1 // btree version threadskv6 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and atomic key insert
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
87 // maximum number of keys to insert atomically in one call
89 #define MAX_atomic 256
91 #define BT_maxkey 255 // maximum number of bytes in a key
92 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
95 There are five lock types for each node in three independent sets:
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
111 // definition for phase-fair reader/writer lock implementation
125 // definition for spin latch implementation
127 // exclusive is set for write access
128 // share is count of read accessors
129 // grant write lock when share == 0
131 volatile typedef struct {
142 // The key structure occupies space at the upper end of
143 // each page. It's a length byte followed by the key
147 unsigned char len; // this can be changed to a ushort or uint
148 unsigned char key[0];
151 // the value structure also occupies space at the upper
152 // end of the page. Each key is immediately followed by a value.
155 unsigned char len; // this can be changed to a ushort or uint
156 unsigned char value[0];
159 // hash table entries
162 uint slot; // Latch table entry at head of chain
163 BtSpinLatch latch[1];
166 // latch manager table structure
169 uid page_no; // latch set page number
170 RWLock readwr[1]; // read/write page lock
171 RWLock access[1]; // Access Intent/Page delete
172 RWLock parent[1]; // Posting of fence key in parent
173 uint slot; // entry slot in latch table
174 uint next; // next entry in hash table chain
175 uint prev; // prev entry in hash table chain
176 volatile ushort pin; // number of outstanding threads
177 ushort dirty:1; // page in cache is dirty
180 // lock manager table structure
183 RWLock readwr[1]; // read/write key lock
186 uint pin; // count of readers waiting
187 uint hashidx; // hash idx for entry
188 unsigned char key[BT_keyarray];
191 // Define the length of the page record numbers
195 // Page key slot definition.
197 // Keys are marked dead, but remain on the page until
198 // it cleanup is called. The fence key (highest key) for
199 // a leaf page is always present, even after cleanup.
203 // In addition to the Unique keys that occupy slots
204 // there are Librarian and Duplicate key
205 // slots occupying the key slot array.
207 // The Librarian slots are dead keys that
208 // serve as filler, available to add new Unique
209 // or Dup slots that are inserted into the B-tree.
211 // The Duplicate slots have had their key bytes extended
212 // by 6 bytes to contain a binary duplicate key uniqueifier.
221 uint off:BT_maxbits; // page offset for key start
222 uint type:3; // type of slot
223 uint dead:1; // set for deleted slot
226 // The first part of an index page.
227 // It is immediately followed
228 // by the BtSlot array of keys.
230 // note that this structure size
231 // must be a multiple of 8 bytes
232 // in order to place dups correctly.
234 typedef struct BtPage_ {
235 uint cnt; // count of keys in page
236 uint act; // count of active keys
237 uint min; // next key offset
238 uint garbage; // page garbage in bytes
239 unsigned char bits:7; // page size in bits
240 unsigned char free:1; // page is on free chain
241 unsigned char lvl:7; // level of page
242 unsigned char kill:1; // page is being deleted
243 unsigned char left[BtId]; // page number to left
244 unsigned char filler[2]; // padding to multiple of 8
245 unsigned char right[BtId]; // page number to right
248 // The loadpage interface object
251 uid page_no; // current page number
252 BtPage page; // current page pointer
253 BtLatchSet *latch; // current page latch set
256 // structure for latch manager on ALLOC_page
259 struct BtPage_ alloc[1]; // next page_no in right ptr
260 unsigned long long dups[1]; // global duplicate key uniqueifier
261 unsigned char chain[BtId]; // head of free page_nos chain
264 // The object structure for Btree access
267 uint page_size; // page size
268 uint page_bits; // page size in bits
274 BtPageZero *pagezero; // mapped allocation page
275 BtSpinLatch alloclatch[1]; // allocation area lite latch
276 uint latchdeployed; // highest number of pool entries deployed
277 uint nlatchpage; // number of latch & lock & pool pages
278 uint latchtotal; // number of page latch entries
279 uint latchhash; // number of latch hash table slots
280 uint latchvictim; // next latch entry to examine
281 BtHashEntry *hashtable; // the anonymous mapping buffer pool
282 BtLatchSet *latchsets; // mapped latch set from latch pages
283 unsigned char *pagepool; // mapped to the buffer pool pages
284 uint lockhash; // number of lock hash table slots
285 uint lockfree; // next available lock table entry
286 BtSpinLatch locklatch[1]; // lock manager free chain latch
287 BtHashEntry *hashlock; // the lock manager hash table
288 BtLockSet *locktable; // the lock manager key table
290 HANDLE halloc; // allocation handle
291 HANDLE hpool; // buffer pool handle
296 BtMgr *mgr; // buffer manager for thread
297 BtPage cursor; // cached frame for start/next (never mapped)
298 BtPage frame; // spare frame for the page split (never mapped)
299 uid cursor_page; // current cursor page number
300 unsigned char *mem; // frame, cursor, page memory buffer
301 unsigned char key[BT_keyarray]; // last found complete key
302 int found; // last delete or insert was found
303 int err; // last error
304 int reads, writes; // number of reads and writes from the btree
318 #define CLOCK_bit 0x8000
321 extern void bt_close (BtDb *bt);
322 extern BtDb *bt_open (BtMgr *mgr);
323 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, int unique);
324 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
325 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
326 extern BtKey *bt_foundkey (BtDb *bt);
327 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
328 extern uint bt_nextkey (BtDb *bt, uint slot);
331 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint locksize);
332 void bt_mgrclose (BtMgr *mgr);
334 // Helper functions to return slot values
335 // from the cursor page.
337 extern BtKey *bt_key (BtDb *bt, uint slot);
338 extern BtVal *bt_val (BtDb *bt, uint slot);
340 // The page is allocated from low and hi ends.
341 // The key slots are allocated from the bottom,
342 // while the text and value of the key
343 // are allocated from the top. When the two
344 // areas meet, the page is split into two.
346 // A key consists of a length byte, two bytes of
347 // index number (0 - 65535), and up to 253 bytes
350 // Associated with each key is a value byte string
351 // containing any value desired.
353 // The b-tree root is always located at page 1.
354 // The first leaf page of level zero is always
355 // located on page 2.
357 // The b-tree pages are linked with next
358 // pointers to facilitate enumerators,
359 // and provide for concurrency.
361 // When to root page fills, it is split in two and
362 // the tree height is raised by a new root at page
363 // one with two keys.
365 // Deleted keys are marked with a dead bit until
366 // page cleanup. The fence key for a leaf node is
369 // To achieve maximum concurrency one page is locked at a time
370 // as the tree is traversed to find leaf key in question. The right
371 // page numbers are used in cases where the page is being split,
374 // Page 0 is dedicated to lock for new page extensions,
375 // and chains empty pages together for reuse. It also
376 // contains the latch manager hash table.
378 // The ParentModification lock on a node is obtained to serialize posting
379 // or changing the fence key for a node.
381 // Empty pages are chained together through the ALLOC page and reused.
383 // Access macros to address slot and key values from the page
384 // Page slots use 1 based indexing.
386 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
387 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
388 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
390 void bt_putid(unsigned char *dest, uid id)
395 dest[i] = (unsigned char)id, id >>= 8;
398 uid bt_getid(unsigned char *src)
403 for( i = 0; i < BtId; i++ )
404 id <<= 8, id |= *src++;
409 uid bt_newdup (BtDb *bt)
412 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
414 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
418 // Phase-Fair reader/writer lock implementation
420 void WriteLock (RWLock *lock)
425 tix = __sync_fetch_and_add (lock->ticket, 1);
427 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
429 // wait for our ticket to come up
431 while( tix != lock->serving[0] )
438 w = PRES | (tix & PHID);
440 r = __sync_fetch_and_add (lock->rin, w);
442 r = _InterlockedExchangeAdd16 (lock->rin, w);
444 while( r != *lock->rout )
452 void WriteRelease (RWLock *lock)
455 __sync_fetch_and_and (lock->rin, ~MASK);
457 _InterlockedAnd16 (lock->rin, ~MASK);
462 void ReadLock (RWLock *lock)
466 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
468 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
471 while( w == (*lock->rin & MASK) )
479 void ReadRelease (RWLock *lock)
482 __sync_fetch_and_add (lock->rout, RINC);
484 _InterlockedExchangeAdd16 (lock->rout, RINC);
488 // Spin Latch Manager
490 // wait until write lock mode is clear
491 // and add 1 to the share count
493 void bt_spinreadlock(BtSpinLatch *latch)
499 prev = __sync_fetch_and_add ((uint *)latch, SHARE);
501 prev = _InterlockedExchangeAdd((uint *)latch, SHARE);
503 // see if exclusive request is granted or pending
508 prev = __sync_fetch_and_add ((uint *)latch, -SHARE);
510 prev = _InterlockedExchangeAdd((uint *)latch, -SHARE);
513 } while( sched_yield(), 1 );
515 } while( SwitchToThread(), 1 );
519 // wait for other read and write latches to relinquish
521 void bt_spinwritelock(BtSpinLatch *latch)
527 prev = __sync_fetch_and_or((uint *)latch, PEND | XCL);
529 prev = _InterlockedOr((uint *)latch, PEND | XCL);
532 if( !(prev & ~BOTH) )
536 __sync_fetch_and_and ((uint *)latch, ~XCL);
538 _InterlockedAnd((uint *)latch, ~XCL);
541 } while( sched_yield(), 1 );
543 } while( SwitchToThread(), 1 );
547 // try to obtain write lock
549 // return 1 if obtained,
552 int bt_spinwritetry(BtSpinLatch *latch)
557 prev = __sync_fetch_and_or((uint *)latch, XCL);
559 prev = _InterlockedOr((uint *)latch, XCL);
561 // take write access if all bits are clear
564 if( !(prev & ~BOTH) )
568 __sync_fetch_and_and ((uint *)latch, ~XCL);
570 _InterlockedAnd((uint *)latch, ~XCL);
577 void bt_spinreleasewrite(BtSpinLatch *latch)
580 __sync_fetch_and_and((uint *)latch, ~BOTH);
582 _InterlockedAnd((uint *)latch, ~BOTH);
586 // decrement reader count
588 void bt_spinreleaseread(BtSpinLatch *latch)
591 __sync_fetch_and_add((uint *)latch, -SHARE);
593 _InterlockedExchangeAdd((uint *)latch, -SHARE);
597 // read page from permanent location in Btree file
599 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
601 off64_t off = page_no << mgr->page_bits;
604 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
605 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
612 memset (ovl, 0, sizeof(OVERLAPPED));
614 ovl->OffsetHigh = off >> 32;
616 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
617 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
620 if( *amt < mgr->page_size ) {
621 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
628 // write page to permanent location in Btree file
629 // clear the dirty bit
631 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
633 off64_t off = page_no << mgr->page_bits;
636 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
642 memset (ovl, 0, sizeof(OVERLAPPED));
644 ovl->OffsetHigh = off >> 32;
646 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
649 if( *amt < mgr->page_size )
655 // link latch table entry into head of latch hash table
657 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
659 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
660 BtLatchSet *latch = bt->mgr->latchsets + slot;
662 if( latch->next = bt->mgr->hashtable[hashidx].slot )
663 bt->mgr->latchsets[latch->next].prev = slot;
665 bt->mgr->hashtable[hashidx].slot = slot;
666 latch->page_no = page_no;
672 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
680 // set CLOCK bit in latch
681 // decrement pin count
683 void bt_unpinlatch (BtLatchSet *latch)
686 if( ~latch->pin & CLOCK_bit )
687 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
688 __sync_fetch_and_add(&latch->pin, -1);
690 if( ~latch->pin & CLOCK_bit )
691 _InterlockedOr16 (&latch->pin, CLOCK_bit);
692 _InterlockedDecrement16 (&latch->pin);
696 // return the btree cached page address
698 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
700 BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
705 // find existing latchset or inspire new one
706 // return with latchset pinned
708 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
710 uint hashidx = page_no % bt->mgr->latchhash;
718 // try to find our entry
720 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
722 if( slot = bt->mgr->hashtable[hashidx].slot ) do
724 latch = bt->mgr->latchsets + slot;
725 if( page_no == latch->page_no )
727 } while( slot = latch->next );
733 latch = bt->mgr->latchsets + slot;
735 __sync_fetch_and_add(&latch->pin, 1);
737 _InterlockedIncrement16 (&latch->pin);
739 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
743 // see if there are any unused pool entries
745 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
747 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
750 if( slot < bt->mgr->latchtotal ) {
751 latch = bt->mgr->latchsets + slot;
752 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
754 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
759 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
761 _InterlockedDecrement (&bt->mgr->latchdeployed);
763 // find and reuse previous entry on victim
767 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
769 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
771 // try to get write lock on hash chain
772 // skip entry if not obtained
773 // or has outstanding pins
775 slot %= bt->mgr->latchtotal;
780 latch = bt->mgr->latchsets + slot;
781 idx = latch->page_no % bt->mgr->latchhash;
783 // see we are on same chain as hashidx
788 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
791 // skip this slot if it is pinned
792 // or the CLOCK bit is set
795 if( latch->pin & CLOCK_bit ) {
797 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
799 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
802 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
806 // update permanent page area in btree from buffer pool
808 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
811 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
814 latch->dirty = 0, bt->writes++;
816 // unlink our available slot from its hash chain
819 bt->mgr->latchsets[latch->prev].next = latch->next;
821 bt->mgr->hashtable[idx].slot = latch->next;
824 bt->mgr->latchsets[latch->next].prev = latch->prev;
826 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
828 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
831 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
836 void bt_mgrclose (BtMgr *mgr)
843 // flush dirty pool pages to the btree
845 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
846 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
847 latch = mgr->latchsets + slot;
850 bt_writepage(mgr, page, latch->page_no);
851 latch->dirty = 0, num++;
853 // madvise (page, mgr->page_size, MADV_DONTNEED);
856 fprintf(stderr, "%d buffer pool pages flushed\n", num);
859 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
860 munmap (mgr->pagezero, mgr->page_size);
862 FlushViewOfFile(mgr->pagezero, 0);
863 UnmapViewOfFile(mgr->pagezero);
864 UnmapViewOfFile(mgr->hashtable);
865 CloseHandle(mgr->halloc);
866 CloseHandle(mgr->hpool);
872 FlushFileBuffers(mgr->idx);
873 CloseHandle(mgr->idx);
878 // close and release memory
880 void bt_close (BtDb *bt)
887 VirtualFree (bt->mem, 0, MEM_RELEASE);
892 // open/create new btree buffer manager
894 // call with file_name, BT_openmode, bits in page size (e.g. 16),
895 // size of page pool (e.g. 262144) and number of lock table entries.
897 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint lockmax)
899 uint lvl, attr, last, slot, idx;
900 unsigned char value[BtId];
901 int flag, initit = 0;
902 BtPageZero *pagezero;
909 // determine sanity of page size and buffer pool
911 if( bits > BT_maxbits )
913 else if( bits < BT_minbits )
917 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
922 mgr = calloc (1, sizeof(BtMgr));
924 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
926 if( mgr->idx == -1 ) {
927 fprintf (stderr, "Unable to open btree file\n");
928 return free(mgr), NULL;
931 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
932 attr = FILE_ATTRIBUTE_NORMAL;
933 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
935 if( mgr->idx == INVALID_HANDLE_VALUE )
936 return GlobalFree(mgr), NULL;
940 pagezero = valloc (BT_maxpage);
943 // read minimum page size to get root info
944 // to support raw disk partition files
945 // check if bits == 0 on the disk.
947 if( size = lseek (mgr->idx, 0L, 2) )
948 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
949 if( pagezero->alloc->bits )
950 bits = pagezero->alloc->bits;
954 return free(mgr), free(pagezero), NULL;
958 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
959 size = GetFileSize(mgr->idx, amt);
962 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
963 return bt_mgrclose (mgr), NULL;
964 bits = pagezero->alloc->bits;
969 mgr->page_size = 1 << bits;
970 mgr->page_bits = bits;
972 // calculate number of latch hash table entries
974 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
975 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
977 // add on the number of pages in buffer pool
978 // along with the corresponding latch table
980 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
981 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
982 mgr->latchtotal = nodemax;
984 // add on the sizeof the lock manager hash table and the lock table
986 mgr->nlatchpage += (lockmax / 16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
988 mgr->nlatchpage += (lockmax * sizeof(BtLockSet) + mgr->page_size - 1) / mgr->page_size;
993 // initialize an empty b-tree with latch page, root page, page of leaves
994 // and page(s) of latches and page pool cache
996 memset (pagezero, 0, 1 << bits);
997 pagezero->alloc->bits = mgr->page_bits;
998 bt_putid(pagezero->alloc->right, MIN_lvl+1);
1000 // initialize left-most LEAF page in
1003 bt_putid (pagezero->alloc->left, LEAF_page);
1005 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1006 fprintf (stderr, "Unable to create btree page zero\n");
1007 return bt_mgrclose (mgr), NULL;
1010 memset (pagezero, 0, 1 << bits);
1011 pagezero->alloc->bits = mgr->page_bits;
1013 for( lvl=MIN_lvl; lvl--; ) {
1014 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1015 key = keyptr(pagezero->alloc, 1);
1016 key->len = 2; // create stopper key
1020 bt_putid(value, MIN_lvl - lvl + 1);
1021 val = valptr(pagezero->alloc, 1);
1022 val->len = lvl ? BtId : 0;
1023 memcpy (val->value, value, val->len);
1025 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1026 pagezero->alloc->lvl = lvl;
1027 pagezero->alloc->cnt = 1;
1028 pagezero->alloc->act = 1;
1030 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1031 fprintf (stderr, "Unable to create btree page zero\n");
1032 return bt_mgrclose (mgr), NULL;
1040 VirtualFree (pagezero, 0, MEM_RELEASE);
1043 // mlock the pagezero page
1045 flag = PROT_READ | PROT_WRITE;
1046 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1047 if( mgr->pagezero == MAP_FAILED ) {
1048 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1049 return bt_mgrclose (mgr), NULL;
1051 mlock (mgr->pagezero, mgr->page_size);
1053 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1054 if( mgr->hashtable == MAP_FAILED ) {
1055 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1056 return bt_mgrclose (mgr), NULL;
1059 flag = PAGE_READWRITE;
1060 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1061 if( !mgr->halloc ) {
1062 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1063 return bt_mgrclose (mgr), NULL;
1066 flag = FILE_MAP_WRITE;
1067 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1068 if( !mgr->pagezero ) {
1069 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1070 return bt_mgrclose (mgr), NULL;
1073 flag = PAGE_READWRITE;
1074 size = (uid)mgr->nlatchpage << mgr->page_bits;
1075 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1077 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1078 return bt_mgrclose (mgr), NULL;
1081 flag = FILE_MAP_WRITE;
1082 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1083 if( !mgr->hashtable ) {
1084 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1085 return bt_mgrclose (mgr), NULL;
1089 size = (mgr->latchhash * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1090 mgr->latchsets = (BtLatchSet *)((unsigned char *)mgr->hashtable + size * mgr->page_size);
1091 size = (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1093 mgr->pagepool = (unsigned char *)mgr->hashtable + (size << mgr->page_bits);
1094 mgr->hashlock = (BtHashEntry *)(mgr->pagepool + ((uid)nodemax << mgr->page_bits));
1095 mgr->locktable = (BtLockSet *)((unsigned char *)mgr->hashtable + ((uid)mgr->nlatchpage << mgr->page_bits) - lockmax * sizeof(BtLockSet));
1097 mgr->lockfree = lockmax - 1;
1098 mgr->lockhash = ((unsigned char *)mgr->locktable - (unsigned char *)mgr->hashlock) / sizeof(BtHashEntry);
1100 for( idx = 1; idx < lockmax; idx++ )
1101 mgr->locktable[idx].next = idx - 1;
1106 // open BTree access method
1107 // based on buffer manager
1109 BtDb *bt_open (BtMgr *mgr)
1111 BtDb *bt = malloc (sizeof(*bt));
1113 memset (bt, 0, sizeof(*bt));
1116 bt->mem = valloc (2 *mgr->page_size);
1118 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1120 bt->frame = (BtPage)bt->mem;
1121 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1125 // compare two keys, returning > 0, = 0, or < 0
1126 // as the comparison value
1128 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1130 uint len1 = key1->len;
1133 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1144 // place write, read, or parent lock on requested page_no.
1146 void bt_lockpage(BtLock mode, BtLatchSet *set)
1150 ReadLock (set->readwr);
1153 WriteLock (set->readwr);
1156 ReadLock (set->access);
1159 WriteLock (set->access);
1162 WriteLock (set->parent);
1167 // remove write, read, or parent lock on requested page
1169 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1173 ReadRelease (set->readwr);
1176 WriteRelease (set->readwr);
1179 ReadRelease (set->access);
1182 WriteRelease (set->access);
1185 WriteRelease (set->parent);
1190 // allocate a new page
1191 // return with page latched.
1193 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1197 // lock allocation page
1199 bt_spinwritelock(bt->mgr->alloclatch);
1201 // use empty chain first
1202 // else allocate empty page
1204 if( set->page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1205 if( set->latch = bt_pinlatch (bt, set->page_no, 1) )
1206 set->page = bt_mappage (bt, set->latch);
1208 return bt->err = BTERR_struct, -1;
1210 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1211 bt_spinreleasewrite(bt->mgr->alloclatch);
1213 memcpy (set->page, contents, bt->mgr->page_size);
1214 set->latch->dirty = 1;
1218 set->page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1219 bt_putid(bt->mgr->pagezero->alloc->right, set->page_no+1);
1221 // unlock allocation latch
1223 bt_spinreleasewrite(bt->mgr->alloclatch);
1225 // don't load cache from btree page
1227 if( set->latch = bt_pinlatch (bt, set->page_no, 0) )
1228 set->page = bt_mappage (bt, set->latch);
1230 return bt->err = BTERR_struct;
1232 memcpy (set->page, contents, bt->mgr->page_size);
1233 set->latch->dirty = 1;
1237 // find slot in page for given key at a given level
1239 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1241 uint diff, higher = set->page->cnt, low = 1, slot;
1244 // make stopper key an infinite fence value
1246 if( bt_getid (set->page->right) )
1251 // low is the lowest candidate.
1252 // loop ends when they meet
1254 // higher is already
1255 // tested as .ge. the passed key.
1257 while( diff = higher - low ) {
1258 slot = low + ( diff >> 1 );
1259 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1262 higher = slot, good++;
1265 // return zero if key is on right link page
1267 return good ? higher : 0;
1270 // find and load page at given level for given key
1271 // leave page rd or wr locked as requested
1273 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1275 uid page_no = ROOT_page, prevpage = 0;
1276 uint drill = 0xff, slot;
1277 BtLatchSet *prevlatch;
1278 uint mode, prevmode;
1280 // start at root of btree and drill down
1283 // determine lock mode of drill level
1284 mode = (drill == lvl) ? lock : BtLockRead;
1286 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1287 set->page_no = page_no;
1291 // obtain access lock using lock chaining with Access mode
1293 if( page_no > ROOT_page )
1294 bt_lockpage(BtLockAccess, set->latch);
1296 set->page = bt_mappage (bt, set->latch);
1298 // release & unpin parent page
1301 bt_unlockpage(prevmode, prevlatch);
1302 bt_unpinlatch (prevlatch);
1306 // obtain read lock using lock chaining
1308 bt_lockpage(mode, set->latch);
1310 if( set->page->free )
1311 return bt->err = BTERR_struct, 0;
1313 if( page_no > ROOT_page )
1314 bt_unlockpage(BtLockAccess, set->latch);
1316 // re-read and re-lock root after determining actual level of root
1318 if( set->page->lvl != drill) {
1319 if( set->page_no != ROOT_page )
1320 return bt->err = BTERR_struct, 0;
1322 drill = set->page->lvl;
1324 if( lock != BtLockRead && drill == lvl ) {
1325 bt_unlockpage(mode, set->latch);
1326 bt_unpinlatch (set->latch);
1331 prevpage = set->page_no;
1332 prevlatch = set->latch;
1335 // find key on page at this level
1336 // and descend to requested level
1338 if( set->page->kill )
1341 if( slot = bt_findslot (set, key, len) ) {
1345 while( slotptr(set->page, slot)->dead )
1346 if( slot++ < set->page->cnt )
1351 page_no = bt_getid(valptr(set->page, slot)->value);
1356 // or slide right into next page
1359 page_no = bt_getid(set->page->right);
1363 // return error on end of right chain
1365 bt->err = BTERR_struct;
1366 return 0; // return error
1369 // return page to free list
1370 // page must be delete & write locked
1372 void bt_freepage (BtDb *bt, BtPageSet *set)
1374 // lock allocation page
1376 bt_spinwritelock (bt->mgr->alloclatch);
1379 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1380 bt_putid(bt->mgr->pagezero->chain, set->page_no);
1381 set->latch->dirty = 1;
1382 set->page->free = 1;
1384 // unlock released page
1386 bt_unlockpage (BtLockDelete, set->latch);
1387 bt_unlockpage (BtLockWrite, set->latch);
1388 bt_unpinlatch (set->latch);
1390 // unlock allocation page
1392 bt_spinreleasewrite (bt->mgr->alloclatch);
1395 // a fence key was deleted from a page
1396 // push new fence value upwards
1398 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1400 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1401 unsigned char value[BtId];
1405 // remove the old fence value
1407 ptr = keyptr(set->page, set->page->cnt);
1408 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1409 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1410 set->latch->dirty = 1;
1412 // cache new fence value
1414 ptr = keyptr(set->page, set->page->cnt);
1415 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1417 bt_lockpage (BtLockParent, set->latch);
1418 bt_unlockpage (BtLockWrite, set->latch);
1420 // insert new (now smaller) fence key
1422 bt_putid (value, set->page_no);
1423 ptr = (BtKey*)leftkey;
1425 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1428 // now delete old fence key
1430 ptr = (BtKey*)rightkey;
1432 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1435 bt_unlockpage (BtLockParent, set->latch);
1436 bt_unpinlatch(set->latch);
1440 // root has a single child
1441 // collapse a level from the tree
1443 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1448 // find the child entry and promote as new root contents
1451 for( idx = 0; idx++ < root->page->cnt; )
1452 if( !slotptr(root->page, idx)->dead )
1455 child->page_no = bt_getid (valptr(root->page, idx)->value);
1457 if( child->latch = bt_pinlatch (bt, child->page_no, 1) )
1458 child->page = bt_mappage (bt, child->latch);
1462 bt_lockpage (BtLockDelete, child->latch);
1463 bt_lockpage (BtLockWrite, child->latch);
1465 memcpy (root->page, child->page, bt->mgr->page_size);
1466 root->latch->dirty = 1;
1468 bt_freepage (bt, child);
1470 } while( root->page->lvl > 1 && root->page->act == 1 );
1472 bt_unlockpage (BtLockWrite, root->latch);
1473 bt_unpinlatch (root->latch);
1477 // maintain reverse scan pointers by
1478 // linking left pointer of far right node
1480 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1482 BtPageSet right2[1];
1484 // keep track of rightmost leaf page
1486 if( !right_page_no ) {
1487 bt_putid (bt->mgr->pagezero->alloc->left, left_page_no);
1491 // link right page to left page
1493 if( right2->latch = bt_pinlatch (bt, right_page_no, 1) )
1494 right2->page = bt_mappage (bt, right2->latch);
1498 bt_lockpage (BtLockWrite, right2->latch);
1500 bt_putid(right2->page->left, left_page_no);
1501 bt_unlockpage (BtLockWrite, right2->latch);
1502 bt_unpinlatch (right2->latch);
1506 // find and delete key on page by marking delete flag bit
1507 // if page becomes empty, delete it from the btree
1509 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1511 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1512 uint slot, idx, found, fence;
1513 BtPageSet set[1], right[1];
1514 unsigned char value[BtId];
1518 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1519 ptr = keyptr(set->page, slot);
1523 // if librarian slot, advance to real slot
1525 if( slotptr(set->page, slot)->type == Librarian )
1526 ptr = keyptr(set->page, ++slot);
1528 fence = slot == set->page->cnt;
1530 // if key is found delete it, otherwise ignore request
1532 if( found = !keycmp (ptr, key, len) )
1533 if( found = slotptr(set->page, slot)->dead == 0 ) {
1534 val = valptr(set->page,slot);
1535 slotptr(set->page, slot)->dead = 1;
1536 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1539 // collapse empty slots beneath the fence
1541 while( idx = set->page->cnt - 1 )
1542 if( slotptr(set->page, idx)->dead ) {
1543 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1544 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1549 // did we delete a fence key in an upper level?
1551 if( found && lvl && set->page->act && fence )
1552 if( bt_fixfence (bt, set, lvl) )
1555 return bt->found = found, 0;
1557 // do we need to collapse root?
1559 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1560 if( bt_collapseroot (bt, set) )
1563 return bt->found = found, 0;
1565 // return if page is not empty
1567 if( set->page->act ) {
1568 set->latch->dirty = 1;
1569 bt_unlockpage(BtLockWrite, set->latch);
1570 bt_unpinlatch (set->latch);
1571 return bt->found = found, 0;
1574 // cache copy of fence key
1575 // to post in parent
1577 ptr = keyptr(set->page, set->page->cnt);
1578 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1580 // obtain lock on right page
1582 right->page_no = bt_getid(set->page->right);
1584 if( right->latch = bt_pinlatch (bt, right->page_no, 1) )
1585 right->page = bt_mappage (bt, right->latch);
1589 bt_lockpage (BtLockWrite, right->latch);
1591 if( right->page->kill )
1592 return bt->err = BTERR_struct;
1594 // transfer left link
1596 memcpy (right->page->left, set->page->left, BtId);
1598 // pull contents of right peer into our empty page
1600 memcpy (set->page, right->page, bt->mgr->page_size);
1601 set->latch->dirty = 1;
1606 if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1609 // cache copy of key to update
1611 ptr = keyptr(right->page, right->page->cnt);
1612 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1614 // mark right page deleted and point it to left page
1615 // until we can post parent updates
1617 bt_putid (right->page->right, set->page_no);
1618 right->latch->dirty = 1;
1619 right->page->kill = 1;
1621 bt_lockpage (BtLockParent, right->latch);
1622 bt_unlockpage (BtLockWrite, right->latch);
1624 bt_lockpage (BtLockParent, set->latch);
1625 bt_unlockpage (BtLockWrite, set->latch);
1627 // redirect higher key directly to our new node contents
1629 bt_putid (value, set->page_no);
1630 ptr = (BtKey*)higherfence;
1632 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1635 // delete old lower key to our node
1637 ptr = (BtKey*)lowerfence;
1639 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1642 // obtain delete and write locks to right node
1644 bt_unlockpage (BtLockParent, right->latch);
1645 bt_lockpage (BtLockDelete, right->latch);
1646 bt_lockpage (BtLockWrite, right->latch);
1647 bt_freepage (bt, right);
1649 bt_unlockpage (BtLockParent, set->latch);
1650 bt_unpinlatch (set->latch);
1655 BtKey *bt_foundkey (BtDb *bt)
1657 return (BtKey*)(bt->key);
1660 // advance to next slot
1662 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1664 BtLatchSet *prevlatch;
1667 if( slot < set->page->cnt )
1670 prevlatch = set->latch;
1672 if( page_no = bt_getid(set->page->right) )
1673 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1674 set->page = bt_mappage (bt, set->latch);
1678 return bt->err = BTERR_struct, 0;
1680 // obtain access lock using lock chaining with Access mode
1682 bt_lockpage(BtLockAccess, set->latch);
1684 bt_unlockpage(BtLockRead, prevlatch);
1685 bt_unpinlatch (prevlatch);
1687 bt_lockpage(BtLockRead, set->latch);
1688 bt_unlockpage(BtLockAccess, set->latch);
1690 set->page_no = page_no;
1694 // find unique key or first duplicate key in
1695 // leaf level and return number of value bytes
1696 // or (-1) if not found. Setup key for bt_foundkey
1698 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1706 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1708 ptr = keyptr(set->page, slot);
1710 // skip librarian slot place holder
1712 if( slotptr(set->page, slot)->type == Librarian )
1713 ptr = keyptr(set->page, ++slot);
1715 // return actual key found
1717 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1720 if( slotptr(set->page, slot)->type == Duplicate )
1723 // not there if we reach the stopper key
1725 if( slot == set->page->cnt )
1726 if( !bt_getid (set->page->right) )
1729 // if key exists, return >= 0 value bytes copied
1730 // otherwise return (-1)
1732 if( slotptr(set->page, slot)->dead )
1736 if( !memcmp (ptr->key, key, len) ) {
1737 val = valptr (set->page,slot);
1738 if( valmax > val->len )
1740 memcpy (value, val->value, valmax);
1746 } while( slot = bt_findnext (bt, set, slot) );
1748 bt_unlockpage (BtLockRead, set->latch);
1749 bt_unpinlatch (set->latch);
1753 // check page for space available,
1754 // clean if necessary and return
1755 // 0 - page needs splitting
1756 // >0 new slot value
1758 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1760 uint nxt = bt->mgr->page_size;
1761 BtPage page = set->page;
1762 uint cnt = 0, idx = 0;
1763 uint max = page->cnt;
1768 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1771 // skip cleanup and proceed to split
1772 // if there's not enough garbage
1775 if( page->garbage < nxt / 5 )
1778 memcpy (bt->frame, page, bt->mgr->page_size);
1780 // skip page info and set rest of page to zero
1782 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1783 set->latch->dirty = 1;
1787 // clean up page first by
1788 // removing deleted keys
1790 while( cnt++ < max ) {
1793 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1796 // copy the value across
1798 val = valptr(bt->frame, cnt);
1799 nxt -= val->len + sizeof(BtVal);
1800 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1802 // copy the key across
1804 key = keyptr(bt->frame, cnt);
1805 nxt -= key->len + sizeof(BtKey);
1806 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1808 // make a librarian slot
1811 slotptr(page, ++idx)->off = nxt;
1812 slotptr(page, idx)->type = Librarian;
1813 slotptr(page, idx)->dead = 1;
1818 slotptr(page, ++idx)->off = nxt;
1819 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1821 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1828 // see if page has enough space now, or does it need splitting?
1830 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1836 // split the root and raise the height of the btree
1838 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
1840 uint nxt = bt->mgr->page_size;
1841 unsigned char value[BtId];
1846 // Obtain an empty page to use, and copy the current
1847 // root contents into it, e.g. lower keys
1849 if( bt_newpage(bt, left, root->page) )
1852 bt_unpinlatch (left->latch);
1854 // set left from higher to lower page
1856 bt_putid (right->page->left, left->page_no);
1857 bt_unpinlatch (right->latch);
1859 // preserve the page info at the bottom
1860 // of higher keys and set rest to zero
1862 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1864 // insert stopper key at top of newroot page
1865 // and increase the root height
1867 nxt -= BtId + sizeof(BtVal);
1868 bt_putid (value, right->page_no);
1869 val = (BtVal *)((unsigned char *)root->page + nxt);
1870 memcpy (val->value, value, BtId);
1873 nxt -= 2 + sizeof(BtKey);
1874 slotptr(root->page, 2)->off = nxt;
1875 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1880 // insert lower keys page fence key on newroot page as first key
1882 nxt -= BtId + sizeof(BtVal);
1883 bt_putid (value, left->page_no);
1884 val = (BtVal *)((unsigned char *)root->page + nxt);
1885 memcpy (val->value, value, BtId);
1888 nxt -= leftkey->len + sizeof(BtKey);
1889 slotptr(root->page, 1)->off = nxt;
1890 memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
1892 bt_putid(root->page->right, 0);
1893 root->page->min = nxt; // reset lowest used offset and key count
1894 root->page->cnt = 2;
1895 root->page->act = 2;
1898 // release and unpin root pages
1900 bt_unlockpage(BtLockWrite, root->latch);
1901 bt_unpinlatch (root->latch);
1905 // split already locked full node
1908 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1910 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
1911 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1912 unsigned char value[BtId];
1913 uint lvl = set->page->lvl;
1920 // split higher half of keys to bt->frame
1922 memset (bt->frame, 0, bt->mgr->page_size);
1923 max = set->page->cnt;
1927 while( cnt++ < max ) {
1928 if( slotptr(set->page, cnt)->dead && cnt < max )
1930 src = valptr(set->page, cnt);
1931 nxt -= src->len + sizeof(BtVal);
1932 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1934 key = keyptr(set->page, cnt);
1935 nxt -= key->len + sizeof(BtKey);
1936 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1937 memcpy (ptr, key, key->len + sizeof(BtKey));
1939 // add librarian slot
1942 slotptr(bt->frame, ++idx)->off = nxt;
1943 slotptr(bt->frame, idx)->type = Librarian;
1944 slotptr(bt->frame, idx)->dead = 1;
1949 slotptr(bt->frame, ++idx)->off = nxt;
1950 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1952 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1956 // remember existing fence key for new page to the right
1958 memcpy (rightkey, key, key->len + sizeof(BtKey));
1960 bt->frame->bits = bt->mgr->page_bits;
1961 bt->frame->min = nxt;
1962 bt->frame->cnt = idx;
1963 bt->frame->lvl = lvl;
1967 if( set->page_no > ROOT_page ) {
1968 bt_putid (bt->frame->right, bt_getid (set->page->right));
1969 bt_putid(bt->frame->left, set->page_no);
1972 // get new free page and write higher keys to it.
1974 if( bt_newpage(bt, right, bt->frame) )
1979 if( set->page_no > ROOT_page && !lvl )
1980 if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
1983 // update lower keys to continue in old page
1985 memcpy (bt->frame, set->page, bt->mgr->page_size);
1986 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1987 set->latch->dirty = 1;
1989 nxt = bt->mgr->page_size;
1990 set->page->garbage = 0;
1996 if( slotptr(bt->frame, max)->type == Librarian )
1999 // assemble page of smaller keys
2001 while( cnt++ < max ) {
2002 if( slotptr(bt->frame, cnt)->dead )
2004 val = valptr(bt->frame, cnt);
2005 nxt -= val->len + sizeof(BtVal);
2006 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2008 key = keyptr(bt->frame, cnt);
2009 nxt -= key->len + sizeof(BtKey);
2010 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2012 // add librarian slot
2015 slotptr(set->page, ++idx)->off = nxt;
2016 slotptr(set->page, idx)->type = Librarian;
2017 slotptr(set->page, idx)->dead = 1;
2022 slotptr(set->page, ++idx)->off = nxt;
2023 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2027 // remember fence key for smaller page
2029 memcpy(fencekey, key, key->len + sizeof(BtKey));
2031 bt_putid(set->page->right, right->page_no);
2032 set->page->min = nxt;
2033 set->page->cnt = idx;
2035 // if current page is the root page, split it
2037 if( set->page_no == ROOT_page )
2038 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2040 // insert new fences in their parent pages
2042 bt_lockpage (BtLockParent, right->latch);
2044 bt_lockpage (BtLockParent, set->latch);
2045 bt_unlockpage (BtLockWrite, set->latch);
2047 // insert new fence for reformulated left block of smaller keys
2049 bt_putid (value, set->page_no);
2051 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2054 // switch fence for right block of larger keys to new right page
2056 bt_putid (value, right->page_no);
2058 if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2061 bt_unlockpage (BtLockParent, set->latch);
2062 bt_unpinlatch (set->latch);
2064 bt_unlockpage (BtLockParent, right->latch);
2065 bt_unpinlatch (right->latch);
2069 // install new key and value onto page
2070 // page must already be checked for
2073 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2075 uint idx, librarian;
2080 // if found slot > desired slot and previous slot
2081 // is a librarian slot, use it
2084 if( slotptr(set->page, slot-1)->type == Librarian )
2087 // copy value onto page
2089 set->page->min -= vallen + sizeof(BtVal);
2090 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2091 memcpy (val->value, value, vallen);
2094 // copy key onto page
2096 set->page->min -= keylen + sizeof(BtKey);
2097 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2098 memcpy (ptr->key, key, keylen);
2101 // find first empty slot
2103 for( idx = slot; idx < set->page->cnt; idx++ )
2104 if( slotptr(set->page, idx)->dead )
2107 // now insert key into array before slot
2109 if( idx == set->page->cnt )
2110 idx += 2, set->page->cnt += 2, librarian = 2;
2114 set->latch->dirty = 1;
2117 while( idx > slot + librarian - 1 )
2118 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2120 // add librarian slot
2122 if( librarian > 1 ) {
2123 node = slotptr(set->page, slot++);
2124 node->off = set->page->min;
2125 node->type = Librarian;
2131 node = slotptr(set->page, slot);
2132 node->off = set->page->min;
2136 bt_unlockpage (BtLockWrite, set->latch);
2137 bt_unpinlatch (set->latch);
2141 // Insert new key into the btree at given level.
2142 // either add a new key or update/add an existing one
2144 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, int unique)
2146 unsigned char newkey[BT_keyarray];
2147 uint slot, idx, len;
2154 // set up the key we're working on
2156 ins = (BtKey*)newkey;
2157 memcpy (ins->key, key, keylen);
2160 // is this a non-unique index value?
2166 sequence = bt_newdup (bt);
2167 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2171 while( 1 ) { // find the page and slot for the current key
2172 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2173 ptr = keyptr(set->page, slot);
2176 bt->err = BTERR_ovflw;
2180 // if librarian slot == found slot, advance to real slot
2182 if( slotptr(set->page, slot)->type == Librarian )
2183 if( !keycmp (ptr, key, keylen) )
2184 ptr = keyptr(set->page, ++slot);
2188 if( slotptr(set->page, slot)->type == Duplicate )
2191 // if inserting a duplicate key or unique key
2192 // check for adequate space on the page
2193 // and insert the new key before slot.
2195 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2196 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2197 if( bt_splitpage (bt, set) )
2202 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2205 // if key already exists, update value and return
2207 val = valptr(set->page, slot);
2209 if( val->len >= vallen ) {
2210 if( slotptr(set->page, slot)->dead )
2212 set->page->garbage += val->len - vallen;
2213 set->latch->dirty = 1;
2214 slotptr(set->page, slot)->dead = 0;
2216 memcpy (val->value, value, vallen);
2217 bt_unlockpage(BtLockWrite, set->latch);
2218 bt_unpinlatch (set->latch);
2222 // new update value doesn't fit in existing value area
2224 if( !slotptr(set->page, slot)->dead )
2225 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2227 slotptr(set->page, slot)->dead = 0;
2231 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2232 if( bt_splitpage (bt, set) )
2237 set->page->min -= vallen + sizeof(BtVal);
2238 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2239 memcpy (val->value, value, vallen);
2242 set->latch->dirty = 1;
2243 set->page->min -= keylen + sizeof(BtKey);
2244 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2245 memcpy (ptr->key, key, keylen);
2248 slotptr(set->page, slot)->off = set->page->min;
2249 bt_unlockpage(BtLockWrite, set->latch);
2250 bt_unpinlatch (set->latch);
2256 // compute hash of string
2258 uint bt_hashkey (unsigned char *key, unsigned int len)
2262 while( len >= sizeof(uint) )
2263 hash *= 11, hash += *(uint *)key, len -= sizeof(uint), key += sizeof(uint);
2266 hash *= 11, hash += *key++ * len--;
2271 // add a new lock table entry
2273 uint bt_newlock (BtDb *bt, BtKey *key, uint hashidx)
2275 BtLockSet *lock = bt->mgr->locktable;
2278 // obtain lock manager global lock
2280 bt_spinwritelock (bt->mgr->locklatch);
2282 // return NULL if table is full
2284 if( !(slot = bt->mgr->lockfree) ) {
2285 bt_spinreleasewrite (bt->mgr->locklatch);
2289 // maintain free chain
2291 bt->mgr->lockfree = lock[slot].next;
2292 bt_spinreleasewrite (bt->mgr->locklatch);
2294 if( prev = bt->mgr->hashlock[hashidx].slot )
2295 lock[prev].prev = slot;
2297 bt->mgr->hashlock[hashidx].slot = slot;
2298 lock[slot].hashidx = hashidx;
2299 lock[slot].next = prev;
2300 lock[slot].prev = 0;
2302 // save the key being locked
2304 memcpy (lock[slot].key, key, key->len + sizeof(BtKey));
2308 // add key to the lock table
2309 // block until available.
2311 uint bt_setlock(BtDb *bt, BtKey *key)
2313 uint hashidx = bt_hashkey(key->key, key->len) % bt->mgr->lockhash;
2314 BtLockSet *lock = NULL;
2318 // find existing lock entry
2319 // or recover from full table
2321 while( lock == NULL ) {
2322 // obtain lock on hash slot
2324 bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2326 if( slot = bt->mgr->hashlock[hashidx].slot )
2328 lock = bt->mgr->locktable + slot;
2329 key2 = (BtKey *)lock->key;
2331 if( !keycmp (key, key2->key, key2->len) )
2333 } while( slot = lock->next );
2338 if( slot = bt_newlock (bt, key, hashidx) )
2341 bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2349 lock = bt->mgr->locktable + slot;
2352 bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2353 WriteLock (lock->readwr);
2357 void bt_lockclr (BtDb *bt, uint slot)
2359 BtLockSet *lock = bt->mgr->locktable + slot;
2360 uint hashidx = lock->hashidx;
2363 bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2364 WriteRelease (lock->readwr);
2366 // if entry is no longer in use,
2367 // return it to the free chain.
2369 if( !--lock->pin ) {
2370 if( next = lock->next )
2371 bt->mgr->locktable[next].prev = lock->prev;
2373 if( prev = lock->prev )
2374 bt->mgr->locktable[prev].next = lock->next;
2376 bt->mgr->hashlock[lock->hashidx].slot = next;
2378 bt_spinwritelock (bt->mgr->locklatch);
2379 lock->next = bt->mgr->lockfree;
2380 bt->mgr->lockfree = slot;
2381 bt_spinreleasewrite (bt->mgr->locklatch);
2384 bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2387 // atomic insert of a batch of keys.
2388 // return -1 if BTERR is set
2389 // otherwise return slot number
2390 // causing the key constraint violation
2391 // or zero on successful completion.
2393 int bt_atomicinsert (BtDb *bt, BtPage source)
2395 uint locks[MAX_atomic];
2403 // first sort the list of keys into order to
2404 // prevent deadlocks between threads.
2406 for( slot = 1; slot++ < source->cnt; ) {
2407 *temp = *slotptr(source,slot);
2408 key = keyptr (source,slot);
2409 for( idx = slot; --idx; ) {
2410 key2 = keyptr (source,idx);
2411 if( keycmp (key, key2->key, key2->len) < 0 ) {
2412 *slotptr(source,idx+1) = *slotptr(source,idx);
2413 *slotptr(source,idx) = *temp;
2419 // take each unique-type key and add it to the lock table
2421 for( slot = 0; slot++ < source->cnt; )
2422 if( slotptr(source, slot)->type == Unique )
2423 locks[slot] = bt_setlock (bt, keyptr(source,slot));
2425 // Lookup each unique key and determine constraint violations
2427 for( slot = 0; slot++ < source->cnt; )
2428 if( slotptr(source, slot)->type == Unique ) {
2429 key = keyptr(source, slot);
2430 if( bt_findkey (bt, key->key, key->len, NULL, 0) < 0 )
2436 // add each key to the btree
2439 for( slot = 0; slot++ < source->cnt; ) {
2440 key = keyptr(source,slot);
2441 val = valptr(source,slot);
2442 type = slotptr(source,slot)->type;
2443 if( bt_insertkey (bt, key->key, key->len, 0, val->value, val->len, type == Unique) )
2447 // remove each unique-type key from the lock table
2449 for( slot = 0; slot++ < source->cnt; )
2450 if( slotptr(source, slot)->type == Unique )
2451 bt_lockclr (bt, locks[slot]);
2456 // set cursor to highest slot on highest page
2458 uint bt_lastkey (BtDb *bt)
2460 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2464 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2465 set->page = bt_mappage (bt, set->latch);
2469 bt_lockpage(BtLockRead, set->latch);
2471 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2472 slot = set->page->cnt;
2474 bt_unlockpage(BtLockRead, set->latch);
2475 bt_unpinlatch (set->latch);
2479 // return previous slot on cursor page
2481 uint bt_prevkey (BtDb *bt, uint slot)
2489 if( left = bt_getid(bt->cursor->left) )
2490 bt->cursor_page = left;
2494 if( set->latch = bt_pinlatch (bt, left, 1) )
2495 set->page = bt_mappage (bt, set->latch);
2499 bt_lockpage(BtLockRead, set->latch);
2500 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2501 bt_unlockpage(BtLockRead, set->latch);
2502 bt_unpinlatch (set->latch);
2503 return bt->cursor->cnt;
2506 // return next slot on cursor page
2507 // or slide cursor right into next page
2509 uint bt_nextkey (BtDb *bt, uint slot)
2515 right = bt_getid(bt->cursor->right);
2517 while( slot++ < bt->cursor->cnt )
2518 if( slotptr(bt->cursor,slot)->dead )
2520 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2528 bt->cursor_page = right;
2530 if( set->latch = bt_pinlatch (bt, right, 1) )
2531 set->page = bt_mappage (bt, set->latch);
2535 bt_lockpage(BtLockRead, set->latch);
2537 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2539 bt_unlockpage(BtLockRead, set->latch);
2540 bt_unpinlatch (set->latch);
2548 // cache page of keys into cursor and return starting slot for given key
2550 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2555 // cache page for retrieval
2557 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2558 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2562 bt->cursor_page = set->page_no;
2564 bt_unlockpage(BtLockRead, set->latch);
2565 bt_unpinlatch (set->latch);
2569 BtKey *bt_key(BtDb *bt, uint slot)
2571 return keyptr(bt->cursor, slot);
2574 BtVal *bt_val(BtDb *bt, uint slot)
2576 return valptr(bt->cursor,slot);
2582 double getCpuTime(int type)
2585 FILETIME xittime[1];
2586 FILETIME systime[1];
2587 FILETIME usrtime[1];
2588 SYSTEMTIME timeconv[1];
2591 memset (timeconv, 0, sizeof(SYSTEMTIME));
2595 GetSystemTimeAsFileTime (xittime);
2596 FileTimeToSystemTime (xittime, timeconv);
2597 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2600 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2601 FileTimeToSystemTime (usrtime, timeconv);
2604 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2605 FileTimeToSystemTime (systime, timeconv);
2609 ans += (double)timeconv->wHour * 3600;
2610 ans += (double)timeconv->wMinute * 60;
2611 ans += (double)timeconv->wSecond;
2612 ans += (double)timeconv->wMilliseconds / 1000;
2617 #include <sys/resource.h>
2619 double getCpuTime(int type)
2621 struct rusage used[1];
2622 struct timeval tv[1];
2626 gettimeofday(tv, NULL);
2627 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2630 getrusage(RUSAGE_SELF, used);
2631 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2634 getrusage(RUSAGE_SELF, used);
2635 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2642 void bt_poolaudit (BtMgr *mgr)
2647 while( slot++ < mgr->latchdeployed ) {
2648 latch = mgr->latchsets + slot;
2650 if( *latch->readwr->rin & MASK )
2651 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2652 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2654 if( *latch->access->rin & MASK )
2655 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2656 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2658 if( *latch->parent->rin & MASK )
2659 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2660 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2662 if( latch->pin & ~CLOCK_bit ) {
2663 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2669 uint bt_latchaudit (BtDb *bt)
2671 ushort idx, hashidx;
2677 if( *(ushort *)(bt->mgr->alloclatch) )
2678 fprintf(stderr, "Alloc page locked\n");
2679 *(uint *)(bt->mgr->alloclatch) = 0;
2681 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2682 latch = bt->mgr->latchsets + idx;
2683 if( *latch->readwr->rin & MASK )
2684 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2685 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2687 if( *latch->access->rin & MASK )
2688 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2689 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2691 if( *latch->parent->rin & MASK )
2692 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2693 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2696 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2701 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2702 if( *(uint *)(bt->mgr->hashtable[hashidx].latch) )
2703 fprintf(stderr, "hash entry %d locked\n", hashidx);
2705 *(uint *)(bt->mgr->hashtable[hashidx].latch) = 0;
2707 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2708 latch = bt->mgr->latchsets + idx;
2710 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2711 } while( idx = latch->next );
2714 page_no = LEAF_page;
2716 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2717 uid off = page_no << bt->mgr->page_bits;
2719 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2723 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2725 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2726 return bt->err = BTERR_map;
2728 if( *amt < bt->mgr->page_size )
2729 return bt->err = BTERR_map;
2731 if( !bt->frame->free && !bt->frame->lvl )
2732 cnt += bt->frame->act;
2736 cnt--; // remove stopper key
2737 fprintf(stderr, " Total keys read %d\n", cnt);
2750 // standalone program to index file of keys
2751 // then list them onto std-out
2754 void *index_file (void *arg)
2756 uint __stdcall index_file (void *arg)
2759 int line = 0, found = 0, cnt = 0, unique;
2760 uid next, page_no = LEAF_page; // start on first page of leaves
2761 BtPage page = calloc (4096, 1);
2762 unsigned char key[BT_maxkey];
2763 ThreadArg *args = arg;
2764 int ch, len = 0, slot;
2772 bt = bt_open (args->mgr);
2774 unique = (args->type[1] | 0x20) != 'd';
2775 atomic = (args->type[1] | 0x20) == 'a';
2777 switch(args->type[0] | 0x20)
2780 fprintf(stderr, "started latch mgr audit\n");
2781 cnt = bt_latchaudit (bt);
2782 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2786 fprintf(stderr, "started pennysort for %s\n", args->infile);
2787 if( in = fopen (args->infile, "rb") )
2788 while( ch = getc(in), ch != EOF )
2794 memset (page, 0, 4096);
2795 slotptr(page, 1)->off = 2048;
2796 slotptr(page, 1)->type = Unique;
2797 ptr = keyptr(page,1);
2799 memcpy(ptr->key, key, 10);
2800 val = valptr(page,1);
2801 val->len = len - 10;
2802 memcpy (val->value, key + 10, len - 10);
2804 if( slot = bt_atomicinsert (bt, page) )
2805 fprintf(stderr, "Error %d Line: %d\n", slot, line), exit(0);
2807 else if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2808 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2811 else if( len < BT_maxkey )
2813 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2817 fprintf(stderr, "started indexing for %s\n", args->infile);
2818 if( in = fopen (args->infile, "rb") )
2819 while( ch = getc(in), ch != EOF )
2823 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2824 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2827 else if( len < BT_maxkey )
2829 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2833 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2834 if( in = fopen (args->infile, "rb") )
2835 while( ch = getc(in), ch != EOF )
2839 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2840 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2841 ptr = (BtKey*)(bt->key);
2844 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2845 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2848 else if( len < BT_maxkey )
2850 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2854 fprintf(stderr, "started finding keys for %s\n", args->infile);
2855 if( in = fopen (args->infile, "rb") )
2856 while( ch = getc(in), ch != EOF )
2860 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2863 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2866 else if( len < BT_maxkey )
2868 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2872 fprintf(stderr, "started scanning\n");
2874 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2875 set->page = bt_mappage (bt, set->latch);
2877 fprintf(stderr, "unable to obtain latch"), exit(1);
2878 bt_lockpage (BtLockRead, set->latch);
2879 next = bt_getid (set->page->right);
2881 for( slot = 0; slot++ < set->page->cnt; )
2882 if( next || slot < set->page->cnt )
2883 if( !slotptr(set->page, slot)->dead ) {
2884 ptr = keyptr(set->page, slot);
2887 if( slotptr(set->page, slot)->type == Duplicate )
2890 fwrite (ptr->key, len, 1, stdout);
2891 val = valptr(set->page, slot);
2892 fwrite (val->value, val->len, 1, stdout);
2893 fputc ('\n', stdout);
2897 bt_unlockpage (BtLockRead, set->latch);
2898 bt_unpinlatch (set->latch);
2899 } while( page_no = next );
2901 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2905 fprintf(stderr, "started reverse scan\n");
2906 if( slot = bt_lastkey (bt) )
2907 while( slot = bt_prevkey (bt, slot) ) {
2908 if( slotptr(bt->cursor, slot)->dead )
2911 ptr = keyptr(bt->cursor, slot);
2914 if( slotptr(bt->cursor, slot)->type == Duplicate )
2917 fwrite (ptr->key, len, 1, stdout);
2918 val = valptr(bt->cursor, slot);
2919 fwrite (val->value, val->len, 1, stdout);
2920 fputc ('\n', stdout);
2924 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2929 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2931 fprintf(stderr, "started counting\n");
2932 page_no = LEAF_page;
2934 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2935 if( bt_readpage (bt->mgr, bt->frame, page_no) )
2938 if( !bt->frame->free && !bt->frame->lvl )
2939 cnt += bt->frame->act;
2945 cnt--; // remove stopper key
2946 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2958 typedef struct timeval timer;
2960 int main (int argc, char **argv)
2962 int idx, cnt, len, slot, err;
2963 int segsize, bits = 16;
2980 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find/Atomic [page_bits buffer_pool_size lock_mgr_size src_file1 src_file2 ... ]\n", argv[0]);
2981 fprintf (stderr, " where page_bits is the page size in bits\n");
2982 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
2983 fprintf (stderr, " lock_mgr_size is the maximum number of outstanding key locks\n");
2984 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
2988 start = getCpuTime(0);
2991 bits = atoi(argv[3]);
2994 poolsize = atoi(argv[4]);
2997 fprintf (stderr, "Warning: no mapped_pool\n");
3000 locksize = atoi(argv[5]);
3004 threads = malloc (cnt * sizeof(pthread_t));
3006 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3008 args = malloc (cnt * sizeof(ThreadArg));
3010 mgr = bt_mgr ((argv[1]), bits, poolsize, locksize);
3013 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3019 for( idx = 0; idx < cnt; idx++ ) {
3020 args[idx].infile = argv[idx + 6];
3021 args[idx].type = argv[2];
3022 args[idx].mgr = mgr;
3023 args[idx].idx = idx;
3025 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3026 fprintf(stderr, "Error creating thread %d\n", err);
3028 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3032 // wait for termination
3035 for( idx = 0; idx < cnt; idx++ )
3036 pthread_join (threads[idx], NULL);
3038 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3040 for( idx = 0; idx < cnt; idx++ )
3041 CloseHandle(threads[idx]);
3047 elapsed = getCpuTime(0) - start;
3048 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3049 elapsed = getCpuTime(1);
3050 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3051 elapsed = getCpuTime(2);
3052 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);