1 // btree version threadskv5 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // and bi-directional cursors
10 // author: karl malbrain, malbrain@cal.berkeley.edu
13 This work, including the source code, documentation
14 and related data, is placed into the public domain.
16 The orginal author is Karl Malbrain.
18 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
19 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
20 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
21 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
22 RESULTING FROM THE USE, MODIFICATION, OR
23 REDISTRIBUTION OF THIS SOFTWARE.
26 // Please see the project home page for documentation
27 // code.google.com/p/high-concurrency-btree
29 #define _FILE_OFFSET_BITS 64
30 #define _LARGEFILE64_SOURCE
46 #define WIN32_LEAN_AND_MEAN
60 typedef unsigned long long uid;
63 typedef unsigned long long off64_t;
64 typedef unsigned short ushort;
65 typedef unsigned int uint;
68 #define BT_latchtable 128 // number of latch manager slots
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
79 There are five lock types for each node in three independent sets:
80 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
81 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
82 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
83 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
84 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
95 // definition for phase-fair reader/writer lock implementation
109 // definition for spin latch implementation
111 // exclusive is set for write access
112 // share is count of read accessors
113 // grant write lock when share == 0
115 volatile typedef struct {
126 // hash table entries
129 BtSpinLatch latch[1];
130 volatile ushort slot; // Latch table entry at head of chain
133 // latch manager table structure
136 RWLock readwr[1]; // read/write page lock
137 RWLock access[1]; // Access Intent/Page delete
138 RWLock parent[1]; // Posting of fence key in parent
139 BtSpinLatch busy[1]; // slot is being moved between chains
140 volatile ushort next; // next entry in hash table chain
141 volatile ushort prev; // prev entry in hash table chain
142 volatile ushort pin; // number of outstanding locks
143 volatile ushort hash; // hash slot entry is under
144 volatile uid page_no; // latch set page number
147 // Define the length of the page record numbers
151 // Page key slot definition.
153 // Keys are marked dead, but remain on the page until
154 // it cleanup is called. The fence key (highest key) for
155 // a leaf page is always present, even after cleanup.
159 // In addition to the Unique keys that occupy slots
160 // there are Librarian and Duplicate key
161 // slots occupying the key slot array.
163 // The Librarian slots are dead keys that
164 // serve as filler, available to add new Unique
165 // or Dup slots that are inserted into the B-tree.
167 // The Duplicate slots have had their key bytes extended
168 // by 6 bytes to contain a binary duplicate key uniqueifier.
177 uint off:BT_maxbits; // page offset for key start
178 uint type:3; // type of slot
179 uint dead:1; // set for deleted slot
182 // The key structure occupies space at the upper end of
183 // each page. It's a length byte followed by the key
187 unsigned char len; // this can be changed to a ushort or uint
188 unsigned char key[0];
191 // the value structure also occupies space at the upper
192 // end of the page. Each key is immediately followed by a value.
195 unsigned char len; // this can be changed to a ushort or uint
196 unsigned char value[0];
199 #define BT_maxkey 255 // maximum number of bytes in a key
200 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
202 // The first part of an index page.
203 // It is immediately followed
204 // by the BtSlot array of keys.
206 // note that this structure size
207 // must be a multiple of 8 bytes
208 // in order to place dups correctly.
210 typedef struct BtPage_ {
211 uint cnt; // count of keys in page
212 uint act; // count of active keys
213 uint min; // next key offset
214 uint garbage; // page garbage in bytes
215 unsigned char bits:7; // page size in bits
216 unsigned char free:1; // page is on free chain
217 unsigned char lvl:7; // level of page
218 unsigned char kill:1; // page is being deleted
219 unsigned char left[BtId]; // page number to left
220 unsigned char filler[2]; // padding to multiple of 8
221 unsigned char right[BtId]; // page number to right
224 // The memory mapping pool table buffer manager entry
227 uid basepage; // mapped base page number
228 char *map; // mapped memory pointer
229 ushort slot; // slot index in this array
230 ushort pin; // mapped page pin counter
231 void *hashprev; // previous pool entry for the same hash idx
232 void *hashnext; // next pool entry for the same hash idx
234 HANDLE hmap; // Windows memory mapping handle
238 #define CLOCK_bit 0x8000 // bit in pool->pin
240 // The loadpage interface object
243 uid page_no; // current page number
244 BtPage page; // current page pointer
245 BtPool *pool; // current page pool
246 BtLatchSet *latch; // current page latch set
249 // structure for latch manager on ALLOC_page
252 struct BtPage_ alloc[1]; // next page_no in right ptr
253 unsigned long long dups[1]; // global duplicate key uniqueifier
254 unsigned char chain[BtId]; // head of free page_nos chain
255 BtSpinLatch lock[1]; // allocation area lite latch
256 ushort latchdeployed; // highest number of latch entries deployed
257 ushort nlatchpage; // number of latch pages at BT_latch
258 ushort latchtotal; // number of page latch entries
259 ushort latchhash; // number of latch hash table slots
260 ushort latchvictim; // next latch entry to examine
261 BtHashEntry table[0]; // the hash table
264 // The object structure for Btree access
267 uint page_size; // page size
268 uint page_bits; // page size in bits
269 uint seg_bits; // seg size in pages in bits
270 uint mode; // read-write mode
276 ushort poolcnt; // highest page pool node in use
277 ushort poolmax; // highest page pool node allocated
278 ushort poolmask; // total number of pages in mmap segment - 1
279 ushort hashsize; // size of Hash Table for pool entries
280 volatile uint evicted; // last evicted hash table slot
281 ushort *hash; // pool index for hash entries
282 BtSpinLatch *latch; // latches for hash table slots
283 BtLatchMgr *latchmgr; // mapped latch page from allocation page
284 BtLatchSet *latchsets; // mapped latch set from latch pages
285 BtPool *pool; // memory pool page segments
287 HANDLE halloc; // allocation and latch table handle
292 BtMgr *mgr; // buffer manager for thread
293 BtPage cursor; // cached frame for start/next (never mapped)
294 BtPage frame; // spare frame for the page split (never mapped)
295 uid cursor_page; // current cursor page number
296 unsigned char *mem; // frame, cursor, page memory buffer
297 unsigned char key[BT_keyarray]; // last found complete key
298 int found; // last delete or insert was found
299 int err; // last error
313 extern void bt_close (BtDb *bt);
314 extern BtDb *bt_open (BtMgr *mgr);
315 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
316 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
317 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
318 extern BtKey *bt_foundkey (BtDb *bt);
319 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
320 extern uint bt_nextkey (BtDb *bt, uint slot);
323 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
324 void bt_mgrclose (BtMgr *mgr);
326 // Helper functions to return slot values
327 // from the cursor page.
329 extern BtKey *bt_key (BtDb *bt, uint slot);
330 extern BtVal *bt_val (BtDb *bt, uint slot);
332 // BTree page number constants
333 #define ALLOC_page 0 // allocation & latch manager hash table
334 #define ROOT_page 1 // root of the btree
335 #define LEAF_page 2 // first page of leaves
336 #define LATCH_page 3 // pages for latch manager
338 // Number of levels to create in a new BTree
342 // The page is allocated from low and hi ends.
343 // The key slots are allocated from the bottom,
344 // while the text and value of the key
345 // are allocated from the top. When the two
346 // areas meet, the page is split into two.
348 // A key consists of a length byte, two bytes of
349 // index number (0 - 65534), and up to 253 bytes
352 // Associated with each key is a value byte string
353 // containing any value desired.
355 // The b-tree root is always located at page 1.
356 // The first leaf page of level zero is always
357 // located on page 2.
359 // The b-tree pages are linked with next
360 // pointers to facilitate enumerators,
361 // and provide for concurrency.
363 // When to root page fills, it is split in two and
364 // the tree height is raised by a new root at page
365 // one with two keys.
367 // Deleted keys are marked with a dead bit until
368 // page cleanup. The fence key for a leaf node is
371 // Groups of pages called segments from the btree are optionally
372 // cached with a memory mapped pool. A hash table is used to keep
373 // track of the cached segments. This behaviour is controlled
374 // by the cache block size parameter to bt_open.
376 // To achieve maximum concurrency one page is locked at a time
377 // as the tree is traversed to find leaf key in question. The right
378 // page numbers are used in cases where the page is being split,
381 // Page 0 is dedicated to lock for new page extensions,
382 // and chains empty pages together for reuse. It also
383 // contains the latch manager hash table.
385 // The ParentModification lock on a node is obtained to serialize posting
386 // or changing the fence key for a node.
388 // Empty pages are chained together through the ALLOC page and reused.
390 // Access macros to address slot and key values from the page
391 // Page slots use 1 based indexing.
393 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
394 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
395 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
397 void bt_putid(unsigned char *dest, uid id)
402 dest[i] = (unsigned char)id, id >>= 8;
405 uid bt_getid(unsigned char *src)
410 for( i = 0; i < BtId; i++ )
411 id <<= 8, id |= *src++;
416 uid bt_newdup (BtDb *bt)
419 return __sync_fetch_and_add (bt->mgr->latchmgr->dups, 1) + 1;
421 return _InterlockedIncrement64(bt->mgr->latchmgr->dups, 1);
425 // Phase-Fair reader/writer lock implementation
427 void WriteLock (RWLock *lock)
432 tix = __sync_fetch_and_add (lock->ticket, 1);
434 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
436 // wait for our ticket to come up
438 while( tix != lock->serving[0] )
445 w = PRES | (tix & PHID);
447 r = __sync_fetch_and_add (lock->rin, w);
449 r = _InterlockedExchangeAdd16 (lock->rin, w);
451 while( r != *lock->rout )
459 void WriteRelease (RWLock *lock)
462 __sync_fetch_and_and (lock->rin, ~MASK);
464 _InterlockedAnd16 (lock->rin, ~MASK);
469 void ReadLock (RWLock *lock)
473 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
475 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
478 while( w == (*lock->rin & MASK) )
486 void ReadRelease (RWLock *lock)
489 __sync_fetch_and_add (lock->rout, RINC);
491 _InterlockedExchangeAdd16 (lock->rout, RINC);
495 // Spin Latch Manager
497 // wait until write lock mode is clear
498 // and add 1 to the share count
500 void bt_spinreadlock(BtSpinLatch *latch)
506 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
508 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
510 // see if exclusive request is granted or pending
515 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
517 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
520 } while( sched_yield(), 1 );
522 } while( SwitchToThread(), 1 );
526 // wait for other read and write latches to relinquish
528 void bt_spinwritelock(BtSpinLatch *latch)
534 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
536 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
539 if( !(prev & ~BOTH) )
543 __sync_fetch_and_and ((ushort *)latch, ~XCL);
545 _InterlockedAnd16((ushort *)latch, ~XCL);
548 } while( sched_yield(), 1 );
550 } while( SwitchToThread(), 1 );
554 // try to obtain write lock
556 // return 1 if obtained,
559 int bt_spinwritetry(BtSpinLatch *latch)
564 prev = __sync_fetch_and_or((ushort *)latch, XCL);
566 prev = _InterlockedOr16((ushort *)latch, XCL);
568 // take write access if all bits are clear
571 if( !(prev & ~BOTH) )
575 __sync_fetch_and_and ((ushort *)latch, ~XCL);
577 _InterlockedAnd16((ushort *)latch, ~XCL);
584 void bt_spinreleasewrite(BtSpinLatch *latch)
587 __sync_fetch_and_and((ushort *)latch, ~BOTH);
589 _InterlockedAnd16((ushort *)latch, ~BOTH);
593 // decrement reader count
595 void bt_spinreleaseread(BtSpinLatch *latch)
598 __sync_fetch_and_add((ushort *)latch, -SHARE);
600 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
604 // link latch table entry into latch hash table
606 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
608 BtLatchSet *set = bt->mgr->latchsets + victim;
610 if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
611 bt->mgr->latchsets[set->next].prev = victim;
613 bt->mgr->latchmgr->table[hashidx].slot = victim;
614 set->page_no = page_no;
621 void bt_unpinlatch (BtLatchSet *set)
624 __sync_fetch_and_add(&set->pin, -1);
626 _InterlockedDecrement16 (&set->pin);
630 // find existing latchset or inspire new one
631 // return with latchset pinned
633 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
635 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
636 ushort slot, avail = 0, victim, idx;
639 // obtain read lock on hash table entry
641 bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
643 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
645 set = bt->mgr->latchsets + slot;
646 if( page_no == set->page_no )
648 } while( slot = set->next );
652 __sync_fetch_and_add(&set->pin, 1);
654 _InterlockedIncrement16 (&set->pin);
658 bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
663 // try again, this time with write lock
665 bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
667 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
669 set = bt->mgr->latchsets + slot;
670 if( page_no == set->page_no )
672 if( !set->pin && !avail )
674 } while( slot = set->next );
676 // found our entry, or take over an unpinned one
678 if( slot || (slot = avail) ) {
679 set = bt->mgr->latchsets + slot;
681 __sync_fetch_and_add(&set->pin, 1);
683 _InterlockedIncrement16 (&set->pin);
685 set->page_no = page_no;
686 bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
690 // see if there are any unused entries
692 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
694 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
697 if( victim < bt->mgr->latchmgr->latchtotal ) {
698 set = bt->mgr->latchsets + victim;
700 __sync_fetch_and_add(&set->pin, 1);
702 _InterlockedIncrement16 (&set->pin);
704 bt_latchlink (bt, hashidx, victim, page_no);
705 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
710 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
712 victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
714 // find and reuse previous lock entry
718 victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
720 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
722 // we don't use slot zero
724 if( victim %= bt->mgr->latchmgr->latchtotal )
725 set = bt->mgr->latchsets + victim;
729 // take control of our slot
730 // from other threads
732 if( set->pin || !bt_spinwritetry (set->busy) )
737 // try to get write lock on hash chain
738 // skip entry if not obtained
739 // or has outstanding locks
741 if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
742 bt_spinreleasewrite (set->busy);
747 bt_spinreleasewrite (set->busy);
748 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
752 // unlink our available victim from its hash chain
755 bt->mgr->latchsets[set->prev].next = set->next;
757 bt->mgr->latchmgr->table[idx].slot = set->next;
760 bt->mgr->latchsets[set->next].prev = set->prev;
762 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
764 __sync_fetch_and_add(&set->pin, 1);
766 _InterlockedIncrement16 (&set->pin);
768 bt_latchlink (bt, hashidx, victim, page_no);
769 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
770 bt_spinreleasewrite (set->busy);
775 void bt_mgrclose (BtMgr *mgr)
780 // release mapped pages
781 // note that slot zero is never used
783 for( slot = 1; slot < mgr->poolmax; slot++ ) {
784 pool = mgr->pool + slot;
787 munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
790 FlushViewOfFile(pool->map, 0);
791 UnmapViewOfFile(pool->map);
792 CloseHandle(pool->hmap);
798 munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
799 munmap (mgr->latchmgr, 2 * mgr->page_size);
801 FlushViewOfFile(mgr->latchmgr, 0);
802 UnmapViewOfFile(mgr->latchmgr);
803 CloseHandle(mgr->halloc);
809 free ((void *)mgr->latch);
812 FlushFileBuffers(mgr->idx);
813 CloseHandle(mgr->idx);
814 GlobalFree (mgr->pool);
815 GlobalFree (mgr->hash);
816 GlobalFree ((void *)mgr->latch);
821 // close and release memory
823 void bt_close (BtDb *bt)
830 VirtualFree (bt->mem, 0, MEM_RELEASE);
835 // open/create new btree buffer manager
837 // call with file_name, BT_openmode, bits in page size (e.g. 16),
838 // size of mapped page pool (e.g. 8192)
840 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
842 uint lvl, attr, cacheblk, last, slot, idx;
843 uint nlatchpage, latchhash;
844 unsigned char value[BtId];
845 int flag, initit = 0;
846 BtLatchMgr *latchmgr;
854 SYSTEM_INFO sysinfo[1];
857 // determine sanity of page size and buffer pool
859 if( bits > BT_maxbits )
861 else if( bits < BT_minbits )
865 return NULL; // must have buffer pool
868 mgr = calloc (1, sizeof(BtMgr));
870 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
873 return free(mgr), NULL;
875 cacheblk = 4096; // minimum mmap segment size for unix
878 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
879 attr = FILE_ATTRIBUTE_NORMAL;
880 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
882 if( mgr->idx == INVALID_HANDLE_VALUE )
883 return GlobalFree(mgr), NULL;
885 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
886 GetSystemInfo(sysinfo);
887 cacheblk = sysinfo->dwAllocationGranularity;
891 latchmgr = malloc (BT_maxpage);
894 // read minimum page size to get root info
895 // to support raw disk partition files
896 // check if bits == 0 on the disk.
898 if( size = lseek (mgr->idx, 0L, 2) ) {
899 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
900 if( latchmgr->alloc->bits )
901 bits = latchmgr->alloc->bits;
905 return free(mgr), free(latchmgr), NULL;
906 } else if( mode == BT_ro )
907 return free(latchmgr), bt_mgrclose (mgr), NULL;
911 latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
912 size = GetFileSize(mgr->idx, amt);
915 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
916 return bt_mgrclose (mgr), NULL;
917 bits = latchmgr->alloc->bits;
918 } else if( mode == BT_ro )
919 return bt_mgrclose (mgr), NULL;
924 mgr->page_size = 1 << bits;
925 mgr->page_bits = bits;
927 mgr->poolmax = poolmax;
930 if( cacheblk < mgr->page_size )
931 cacheblk = mgr->page_size;
933 // mask for partial memmaps
935 mgr->poolmask = (cacheblk >> bits) - 1;
937 // see if requested size of pages per memmap is greater
939 if( (1 << segsize) > mgr->poolmask )
940 mgr->poolmask = (1 << segsize) - 1;
944 while( (1 << mgr->seg_bits) <= mgr->poolmask )
947 mgr->hashsize = hashsize;
950 mgr->pool = calloc (poolmax, sizeof(BtPool));
951 mgr->hash = calloc (hashsize, sizeof(ushort));
952 mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
954 mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
955 mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
956 mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
962 // initialize an empty b-tree with latch page, root page, page of leaves
963 // and page(s) of latches
965 memset (latchmgr, 0, 1 << bits);
966 nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1;
967 bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
968 latchmgr->alloc->bits = mgr->page_bits;
970 latchmgr->nlatchpage = nlatchpage;
971 latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
973 // initialize latch manager
975 latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
977 // size of hash table = total number of latchsets
979 if( latchhash > latchmgr->latchtotal )
980 latchhash = latchmgr->latchtotal;
982 latchmgr->latchhash = latchhash;
984 // initialize left-most LEAF page in
987 bt_putid (latchmgr->alloc->left, LEAF_page);
990 if( pwrite (mgr->idx, latchmgr, mgr->page_size, 0) < mgr->page_size )
991 return bt_mgrclose (mgr), NULL;
993 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
994 return bt_mgrclose (mgr), NULL;
996 if( *amt < mgr->page_size )
997 return bt_mgrclose (mgr), NULL;
1000 memset (latchmgr, 0, 1 << bits);
1001 latchmgr->alloc->bits = mgr->page_bits;
1003 for( lvl=MIN_lvl; lvl--; ) {
1004 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1005 key = keyptr(latchmgr->alloc, 1);
1006 key->len = 2; // create stopper key
1010 bt_putid(value, MIN_lvl - lvl + 1);
1011 val = valptr(latchmgr->alloc, 1);
1012 val->len = lvl ? BtId : 0;
1013 memcpy (val->value, value, val->len);
1015 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
1016 latchmgr->alloc->lvl = lvl;
1017 latchmgr->alloc->cnt = 1;
1018 latchmgr->alloc->act = 1;
1020 if( pwrite (mgr->idx, latchmgr, mgr->page_size, (MIN_lvl - lvl) << bits) < mgr->page_size )
1021 return bt_mgrclose (mgr), NULL;
1023 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1024 return bt_mgrclose (mgr), NULL;
1026 if( *amt < mgr->page_size )
1027 return bt_mgrclose (mgr), NULL;
1031 // clear out latch manager locks
1032 // and rest of pages to round out segment
1034 memset(latchmgr, 0, mgr->page_size);
1037 while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1039 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1041 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1042 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1043 return bt_mgrclose (mgr), NULL;
1044 if( *amt < mgr->page_size )
1045 return bt_mgrclose (mgr), NULL;
1052 // mlock the root page and the latchmgr page
1054 flag = PROT_READ | PROT_WRITE;
1055 mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1056 if( mgr->latchmgr == MAP_FAILED )
1057 return bt_mgrclose (mgr), NULL;
1058 mlock (mgr->latchmgr, 2 * mgr->page_size);
1060 mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1061 if( mgr->latchsets == MAP_FAILED )
1062 return bt_mgrclose (mgr), NULL;
1063 mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1065 flag = PAGE_READWRITE;
1066 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1068 return bt_mgrclose (mgr), NULL;
1070 flag = FILE_MAP_WRITE;
1071 mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1072 if( !mgr->latchmgr )
1073 return GetLastError(), bt_mgrclose (mgr), NULL;
1075 mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1081 VirtualFree (latchmgr, 0, MEM_RELEASE);
1086 // open BTree access method
1087 // based on buffer manager
1089 BtDb *bt_open (BtMgr *mgr)
1091 BtDb *bt = malloc (sizeof(*bt));
1093 memset (bt, 0, sizeof(*bt));
1096 bt->mem = malloc (2 *mgr->page_size);
1098 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1100 bt->frame = (BtPage)bt->mem;
1101 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1105 // compare two keys, returning > 0, = 0, or < 0
1106 // as the comparison value
1108 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1110 uint len1 = key1->len;
1113 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1126 // find segment in pool
1127 // must be called with hashslot idx locked
1128 // return NULL if not there
1129 // otherwise return node
1131 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1136 // compute start of hash chain in pool
1138 if( slot = bt->mgr->hash[idx] )
1139 pool = bt->mgr->pool + slot;
1143 page_no &= ~bt->mgr->poolmask;
1145 while( pool->basepage != page_no )
1146 if( pool = pool->hashnext )
1154 // add segment to hash table
1156 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1161 pool->hashprev = pool->hashnext = NULL;
1162 pool->basepage = page_no & ~bt->mgr->poolmask;
1163 pool->pin = CLOCK_bit + 1;
1165 if( slot = bt->mgr->hash[idx] ) {
1166 node = bt->mgr->pool + slot;
1167 pool->hashnext = node;
1168 node->hashprev = pool;
1171 bt->mgr->hash[idx] = pool->slot;
1174 // map new buffer pool segment to virtual memory
1176 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1178 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1179 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1183 flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1184 pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1185 if( pool->map == MAP_FAILED )
1186 return bt->err = BTERR_map;
1189 flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1190 pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1192 return bt->err = BTERR_map;
1194 flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1195 pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1197 return bt->err = BTERR_map;
1202 // calculate page within pool
1204 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1206 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1209 page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1215 void bt_unpinpool (BtPool *pool)
1218 __sync_fetch_and_add(&pool->pin, -1);
1220 _InterlockedDecrement16 (&pool->pin);
1224 // find or place requested page in segment-pool
1225 // return pool table entry, incrementing pin
1227 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1229 uint slot, hashidx, idx, victim;
1230 BtPool *pool, *node, *next;
1232 // lock hash table chain
1234 hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1235 bt_spinwritelock (&bt->mgr->latch[hashidx]);
1237 // look up in hash table
1239 if( pool = bt_findpool(bt, page_no, hashidx) ) {
1241 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1242 __sync_fetch_and_add(&pool->pin, 1);
1244 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1245 _InterlockedIncrement16 (&pool->pin);
1247 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1251 // allocate a new pool node
1252 // and add to hash table
1255 slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1257 slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1260 if( ++slot < bt->mgr->poolmax ) {
1261 pool = bt->mgr->pool + slot;
1264 if( bt_mapsegment(bt, pool, page_no) )
1267 bt_linkhash(bt, pool, page_no, hashidx);
1268 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1272 // pool table is full
1273 // find best pool entry to evict
1276 __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1278 _InterlockedDecrement16 (&bt->mgr->poolcnt);
1283 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1285 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1287 victim %= bt->mgr->poolmax;
1288 pool = bt->mgr->pool + victim;
1289 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1294 // try to get write lock
1295 // skip entry if not obtained
1297 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1300 // skip this entry if
1302 // or clock bit is set
1306 __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1308 _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1310 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1314 // unlink victim pool node from hash table
1316 if( node = pool->hashprev )
1317 node->hashnext = pool->hashnext;
1318 else if( node = pool->hashnext )
1319 bt->mgr->hash[idx] = node->slot;
1321 bt->mgr->hash[idx] = 0;
1323 if( node = pool->hashnext )
1324 node->hashprev = pool->hashprev;
1326 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1328 // remove old file mapping
1330 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1332 // FlushViewOfFile(pool->map, 0);
1333 UnmapViewOfFile(pool->map);
1334 CloseHandle(pool->hmap);
1338 // create new pool mapping
1339 // and link into hash table
1341 if( bt_mapsegment(bt, pool, page_no) )
1344 bt_linkhash(bt, pool, page_no, hashidx);
1345 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1350 // place write, read, or parent lock on requested page_no.
1352 void bt_lockpage(BtLock mode, BtLatchSet *set)
1356 ReadLock (set->readwr);
1359 WriteLock (set->readwr);
1362 ReadLock (set->access);
1365 WriteLock (set->access);
1368 WriteLock (set->parent);
1373 // remove write, read, or parent lock on requested page
1375 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1379 ReadRelease (set->readwr);
1382 WriteRelease (set->readwr);
1385 ReadRelease (set->access);
1388 WriteRelease (set->access);
1391 WriteRelease (set->parent);
1396 // allocate a new page
1397 // return with page pinned.
1399 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
1403 // lock allocation page
1405 bt_spinwritelock(bt->mgr->latchmgr->lock);
1407 // use empty chain first
1408 // else allocate empty page
1410 if( set->page_no = bt_getid(bt->mgr->latchmgr->chain) ) {
1411 if( set->pool = bt_pinpool (bt, set->page_no) )
1412 set->page = bt_page (bt, set->pool, set->page_no);
1414 return bt->err = BTERR_struct;
1416 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1418 set->page_no = bt_getid(bt->mgr->latchmgr->alloc->right);
1419 bt_putid(bt->mgr->latchmgr->alloc->right, set->page_no+1);
1421 // if writing first page of pool block, set file length thru last page
1423 if( (set->page_no & bt->mgr->poolmask) == 0 )
1424 ftruncate (bt->mgr->idx, (set->page_no + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1426 if( set->pool = bt_pinpool (bt, set->page_no) )
1427 set->page = bt_page (bt, set->pool, set->page_no);
1432 // unlock allocation latch
1434 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1438 // find slot in page for given key at a given level
1440 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1442 uint diff, higher = set->page->cnt, low = 1, slot;
1445 // make stopper key an infinite fence value
1447 if( bt_getid (set->page->right) )
1452 // low is the lowest candidate.
1453 // loop ends when they meet
1455 // higher is already
1456 // tested as .ge. the passed key.
1458 while( diff = higher - low ) {
1459 slot = low + ( diff >> 1 );
1460 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1463 higher = slot, good++;
1466 // return zero if key is on right link page
1468 return good ? higher : 0;
1471 // find and load page at given level for given key
1472 // leave page rd or wr locked as requested
1474 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1476 uid page_no = ROOT_page, prevpage = 0;
1477 uint drill = 0xff, slot;
1478 BtLatchSet *prevlatch;
1479 uint mode, prevmode;
1482 // start at root of btree and drill down
1485 // determine lock mode of drill level
1486 mode = (drill == lvl) ? lock : BtLockRead;
1488 set->latch = bt_pinlatch (bt, page_no);
1489 set->page_no = page_no;
1491 // pin page contents
1493 if( set->pool = bt_pinpool (bt, page_no) )
1494 set->page = bt_page (bt, set->pool, page_no);
1498 // obtain access lock using lock chaining with Access mode
1500 if( page_no > ROOT_page )
1501 bt_lockpage(BtLockAccess, set->latch);
1503 // release & unpin parent page
1506 bt_unlockpage(prevmode, prevlatch);
1507 bt_unpinlatch (prevlatch);
1508 bt_unpinpool (prevpool);
1512 // obtain read lock using lock chaining
1514 bt_lockpage(mode, set->latch);
1516 if( set->page->free )
1517 return bt->err = BTERR_struct, 0;
1519 if( page_no > ROOT_page )
1520 bt_unlockpage(BtLockAccess, set->latch);
1522 // re-read and re-lock root after determining actual level of root
1524 if( set->page->lvl != drill) {
1525 if( set->page_no != ROOT_page )
1526 return bt->err = BTERR_struct, 0;
1528 drill = set->page->lvl;
1530 if( lock != BtLockRead && drill == lvl ) {
1531 bt_unlockpage(mode, set->latch);
1532 bt_unpinlatch (set->latch);
1533 bt_unpinpool (set->pool);
1538 prevpage = set->page_no;
1539 prevlatch = set->latch;
1540 prevpool = set->pool;
1543 // find key on page at this level
1544 // and descend to requested level
1546 if( set->page->kill )
1549 if( slot = bt_findslot (set, key, len) ) {
1553 while( slotptr(set->page, slot)->dead )
1554 if( slot++ < set->page->cnt )
1559 page_no = bt_getid(valptr(set->page, slot)->value);
1564 // or slide right into next page
1567 page_no = bt_getid(set->page->right);
1571 // return error on end of right chain
1573 bt->err = BTERR_struct;
1574 return 0; // return error
1577 // return page to free list
1578 // page must be delete & write locked
1580 void bt_freepage (BtDb *bt, BtPageSet *set)
1582 // lock allocation page
1584 bt_spinwritelock (bt->mgr->latchmgr->lock);
1587 memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1588 bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1589 set->page->free = 1;
1591 // unlock released page
1593 bt_unlockpage (BtLockDelete, set->latch);
1594 bt_unlockpage (BtLockWrite, set->latch);
1595 bt_unpinlatch (set->latch);
1596 bt_unpinpool (set->pool);
1598 // unlock allocation page
1600 bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1603 // a fence key was deleted from a page
1604 // push new fence value upwards
1606 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1608 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1609 unsigned char value[BtId];
1613 // remove the old fence value
1615 ptr = keyptr(set->page, set->page->cnt);
1616 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1617 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1619 // cache new fence value
1621 ptr = keyptr(set->page, set->page->cnt);
1622 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1624 bt_lockpage (BtLockParent, set->latch);
1625 bt_unlockpage (BtLockWrite, set->latch);
1627 // insert new (now smaller) fence key
1629 bt_putid (value, set->page_no);
1630 ptr = (BtKey*)leftkey;
1632 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1635 // now delete old fence key
1637 ptr = (BtKey*)rightkey;
1639 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1642 bt_unlockpage (BtLockParent, set->latch);
1643 bt_unpinlatch(set->latch);
1644 bt_unpinpool (set->pool);
1648 // root has a single child
1649 // collapse a level from the tree
1651 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1656 // find the child entry and promote as new root contents
1659 for( idx = 0; idx++ < root->page->cnt; )
1660 if( !slotptr(root->page, idx)->dead )
1663 child->page_no = bt_getid (valptr(root->page, idx)->value);
1665 child->latch = bt_pinlatch (bt, child->page_no);
1666 bt_lockpage (BtLockDelete, child->latch);
1667 bt_lockpage (BtLockWrite, child->latch);
1669 if( child->pool = bt_pinpool (bt, child->page_no) )
1670 child->page = bt_page (bt, child->pool, child->page_no);
1674 memcpy (root->page, child->page, bt->mgr->page_size);
1675 bt_freepage (bt, child);
1677 } while( root->page->lvl > 1 && root->page->act == 1 );
1679 bt_unlockpage (BtLockWrite, root->latch);
1680 bt_unpinlatch (root->latch);
1681 bt_unpinpool (root->pool);
1685 // maintain reverse scan pointers by
1686 // linking left pointer of far right node
1688 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1690 BtPageSet right2[1];
1692 // keep track of rightmost leaf page
1694 if( !right_page_no ) {
1695 bt_putid (bt->mgr->latchmgr->alloc->left, left_page_no);
1699 // link right page to left page
1701 right2->latch = bt_pinlatch (bt, right_page_no);
1702 bt_lockpage (BtLockWrite, right2->latch);
1704 if( right2->pool = bt_pinpool (bt, right_page_no) )
1705 right2->page = bt_page (bt, right2->pool, right_page_no);
1709 bt_putid(right2->page->left, left_page_no);
1710 bt_unlockpage (BtLockWrite, right2->latch);
1711 bt_unpinlatch (right2->latch);
1712 bt_unpinpool (right2->pool);
1716 // find and delete key on page by marking delete flag bit
1717 // if page becomes empty, delete it from the btree
1719 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1721 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1722 uint slot, idx, found, fence;
1723 BtPageSet set[1], right[1];
1724 unsigned char value[BtId];
1728 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1729 ptr = keyptr(set->page, slot);
1733 // if librarian slot, advance to real slot
1735 if( slotptr(set->page, slot)->type == Librarian )
1736 ptr = keyptr(set->page, ++slot);
1738 fence = slot == set->page->cnt;
1740 // if key is found delete it, otherwise ignore request
1742 if( found = !keycmp (ptr, key, len) )
1743 if( found = slotptr(set->page, slot)->dead == 0 ) {
1744 val = valptr(set->page,slot);
1745 slotptr(set->page, slot)->dead = 1;
1746 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1749 // collapse empty slots beneath the fence
1751 while( idx = set->page->cnt - 1 )
1752 if( slotptr(set->page, idx)->dead ) {
1753 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1754 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1759 // did we delete a fence key in an upper level?
1761 if( found && lvl && set->page->act && fence )
1762 if( bt_fixfence (bt, set, lvl) )
1765 return bt->found = found, 0;
1767 // do we need to collapse root?
1769 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1770 if( bt_collapseroot (bt, set) )
1773 return bt->found = found, 0;
1775 // return if page is not empty
1777 if( set->page->act ) {
1778 bt_unlockpage(BtLockWrite, set->latch);
1779 bt_unpinlatch (set->latch);
1780 bt_unpinpool (set->pool);
1781 return bt->found = found, 0;
1784 // cache copy of fence key
1785 // to post in parent
1787 ptr = keyptr(set->page, set->page->cnt);
1788 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1790 // obtain lock on right page
1792 right->page_no = bt_getid(set->page->right);
1793 right->latch = bt_pinlatch (bt, right->page_no);
1794 bt_lockpage (BtLockWrite, right->latch);
1796 // pin page contents
1798 if( right->pool = bt_pinpool (bt, right->page_no) )
1799 right->page = bt_page (bt, right->pool, right->page_no);
1803 if( right->page->kill )
1804 return bt->err = BTERR_struct;
1806 // transfer left link
1808 memcpy (right->page->left, set->page->left, BtId);
1810 // pull contents of right peer into our empty page
1812 memcpy (set->page, right->page, bt->mgr->page_size);
1817 if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1820 // cache copy of key to update
1822 ptr = keyptr(right->page, right->page->cnt);
1823 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1825 // mark right page deleted and point it to left page
1826 // until we can post parent updates
1828 bt_putid (right->page->right, set->page_no);
1829 right->page->kill = 1;
1831 bt_lockpage (BtLockParent, right->latch);
1832 bt_unlockpage (BtLockWrite, right->latch);
1834 bt_lockpage (BtLockParent, set->latch);
1835 bt_unlockpage (BtLockWrite, set->latch);
1837 // redirect higher key directly to our new node contents
1839 bt_putid (value, set->page_no);
1840 ptr = (BtKey*)higherfence;
1842 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1845 // delete old lower key to our node
1847 ptr = (BtKey*)lowerfence;
1849 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1852 // obtain delete and write locks to right node
1854 bt_unlockpage (BtLockParent, right->latch);
1855 bt_lockpage (BtLockDelete, right->latch);
1856 bt_lockpage (BtLockWrite, right->latch);
1857 bt_freepage (bt, right);
1859 bt_unlockpage (BtLockParent, set->latch);
1860 bt_unpinlatch (set->latch);
1861 bt_unpinpool (set->pool);
1866 BtKey *bt_foundkey (BtDb *bt)
1868 return (BtKey*)(bt->key);
1871 // advance to next slot
1873 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1875 BtLatchSet *prevlatch;
1879 if( slot < set->page->cnt )
1882 prevlatch = set->latch;
1883 prevpool = set->pool;
1885 if( page_no = bt_getid(set->page->right) )
1886 set->latch = bt_pinlatch (bt, page_no);
1888 return bt->err = BTERR_struct, 0;
1890 // pin page contents
1892 if( set->pool = bt_pinpool (bt, page_no) )
1893 set->page = bt_page (bt, set->pool, page_no);
1897 // obtain access lock using lock chaining with Access mode
1899 bt_lockpage(BtLockAccess, set->latch);
1901 bt_unlockpage(BtLockRead, prevlatch);
1902 bt_unpinlatch (prevlatch);
1903 bt_unpinpool (prevpool);
1905 bt_lockpage(BtLockRead, set->latch);
1906 bt_unlockpage(BtLockAccess, set->latch);
1908 set->page_no = page_no;
1912 // find unique key or first duplicate key in
1913 // leaf level and return number of value bytes
1914 // or (-1) if not found. Setup key for bt_foundkey
1916 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1924 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1926 ptr = keyptr(set->page, slot);
1928 // skip librarian slot place holder
1930 if( slotptr(set->page, slot)->type == Librarian )
1931 ptr = keyptr(set->page, ++slot);
1933 // return actual key found
1935 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1938 if( slotptr(set->page, slot)->type == Duplicate )
1941 // not there if we reach the stopper key
1943 if( slot == set->page->cnt )
1944 if( !bt_getid (set->page->right) )
1947 // if key exists, return >= 0 value bytes copied
1948 // otherwise return (-1)
1950 if( slotptr(set->page, slot)->dead )
1954 if( !memcmp (ptr->key, key, len) ) {
1955 val = valptr (set->page,slot);
1956 if( valmax > val->len )
1958 memcpy (value, val->value, valmax);
1964 } while( slot = bt_findnext (bt, set, slot) );
1966 bt_unlockpage (BtLockRead, set->latch);
1967 bt_unpinlatch (set->latch);
1968 bt_unpinpool (set->pool);
1972 // check page for space available,
1973 // clean if necessary and return
1974 // 0 - page needs splitting
1975 // >0 new slot value
1977 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1979 uint nxt = bt->mgr->page_size;
1980 uint cnt = 0, idx = 0;
1981 uint max = page->cnt;
1986 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1989 // skip cleanup and proceed to split
1990 // if there's not enough garbage
1993 if( page->garbage < nxt / 5 )
1996 memcpy (bt->frame, page, bt->mgr->page_size);
1998 // skip page info and set rest of page to zero
2000 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2004 // clean up page first by
2005 // removing deleted keys
2007 while( cnt++ < max ) {
2011 if( cnt < max || page->lvl )
2012 if( slotptr(bt->frame,cnt)->dead )
2015 // copy the value across
2017 val = valptr(bt->frame, cnt);
2018 nxt -= val->len + sizeof(BtVal);
2019 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2021 // copy the key across
2023 key = keyptr(bt->frame, cnt);
2024 nxt -= key->len + sizeof(BtKey);
2025 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2027 // make a librarian slot
2029 slotptr(page, ++idx)->off = nxt;
2030 slotptr(page, idx)->type = Librarian;
2031 slotptr(page, idx)->dead = 1;
2035 slotptr(page, ++idx)->off = nxt;
2036 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2038 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2045 // see if page has enough space now, or does it need splitting?
2047 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2053 // split the root and raise the height of the btree
2055 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
2057 uint nxt = bt->mgr->page_size;
2058 unsigned char value[BtId];
2063 // Obtain an empty page to use, and copy the current
2064 // root contents into it, e.g. lower keys
2066 if( bt_newpage(bt, left) )
2069 memcpy(left->page, root->page, bt->mgr->page_size);
2070 bt_unpinpool (left->pool);
2072 // set left from higher to lower page
2074 bt_putid (right->page->left, left->page_no);
2075 bt_unpinpool (right->pool);
2077 // preserve the page info at the bottom
2078 // of higher keys and set rest to zero
2080 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2082 // insert stopper key at top of newroot page
2083 // and increase the root height
2085 nxt -= BtId + sizeof(BtVal);
2086 bt_putid (value, right->page_no);
2087 val = (BtVal *)((unsigned char *)root->page + nxt);
2088 memcpy (val->value, value, BtId);
2091 nxt -= 2 + sizeof(BtKey);
2092 slotptr(root->page, 2)->off = nxt;
2093 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2098 // insert lower keys page fence key on newroot page as first key
2100 nxt -= BtId + sizeof(BtVal);
2101 bt_putid (value, left->page_no);
2102 val = (BtVal *)((unsigned char *)root->page + nxt);
2103 memcpy (val->value, value, BtId);
2106 nxt -= leftkey->len + sizeof(BtKey);
2107 slotptr(root->page, 1)->off = nxt;
2108 memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
2110 bt_putid(root->page->right, 0);
2111 root->page->min = nxt; // reset lowest used offset and key count
2112 root->page->cnt = 2;
2113 root->page->act = 2;
2116 // release and unpin root
2118 bt_unlockpage(BtLockWrite, root->latch);
2119 bt_unpinlatch (root->latch);
2120 bt_unpinpool (root->pool);
2124 // split already locked full node
2127 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2129 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
2130 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2131 unsigned char value[BtId];
2132 uint lvl = set->page->lvl;
2139 // split higher half of keys to bt->frame
2141 memset (bt->frame, 0, bt->mgr->page_size);
2142 max = set->page->cnt;
2146 while( cnt++ < max ) {
2147 if( cnt < max || set->page->lvl )
2148 if( slotptr(set->page, cnt)->dead )
2150 src = valptr(set->page, cnt);
2151 nxt -= src->len + sizeof(BtVal);
2152 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2154 key = keyptr(set->page, cnt);
2155 nxt -= key->len + sizeof(BtKey);
2156 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + sizeof(BtVal));
2158 // add librarian slot
2160 slotptr(bt->frame, ++idx)->off = nxt;
2161 slotptr(bt->frame, idx)->type = Librarian;
2162 slotptr(bt->frame, idx)->dead = 1;
2166 slotptr(bt->frame, ++idx)->off = nxt;
2167 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2169 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2173 // remember existing fence key for new page to the right
2175 memcpy (rightkey, key, key->len + sizeof(BtKey));
2177 bt->frame->bits = bt->mgr->page_bits;
2178 bt->frame->min = nxt;
2179 bt->frame->cnt = idx;
2180 bt->frame->lvl = lvl;
2184 if( set->page_no > ROOT_page ) {
2185 bt_putid (bt->frame->right, bt_getid (set->page->right));
2186 bt_putid(bt->frame->left, set->page_no);
2189 // get new free page and write higher keys to it.
2191 if( bt_newpage(bt, right) )
2196 if( set->page_no > ROOT_page && !lvl )
2197 if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
2200 memcpy (right->page, bt->frame, bt->mgr->page_size);
2202 // update lower keys to continue in old page
2204 memcpy (bt->frame, set->page, bt->mgr->page_size);
2205 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2206 nxt = bt->mgr->page_size;
2207 set->page->garbage = 0;
2213 if( slotptr(bt->frame, max)->type == Librarian )
2216 // assemble page of smaller keys
2218 while( cnt++ < max ) {
2219 if( slotptr(bt->frame, cnt)->dead )
2221 src = valptr(bt->frame, cnt);
2222 nxt -= src->len + sizeof(BtVal);
2223 memcpy ((unsigned char *)set->page + nxt, src, src->len + sizeof(BtVal));
2225 key = keyptr(bt->frame, cnt);
2226 nxt -= key->len + sizeof(BtKey);
2227 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2229 // add librarian slot
2231 slotptr(set->page, ++idx)->off = nxt;
2232 slotptr(set->page, idx)->type = Librarian;
2233 slotptr(set->page, idx)->dead = 1;
2237 slotptr(set->page, ++idx)->off = nxt;
2238 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2242 // remember fence key for smaller page
2244 memcpy(fencekey, key, key->len + sizeof(BtKey));
2246 bt_putid(set->page->right, right->page_no);
2247 set->page->min = nxt;
2248 set->page->cnt = idx;
2250 // if current page is the root page, split it
2252 if( set->page_no == ROOT_page )
2253 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2255 // insert new fences in their parent pages
2257 right->latch = bt_pinlatch (bt, right->page_no);
2258 bt_lockpage (BtLockParent, right->latch);
2260 bt_lockpage (BtLockParent, set->latch);
2261 bt_unlockpage (BtLockWrite, set->latch);
2263 // insert new fence for reformulated left block of smaller keys
2265 bt_putid (value, set->page_no);
2266 ptr = (BtKey*)fencekey;
2268 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2271 // switch fence for right block of larger keys to new right page
2273 bt_putid (value, right->page_no);
2274 ptr = (BtKey*)rightkey;
2276 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2279 bt_unlockpage (BtLockParent, set->latch);
2280 bt_unpinlatch (set->latch);
2281 bt_unpinpool (set->pool);
2283 bt_unlockpage (BtLockParent, right->latch);
2284 bt_unpinlatch (right->latch);
2285 bt_unpinpool (right->pool);
2289 // install new key and value onto page
2290 // page must already be checked for
2293 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2295 uint idx, librarian;
2300 // if found slot > desired slot and previous slot
2301 // is a librarian slot, use it
2304 if( slotptr(set->page, slot-1)->type == Librarian )
2307 // copy value onto page
2309 set->page->min -= vallen + sizeof(BtVal);
2310 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2311 memcpy (val->value, value, vallen);
2314 // copy key onto page
2316 set->page->min -= keylen + sizeof(BtKey);
2317 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2318 memcpy (ptr->key, key, keylen);
2321 // find first empty slot
2323 for( idx = slot; idx < set->page->cnt; idx++ )
2324 if( slotptr(set->page, idx)->dead )
2327 // now insert key into array before slot
2329 if( idx == set->page->cnt )
2330 idx += 2, set->page->cnt += 2, librarian = 2;
2336 while( idx > slot + librarian - 1 )
2337 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2339 // add librarian slot
2341 if( librarian > 1 ) {
2342 node = slotptr(set->page, slot++);
2343 node->off = set->page->min;
2344 node->type = Librarian;
2350 node = slotptr(set->page, slot);
2351 node->off = set->page->min;
2355 bt_unlockpage (BtLockWrite, set->latch);
2356 bt_unpinlatch (set->latch);
2357 bt_unpinpool (set->pool);
2361 // Insert new key into the btree at given level.
2362 // either add a new key or update/add an existing one
2364 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2366 unsigned char newkey[BT_keyarray];
2367 uint slot, idx, len;
2374 // set up the key we're working on
2376 ins = (BtKey*)newkey;
2377 memcpy (ins->key, key, keylen);
2380 // is this a non-unique index value?
2386 sequence = bt_newdup (bt);
2387 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2391 while( 1 ) { // find the page and slot for the current key
2392 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2393 ptr = keyptr(set->page, slot);
2396 bt->err = BTERR_ovflw;
2400 // if librarian slot == found slot, advance to real slot
2402 if( slotptr(set->page, slot)->type == Librarian )
2403 if( !keycmp (ptr, key, keylen) )
2404 ptr = keyptr(set->page, ++slot);
2408 if( slotptr(set->page, slot)->type == Duplicate )
2411 // if inserting a duplicate key or unique key
2412 // check for adequate space on the page
2413 // and insert the new key before slot.
2415 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2416 if( !(slot = bt_cleanpage (bt, set->page, ins->len, slot, vallen)) )
2417 if( bt_splitpage (bt, set) )
2422 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2425 // if key already exists, update value and return
2427 val = valptr(set->page, slot);
2429 if( val->len >= vallen ) {
2430 if( slotptr(set->page, slot)->dead )
2432 set->page->garbage += val->len - vallen;
2433 slotptr(set->page, slot)->dead = 0;
2435 memcpy (val->value, value, vallen);
2436 bt_unlockpage(BtLockWrite, set->latch);
2437 bt_unpinlatch (set->latch);
2438 bt_unpinpool (set->pool);
2442 // new update value doesn't fit in existing value area
2444 if( !slotptr(set->page, slot)->dead )
2445 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2447 slotptr(set->page, slot)->dead = 0;
2451 if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2452 if( bt_splitpage (bt, set) )
2457 set->page->min -= vallen + sizeof(BtVal);
2458 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2459 memcpy (val->value, value, vallen);
2462 set->page->min -= keylen + sizeof(BtKey);
2463 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2464 memcpy (ptr->key, key, keylen);
2467 slotptr(set->page, slot)->off = set->page->min;
2468 bt_unlockpage(BtLockWrite, set->latch);
2469 bt_unpinlatch (set->latch);
2470 bt_unpinpool (set->pool);
2476 // set cursor to highest slot on highest page
2478 uint bt_lastkey (BtDb *bt)
2480 uid page_no = bt_getid (bt->mgr->latchmgr->alloc->left);
2484 if( set->pool = bt_pinpool (bt, page_no) )
2485 set->page = bt_page (bt, set->pool, page_no);
2489 set->latch = bt_pinlatch (bt, page_no);
2490 bt_lockpage(BtLockRead, set->latch);
2492 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2493 slot = set->page->cnt;
2495 bt_unlockpage(BtLockRead, set->latch);
2496 bt_unpinlatch (set->latch);
2497 bt_unpinpool (set->pool);
2502 // return previous slot on cursor page
2504 uint bt_prevkey (BtDb *bt, uint slot)
2512 if( left = bt_getid(bt->cursor->left) )
2513 bt->cursor_page = left;
2517 if( set->pool = bt_pinpool (bt, left) )
2518 set->page = bt_page (bt, set->pool, left);
2522 set->latch = bt_pinlatch (bt, left);
2523 bt_lockpage(BtLockRead, set->latch);
2525 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2527 bt_unlockpage(BtLockRead, set->latch);
2528 bt_unpinlatch (set->latch);
2529 bt_unpinpool (set->pool);
2530 return bt->cursor->cnt;
2533 // return next slot on cursor page
2534 // or slide cursor right into next page
2536 uint bt_nextkey (BtDb *bt, uint slot)
2542 right = bt_getid(bt->cursor->right);
2544 while( slot++ < bt->cursor->cnt )
2545 if( slotptr(bt->cursor,slot)->dead )
2547 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2555 bt->cursor_page = right;
2557 if( set->pool = bt_pinpool (bt, right) )
2558 set->page = bt_page (bt, set->pool, right);
2562 set->latch = bt_pinlatch (bt, right);
2563 bt_lockpage(BtLockRead, set->latch);
2565 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2567 bt_unlockpage(BtLockRead, set->latch);
2568 bt_unpinlatch (set->latch);
2569 bt_unpinpool (set->pool);
2577 // cache page of keys into cursor and return starting slot for given key
2579 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2584 // cache page for retrieval
2586 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2587 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2591 bt->cursor_page = set->page_no;
2593 bt_unlockpage(BtLockRead, set->latch);
2594 bt_unpinlatch (set->latch);
2595 bt_unpinpool (set->pool);
2599 BtKey *bt_key(BtDb *bt, uint slot)
2601 return keyptr(bt->cursor, slot);
2604 BtVal *bt_val(BtDb *bt, uint slot)
2606 return valptr(bt->cursor,slot);
2612 double getCpuTime(int type)
2615 FILETIME xittime[1];
2616 FILETIME systime[1];
2617 FILETIME usrtime[1];
2618 SYSTEMTIME timeconv[1];
2621 memset (timeconv, 0, sizeof(SYSTEMTIME));
2625 GetSystemTimeAsFileTime (xittime);
2626 FileTimeToSystemTime (xittime, timeconv);
2627 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2630 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2631 FileTimeToSystemTime (usrtime, timeconv);
2634 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2635 FileTimeToSystemTime (systime, timeconv);
2639 ans += (double)timeconv->wHour * 3600;
2640 ans += (double)timeconv->wMinute * 60;
2641 ans += (double)timeconv->wSecond;
2642 ans += (double)timeconv->wMilliseconds / 1000;
2647 #include <sys/resource.h>
2649 double getCpuTime(int type)
2651 struct rusage used[1];
2652 struct timeval tv[1];
2656 gettimeofday(tv, NULL);
2657 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2660 getrusage(RUSAGE_SELF, used);
2661 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2664 getrusage(RUSAGE_SELF, used);
2665 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2672 void bt_poolaudit (BtMgr *mgr)
2677 while( slot++ < mgr->poolcnt ) {
2678 pool = mgr->pool + slot;
2679 if( pool->pin & ~CLOCK_bit )
2680 fprintf(stderr, "pool slot %d pinned\n", slot);
2684 uint bt_latchaudit (BtDb *bt)
2686 ushort idx, hashidx;
2693 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2695 if( *(ushort *)(bt->mgr->latchmgr->lock) )
2696 fprintf(stderr, "Alloc page locked\n");
2697 *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2699 for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2700 latch = bt->mgr->latchsets + idx;
2701 if( *latch->readwr->rin & MASK )
2702 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2703 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2705 if( *latch->access->rin & MASK )
2706 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2707 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2709 if( *latch->parent->rin & MASK )
2710 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2711 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2714 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2719 for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2720 if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2721 fprintf(stderr, "hash entry %d locked\n", hashidx);
2723 *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2725 if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2726 latch = bt->mgr->latchsets + idx;
2727 if( *(ushort *)latch->busy )
2728 fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2729 *(ushort *)latch->busy = 0;
2731 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2732 } while( idx = latch->next );
2735 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2736 page_no = LEAF_page;
2738 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2739 off64_t off = page_no << bt->mgr->page_bits;
2741 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2745 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2747 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2748 fprintf(stderr, "page %.8x unable to read\n", page_no);
2750 if( *amt < bt->mgr->page_size )
2751 fprintf(stderr, "page %.8x unable to read\n", page_no);
2753 if( !bt->frame->free ) {
2754 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2755 ptr = keyptr(bt->frame, idx+1);
2756 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) > 0 )
2757 fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2759 if( !bt->frame->lvl )
2760 cnt += bt->frame->act;
2762 if( page_no > LEAF_page )
2777 // standalone program to index file of keys
2778 // then list them onto std-out
2781 void *index_file (void *arg)
2783 uint __stdcall index_file (void *arg)
2786 int line = 0, found = 0, cnt = 0, unique;
2787 uid next, page_no = LEAF_page; // start on first page of leaves
2788 unsigned char key[BT_maxkey];
2789 ThreadArg *args = arg;
2790 int ch, len = 0, slot;
2797 bt = bt_open (args->mgr);
2799 unique = (args->type[1] | 0x20) != 'd';
2801 switch(args->type[0] | 0x20)
2804 fprintf(stderr, "started latch mgr audit\n");
2805 cnt = bt_latchaudit (bt);
2806 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2810 fprintf(stderr, "started pennysort for %s\n", args->infile);
2811 if( in = fopen (args->infile, "rb") )
2812 while( ch = getc(in), ch != EOF )
2817 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2818 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2821 else if( len < BT_maxkey )
2823 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2827 fprintf(stderr, "started indexing for %s\n", args->infile);
2828 if( in = fopen (args->infile, "rb") )
2829 while( ch = getc(in), ch != EOF )
2834 if( args->num == 1 )
2835 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2837 else if( args->num )
2838 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2840 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2841 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2844 else if( len < BT_maxkey )
2846 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2850 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2851 if( in = fopen (args->infile, "rb") )
2852 while( ch = getc(in), ch != EOF )
2856 if( args->num == 1 )
2857 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2859 else if( args->num )
2860 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2862 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2863 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2864 ptr = (BtKey*)(bt->key);
2867 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2868 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2871 else if( len < BT_maxkey )
2874 fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
2878 fprintf(stderr, "started finding keys for %s\n", args->infile);
2879 if( in = fopen (args->infile, "rb") )
2880 while( ch = getc(in), ch != EOF )
2884 if( args->num == 1 )
2885 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2887 else if( args->num )
2888 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2890 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2893 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2896 else if( len < BT_maxkey )
2898 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2902 fprintf(stderr, "started scanning\n");
2904 if( set->pool = bt_pinpool (bt, page_no) )
2905 set->page = bt_page (bt, set->pool, page_no);
2908 madvise (set->page, bt->mgr->page_size, MADV_WILLNEED);
2909 set->latch = bt_pinlatch (bt, page_no);
2910 bt_lockpage (BtLockRead, set->latch);
2911 next = bt_getid (set->page->right);
2913 for( slot = 0; slot++ < set->page->cnt; )
2914 if( next || slot < set->page->cnt )
2915 if( !slotptr(set->page, slot)->dead ) {
2916 ptr = keyptr(set->page, slot);
2919 if( slotptr(set->page, slot)->type == Duplicate )
2922 fwrite (ptr->key, len, 1, stdout);
2923 val = valptr(set->page, slot);
2924 fwrite (val->value, val->len, 1, stdout);
2925 fputc ('\n', stdout);
2929 bt_unlockpage (BtLockRead, set->latch);
2930 bt_unpinlatch (set->latch);
2931 bt_unpinpool (set->pool);
2932 } while( page_no = next );
2934 fprintf(stderr, " Total keys read %d\n", cnt);
2938 fprintf(stderr, "started reverse scan\n");
2939 if( slot = bt_lastkey (bt) )
2940 while( slot = bt_prevkey (bt, slot) ) {
2941 if( slotptr(bt->cursor, slot)->dead )
2944 ptr = keyptr(bt->cursor, slot);
2947 if( slotptr(bt->cursor, slot)->type == Duplicate )
2950 fwrite (ptr->key, len, 1, stdout);
2951 val = valptr(bt->cursor, slot);
2952 fwrite (val->value, val->len, 1, stdout);
2953 fputc ('\n', stdout);
2957 fprintf(stderr, " Total keys read %d\n", cnt);
2962 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2964 fprintf(stderr, "started counting\n");
2965 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2966 page_no = LEAF_page;
2968 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2969 uid off = page_no << bt->mgr->page_bits;
2971 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2975 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2977 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2978 return bt->err = BTERR_map;
2980 if( *amt < bt->mgr->page_size )
2981 return bt->err = BTERR_map;
2983 if( !bt->frame->free && !bt->frame->lvl )
2984 cnt += bt->frame->act;
2985 if( page_no > LEAF_page )
2990 cnt--; // remove stopper key
2991 fprintf(stderr, " Total keys read %d\n", cnt);
3003 typedef struct timeval timer;
3005 int main (int argc, char **argv)
3007 int idx, cnt, len, slot, err;
3008 int segsize, bits = 16;
3025 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3026 fprintf (stderr, " where page_bits is the page size in bits\n");
3027 fprintf (stderr, " mapped_segments is the number of mmap segments in buffer pool\n");
3028 fprintf (stderr, " seg_bits is the size of individual segments in buffer pool in pages in bits\n");
3029 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
3030 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3034 start = getCpuTime(0);
3037 bits = atoi(argv[3]);
3040 poolsize = atoi(argv[4]);
3043 fprintf (stderr, "Warning: no mapped_pool\n");
3045 if( poolsize > 65535 )
3046 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
3049 segsize = atoi(argv[5]);
3051 segsize = 4; // 16 pages per mmap segment
3054 num = atoi(argv[6]);
3058 threads = malloc (cnt * sizeof(pthread_t));
3060 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3062 args = malloc (cnt * sizeof(ThreadArg));
3064 mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3067 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3073 for( idx = 0; idx < cnt; idx++ ) {
3074 args[idx].infile = argv[idx + 7];
3075 args[idx].type = argv[2];
3076 args[idx].mgr = mgr;
3077 args[idx].num = num;
3078 args[idx].idx = idx;
3080 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3081 fprintf(stderr, "Error creating thread %d\n", err);
3083 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3087 // wait for termination
3090 for( idx = 0; idx < cnt; idx++ )
3091 pthread_join (threads[idx], NULL);
3093 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3095 for( idx = 0; idx < cnt; idx++ )
3096 CloseHandle(threads[idx]);
3099 elapsed = getCpuTime(0) - start;
3100 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3101 elapsed = getCpuTime(1);
3102 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3103 elapsed = getCpuTime(2);
3104 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);