1 // btree version threadskv5 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // and bi-directional cursors
10 // author: karl malbrain, malbrain@cal.berkeley.edu
13 This work, including the source code, documentation
14 and related data, is placed into the public domain.
16 The orginal author is Karl Malbrain.
18 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
19 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
20 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
21 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
22 RESULTING FROM THE USE, MODIFICATION, OR
23 REDISTRIBUTION OF THIS SOFTWARE.
26 // Please see the project home page for documentation
27 // code.google.com/p/high-concurrency-btree
29 #define _FILE_OFFSET_BITS 64
30 #define _LARGEFILE64_SOURCE
46 #define WIN32_LEAN_AND_MEAN
60 typedef unsigned long long uid;
63 typedef unsigned long long off64_t;
64 typedef unsigned short ushort;
65 typedef unsigned int uint;
68 #define BT_latchtable 128 // number of latch manager slots
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
79 There are five lock types for each node in three independent sets:
80 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
81 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
82 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
83 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
84 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
95 // definition for phase-fair reader/writer lock implementation
109 // definition for spin latch implementation
111 // exclusive is set for write access
112 // share is count of read accessors
113 // grant write lock when share == 0
115 volatile typedef struct {
126 // hash table entries
129 BtSpinLatch latch[1];
130 volatile ushort slot; // Latch table entry at head of chain
133 // latch manager table structure
136 RWLock readwr[1]; // read/write page lock
137 RWLock access[1]; // Access Intent/Page delete
138 RWLock parent[1]; // Posting of fence key in parent
139 BtSpinLatch busy[1]; // slot is being moved between chains
140 volatile ushort next; // next entry in hash table chain
141 volatile ushort prev; // prev entry in hash table chain
142 volatile ushort pin; // number of outstanding locks
143 volatile ushort hash; // hash slot entry is under
144 volatile uid page_no; // latch set page number
147 // Define the length of the page record numbers
151 // Page key slot definition.
153 // Keys are marked dead, but remain on the page until
154 // it cleanup is called. The fence key (highest key) for
155 // a leaf page is always present, even after cleanup.
159 // In addition to the Unique keys that occupy slots
160 // there are Librarian and Duplicate key
161 // slots occupying the key slot array.
163 // The Librarian slots are dead keys that
164 // serve as filler, available to add new Unique
165 // or Dup slots that are inserted into the B-tree.
167 // The Duplicate slots have had their key bytes extended
168 // by 6 bytes to contain a binary duplicate key uniqueifier.
177 uint off:BT_maxbits; // page offset for key start
178 uint type:3; // type of slot
179 uint dead:1; // set for deleted slot
182 // The key structure occupies space at the upper end of
183 // each page. It's a length byte followed by the key
187 unsigned char len; // this can be changed to a ushort or uint
188 unsigned char key[0];
191 // the value structure also occupies space at the upper
192 // end of the page. Each key is immediately followed by a value.
195 unsigned char len; // this can be changed to a ushort or uint
196 unsigned char value[0];
199 #define BT_maxkey 255 // maximum number of bytes in a key
200 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
202 // The first part of an index page.
203 // It is immediately followed
204 // by the BtSlot array of keys.
206 // note that this structure size
207 // must be a multiple of 8 bytes
208 // in order to place dups correctly.
210 typedef struct BtPage_ {
211 uint cnt; // count of keys in page
212 uint act; // count of active keys
213 uint min; // next key offset
214 uint garbage; // page garbage in bytes
215 unsigned char bits:7; // page size in bits
216 unsigned char free:1; // page is on free chain
217 unsigned char lvl:7; // level of page
218 unsigned char kill:1; // page is being deleted
219 unsigned char left[BtId]; // page number to left
220 unsigned char filler[2]; // padding to multiple of 8
221 unsigned char right[BtId]; // page number to right
224 // The memory mapping pool table buffer manager entry
227 uid basepage; // mapped base page number
228 char *map; // mapped memory pointer
229 ushort slot; // slot index in this array
230 ushort pin; // mapped page pin counter
231 void *hashprev; // previous pool entry for the same hash idx
232 void *hashnext; // next pool entry for the same hash idx
234 HANDLE hmap; // Windows memory mapping handle
238 #define CLOCK_bit 0x8000 // bit in pool->pin
240 // The loadpage interface object
243 uid page_no; // current page number
244 BtPage page; // current page pointer
245 BtPool *pool; // current page pool
246 BtLatchSet *latch; // current page latch set
249 // structure for latch manager on ALLOC_page
252 struct BtPage_ alloc[1]; // next page_no in right ptr
253 unsigned long long dups[1]; // global duplicate key uniqueifier
254 unsigned char chain[BtId]; // head of free page_nos chain
255 BtSpinLatch lock[1]; // allocation area lite latch
256 ushort latchdeployed; // highest number of latch entries deployed
257 ushort nlatchpage; // number of latch pages at BT_latch
258 ushort latchtotal; // number of page latch entries
259 ushort latchhash; // number of latch hash table slots
260 ushort latchvictim; // next latch entry to examine
261 BtHashEntry table[0]; // the hash table
264 // The object structure for Btree access
267 uint page_size; // page size
268 uint page_bits; // page size in bits
269 uint seg_bits; // seg size in pages in bits
270 uint mode; // read-write mode
276 ushort poolcnt; // highest page pool node in use
277 ushort poolmax; // highest page pool node allocated
278 ushort poolmask; // total number of pages in mmap segment - 1
279 ushort hashsize; // size of Hash Table for pool entries
280 volatile uint evicted; // last evicted hash table slot
281 ushort *hash; // pool index for hash entries
282 BtSpinLatch *latch; // latches for hash table slots
283 BtLatchMgr *latchmgr; // mapped latch page from allocation page
284 BtLatchSet *latchsets; // mapped latch set from latch pages
285 BtPool *pool; // memory pool page segments
287 HANDLE halloc; // allocation and latch table handle
292 BtMgr *mgr; // buffer manager for thread
293 BtPage cursor; // cached frame for start/next (never mapped)
294 BtPage frame; // spare frame for the page split (never mapped)
295 uid cursor_page; // current cursor page number
296 unsigned char *mem; // frame, cursor, page memory buffer
297 unsigned char key[BT_keyarray]; // last found complete key
298 int found; // last delete or insert was found
299 int err; // last error
313 extern void bt_close (BtDb *bt);
314 extern BtDb *bt_open (BtMgr *mgr);
315 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
316 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
317 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
318 extern BtKey *bt_foundkey (BtDb *bt);
319 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
320 extern uint bt_nextkey (BtDb *bt, uint slot);
323 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
324 void bt_mgrclose (BtMgr *mgr);
326 // Helper functions to return slot values
327 // from the cursor page.
329 extern BtKey *bt_key (BtDb *bt, uint slot);
330 extern BtVal *bt_val (BtDb *bt, uint slot);
332 // BTree page number constants
333 #define ALLOC_page 0 // allocation & latch manager hash table
334 #define ROOT_page 1 // root of the btree
335 #define LEAF_page 2 // first page of leaves
336 #define LATCH_page 3 // pages for latch manager
338 // Number of levels to create in a new BTree
342 // The page is allocated from low and hi ends.
343 // The key slots are allocated from the bottom,
344 // while the text and value of the key
345 // are allocated from the top. When the two
346 // areas meet, the page is split into two.
348 // A key consists of a length byte, two bytes of
349 // index number (0 - 65534), and up to 253 bytes
352 // Associated with each key is a value byte string
353 // containing any value desired.
355 // The b-tree root is always located at page 1.
356 // The first leaf page of level zero is always
357 // located on page 2.
359 // The b-tree pages are linked with next
360 // pointers to facilitate enumerators,
361 // and provide for concurrency.
363 // When to root page fills, it is split in two and
364 // the tree height is raised by a new root at page
365 // one with two keys.
367 // Deleted keys are marked with a dead bit until
368 // page cleanup. The fence key for a leaf node is
371 // Groups of pages called segments from the btree are optionally
372 // cached with a memory mapped pool. A hash table is used to keep
373 // track of the cached segments. This behaviour is controlled
374 // by the cache block size parameter to bt_open.
376 // To achieve maximum concurrency one page is locked at a time
377 // as the tree is traversed to find leaf key in question. The right
378 // page numbers are used in cases where the page is being split,
381 // Page 0 is dedicated to lock for new page extensions,
382 // and chains empty pages together for reuse. It also
383 // contains the latch manager hash table.
385 // The ParentModification lock on a node is obtained to serialize posting
386 // or changing the fence key for a node.
388 // Empty pages are chained together through the ALLOC page and reused.
390 // Access macros to address slot and key values from the page
391 // Page slots use 1 based indexing.
393 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
394 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
395 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
397 void bt_putid(unsigned char *dest, uid id)
402 dest[i] = (unsigned char)id, id >>= 8;
405 uid bt_getid(unsigned char *src)
410 for( i = 0; i < BtId; i++ )
411 id <<= 8, id |= *src++;
416 uid bt_newdup (BtDb *bt)
419 return __sync_fetch_and_add (bt->mgr->latchmgr->dups, 1) + 1;
421 return _InterlockedIncrement64(bt->mgr->latchmgr->dups, 1);
425 // Phase-Fair reader/writer lock implementation
427 void WriteLock (RWLock *lock)
432 tix = __sync_fetch_and_add (lock->ticket, 1);
434 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
436 // wait for our ticket to come up
438 while( tix != lock->serving[0] )
445 w = PRES | (tix & PHID);
447 r = __sync_fetch_and_add (lock->rin, w);
449 r = _InterlockedExchangeAdd16 (lock->rin, w);
451 while( r != *lock->rout )
459 void WriteRelease (RWLock *lock)
462 __sync_fetch_and_and (lock->rin, ~MASK);
464 _InterlockedAnd16 (lock->rin, ~MASK);
469 void ReadLock (RWLock *lock)
473 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
475 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
478 while( w == (*lock->rin & MASK) )
486 void ReadRelease (RWLock *lock)
489 __sync_fetch_and_add (lock->rout, RINC);
491 _InterlockedExchangeAdd16 (lock->rout, RINC);
495 // Spin Latch Manager
497 // wait until write lock mode is clear
498 // and add 1 to the share count
500 void bt_spinreadlock(BtSpinLatch *latch)
506 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
508 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
510 // see if exclusive request is granted or pending
515 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
517 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
520 } while( sched_yield(), 1 );
522 } while( SwitchToThread(), 1 );
526 // wait for other read and write latches to relinquish
528 void bt_spinwritelock(BtSpinLatch *latch)
534 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
536 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
539 if( !(prev & ~BOTH) )
543 __sync_fetch_and_and ((ushort *)latch, ~XCL);
545 _InterlockedAnd16((ushort *)latch, ~XCL);
548 } while( sched_yield(), 1 );
550 } while( SwitchToThread(), 1 );
554 // try to obtain write lock
556 // return 1 if obtained,
559 int bt_spinwritetry(BtSpinLatch *latch)
564 prev = __sync_fetch_and_or((ushort *)latch, XCL);
566 prev = _InterlockedOr16((ushort *)latch, XCL);
568 // take write access if all bits are clear
571 if( !(prev & ~BOTH) )
575 __sync_fetch_and_and ((ushort *)latch, ~XCL);
577 _InterlockedAnd16((ushort *)latch, ~XCL);
584 void bt_spinreleasewrite(BtSpinLatch *latch)
587 __sync_fetch_and_and((ushort *)latch, ~BOTH);
589 _InterlockedAnd16((ushort *)latch, ~BOTH);
593 // decrement reader count
595 void bt_spinreleaseread(BtSpinLatch *latch)
598 __sync_fetch_and_add((ushort *)latch, -SHARE);
600 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
604 // link latch table entry into latch hash table
606 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
608 BtLatchSet *set = bt->mgr->latchsets + victim;
610 if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
611 bt->mgr->latchsets[set->next].prev = victim;
613 bt->mgr->latchmgr->table[hashidx].slot = victim;
614 set->page_no = page_no;
621 void bt_unpinlatch (BtLatchSet *set)
624 __sync_fetch_and_add(&set->pin, -1);
626 _InterlockedDecrement16 (&set->pin);
630 // find existing latchset or inspire new one
631 // return with latchset pinned
633 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
635 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
636 ushort slot, avail = 0, victim, idx;
639 // obtain read lock on hash table entry
641 bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
643 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
645 set = bt->mgr->latchsets + slot;
646 if( page_no == set->page_no )
648 } while( slot = set->next );
652 __sync_fetch_and_add(&set->pin, 1);
654 _InterlockedIncrement16 (&set->pin);
658 bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
663 // try again, this time with write lock
665 bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
667 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
669 set = bt->mgr->latchsets + slot;
670 if( page_no == set->page_no )
672 if( !set->pin && !avail )
674 } while( slot = set->next );
676 // found our entry, or take over an unpinned one
678 if( slot || (slot = avail) ) {
679 set = bt->mgr->latchsets + slot;
681 __sync_fetch_and_add(&set->pin, 1);
683 _InterlockedIncrement16 (&set->pin);
685 set->page_no = page_no;
686 bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
690 // see if there are any unused entries
692 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
694 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
697 if( victim < bt->mgr->latchmgr->latchtotal ) {
698 set = bt->mgr->latchsets + victim;
700 __sync_fetch_and_add(&set->pin, 1);
702 _InterlockedIncrement16 (&set->pin);
704 bt_latchlink (bt, hashidx, victim, page_no);
705 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
710 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
712 victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
714 // find and reuse previous lock entry
718 victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
720 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
722 // we don't use slot zero
724 if( victim %= bt->mgr->latchmgr->latchtotal )
725 set = bt->mgr->latchsets + victim;
729 // take control of our slot
730 // from other threads
732 if( set->pin || !bt_spinwritetry (set->busy) )
737 // try to get write lock on hash chain
738 // skip entry if not obtained
739 // or has outstanding locks
741 if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
742 bt_spinreleasewrite (set->busy);
747 bt_spinreleasewrite (set->busy);
748 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
752 // unlink our available victim from its hash chain
755 bt->mgr->latchsets[set->prev].next = set->next;
757 bt->mgr->latchmgr->table[idx].slot = set->next;
760 bt->mgr->latchsets[set->next].prev = set->prev;
762 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
764 __sync_fetch_and_add(&set->pin, 1);
766 _InterlockedIncrement16 (&set->pin);
768 bt_latchlink (bt, hashidx, victim, page_no);
769 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
770 bt_spinreleasewrite (set->busy);
775 void bt_mgrclose (BtMgr *mgr)
780 // release mapped pages
781 // note that slot zero is never used
783 for( slot = 1; slot < mgr->poolmax; slot++ ) {
784 pool = mgr->pool + slot;
787 munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
790 FlushViewOfFile(pool->map, 0);
791 UnmapViewOfFile(pool->map);
792 CloseHandle(pool->hmap);
798 munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
799 munmap (mgr->latchmgr, 2 * mgr->page_size);
801 FlushViewOfFile(mgr->latchmgr, 0);
802 UnmapViewOfFile(mgr->latchmgr);
803 CloseHandle(mgr->halloc);
809 free ((void *)mgr->latch);
812 FlushFileBuffers(mgr->idx);
813 CloseHandle(mgr->idx);
814 GlobalFree (mgr->pool);
815 GlobalFree (mgr->hash);
816 GlobalFree ((void *)mgr->latch);
821 // close and release memory
823 void bt_close (BtDb *bt)
830 VirtualFree (bt->mem, 0, MEM_RELEASE);
835 // open/create new btree buffer manager
837 // call with file_name, BT_openmode, bits in page size (e.g. 16),
838 // size of mapped page pool (e.g. 8192)
840 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
842 uint lvl, attr, cacheblk, last, slot, idx;
843 uint nlatchpage, latchhash;
844 unsigned char value[BtId];
845 int flag, initit = 0;
846 BtLatchMgr *latchmgr;
854 SYSTEM_INFO sysinfo[1];
857 // determine sanity of page size and buffer pool
859 if( bits > BT_maxbits )
861 else if( bits < BT_minbits )
865 return NULL; // must have buffer pool
868 mgr = calloc (1, sizeof(BtMgr));
870 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
873 return free(mgr), NULL;
875 cacheblk = 4096; // minimum mmap segment size for unix
878 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
879 attr = FILE_ATTRIBUTE_NORMAL;
880 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
882 if( mgr->idx == INVALID_HANDLE_VALUE )
883 return GlobalFree(mgr), NULL;
885 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
886 GetSystemInfo(sysinfo);
887 cacheblk = sysinfo->dwAllocationGranularity;
891 latchmgr = malloc (BT_maxpage);
894 // read minimum page size to get root info
895 // to support raw disk partition files
896 // check if bits == 0 on the disk.
898 if( size = lseek (mgr->idx, 0L, 2) ) {
899 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
900 if( latchmgr->alloc->bits )
901 bits = latchmgr->alloc->bits;
905 return free(mgr), free(latchmgr), NULL;
906 } else if( mode == BT_ro )
907 return free(latchmgr), bt_mgrclose (mgr), NULL;
911 latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
912 size = GetFileSize(mgr->idx, amt);
915 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
916 return bt_mgrclose (mgr), NULL;
917 bits = latchmgr->alloc->bits;
918 } else if( mode == BT_ro )
919 return bt_mgrclose (mgr), NULL;
924 mgr->page_size = 1 << bits;
925 mgr->page_bits = bits;
927 mgr->poolmax = poolmax;
930 if( cacheblk < mgr->page_size )
931 cacheblk = mgr->page_size;
933 // mask for partial memmaps
935 mgr->poolmask = (cacheblk >> bits) - 1;
937 // see if requested size of pages per memmap is greater
939 if( (1 << segsize) > mgr->poolmask )
940 mgr->poolmask = (1 << segsize) - 1;
944 while( (1 << mgr->seg_bits) <= mgr->poolmask )
947 mgr->hashsize = hashsize;
950 mgr->pool = calloc (poolmax, sizeof(BtPool));
951 mgr->hash = calloc (hashsize, sizeof(ushort));
952 mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
954 mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
955 mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
956 mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
962 // initialize an empty b-tree with latch page, root page, page of leaves
963 // and page(s) of latches
965 memset (latchmgr, 0, 1 << bits);
966 nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1;
967 bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
968 latchmgr->alloc->bits = mgr->page_bits;
970 latchmgr->nlatchpage = nlatchpage;
971 latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
973 // initialize latch manager
975 latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
977 // size of hash table = total number of latchsets
979 if( latchhash > latchmgr->latchtotal )
980 latchhash = latchmgr->latchtotal;
982 latchmgr->latchhash = latchhash;
984 // initialize left-most LEAF page in
987 bt_putid (latchmgr->alloc->left, LEAF_page);
990 if( pwrite (mgr->idx, latchmgr, mgr->page_size, 0) < mgr->page_size )
991 return bt_mgrclose (mgr), NULL;
993 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
994 return bt_mgrclose (mgr), NULL;
996 if( *amt < mgr->page_size )
997 return bt_mgrclose (mgr), NULL;
1000 memset (latchmgr, 0, 1 << bits);
1001 latchmgr->alloc->bits = mgr->page_bits;
1003 for( lvl=MIN_lvl; lvl--; ) {
1004 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1005 key = keyptr(latchmgr->alloc, 1);
1006 key->len = 2; // create stopper key
1010 bt_putid(value, MIN_lvl - lvl + 1);
1011 val = valptr(latchmgr->alloc, 1);
1012 val->len = lvl ? BtId : 0;
1013 memcpy (val->value, value, val->len);
1015 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
1016 latchmgr->alloc->lvl = lvl;
1017 latchmgr->alloc->cnt = 1;
1018 latchmgr->alloc->act = 1;
1020 if( pwrite (mgr->idx, latchmgr, mgr->page_size, (MIN_lvl - lvl) << bits) < mgr->page_size )
1021 return bt_mgrclose (mgr), NULL;
1023 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1024 return bt_mgrclose (mgr), NULL;
1026 if( *amt < mgr->page_size )
1027 return bt_mgrclose (mgr), NULL;
1031 // clear out latch manager locks
1032 // and rest of pages to round out segment
1034 memset(latchmgr, 0, mgr->page_size);
1037 while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1039 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1041 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1042 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1043 return bt_mgrclose (mgr), NULL;
1044 if( *amt < mgr->page_size )
1045 return bt_mgrclose (mgr), NULL;
1052 // mlock the root page and the latchmgr page
1054 flag = PROT_READ | PROT_WRITE;
1055 mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1056 if( mgr->latchmgr == MAP_FAILED )
1057 return bt_mgrclose (mgr), NULL;
1058 mlock (mgr->latchmgr, 2 * mgr->page_size);
1060 mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1061 if( mgr->latchsets == MAP_FAILED )
1062 return bt_mgrclose (mgr), NULL;
1063 mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1065 flag = PAGE_READWRITE;
1066 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1068 return bt_mgrclose (mgr), NULL;
1070 flag = FILE_MAP_WRITE;
1071 mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1072 if( !mgr->latchmgr )
1073 return GetLastError(), bt_mgrclose (mgr), NULL;
1075 mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1081 VirtualFree (latchmgr, 0, MEM_RELEASE);
1086 // open BTree access method
1087 // based on buffer manager
1089 BtDb *bt_open (BtMgr *mgr)
1091 BtDb *bt = malloc (sizeof(*bt));
1093 memset (bt, 0, sizeof(*bt));
1096 bt->mem = malloc (2 *mgr->page_size);
1098 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1100 bt->frame = (BtPage)bt->mem;
1101 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1105 // compare two keys, returning > 0, = 0, or < 0
1106 // as the comparison value
1108 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1110 uint len1 = key1->len;
1113 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1126 // find segment in pool
1127 // must be called with hashslot idx locked
1128 // return NULL if not there
1129 // otherwise return node
1131 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1136 // compute start of hash chain in pool
1138 if( slot = bt->mgr->hash[idx] )
1139 pool = bt->mgr->pool + slot;
1143 page_no &= ~bt->mgr->poolmask;
1145 while( pool->basepage != page_no )
1146 if( pool = pool->hashnext )
1154 // add segment to hash table
1156 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1161 pool->hashprev = pool->hashnext = NULL;
1162 pool->basepage = page_no & ~bt->mgr->poolmask;
1163 pool->pin = CLOCK_bit + 1;
1165 if( slot = bt->mgr->hash[idx] ) {
1166 node = bt->mgr->pool + slot;
1167 pool->hashnext = node;
1168 node->hashprev = pool;
1171 bt->mgr->hash[idx] = pool->slot;
1174 // map new buffer pool segment to virtual memory
1176 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1178 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1179 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1183 flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1184 pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1185 if( pool->map == MAP_FAILED )
1186 return bt->err = BTERR_map;
1189 flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1190 pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1192 return bt->err = BTERR_map;
1194 flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1195 pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1197 return bt->err = BTERR_map;
1202 // calculate page within pool
1204 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1206 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1209 page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1215 void bt_unpinpool (BtPool *pool)
1218 __sync_fetch_and_add(&pool->pin, -1);
1220 _InterlockedDecrement16 (&pool->pin);
1224 // find or place requested page in segment-pool
1225 // return pool table entry, incrementing pin
1227 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1229 uint slot, hashidx, idx, victim;
1230 BtPool *pool, *node, *next;
1232 // lock hash table chain
1234 hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1235 bt_spinwritelock (&bt->mgr->latch[hashidx]);
1237 // look up in hash table
1239 if( pool = bt_findpool(bt, page_no, hashidx) ) {
1241 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1242 __sync_fetch_and_add(&pool->pin, 1);
1244 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1245 _InterlockedIncrement16 (&pool->pin);
1247 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1251 // allocate a new pool node
1252 // and add to hash table
1255 slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1257 slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1260 if( ++slot < bt->mgr->poolmax ) {
1261 pool = bt->mgr->pool + slot;
1264 if( bt_mapsegment(bt, pool, page_no) )
1267 bt_linkhash(bt, pool, page_no, hashidx);
1268 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1272 // pool table is full
1273 // find best pool entry to evict
1276 __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1278 _InterlockedDecrement16 (&bt->mgr->poolcnt);
1283 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1285 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1287 victim %= bt->mgr->poolmax;
1288 pool = bt->mgr->pool + victim;
1289 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1294 // try to get write lock
1295 // skip entry if not obtained
1297 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1300 // skip this entry if
1302 // or clock bit is set
1306 __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1308 _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1310 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1314 // unlink victim pool node from hash table
1316 if( node = pool->hashprev )
1317 node->hashnext = pool->hashnext;
1318 else if( node = pool->hashnext )
1319 bt->mgr->hash[idx] = node->slot;
1321 bt->mgr->hash[idx] = 0;
1323 if( node = pool->hashnext )
1324 node->hashprev = pool->hashprev;
1326 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1328 // remove old file mapping
1330 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1332 // FlushViewOfFile(pool->map, 0);
1333 UnmapViewOfFile(pool->map);
1334 CloseHandle(pool->hmap);
1338 // create new pool mapping
1339 // and link into hash table
1341 if( bt_mapsegment(bt, pool, page_no) )
1344 bt_linkhash(bt, pool, page_no, hashidx);
1345 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1350 // place write, read, or parent lock on requested page_no.
1352 void bt_lockpage(BtLock mode, BtLatchSet *set)
1356 ReadLock (set->readwr);
1359 WriteLock (set->readwr);
1362 ReadLock (set->access);
1365 WriteLock (set->access);
1368 WriteLock (set->parent);
1373 // remove write, read, or parent lock on requested page
1375 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1379 ReadRelease (set->readwr);
1382 WriteRelease (set->readwr);
1385 ReadRelease (set->access);
1388 WriteRelease (set->access);
1391 WriteRelease (set->parent);
1396 // allocate a new page
1397 // return with page pinned.
1399 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
1403 // lock allocation page
1405 bt_spinwritelock(bt->mgr->latchmgr->lock);
1407 // use empty chain first
1408 // else allocate empty page
1410 if( set->page_no = bt_getid(bt->mgr->latchmgr->chain) ) {
1411 if( set->pool = bt_pinpool (bt, set->page_no) )
1412 set->page = bt_page (bt, set->pool, set->page_no);
1414 return bt->err = BTERR_struct;
1416 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1418 set->page_no = bt_getid(bt->mgr->latchmgr->alloc->right);
1419 bt_putid(bt->mgr->latchmgr->alloc->right, set->page_no+1);
1421 // if writing first page of pool block, set file length thru last page
1423 if( (set->page_no & bt->mgr->poolmask) == 0 )
1424 ftruncate (bt->mgr->idx, (set->page_no + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1426 if( set->pool = bt_pinpool (bt, set->page_no) )
1427 set->page = bt_page (bt, set->pool, set->page_no);
1432 // unlock allocation latch
1434 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1438 // find slot in page for given key at a given level
1440 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1442 uint diff, higher = set->page->cnt, low = 1, slot;
1445 // make stopper key an infinite fence value
1447 if( bt_getid (set->page->right) )
1452 // low is the lowest candidate.
1453 // loop ends when they meet
1455 // higher is already
1456 // tested as .ge. the passed key.
1458 while( diff = higher - low ) {
1459 slot = low + ( diff >> 1 );
1460 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1463 higher = slot, good++;
1466 // return zero if key is on right link page
1468 return good ? higher : 0;
1471 // find and load page at given level for given key
1472 // leave page rd or wr locked as requested
1474 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1476 uid page_no = ROOT_page, prevpage = 0;
1477 uint drill = 0xff, slot;
1478 BtLatchSet *prevlatch;
1479 uint mode, prevmode;
1482 // start at root of btree and drill down
1485 // determine lock mode of drill level
1486 mode = (drill == lvl) ? lock : BtLockRead;
1488 set->latch = bt_pinlatch (bt, page_no);
1489 set->page_no = page_no;
1491 // pin page contents
1493 if( set->pool = bt_pinpool (bt, page_no) )
1494 set->page = bt_page (bt, set->pool, page_no);
1498 // obtain access lock using lock chaining with Access mode
1500 if( page_no > ROOT_page )
1501 bt_lockpage(BtLockAccess, set->latch);
1503 // release & unpin parent page
1506 bt_unlockpage(prevmode, prevlatch);
1507 bt_unpinlatch (prevlatch);
1508 bt_unpinpool (prevpool);
1512 // obtain read lock using lock chaining
1514 bt_lockpage(mode, set->latch);
1516 if( set->page->free )
1517 return bt->err = BTERR_struct, 0;
1519 if( page_no > ROOT_page )
1520 bt_unlockpage(BtLockAccess, set->latch);
1522 // re-read and re-lock root after determining actual level of root
1524 if( set->page->lvl != drill) {
1525 if( set->page_no != ROOT_page )
1526 return bt->err = BTERR_struct, 0;
1528 drill = set->page->lvl;
1530 if( lock != BtLockRead && drill == lvl ) {
1531 bt_unlockpage(mode, set->latch);
1532 bt_unpinlatch (set->latch);
1533 bt_unpinpool (set->pool);
1538 prevpage = set->page_no;
1539 prevlatch = set->latch;
1540 prevpool = set->pool;
1543 // find key on page at this level
1544 // and descend to requested level
1546 if( set->page->kill )
1549 if( slot = bt_findslot (set, key, len) ) {
1553 while( slotptr(set->page, slot)->dead )
1554 if( slot++ < set->page->cnt )
1559 page_no = bt_getid(valptr(set->page, slot)->value);
1564 // or slide right into next page
1567 page_no = bt_getid(set->page->right);
1571 // return error on end of right chain
1573 bt->err = BTERR_struct;
1574 return 0; // return error
1577 // return page to free list
1578 // page must be delete & write locked
1580 void bt_freepage (BtDb *bt, BtPageSet *set)
1582 // lock allocation page
1584 bt_spinwritelock (bt->mgr->latchmgr->lock);
1587 memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1588 bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1589 set->page->free = 1;
1591 // unlock released page
1593 bt_unlockpage (BtLockDelete, set->latch);
1594 bt_unlockpage (BtLockWrite, set->latch);
1595 bt_unpinlatch (set->latch);
1596 bt_unpinpool (set->pool);
1598 // unlock allocation page
1600 bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1603 // a fence key was deleted from a page
1604 // push new fence value upwards
1606 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1608 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1609 unsigned char value[BtId];
1613 // remove the old fence value
1615 ptr = keyptr(set->page, set->page->cnt);
1616 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1617 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1619 // cache new fence value
1621 ptr = keyptr(set->page, set->page->cnt);
1622 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1624 bt_lockpage (BtLockParent, set->latch);
1625 bt_unlockpage (BtLockWrite, set->latch);
1627 // insert new (now smaller) fence key
1629 bt_putid (value, set->page_no);
1630 ptr = (BtKey*)leftkey;
1632 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1635 // now delete old fence key
1637 ptr = (BtKey*)rightkey;
1639 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1642 bt_unlockpage (BtLockParent, set->latch);
1643 bt_unpinlatch(set->latch);
1644 bt_unpinpool (set->pool);
1648 // root has a single child
1649 // collapse a level from the tree
1651 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1656 // find the child entry and promote as new root contents
1659 for( idx = 0; idx++ < root->page->cnt; )
1660 if( !slotptr(root->page, idx)->dead )
1663 child->page_no = bt_getid (valptr(root->page, idx)->value);
1665 child->latch = bt_pinlatch (bt, child->page_no);
1666 bt_lockpage (BtLockDelete, child->latch);
1667 bt_lockpage (BtLockWrite, child->latch);
1669 if( child->pool = bt_pinpool (bt, child->page_no) )
1670 child->page = bt_page (bt, child->pool, child->page_no);
1674 memcpy (root->page, child->page, bt->mgr->page_size);
1675 bt_freepage (bt, child);
1677 } while( root->page->lvl > 1 && root->page->act == 1 );
1679 bt_unlockpage (BtLockWrite, root->latch);
1680 bt_unpinlatch (root->latch);
1681 bt_unpinpool (root->pool);
1685 // maintain reverse scan pointers by
1686 // linking left pointer of far right node
1688 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1690 BtPageSet right2[1];
1692 // keep track of rightmost leaf page
1694 if( !right_page_no ) {
1695 bt_putid (bt->mgr->latchmgr->alloc->left, left_page_no);
1699 // link right page to left page
1701 right2->latch = bt_pinlatch (bt, right_page_no);
1702 bt_lockpage (BtLockWrite, right2->latch);
1704 if( right2->pool = bt_pinpool (bt, right_page_no) )
1705 right2->page = bt_page (bt, right2->pool, right_page_no);
1709 bt_putid(right2->page->left, left_page_no);
1710 bt_unlockpage (BtLockWrite, right2->latch);
1711 bt_unpinlatch (right2->latch);
1712 bt_unpinpool (right2->pool);
1716 // find and delete key on page by marking delete flag bit
1717 // if page becomes empty, delete it from the btree
1719 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1721 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1722 uint slot, idx, found, fence;
1723 BtPageSet set[1], right[1];
1724 unsigned char value[BtId];
1728 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1729 ptr = keyptr(set->page, slot);
1733 // if librarian slot, advance to real slot
1735 if( slotptr(set->page, slot)->type == Librarian )
1736 ptr = keyptr(set->page, ++slot);
1738 fence = slot == set->page->cnt;
1740 // if key is found delete it, otherwise ignore request
1742 if( found = !keycmp (ptr, key, len) )
1743 if( found = slotptr(set->page, slot)->dead == 0 ) {
1744 val = valptr(set->page,slot);
1745 slotptr(set->page, slot)->dead = 1;
1746 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1749 // collapse empty slots beneath the fence
1751 while( idx = set->page->cnt - 1 )
1752 if( slotptr(set->page, idx)->dead ) {
1753 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1754 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1759 // did we delete a fence key in an upper level?
1761 if( found && lvl && set->page->act && fence )
1762 if( bt_fixfence (bt, set, lvl) )
1765 return bt->found = found, 0;
1767 // do we need to collapse root?
1769 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1770 if( bt_collapseroot (bt, set) )
1773 return bt->found = found, 0;
1775 // return if page is not empty
1777 if( set->page->act ) {
1778 bt_unlockpage(BtLockWrite, set->latch);
1779 bt_unpinlatch (set->latch);
1780 bt_unpinpool (set->pool);
1781 return bt->found = found, 0;
1784 // cache copy of fence key
1785 // to post in parent
1787 ptr = keyptr(set->page, set->page->cnt);
1788 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1790 // obtain lock on right page
1792 right->page_no = bt_getid(set->page->right);
1793 right->latch = bt_pinlatch (bt, right->page_no);
1794 bt_lockpage (BtLockWrite, right->latch);
1796 // pin page contents
1798 if( right->pool = bt_pinpool (bt, right->page_no) )
1799 right->page = bt_page (bt, right->pool, right->page_no);
1803 if( right->page->kill )
1804 return bt->err = BTERR_struct;
1806 // transfer left link
1808 memcpy (right->page->left, set->page->left, BtId);
1810 // pull contents of right peer into our empty page
1812 memcpy (set->page, right->page, bt->mgr->page_size);
1817 if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1820 // cache copy of key to update
1822 ptr = keyptr(right->page, right->page->cnt);
1823 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1825 // mark right page deleted and point it to left page
1826 // until we can post parent updates
1828 bt_putid (right->page->right, set->page_no);
1829 right->page->kill = 1;
1831 bt_lockpage (BtLockParent, right->latch);
1832 bt_unlockpage (BtLockWrite, right->latch);
1834 bt_lockpage (BtLockParent, set->latch);
1835 bt_unlockpage (BtLockWrite, set->latch);
1837 // redirect higher key directly to our new node contents
1839 bt_putid (value, set->page_no);
1840 ptr = (BtKey*)higherfence;
1842 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1845 // delete old lower key to our node
1847 ptr = (BtKey*)lowerfence;
1849 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1852 // obtain delete and write locks to right node
1854 bt_unlockpage (BtLockParent, right->latch);
1855 bt_lockpage (BtLockDelete, right->latch);
1856 bt_lockpage (BtLockWrite, right->latch);
1857 bt_freepage (bt, right);
1859 bt_unlockpage (BtLockParent, set->latch);
1860 bt_unpinlatch (set->latch);
1861 bt_unpinpool (set->pool);
1866 BtKey *bt_foundkey (BtDb *bt)
1868 return (BtKey*)(bt->key);
1871 // advance to next slot
1873 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1875 BtLatchSet *prevlatch;
1879 if( slot < set->page->cnt )
1882 prevlatch = set->latch;
1883 prevpool = set->pool;
1885 if( page_no = bt_getid(set->page->right) )
1886 set->latch = bt_pinlatch (bt, page_no);
1888 return bt->err = BTERR_struct, 0;
1890 // pin page contents
1892 if( set->pool = bt_pinpool (bt, page_no) )
1893 set->page = bt_page (bt, set->pool, page_no);
1897 // obtain access lock using lock chaining with Access mode
1899 bt_lockpage(BtLockAccess, set->latch);
1901 bt_unlockpage(BtLockRead, prevlatch);
1902 bt_unpinlatch (prevlatch);
1903 bt_unpinpool (prevpool);
1905 bt_lockpage(BtLockRead, set->latch);
1906 bt_unlockpage(BtLockAccess, set->latch);
1908 set->page_no = page_no;
1912 // find unique key or first duplicate key in
1913 // leaf level and return number of value bytes
1914 // or (-1) if not found. Setup key for bt_foundkey
1916 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1924 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1926 ptr = keyptr(set->page, slot);
1928 // skip librarian slot place holder
1930 if( slotptr(set->page, slot)->type == Librarian )
1931 ptr = keyptr(set->page, ++slot);
1933 // return actual key found
1935 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1938 if( slotptr(set->page, slot)->type == Duplicate )
1941 // not there if we reach the stopper key
1943 if( slot == set->page->cnt )
1944 if( !bt_getid (set->page->right) )
1947 // if key exists, return >= 0 value bytes copied
1948 // otherwise return (-1)
1950 if( slotptr(set->page, slot)->dead )
1954 if( !memcmp (ptr->key, key, len) ) {
1955 val = valptr (set->page,slot);
1956 if( valmax > val->len )
1958 memcpy (value, val->value, valmax);
1964 } while( slot = bt_findnext (bt, set, slot) );
1966 bt_unlockpage (BtLockRead, set->latch);
1967 bt_unpinlatch (set->latch);
1968 bt_unpinpool (set->pool);
1972 // check page for space available,
1973 // clean if necessary and return
1974 // 0 - page needs splitting
1975 // >0 new slot value
1977 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1979 uint nxt = bt->mgr->page_size;
1980 uint cnt = 0, idx = 0;
1981 uint max = page->cnt;
1986 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1989 // skip cleanup and proceed to split
1990 // if there's not enough garbage
1993 if( page->garbage < nxt / 5 )
1996 memcpy (bt->frame, page, bt->mgr->page_size);
1998 // skip page info and set rest of page to zero
2000 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2004 // clean up page first by
2005 // removing deleted keys
2007 while( cnt++ < max ) {
2010 if( cnt < max && slotptr(bt->frame,cnt)->dead )
2013 // copy the value across
2015 val = valptr(bt->frame, cnt);
2016 nxt -= val->len + sizeof(BtVal);
2017 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2019 // copy the key across
2021 key = keyptr(bt->frame, cnt);
2022 nxt -= key->len + sizeof(BtKey);
2023 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2025 // make a librarian slot
2028 slotptr(page, ++idx)->off = nxt;
2029 slotptr(page, idx)->type = Librarian;
2030 slotptr(page, idx)->dead = 1;
2035 slotptr(page, ++idx)->off = nxt;
2036 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2038 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2045 // see if page has enough space now, or does it need splitting?
2047 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2053 // split the root and raise the height of the btree
2055 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
2057 uint nxt = bt->mgr->page_size;
2058 unsigned char value[BtId];
2063 // Obtain an empty page to use, and copy the current
2064 // root contents into it, e.g. lower keys
2066 if( bt_newpage(bt, left) )
2069 memcpy(left->page, root->page, bt->mgr->page_size);
2070 bt_unpinpool (left->pool);
2072 // set left from higher to lower page
2074 bt_putid (right->page->left, left->page_no);
2075 bt_unpinpool (right->pool);
2077 // preserve the page info at the bottom
2078 // of higher keys and set rest to zero
2080 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2082 // insert stopper key at top of newroot page
2083 // and increase the root height
2085 nxt -= BtId + sizeof(BtVal);
2086 bt_putid (value, right->page_no);
2087 val = (BtVal *)((unsigned char *)root->page + nxt);
2088 memcpy (val->value, value, BtId);
2091 nxt -= 2 + sizeof(BtKey);
2092 slotptr(root->page, 2)->off = nxt;
2093 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2098 // insert lower keys page fence key on newroot page as first key
2100 nxt -= BtId + sizeof(BtVal);
2101 bt_putid (value, left->page_no);
2102 val = (BtVal *)((unsigned char *)root->page + nxt);
2103 memcpy (val->value, value, BtId);
2106 nxt -= leftkey->len + sizeof(BtKey);
2107 slotptr(root->page, 1)->off = nxt;
2108 memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
2110 bt_putid(root->page->right, 0);
2111 root->page->min = nxt; // reset lowest used offset and key count
2112 root->page->cnt = 2;
2113 root->page->act = 2;
2116 // release and unpin root
2118 bt_unlockpage(BtLockWrite, root->latch);
2119 bt_unpinlatch (root->latch);
2120 bt_unpinpool (root->pool);
2124 // split already locked full node
2127 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2129 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
2130 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2131 unsigned char value[BtId];
2132 uint lvl = set->page->lvl;
2139 // split higher half of keys to bt->frame
2141 memset (bt->frame, 0, bt->mgr->page_size);
2142 max = set->page->cnt;
2146 while( cnt++ < max ) {
2147 if( slotptr(set->page, cnt)->dead && cnt < max )
2149 src = valptr(set->page, cnt);
2150 nxt -= src->len + sizeof(BtVal);
2151 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2153 key = keyptr(set->page, cnt);
2154 nxt -= key->len + sizeof(BtKey);
2155 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + sizeof(BtVal));
2157 // add librarian slot
2160 slotptr(bt->frame, ++idx)->off = nxt;
2161 slotptr(bt->frame, idx)->type = Librarian;
2162 slotptr(bt->frame, idx)->dead = 1;
2167 slotptr(bt->frame, ++idx)->off = nxt;
2168 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2170 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2174 // remember existing fence key for new page to the right
2176 memcpy (rightkey, key, key->len + sizeof(BtKey));
2178 bt->frame->bits = bt->mgr->page_bits;
2179 bt->frame->min = nxt;
2180 bt->frame->cnt = idx;
2181 bt->frame->lvl = lvl;
2185 if( set->page_no > ROOT_page ) {
2186 bt_putid (bt->frame->right, bt_getid (set->page->right));
2187 bt_putid(bt->frame->left, set->page_no);
2190 // get new free page and write higher keys to it.
2192 if( bt_newpage(bt, right) )
2197 if( set->page_no > ROOT_page && !lvl )
2198 if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
2201 memcpy (right->page, bt->frame, bt->mgr->page_size);
2203 // update lower keys to continue in old page
2205 memcpy (bt->frame, set->page, bt->mgr->page_size);
2206 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2207 nxt = bt->mgr->page_size;
2208 set->page->garbage = 0;
2214 if( slotptr(bt->frame, max)->type == Librarian )
2217 // assemble page of smaller keys
2219 while( cnt++ < max ) {
2220 if( slotptr(bt->frame, cnt)->dead )
2222 src = valptr(bt->frame, cnt);
2223 nxt -= src->len + sizeof(BtVal);
2224 memcpy ((unsigned char *)set->page + nxt, src, src->len + sizeof(BtVal));
2226 key = keyptr(bt->frame, cnt);
2227 nxt -= key->len + sizeof(BtKey);
2228 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2230 // add librarian slot
2233 slotptr(set->page, ++idx)->off = nxt;
2234 slotptr(set->page, idx)->type = Librarian;
2235 slotptr(set->page, idx)->dead = 1;
2240 slotptr(set->page, ++idx)->off = nxt;
2241 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2245 // remember fence key for smaller page
2247 memcpy(fencekey, key, key->len + sizeof(BtKey));
2249 bt_putid(set->page->right, right->page_no);
2250 set->page->min = nxt;
2251 set->page->cnt = idx;
2253 // if current page is the root page, split it
2255 if( set->page_no == ROOT_page )
2256 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2258 // insert new fences in their parent pages
2260 right->latch = bt_pinlatch (bt, right->page_no);
2261 bt_lockpage (BtLockParent, right->latch);
2263 bt_lockpage (BtLockParent, set->latch);
2264 bt_unlockpage (BtLockWrite, set->latch);
2266 // insert new fence for reformulated left block of smaller keys
2268 bt_putid (value, set->page_no);
2270 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2273 // switch fence for right block of larger keys to new right page
2275 bt_putid (value, right->page_no);
2277 if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2280 bt_unlockpage (BtLockParent, set->latch);
2281 bt_unpinlatch (set->latch);
2282 bt_unpinpool (set->pool);
2284 bt_unlockpage (BtLockParent, right->latch);
2285 bt_unpinlatch (right->latch);
2286 bt_unpinpool (right->pool);
2290 // install new key and value onto page
2291 // page must already be checked for
2294 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2296 uint idx, librarian;
2301 // if found slot > desired slot and previous slot
2302 // is a librarian slot, use it
2305 if( slotptr(set->page, slot-1)->type == Librarian )
2308 // copy value onto page
2310 set->page->min -= vallen + sizeof(BtVal);
2311 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2312 memcpy (val->value, value, vallen);
2315 // copy key onto page
2317 set->page->min -= keylen + sizeof(BtKey);
2318 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2319 memcpy (ptr->key, key, keylen);
2322 // find first empty slot
2324 for( idx = slot; idx < set->page->cnt; idx++ )
2325 if( slotptr(set->page, idx)->dead )
2328 // now insert key into array before slot
2330 if( idx == set->page->cnt )
2331 idx += 2, set->page->cnt += 2, librarian = 2;
2337 while( idx > slot + librarian - 1 )
2338 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2340 // add librarian slot
2342 if( librarian > 1 ) {
2343 node = slotptr(set->page, slot++);
2344 node->off = set->page->min;
2345 node->type = Librarian;
2351 node = slotptr(set->page, slot);
2352 node->off = set->page->min;
2356 bt_unlockpage (BtLockWrite, set->latch);
2357 bt_unpinlatch (set->latch);
2358 bt_unpinpool (set->pool);
2362 // Insert new key into the btree at given level.
2363 // either add a new key or update/add an existing one
2365 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2367 unsigned char newkey[BT_keyarray];
2368 uint slot, idx, len;
2375 // set up the key we're working on
2377 ins = (BtKey*)newkey;
2378 memcpy (ins->key, key, keylen);
2381 // is this a non-unique index value?
2387 sequence = bt_newdup (bt);
2388 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2392 while( 1 ) { // find the page and slot for the current key
2393 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2394 ptr = keyptr(set->page, slot);
2397 bt->err = BTERR_ovflw;
2401 // if librarian slot == found slot, advance to real slot
2403 if( slotptr(set->page, slot)->type == Librarian )
2404 if( !keycmp (ptr, key, keylen) )
2405 ptr = keyptr(set->page, ++slot);
2409 if( slotptr(set->page, slot)->type == Duplicate )
2412 // if inserting a duplicate key or unique key
2413 // check for adequate space on the page
2414 // and insert the new key before slot.
2416 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2417 if( !(slot = bt_cleanpage (bt, set->page, ins->len, slot, vallen)) )
2418 if( bt_splitpage (bt, set) )
2423 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2426 // if key already exists, update value and return
2428 val = valptr(set->page, slot);
2430 if( val->len >= vallen ) {
2431 if( slotptr(set->page, slot)->dead )
2433 set->page->garbage += val->len - vallen;
2434 slotptr(set->page, slot)->dead = 0;
2436 memcpy (val->value, value, vallen);
2437 bt_unlockpage(BtLockWrite, set->latch);
2438 bt_unpinlatch (set->latch);
2439 bt_unpinpool (set->pool);
2443 // new update value doesn't fit in existing value area
2445 if( !slotptr(set->page, slot)->dead )
2446 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2448 slotptr(set->page, slot)->dead = 0;
2452 if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2453 if( bt_splitpage (bt, set) )
2458 set->page->min -= vallen + sizeof(BtVal);
2459 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2460 memcpy (val->value, value, vallen);
2463 set->page->min -= keylen + sizeof(BtKey);
2464 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2465 memcpy (ptr->key, key, keylen);
2468 slotptr(set->page, slot)->off = set->page->min;
2469 bt_unlockpage(BtLockWrite, set->latch);
2470 bt_unpinlatch (set->latch);
2471 bt_unpinpool (set->pool);
2477 // set cursor to highest slot on highest page
2479 uint bt_lastkey (BtDb *bt)
2481 uid page_no = bt_getid (bt->mgr->latchmgr->alloc->left);
2485 if( set->pool = bt_pinpool (bt, page_no) )
2486 set->page = bt_page (bt, set->pool, page_no);
2490 set->latch = bt_pinlatch (bt, page_no);
2491 bt_lockpage(BtLockRead, set->latch);
2493 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2494 slot = set->page->cnt;
2496 bt_unlockpage(BtLockRead, set->latch);
2497 bt_unpinlatch (set->latch);
2498 bt_unpinpool (set->pool);
2503 // return previous slot on cursor page
2505 uint bt_prevkey (BtDb *bt, uint slot)
2513 if( left = bt_getid(bt->cursor->left) )
2514 bt->cursor_page = left;
2518 if( set->pool = bt_pinpool (bt, left) )
2519 set->page = bt_page (bt, set->pool, left);
2523 set->latch = bt_pinlatch (bt, left);
2524 bt_lockpage(BtLockRead, set->latch);
2526 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2528 bt_unlockpage(BtLockRead, set->latch);
2529 bt_unpinlatch (set->latch);
2530 bt_unpinpool (set->pool);
2531 return bt->cursor->cnt;
2534 // return next slot on cursor page
2535 // or slide cursor right into next page
2537 uint bt_nextkey (BtDb *bt, uint slot)
2543 right = bt_getid(bt->cursor->right);
2545 while( slot++ < bt->cursor->cnt )
2546 if( slotptr(bt->cursor,slot)->dead )
2548 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2556 bt->cursor_page = right;
2558 if( set->pool = bt_pinpool (bt, right) )
2559 set->page = bt_page (bt, set->pool, right);
2563 set->latch = bt_pinlatch (bt, right);
2564 bt_lockpage(BtLockRead, set->latch);
2566 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2568 bt_unlockpage(BtLockRead, set->latch);
2569 bt_unpinlatch (set->latch);
2570 bt_unpinpool (set->pool);
2578 // cache page of keys into cursor and return starting slot for given key
2580 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2585 // cache page for retrieval
2587 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2588 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2592 bt->cursor_page = set->page_no;
2594 bt_unlockpage(BtLockRead, set->latch);
2595 bt_unpinlatch (set->latch);
2596 bt_unpinpool (set->pool);
2600 BtKey *bt_key(BtDb *bt, uint slot)
2602 return keyptr(bt->cursor, slot);
2605 BtVal *bt_val(BtDb *bt, uint slot)
2607 return valptr(bt->cursor,slot);
2613 double getCpuTime(int type)
2616 FILETIME xittime[1];
2617 FILETIME systime[1];
2618 FILETIME usrtime[1];
2619 SYSTEMTIME timeconv[1];
2622 memset (timeconv, 0, sizeof(SYSTEMTIME));
2626 GetSystemTimeAsFileTime (xittime);
2627 FileTimeToSystemTime (xittime, timeconv);
2628 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2631 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2632 FileTimeToSystemTime (usrtime, timeconv);
2635 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2636 FileTimeToSystemTime (systime, timeconv);
2640 ans += (double)timeconv->wHour * 3600;
2641 ans += (double)timeconv->wMinute * 60;
2642 ans += (double)timeconv->wSecond;
2643 ans += (double)timeconv->wMilliseconds / 1000;
2648 #include <sys/resource.h>
2650 double getCpuTime(int type)
2652 struct rusage used[1];
2653 struct timeval tv[1];
2657 gettimeofday(tv, NULL);
2658 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2661 getrusage(RUSAGE_SELF, used);
2662 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2665 getrusage(RUSAGE_SELF, used);
2666 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2673 void bt_poolaudit (BtMgr *mgr)
2678 while( slot++ < mgr->poolcnt ) {
2679 pool = mgr->pool + slot;
2680 if( pool->pin & ~CLOCK_bit )
2681 fprintf(stderr, "pool slot %d pinned\n", slot);
2685 uint bt_latchaudit (BtDb *bt)
2687 ushort idx, hashidx;
2694 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2696 if( *(ushort *)(bt->mgr->latchmgr->lock) )
2697 fprintf(stderr, "Alloc page locked\n");
2698 *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2700 for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2701 latch = bt->mgr->latchsets + idx;
2702 if( *latch->readwr->rin & MASK )
2703 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2704 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2706 if( *latch->access->rin & MASK )
2707 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2708 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2710 if( *latch->parent->rin & MASK )
2711 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2712 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2715 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2720 for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2721 if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2722 fprintf(stderr, "hash entry %d locked\n", hashidx);
2724 *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2726 if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2727 latch = bt->mgr->latchsets + idx;
2728 if( *(ushort *)latch->busy )
2729 fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2730 *(ushort *)latch->busy = 0;
2732 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2733 } while( idx = latch->next );
2736 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2737 page_no = LEAF_page;
2739 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2740 off64_t off = page_no << bt->mgr->page_bits;
2742 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2746 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2748 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2749 fprintf(stderr, "page %.8x unable to read\n", page_no);
2751 if( *amt < bt->mgr->page_size )
2752 fprintf(stderr, "page %.8x unable to read\n", page_no);
2754 if( !bt->frame->free ) {
2755 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2756 ptr = keyptr(bt->frame, idx+1);
2757 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) > 0 )
2758 fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2760 if( !bt->frame->lvl )
2761 cnt += bt->frame->act;
2763 if( page_no > LEAF_page )
2778 // standalone program to index file of keys
2779 // then list them onto std-out
2782 void *index_file (void *arg)
2784 uint __stdcall index_file (void *arg)
2787 int line = 0, found = 0, cnt = 0, unique;
2788 uid next, page_no = LEAF_page; // start on first page of leaves
2789 unsigned char key[BT_maxkey];
2790 ThreadArg *args = arg;
2791 int ch, len = 0, slot;
2798 bt = bt_open (args->mgr);
2800 unique = (args->type[1] | 0x20) != 'd';
2802 switch(args->type[0] | 0x20)
2805 fprintf(stderr, "started latch mgr audit\n");
2806 cnt = bt_latchaudit (bt);
2807 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2811 fprintf(stderr, "started pennysort for %s\n", args->infile);
2812 if( in = fopen (args->infile, "rb") )
2813 while( ch = getc(in), ch != EOF )
2818 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2819 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2822 else if( len < BT_maxkey )
2824 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2828 fprintf(stderr, "started indexing for %s\n", args->infile);
2829 if( in = fopen (args->infile, "rb") )
2830 while( ch = getc(in), ch != EOF )
2835 if( args->num == 1 )
2836 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2838 else if( args->num )
2839 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2841 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2842 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2845 else if( len < BT_maxkey )
2847 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2851 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2852 if( in = fopen (args->infile, "rb") )
2853 while( ch = getc(in), ch != EOF )
2857 if( args->num == 1 )
2858 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2860 else if( args->num )
2861 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2863 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2864 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2865 ptr = (BtKey*)(bt->key);
2868 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2869 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2872 else if( len < BT_maxkey )
2875 fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
2879 fprintf(stderr, "started finding keys for %s\n", args->infile);
2880 if( in = fopen (args->infile, "rb") )
2881 while( ch = getc(in), ch != EOF )
2885 if( args->num == 1 )
2886 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2888 else if( args->num )
2889 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2891 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2894 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2897 else if( len < BT_maxkey )
2899 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2903 fprintf(stderr, "started scanning\n");
2905 if( set->pool = bt_pinpool (bt, page_no) )
2906 set->page = bt_page (bt, set->pool, page_no);
2909 madvise (set->page, bt->mgr->page_size, MADV_WILLNEED);
2910 set->latch = bt_pinlatch (bt, page_no);
2911 bt_lockpage (BtLockRead, set->latch);
2912 next = bt_getid (set->page->right);
2914 for( slot = 0; slot++ < set->page->cnt; )
2915 if( next || slot < set->page->cnt )
2916 if( !slotptr(set->page, slot)->dead ) {
2917 ptr = keyptr(set->page, slot);
2920 if( slotptr(set->page, slot)->type == Duplicate )
2923 fwrite (ptr->key, len, 1, stdout);
2924 val = valptr(set->page, slot);
2925 fwrite (val->value, val->len, 1, stdout);
2926 fputc ('\n', stdout);
2930 bt_unlockpage (BtLockRead, set->latch);
2931 bt_unpinlatch (set->latch);
2932 bt_unpinpool (set->pool);
2933 } while( page_no = next );
2935 fprintf(stderr, " Total keys read %d\n", cnt);
2939 fprintf(stderr, "started reverse scan\n");
2940 if( slot = bt_lastkey (bt) )
2941 while( slot = bt_prevkey (bt, slot) ) {
2942 if( slotptr(bt->cursor, slot)->dead )
2945 ptr = keyptr(bt->cursor, slot);
2948 if( slotptr(bt->cursor, slot)->type == Duplicate )
2951 fwrite (ptr->key, len, 1, stdout);
2952 val = valptr(bt->cursor, slot);
2953 fwrite (val->value, val->len, 1, stdout);
2954 fputc ('\n', stdout);
2958 fprintf(stderr, " Total keys read %d\n", cnt);
2963 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2965 fprintf(stderr, "started counting\n");
2966 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2967 page_no = LEAF_page;
2969 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2970 uid off = page_no << bt->mgr->page_bits;
2972 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2976 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2978 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2979 return bt->err = BTERR_map;
2981 if( *amt < bt->mgr->page_size )
2982 return bt->err = BTERR_map;
2984 if( !bt->frame->free && !bt->frame->lvl )
2985 cnt += bt->frame->act;
2986 if( page_no > LEAF_page )
2991 cnt--; // remove stopper key
2992 fprintf(stderr, " Total keys read %d\n", cnt);
3004 typedef struct timeval timer;
3006 int main (int argc, char **argv)
3008 int idx, cnt, len, slot, err;
3009 int segsize, bits = 16;
3026 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3027 fprintf (stderr, " where page_bits is the page size in bits\n");
3028 fprintf (stderr, " mapped_segments is the number of mmap segments in buffer pool\n");
3029 fprintf (stderr, " seg_bits is the size of individual segments in buffer pool in pages in bits\n");
3030 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
3031 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3035 start = getCpuTime(0);
3038 bits = atoi(argv[3]);
3041 poolsize = atoi(argv[4]);
3044 fprintf (stderr, "Warning: no mapped_pool\n");
3046 if( poolsize > 65535 )
3047 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
3050 segsize = atoi(argv[5]);
3052 segsize = 4; // 16 pages per mmap segment
3055 num = atoi(argv[6]);
3059 threads = malloc (cnt * sizeof(pthread_t));
3061 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3063 args = malloc (cnt * sizeof(ThreadArg));
3065 mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3068 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3074 for( idx = 0; idx < cnt; idx++ ) {
3075 args[idx].infile = argv[idx + 7];
3076 args[idx].type = argv[2];
3077 args[idx].mgr = mgr;
3078 args[idx].num = num;
3079 args[idx].idx = idx;
3081 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3082 fprintf(stderr, "Error creating thread %d\n", err);
3084 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3088 // wait for termination
3091 for( idx = 0; idx < cnt; idx++ )
3092 pthread_join (threads[idx], NULL);
3094 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3096 for( idx = 0; idx < cnt; idx++ )
3097 CloseHandle(threads[idx]);
3100 elapsed = getCpuTime(0) - start;
3101 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3102 elapsed = getCpuTime(1);
3103 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3104 elapsed = getCpuTime(2);
3105 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);