1 // btree version threadskv7 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and atomic non-consistent key insert
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
87 // maximum number of keys to insert atomically in one call
89 #define MAX_atomic 256
91 #define BT_maxkey 255 // maximum number of bytes in a key
92 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
95 There are five lock types for each node in three independent sets:
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
111 // definition for phase-fair reader/writer lock implementation
125 // definition for spin latch implementation
127 // exclusive is set for write access
128 // share is count of read accessors
129 // grant write lock when share == 0
131 volatile typedef struct {
142 // The key structure occupies space at the upper end of
143 // each page. It's a length byte followed by the key
147 unsigned char len; // this can be changed to a ushort or uint
148 unsigned char key[0];
151 // the value structure also occupies space at the upper
152 // end of the page. Each key is immediately followed by a value.
155 unsigned char len; // this can be changed to a ushort or uint
156 unsigned char value[0];
159 // hash table entries
162 uint slot; // Latch table entry at head of chain
163 BtSpinLatch latch[1];
166 // latch manager table structure
169 uid page_no; // latch set page number
170 RWLock readwr[1]; // read/write page lock
171 RWLock access[1]; // Access Intent/Page delete
172 RWLock parent[1]; // Posting of fence key in parent
173 uint slot; // entry slot in latch table
174 uint next; // next entry in hash table chain
175 uint prev; // prev entry in hash table chain
176 volatile ushort pin; // number of outstanding threads
177 ushort dirty:1; // page in cache is dirty
180 // lock manager table structure
183 RWLock readwr[1]; // read/write key lock
186 uint pin; // count of readers waiting
187 uint hashidx; // hash idx for entry
188 unsigned char key[BT_keyarray];
191 // Define the length of the page record numbers
195 // Page key slot definition.
197 // Keys are marked dead, but remain on the page until
198 // it cleanup is called. The fence key (highest key) for
199 // a leaf page is always present, even after cleanup.
203 // In addition to the Unique keys that occupy slots
204 // there are Librarian and Duplicate key
205 // slots occupying the key slot array.
207 // The Librarian slots are dead keys that
208 // serve as filler, available to add new Unique
209 // or Dup slots that are inserted into the B-tree.
211 // The Duplicate slots have had their key bytes extended
212 // by 6 bytes to contain a binary duplicate key uniqueifier.
221 uint off:BT_maxbits; // page offset for key start
222 uint type:3; // type of slot
223 uint dead:1; // set for deleted slot
226 // The first part of an index page.
227 // It is immediately followed
228 // by the BtSlot array of keys.
230 // note that this structure size
231 // must be a multiple of 8 bytes
232 // in order to place dups correctly.
234 typedef struct BtPage_ {
235 uint cnt; // count of keys in page
236 uint act; // count of active keys
237 uint min; // next key offset
238 uint garbage; // page garbage in bytes
239 unsigned char bits:7; // page size in bits
240 unsigned char free:1; // page is on free chain
241 unsigned char lvl:7; // level of page
242 unsigned char kill:1; // page is being deleted
243 unsigned char left[BtId]; // page number to left
244 unsigned char filler[2]; // padding to multiple of 8
245 unsigned char right[BtId]; // page number to right
248 // The loadpage interface object
251 uid page_no; // current page number
252 BtPage page; // current page pointer
253 BtLatchSet *latch; // current page latch set
256 // structure for latch manager on ALLOC_page
259 struct BtPage_ alloc[1]; // next page_no in right ptr
260 unsigned long long dups[1]; // global duplicate key uniqueifier
261 unsigned char chain[BtId]; // head of free page_nos chain
264 // The object structure for Btree access
267 uint page_size; // page size
268 uint page_bits; // page size in bits
274 BtPageZero *pagezero; // mapped allocation page
275 BtSpinLatch alloclatch[1]; // allocation area lite latch
276 uint latchdeployed; // highest number of pool entries deployed
277 uint nlatchpage; // number of latch & lock & pool pages
278 uint latchtotal; // number of page latch entries
279 uint latchhash; // number of latch hash table slots
280 uint latchvictim; // next latch entry to examine
281 BtHashEntry *hashtable; // the anonymous mapping buffer pool
282 BtLatchSet *latchsets; // mapped latch set from latch pages
283 unsigned char *pagepool; // mapped to the buffer pool pages
284 uint lockhash; // number of lock hash table slots
285 uint lockfree; // next available lock table entry
286 BtSpinLatch locklatch[1]; // lock manager free chain latch
287 BtHashEntry *hashlock; // the lock manager hash table
288 BtLockSet *locktable; // the lock manager key table
290 HANDLE halloc; // allocation handle
291 HANDLE hpool; // buffer pool handle
296 BtMgr *mgr; // buffer manager for thread
297 BtPage cursor; // cached frame for start/next (never mapped)
298 BtPage frame; // spare frame for the page split (never mapped)
299 uid cursor_page; // current cursor page number
300 unsigned char *mem; // frame, cursor, page memory buffer
301 unsigned char key[BT_keyarray]; // last found complete key
302 int found; // last delete or insert was found
303 int err; // last error
304 int reads, writes; // number of reads and writes from the btree
318 #define CLOCK_bit 0x8000
321 extern void bt_close (BtDb *bt);
322 extern BtDb *bt_open (BtMgr *mgr);
323 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, int unique);
324 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
325 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
326 extern BtKey *bt_foundkey (BtDb *bt);
327 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
328 extern uint bt_nextkey (BtDb *bt, uint slot);
331 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint locksize);
332 void bt_mgrclose (BtMgr *mgr);
334 // Helper functions to return slot values
335 // from the cursor page.
337 extern BtKey *bt_key (BtDb *bt, uint slot);
338 extern BtVal *bt_val (BtDb *bt, uint slot);
340 // The page is allocated from low and hi ends.
341 // The key slots are allocated from the bottom,
342 // while the text and value of the key
343 // are allocated from the top. When the two
344 // areas meet, the page is split into two.
346 // A key consists of a length byte, two bytes of
347 // index number (0 - 65535), and up to 253 bytes
350 // Associated with each key is a value byte string
351 // containing any value desired.
353 // The b-tree root is always located at page 1.
354 // The first leaf page of level zero is always
355 // located on page 2.
357 // The b-tree pages are linked with next
358 // pointers to facilitate enumerators,
359 // and provide for concurrency.
361 // When to root page fills, it is split in two and
362 // the tree height is raised by a new root at page
363 // one with two keys.
365 // Deleted keys are marked with a dead bit until
366 // page cleanup. The fence key for a leaf node is
369 // To achieve maximum concurrency one page is locked at a time
370 // as the tree is traversed to find leaf key in question. The right
371 // page numbers are used in cases where the page is being split,
374 // Page 0 is dedicated to lock for new page extensions,
375 // and chains empty pages together for reuse. It also
376 // contains the latch manager hash table.
378 // The ParentModification lock on a node is obtained to serialize posting
379 // or changing the fence key for a node.
381 // Empty pages are chained together through the ALLOC page and reused.
383 // Access macros to address slot and key values from the page
384 // Page slots use 1 based indexing.
386 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
387 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
388 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
390 void bt_putid(unsigned char *dest, uid id)
395 dest[i] = (unsigned char)id, id >>= 8;
398 uid bt_getid(unsigned char *src)
403 for( i = 0; i < BtId; i++ )
404 id <<= 8, id |= *src++;
409 uid bt_newdup (BtDb *bt)
412 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
414 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
418 // Phase-Fair reader/writer lock implementation
420 void WriteLock (RWLock *lock)
425 tix = __sync_fetch_and_add (lock->ticket, 1);
427 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
429 // wait for our ticket to come up
431 while( tix != lock->serving[0] )
438 w = PRES | (tix & PHID);
440 r = __sync_fetch_and_add (lock->rin, w);
442 r = _InterlockedExchangeAdd16 (lock->rin, w);
444 while( r != *lock->rout )
452 void WriteRelease (RWLock *lock)
455 __sync_fetch_and_and (lock->rin, ~MASK);
457 _InterlockedAnd16 (lock->rin, ~MASK);
462 void ReadLock (RWLock *lock)
466 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
468 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
471 while( w == (*lock->rin & MASK) )
479 void ReadRelease (RWLock *lock)
482 __sync_fetch_and_add (lock->rout, RINC);
484 _InterlockedExchangeAdd16 (lock->rout, RINC);
488 // Spin Latch Manager
490 // wait until write lock mode is clear
491 // and add 1 to the share count
493 void bt_spinreadlock(BtSpinLatch *latch)
499 prev = __sync_fetch_and_add ((uint *)latch, SHARE);
501 prev = _InterlockedExchangeAdd((uint *)latch, SHARE);
503 // see if exclusive request is granted or pending
508 prev = __sync_fetch_and_add ((uint *)latch, -SHARE);
510 prev = _InterlockedExchangeAdd((uint *)latch, -SHARE);
513 } while( sched_yield(), 1 );
515 } while( SwitchToThread(), 1 );
519 // wait for other read and write latches to relinquish
521 void bt_spinwritelock(BtSpinLatch *latch)
527 prev = __sync_fetch_and_or((uint *)latch, PEND | XCL);
529 prev = _InterlockedOr((uint *)latch, PEND | XCL);
532 if( !(prev & ~BOTH) )
536 __sync_fetch_and_and ((uint *)latch, ~XCL);
538 _InterlockedAnd((uint *)latch, ~XCL);
541 } while( sched_yield(), 1 );
543 } while( SwitchToThread(), 1 );
547 // try to obtain write lock
549 // return 1 if obtained,
552 int bt_spinwritetry(BtSpinLatch *latch)
557 prev = __sync_fetch_and_or((uint *)latch, XCL);
559 prev = _InterlockedOr((uint *)latch, XCL);
561 // take write access if all bits are clear
564 if( !(prev & ~BOTH) )
568 __sync_fetch_and_and ((uint *)latch, ~XCL);
570 _InterlockedAnd((uint *)latch, ~XCL);
577 void bt_spinreleasewrite(BtSpinLatch *latch)
580 __sync_fetch_and_and((uint *)latch, ~BOTH);
582 _InterlockedAnd((uint *)latch, ~BOTH);
586 // decrement reader count
588 void bt_spinreleaseread(BtSpinLatch *latch)
591 __sync_fetch_and_add((uint *)latch, -SHARE);
593 _InterlockedExchangeAdd((uint *)latch, -SHARE);
597 // read page from permanent location in Btree file
599 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
601 off64_t off = page_no << mgr->page_bits;
604 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
605 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
612 memset (ovl, 0, sizeof(OVERLAPPED));
614 ovl->OffsetHigh = off >> 32;
616 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
617 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
620 if( *amt < mgr->page_size ) {
621 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
628 // write page to permanent location in Btree file
629 // clear the dirty bit
631 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
633 off64_t off = page_no << mgr->page_bits;
636 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
642 memset (ovl, 0, sizeof(OVERLAPPED));
644 ovl->OffsetHigh = off >> 32;
646 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
649 if( *amt < mgr->page_size )
655 // link latch table entry into head of latch hash table
657 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
659 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
660 BtLatchSet *latch = bt->mgr->latchsets + slot;
662 if( latch->next = bt->mgr->hashtable[hashidx].slot )
663 bt->mgr->latchsets[latch->next].prev = slot;
665 bt->mgr->hashtable[hashidx].slot = slot;
666 latch->page_no = page_no;
672 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
680 // set CLOCK bit in latch
681 // decrement pin count
683 void bt_unpinlatch (BtLatchSet *latch)
686 if( ~latch->pin & CLOCK_bit )
687 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
688 __sync_fetch_and_add(&latch->pin, -1);
690 if( ~latch->pin & CLOCK_bit )
691 _InterlockedOr16 (&latch->pin, CLOCK_bit);
692 _InterlockedDecrement16 (&latch->pin);
696 // return the btree cached page address
698 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
700 BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
705 // find existing latchset or inspire new one
706 // return with latchset pinned
708 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
710 uint hashidx = page_no % bt->mgr->latchhash;
718 // try to find our entry
720 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
722 if( slot = bt->mgr->hashtable[hashidx].slot ) do
724 latch = bt->mgr->latchsets + slot;
725 if( page_no == latch->page_no )
727 } while( slot = latch->next );
733 latch = bt->mgr->latchsets + slot;
735 __sync_fetch_and_add(&latch->pin, 1);
737 _InterlockedIncrement16 (&latch->pin);
739 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
743 // see if there are any unused pool entries
745 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
747 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
750 if( slot < bt->mgr->latchtotal ) {
751 latch = bt->mgr->latchsets + slot;
752 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
754 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
759 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
761 _InterlockedDecrement (&bt->mgr->latchdeployed);
763 // find and reuse previous entry on victim
767 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
769 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
771 // try to get write lock on hash chain
772 // skip entry if not obtained
773 // or has outstanding pins
775 slot %= bt->mgr->latchtotal;
780 latch = bt->mgr->latchsets + slot;
781 idx = latch->page_no % bt->mgr->latchhash;
783 // see we are on same chain as hashidx
788 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
791 // skip this slot if it is pinned
792 // or the CLOCK bit is set
795 if( latch->pin & CLOCK_bit ) {
797 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
799 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
802 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
806 // update permanent page area in btree from buffer pool
808 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
811 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
814 latch->dirty = 0, bt->writes++;
816 // unlink our available slot from its hash chain
819 bt->mgr->latchsets[latch->prev].next = latch->next;
821 bt->mgr->hashtable[idx].slot = latch->next;
824 bt->mgr->latchsets[latch->next].prev = latch->prev;
826 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
828 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
831 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
836 void bt_mgrclose (BtMgr *mgr)
843 // flush dirty pool pages to the btree
845 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
846 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
847 latch = mgr->latchsets + slot;
850 bt_writepage(mgr, page, latch->page_no);
851 latch->dirty = 0, num++;
853 // madvise (page, mgr->page_size, MADV_DONTNEED);
856 fprintf(stderr, "%d buffer pool pages flushed\n", num);
859 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
860 munmap (mgr->pagezero, mgr->page_size);
862 FlushViewOfFile(mgr->pagezero, 0);
863 UnmapViewOfFile(mgr->pagezero);
864 UnmapViewOfFile(mgr->hashtable);
865 CloseHandle(mgr->halloc);
866 CloseHandle(mgr->hpool);
872 FlushFileBuffers(mgr->idx);
873 CloseHandle(mgr->idx);
878 // close and release memory
880 void bt_close (BtDb *bt)
887 VirtualFree (bt->mem, 0, MEM_RELEASE);
892 // open/create new btree buffer manager
894 // call with file_name, BT_openmode, bits in page size (e.g. 16),
895 // size of page pool (e.g. 262144) and number of lock table entries.
897 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint lockmax)
899 uint lvl, attr, last, slot, idx;
900 unsigned char value[BtId];
901 int flag, initit = 0;
902 BtPageZero *pagezero;
909 // determine sanity of page size and buffer pool
911 if( bits > BT_maxbits )
913 else if( bits < BT_minbits )
917 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
922 mgr = calloc (1, sizeof(BtMgr));
924 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
926 if( mgr->idx == -1 ) {
927 fprintf (stderr, "Unable to open btree file\n");
928 return free(mgr), NULL;
931 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
932 attr = FILE_ATTRIBUTE_NORMAL;
933 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
935 if( mgr->idx == INVALID_HANDLE_VALUE )
936 return GlobalFree(mgr), NULL;
940 pagezero = valloc (BT_maxpage);
943 // read minimum page size to get root info
944 // to support raw disk partition files
945 // check if bits == 0 on the disk.
947 if( size = lseek (mgr->idx, 0L, 2) )
948 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
949 if( pagezero->alloc->bits )
950 bits = pagezero->alloc->bits;
954 return free(mgr), free(pagezero), NULL;
958 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
959 size = GetFileSize(mgr->idx, amt);
962 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
963 return bt_mgrclose (mgr), NULL;
964 bits = pagezero->alloc->bits;
969 mgr->page_size = 1 << bits;
970 mgr->page_bits = bits;
972 // calculate number of latch hash table entries
974 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
975 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
977 // add on the number of pages in buffer pool
978 // along with the corresponding latch table
980 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
981 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
982 mgr->latchtotal = nodemax;
984 // add on the sizeof the lock manager hash table and the lock table
986 mgr->nlatchpage += (lockmax / 16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
988 mgr->nlatchpage += (lockmax * sizeof(BtLockSet) + mgr->page_size - 1) / mgr->page_size;
993 // initialize an empty b-tree with latch page, root page, page of leaves
994 // and page(s) of latches and page pool cache
996 memset (pagezero, 0, 1 << bits);
997 pagezero->alloc->bits = mgr->page_bits;
998 bt_putid(pagezero->alloc->right, MIN_lvl+1);
1000 // initialize left-most LEAF page in
1003 bt_putid (pagezero->alloc->left, LEAF_page);
1005 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1006 fprintf (stderr, "Unable to create btree page zero\n");
1007 return bt_mgrclose (mgr), NULL;
1010 memset (pagezero, 0, 1 << bits);
1011 pagezero->alloc->bits = mgr->page_bits;
1013 for( lvl=MIN_lvl; lvl--; ) {
1014 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1015 key = keyptr(pagezero->alloc, 1);
1016 key->len = 2; // create stopper key
1020 bt_putid(value, MIN_lvl - lvl + 1);
1021 val = valptr(pagezero->alloc, 1);
1022 val->len = lvl ? BtId : 0;
1023 memcpy (val->value, value, val->len);
1025 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1026 pagezero->alloc->lvl = lvl;
1027 pagezero->alloc->cnt = 1;
1028 pagezero->alloc->act = 1;
1030 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1031 fprintf (stderr, "Unable to create btree page zero\n");
1032 return bt_mgrclose (mgr), NULL;
1040 VirtualFree (pagezero, 0, MEM_RELEASE);
1043 // mlock the pagezero page
1045 flag = PROT_READ | PROT_WRITE;
1046 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1047 if( mgr->pagezero == MAP_FAILED ) {
1048 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1049 return bt_mgrclose (mgr), NULL;
1051 mlock (mgr->pagezero, mgr->page_size);
1053 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1054 if( mgr->hashtable == MAP_FAILED ) {
1055 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1056 return bt_mgrclose (mgr), NULL;
1059 flag = PAGE_READWRITE;
1060 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1061 if( !mgr->halloc ) {
1062 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1063 return bt_mgrclose (mgr), NULL;
1066 flag = FILE_MAP_WRITE;
1067 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1068 if( !mgr->pagezero ) {
1069 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1070 return bt_mgrclose (mgr), NULL;
1073 flag = PAGE_READWRITE;
1074 size = (uid)mgr->nlatchpage << mgr->page_bits;
1075 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1077 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1078 return bt_mgrclose (mgr), NULL;
1081 flag = FILE_MAP_WRITE;
1082 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1083 if( !mgr->hashtable ) {
1084 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1085 return bt_mgrclose (mgr), NULL;
1089 size = (mgr->latchhash * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1090 mgr->latchsets = (BtLatchSet *)((unsigned char *)mgr->hashtable + size * mgr->page_size);
1091 size = (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1093 mgr->pagepool = (unsigned char *)mgr->hashtable + (size << mgr->page_bits);
1094 mgr->hashlock = (BtHashEntry *)(mgr->pagepool + ((uid)nodemax << mgr->page_bits));
1095 mgr->locktable = (BtLockSet *)((unsigned char *)mgr->hashtable + ((uid)mgr->nlatchpage << mgr->page_bits) - lockmax * sizeof(BtLockSet));
1097 mgr->lockfree = lockmax - 1;
1098 mgr->lockhash = ((unsigned char *)mgr->locktable - (unsigned char *)mgr->hashlock) / sizeof(BtHashEntry);
1100 for( idx = 1; idx < lockmax; idx++ )
1101 mgr->locktable[idx].next = idx - 1;
1106 // open BTree access method
1107 // based on buffer manager
1109 BtDb *bt_open (BtMgr *mgr)
1111 BtDb *bt = malloc (sizeof(*bt));
1113 memset (bt, 0, sizeof(*bt));
1116 bt->mem = valloc (2 *mgr->page_size);
1118 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1120 bt->frame = (BtPage)bt->mem;
1121 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1125 // compare two keys, returning > 0, = 0, or < 0
1126 // as the comparison value
1128 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1130 uint len1 = key1->len;
1133 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1144 // place write, read, or parent lock on requested page_no.
1146 void bt_lockpage(BtLock mode, BtLatchSet *set)
1150 ReadLock (set->readwr);
1153 WriteLock (set->readwr);
1156 ReadLock (set->access);
1159 WriteLock (set->access);
1162 WriteLock (set->parent);
1167 // remove write, read, or parent lock on requested page
1169 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1173 ReadRelease (set->readwr);
1176 WriteRelease (set->readwr);
1179 ReadRelease (set->access);
1182 WriteRelease (set->access);
1185 WriteRelease (set->parent);
1190 // allocate a new page
1191 // return with page latched.
1193 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1197 // lock allocation page
1199 bt_spinwritelock(bt->mgr->alloclatch);
1201 // use empty chain first
1202 // else allocate empty page
1204 if( set->page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1205 if( set->latch = bt_pinlatch (bt, set->page_no, 1) )
1206 set->page = bt_mappage (bt, set->latch);
1208 return bt->err = BTERR_struct, -1;
1210 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1211 bt_spinreleasewrite(bt->mgr->alloclatch);
1213 memcpy (set->page, contents, bt->mgr->page_size);
1214 set->latch->dirty = 1;
1218 set->page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1219 bt_putid(bt->mgr->pagezero->alloc->right, set->page_no+1);
1221 // unlock allocation latch
1223 bt_spinreleasewrite(bt->mgr->alloclatch);
1225 // don't load cache from btree page
1227 if( set->latch = bt_pinlatch (bt, set->page_no, 0) )
1228 set->page = bt_mappage (bt, set->latch);
1230 return bt->err = BTERR_struct;
1232 memcpy (set->page, contents, bt->mgr->page_size);
1233 set->latch->dirty = 1;
1237 // find slot in page for given key at a given level
1239 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1241 uint diff, higher = set->page->cnt, low = 1, slot;
1244 // make stopper key an infinite fence value
1246 if( bt_getid (set->page->right) )
1251 // low is the lowest candidate.
1252 // loop ends when they meet
1254 // higher is already
1255 // tested as .ge. the passed key.
1257 while( diff = higher - low ) {
1258 slot = low + ( diff >> 1 );
1259 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1262 higher = slot, good++;
1265 // return zero if key is on right link page
1267 return good ? higher : 0;
1270 // find and load page at given level for given key
1271 // leave page rd or wr locked as requested
1273 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1275 uid page_no = ROOT_page, prevpage = 0;
1276 uint drill = 0xff, slot;
1277 BtLatchSet *prevlatch;
1278 uint mode, prevmode;
1280 // start at root of btree and drill down
1283 // determine lock mode of drill level
1284 mode = (drill == lvl) ? lock : BtLockRead;
1286 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1287 set->page_no = page_no;
1291 // obtain access lock using lock chaining with Access mode
1293 if( page_no > ROOT_page )
1294 bt_lockpage(BtLockAccess, set->latch);
1296 set->page = bt_mappage (bt, set->latch);
1298 // release & unpin parent page
1301 bt_unlockpage(prevmode, prevlatch);
1302 bt_unpinlatch (prevlatch);
1306 // obtain read lock using lock chaining
1308 bt_lockpage(mode, set->latch);
1310 if( set->page->free )
1311 return bt->err = BTERR_struct, 0;
1313 if( page_no > ROOT_page )
1314 bt_unlockpage(BtLockAccess, set->latch);
1316 // re-read and re-lock root after determining actual level of root
1318 if( set->page->lvl != drill) {
1319 if( set->page_no != ROOT_page )
1320 return bt->err = BTERR_struct, 0;
1322 drill = set->page->lvl;
1324 if( lock != BtLockRead && drill == lvl ) {
1325 bt_unlockpage(mode, set->latch);
1326 bt_unpinlatch (set->latch);
1331 prevpage = set->page_no;
1332 prevlatch = set->latch;
1335 // find key on page at this level
1336 // and descend to requested level
1338 if( set->page->kill )
1341 if( slot = bt_findslot (set, key, len) ) {
1345 while( slotptr(set->page, slot)->dead )
1346 if( slot++ < set->page->cnt )
1351 page_no = bt_getid(valptr(set->page, slot)->value);
1356 // or slide right into next page
1359 page_no = bt_getid(set->page->right);
1363 // return error on end of right chain
1365 bt->err = BTERR_struct;
1366 return 0; // return error
1369 // return page to free list
1370 // page must be delete & write locked
1372 void bt_freepage (BtDb *bt, BtPageSet *set)
1374 // lock allocation page
1376 bt_spinwritelock (bt->mgr->alloclatch);
1379 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1380 bt_putid(bt->mgr->pagezero->chain, set->page_no);
1381 set->latch->dirty = 1;
1382 set->page->free = 1;
1384 // unlock released page
1386 bt_unlockpage (BtLockDelete, set->latch);
1387 bt_unlockpage (BtLockWrite, set->latch);
1388 bt_unpinlatch (set->latch);
1390 // unlock allocation page
1392 bt_spinreleasewrite (bt->mgr->alloclatch);
1395 // a fence key was deleted from a page
1396 // push new fence value upwards
1398 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1400 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1401 unsigned char value[BtId];
1405 // remove the old fence value
1407 ptr = keyptr(set->page, set->page->cnt);
1408 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1409 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1410 set->latch->dirty = 1;
1412 // cache new fence value
1414 ptr = keyptr(set->page, set->page->cnt);
1415 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1417 bt_lockpage (BtLockParent, set->latch);
1418 bt_unlockpage (BtLockWrite, set->latch);
1420 // insert new (now smaller) fence key
1422 bt_putid (value, set->page_no);
1423 ptr = (BtKey*)leftkey;
1425 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1428 // now delete old fence key
1430 ptr = (BtKey*)rightkey;
1432 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1435 bt_unlockpage (BtLockParent, set->latch);
1436 bt_unpinlatch(set->latch);
1440 // root has a single child
1441 // collapse a level from the tree
1443 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1448 // find the child entry and promote as new root contents
1451 for( idx = 0; idx++ < root->page->cnt; )
1452 if( !slotptr(root->page, idx)->dead )
1455 child->page_no = bt_getid (valptr(root->page, idx)->value);
1457 if( child->latch = bt_pinlatch (bt, child->page_no, 1) )
1458 child->page = bt_mappage (bt, child->latch);
1462 bt_lockpage (BtLockDelete, child->latch);
1463 bt_lockpage (BtLockWrite, child->latch);
1465 memcpy (root->page, child->page, bt->mgr->page_size);
1466 root->latch->dirty = 1;
1468 bt_freepage (bt, child);
1470 } while( root->page->lvl > 1 && root->page->act == 1 );
1472 bt_unlockpage (BtLockWrite, root->latch);
1473 bt_unpinlatch (root->latch);
1477 // maintain reverse scan pointers by
1478 // linking left pointer of far right node
1480 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1482 BtPageSet right2[1];
1484 // keep track of rightmost leaf page
1486 if( !right_page_no ) {
1487 bt_putid (bt->mgr->pagezero->alloc->left, left_page_no);
1491 // link right page to left page
1493 if( right2->latch = bt_pinlatch (bt, right_page_no, 1) )
1494 right2->page = bt_mappage (bt, right2->latch);
1498 bt_lockpage (BtLockWrite, right2->latch);
1500 bt_putid(right2->page->left, left_page_no);
1501 bt_unlockpage (BtLockWrite, right2->latch);
1502 bt_unpinlatch (right2->latch);
1506 // find and delete key on page by marking delete flag bit
1507 // if page becomes empty, delete it from the btree
1509 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1511 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1512 uint slot, idx, found, fence;
1513 BtPageSet set[1], right[1];
1514 unsigned char value[BtId];
1518 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1519 ptr = keyptr(set->page, slot);
1523 // if librarian slot, advance to real slot
1525 if( slotptr(set->page, slot)->type == Librarian )
1526 ptr = keyptr(set->page, ++slot);
1528 fence = slot == set->page->cnt;
1530 // if key is found delete it, otherwise ignore request
1532 if( found = !keycmp (ptr, key, len) )
1533 if( found = slotptr(set->page, slot)->dead == 0 ) {
1534 val = valptr(set->page,slot);
1535 slotptr(set->page, slot)->dead = 1;
1536 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1539 // collapse empty slots beneath the fence
1541 while( idx = set->page->cnt - 1 )
1542 if( slotptr(set->page, idx)->dead ) {
1543 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1544 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1549 // did we delete a fence key in an upper level?
1551 if( found && lvl && set->page->act && fence )
1552 if( bt_fixfence (bt, set, lvl) )
1555 return bt->found = found, 0;
1557 // do we need to collapse root?
1559 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1560 if( bt_collapseroot (bt, set) )
1563 return bt->found = found, 0;
1565 // return if page is not empty
1567 if( set->page->act ) {
1568 set->latch->dirty = 1;
1569 bt_unlockpage(BtLockWrite, set->latch);
1570 bt_unpinlatch (set->latch);
1571 return bt->found = found, 0;
1574 // cache copy of fence key
1575 // to post in parent
1577 ptr = keyptr(set->page, set->page->cnt);
1578 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1580 // obtain lock on right page
1582 right->page_no = bt_getid(set->page->right);
1584 if( right->latch = bt_pinlatch (bt, right->page_no, 1) )
1585 right->page = bt_mappage (bt, right->latch);
1589 bt_lockpage (BtLockWrite, right->latch);
1591 if( right->page->kill )
1592 return bt->err = BTERR_struct;
1594 // transfer left link
1596 memcpy (right->page->left, set->page->left, BtId);
1598 // pull contents of right peer into our empty page
1600 memcpy (set->page, right->page, bt->mgr->page_size);
1601 set->latch->dirty = 1;
1606 if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1609 // cache copy of key to update
1611 ptr = keyptr(right->page, right->page->cnt);
1612 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1614 // mark right page deleted and point it to left page
1615 // until we can post parent updates
1617 bt_putid (right->page->right, set->page_no);
1618 right->latch->dirty = 1;
1619 right->page->kill = 1;
1621 bt_lockpage (BtLockParent, right->latch);
1622 bt_unlockpage (BtLockWrite, right->latch);
1624 bt_lockpage (BtLockParent, set->latch);
1625 bt_unlockpage (BtLockWrite, set->latch);
1627 // redirect higher key directly to our new node contents
1629 bt_putid (value, set->page_no);
1630 ptr = (BtKey*)higherfence;
1632 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1635 // delete old lower key to our node
1637 ptr = (BtKey*)lowerfence;
1639 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1642 // obtain delete and write locks to right node
1644 bt_unlockpage (BtLockParent, right->latch);
1645 bt_lockpage (BtLockDelete, right->latch);
1646 bt_lockpage (BtLockWrite, right->latch);
1647 bt_freepage (bt, right);
1649 bt_unlockpage (BtLockParent, set->latch);
1650 bt_unpinlatch (set->latch);
1655 BtKey *bt_foundkey (BtDb *bt)
1657 return (BtKey*)(bt->key);
1660 // advance to next slot
1662 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1664 BtLatchSet *prevlatch;
1667 if( slot < set->page->cnt )
1670 prevlatch = set->latch;
1672 if( page_no = bt_getid(set->page->right) )
1673 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1674 set->page = bt_mappage (bt, set->latch);
1678 return bt->err = BTERR_struct, 0;
1680 // obtain access lock using lock chaining with Access mode
1682 bt_lockpage(BtLockAccess, set->latch);
1684 bt_unlockpage(BtLockRead, prevlatch);
1685 bt_unpinlatch (prevlatch);
1687 bt_lockpage(BtLockRead, set->latch);
1688 bt_unlockpage(BtLockAccess, set->latch);
1690 set->page_no = page_no;
1694 // find unique key or first duplicate key in
1695 // leaf level and return number of value bytes
1696 // or (-1) if not found. Setup key for bt_foundkey
1698 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1706 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1708 ptr = keyptr(set->page, slot);
1710 // skip librarian slot place holder
1712 if( slotptr(set->page, slot)->type == Librarian )
1713 ptr = keyptr(set->page, ++slot);
1715 // return actual key found
1717 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1720 if( slotptr(set->page, slot)->type == Duplicate )
1723 // not there if we reach the stopper key
1725 if( slot == set->page->cnt )
1726 if( !bt_getid (set->page->right) )
1729 // if key exists, return >= 0 value bytes copied
1730 // otherwise return (-1)
1732 if( slotptr(set->page, slot)->dead )
1736 if( !memcmp (ptr->key, key, len) ) {
1737 val = valptr (set->page,slot);
1738 if( valmax > val->len )
1740 memcpy (value, val->value, valmax);
1746 } while( slot = bt_findnext (bt, set, slot) );
1748 bt_unlockpage (BtLockRead, set->latch);
1749 bt_unpinlatch (set->latch);
1753 // check page for space available,
1754 // clean if necessary and return
1755 // 0 - page needs splitting
1756 // >0 new slot value
1758 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1760 uint nxt = bt->mgr->page_size;
1761 BtPage page = set->page;
1762 uint cnt = 0, idx = 0;
1763 uint max = page->cnt;
1768 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1771 // skip cleanup and proceed to split
1772 // if there's not enough garbage
1775 if( page->garbage < nxt / 5 )
1778 memcpy (bt->frame, page, bt->mgr->page_size);
1780 // skip page info and set rest of page to zero
1782 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1783 set->latch->dirty = 1;
1787 // clean up page first by
1788 // removing deleted keys
1790 while( cnt++ < max ) {
1793 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1796 // copy the value across
1798 val = valptr(bt->frame, cnt);
1799 nxt -= val->len + sizeof(BtVal);
1800 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1802 // copy the key across
1804 key = keyptr(bt->frame, cnt);
1805 nxt -= key->len + sizeof(BtKey);
1806 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1808 // make a librarian slot
1811 slotptr(page, ++idx)->off = nxt;
1812 slotptr(page, idx)->type = Librarian;
1813 slotptr(page, idx)->dead = 1;
1818 slotptr(page, ++idx)->off = nxt;
1819 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1821 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1828 // see if page has enough space now, or does it need splitting?
1830 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1836 // split the root and raise the height of the btree
1838 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
1840 uint nxt = bt->mgr->page_size;
1841 unsigned char value[BtId];
1846 // Obtain an empty page to use, and copy the current
1847 // root contents into it, e.g. lower keys
1849 if( bt_newpage(bt, left, root->page) )
1852 bt_unpinlatch (left->latch);
1854 // set left from higher to lower page
1856 bt_putid (right->page->left, left->page_no);
1857 bt_unpinlatch (right->latch);
1859 // preserve the page info at the bottom
1860 // of higher keys and set rest to zero
1862 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1864 // insert stopper key at top of newroot page
1865 // and increase the root height
1867 nxt -= BtId + sizeof(BtVal);
1868 bt_putid (value, right->page_no);
1869 val = (BtVal *)((unsigned char *)root->page + nxt);
1870 memcpy (val->value, value, BtId);
1873 nxt -= 2 + sizeof(BtKey);
1874 slotptr(root->page, 2)->off = nxt;
1875 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1880 // insert lower keys page fence key on newroot page as first key
1882 nxt -= BtId + sizeof(BtVal);
1883 bt_putid (value, left->page_no);
1884 val = (BtVal *)((unsigned char *)root->page + nxt);
1885 memcpy (val->value, value, BtId);
1888 nxt -= leftkey->len + sizeof(BtKey);
1889 slotptr(root->page, 1)->off = nxt;
1890 memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
1892 bt_putid(root->page->right, 0);
1893 root->page->min = nxt; // reset lowest used offset and key count
1894 root->page->cnt = 2;
1895 root->page->act = 2;
1898 // release and unpin root pages
1900 bt_unlockpage(BtLockWrite, root->latch);
1901 bt_unpinlatch (root->latch);
1905 // split already locked full node
1908 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1910 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
1911 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1912 unsigned char value[BtId];
1913 uint lvl = set->page->lvl;
1920 // split higher half of keys to bt->frame
1922 memset (bt->frame, 0, bt->mgr->page_size);
1923 max = set->page->cnt;
1927 while( cnt++ < max ) {
1928 if( slotptr(set->page, cnt)->dead && cnt < max )
1930 src = valptr(set->page, cnt);
1931 nxt -= src->len + sizeof(BtVal);
1932 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1934 key = keyptr(set->page, cnt);
1935 nxt -= key->len + sizeof(BtKey);
1936 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1937 memcpy (ptr, key, key->len + sizeof(BtKey));
1939 // add librarian slot
1942 slotptr(bt->frame, ++idx)->off = nxt;
1943 slotptr(bt->frame, idx)->type = Librarian;
1944 slotptr(bt->frame, idx)->dead = 1;
1949 slotptr(bt->frame, ++idx)->off = nxt;
1950 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1952 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1956 // remember existing fence key for new page to the right
1958 memcpy (rightkey, key, key->len + sizeof(BtKey));
1960 bt->frame->bits = bt->mgr->page_bits;
1961 bt->frame->min = nxt;
1962 bt->frame->cnt = idx;
1963 bt->frame->lvl = lvl;
1967 if( set->page_no > ROOT_page ) {
1968 bt_putid (bt->frame->right, bt_getid (set->page->right));
1969 bt_putid(bt->frame->left, set->page_no);
1972 // get new free page and write higher keys to it.
1974 if( bt_newpage(bt, right, bt->frame) )
1979 if( set->page_no > ROOT_page && !lvl )
1980 if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
1983 // update lower keys to continue in old page
1985 memcpy (bt->frame, set->page, bt->mgr->page_size);
1986 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1987 set->latch->dirty = 1;
1989 nxt = bt->mgr->page_size;
1990 set->page->garbage = 0;
1996 if( slotptr(bt->frame, max)->type == Librarian )
1999 // assemble page of smaller keys
2001 while( cnt++ < max ) {
2002 if( slotptr(bt->frame, cnt)->dead )
2004 val = valptr(bt->frame, cnt);
2005 nxt -= val->len + sizeof(BtVal);
2006 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2008 key = keyptr(bt->frame, cnt);
2009 nxt -= key->len + sizeof(BtKey);
2010 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2012 // add librarian slot
2015 slotptr(set->page, ++idx)->off = nxt;
2016 slotptr(set->page, idx)->type = Librarian;
2017 slotptr(set->page, idx)->dead = 1;
2022 slotptr(set->page, ++idx)->off = nxt;
2023 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2027 // remember fence key for smaller page
2029 memcpy(fencekey, key, key->len + sizeof(BtKey));
2031 bt_putid(set->page->right, right->page_no);
2032 set->page->min = nxt;
2033 set->page->cnt = idx;
2035 // if current page is the root page, split it
2037 if( set->page_no == ROOT_page )
2038 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2040 // insert new fences in their parent pages
2042 bt_lockpage (BtLockParent, right->latch);
2044 bt_lockpage (BtLockParent, set->latch);
2045 bt_unlockpage (BtLockWrite, set->latch);
2047 // insert new fence for reformulated left block of smaller keys
2049 ptr = (BtKey*)fencekey;
2050 bt_putid (value, set->page_no);
2052 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2055 // switch fence for right block of larger keys to new right page
2057 ptr = (BtKey*)rightkey;
2058 bt_putid (value, right->page_no);
2060 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2063 bt_unlockpage (BtLockParent, set->latch);
2064 bt_unpinlatch (set->latch);
2066 bt_unlockpage (BtLockParent, right->latch);
2067 bt_unpinlatch (right->latch);
2071 // install new key and value onto page
2072 // page must already be checked for
2075 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2077 uint idx, librarian;
2082 // if found slot > desired slot and previous slot
2083 // is a librarian slot, use it
2086 if( slotptr(set->page, slot-1)->type == Librarian )
2089 // copy value onto page
2091 set->page->min -= vallen + sizeof(BtVal);
2092 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2093 memcpy (val->value, value, vallen);
2096 // copy key onto page
2098 set->page->min -= keylen + sizeof(BtKey);
2099 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2100 memcpy (ptr->key, key, keylen);
2103 // find first empty slot
2105 for( idx = slot; idx < set->page->cnt; idx++ )
2106 if( slotptr(set->page, idx)->dead )
2109 // now insert key into array before slot
2111 if( idx == set->page->cnt )
2112 idx += 2, set->page->cnt += 2, librarian = 2;
2116 set->latch->dirty = 1;
2119 while( idx > slot + librarian - 1 )
2120 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2122 // add librarian slot
2124 if( librarian > 1 ) {
2125 node = slotptr(set->page, slot++);
2126 node->off = set->page->min;
2127 node->type = Librarian;
2133 node = slotptr(set->page, slot);
2134 node->off = set->page->min;
2138 bt_unlockpage (BtLockWrite, set->latch);
2139 bt_unpinlatch (set->latch);
2143 // Insert new key into the btree at given level.
2144 // either add a new key or update/add an existing one
2146 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, int unique)
2148 unsigned char newkey[BT_keyarray];
2149 uint slot, idx, len;
2156 // set up the key we're working on
2158 ins = (BtKey*)newkey;
2159 memcpy (ins->key, key, keylen);
2162 // is this a non-unique index value?
2168 sequence = bt_newdup (bt);
2169 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2173 while( 1 ) { // find the page and slot for the current key
2174 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2175 ptr = keyptr(set->page, slot);
2178 bt->err = BTERR_ovflw;
2182 // if librarian slot == found slot, advance to real slot
2184 if( slotptr(set->page, slot)->type == Librarian )
2185 if( !keycmp (ptr, key, keylen) )
2186 ptr = keyptr(set->page, ++slot);
2190 if( slotptr(set->page, slot)->type == Duplicate )
2193 // if inserting a duplicate key or unique key
2194 // check for adequate space on the page
2195 // and insert the new key before slot.
2197 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2198 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2199 if( bt_splitpage (bt, set) )
2204 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2207 // if key already exists, update value and return
2209 val = valptr(set->page, slot);
2211 if( val->len >= vallen ) {
2212 if( slotptr(set->page, slot)->dead )
2214 set->page->garbage += val->len - vallen;
2215 set->latch->dirty = 1;
2216 slotptr(set->page, slot)->dead = 0;
2218 memcpy (val->value, value, vallen);
2219 bt_unlockpage(BtLockWrite, set->latch);
2220 bt_unpinlatch (set->latch);
2224 // new update value doesn't fit in existing value area
2226 if( !slotptr(set->page, slot)->dead )
2227 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2229 slotptr(set->page, slot)->dead = 0;
2233 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2234 if( bt_splitpage (bt, set) )
2239 set->page->min -= vallen + sizeof(BtVal);
2240 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2241 memcpy (val->value, value, vallen);
2244 set->latch->dirty = 1;
2245 set->page->min -= keylen + sizeof(BtKey);
2246 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2247 memcpy (ptr->key, key, keylen);
2250 slotptr(set->page, slot)->off = set->page->min;
2251 bt_unlockpage(BtLockWrite, set->latch);
2252 bt_unpinlatch (set->latch);
2258 // compute hash of string
2260 uint bt_hashkey (unsigned char *key, unsigned int len)
2264 while( len >= sizeof(uint) )
2265 hash *= 11, hash += *(uint *)key, len -= sizeof(uint), key += sizeof(uint);
2268 hash *= 11, hash += *key++ * len--;
2273 // add a new lock table entry
2275 uint bt_newlock (BtDb *bt, BtKey *key, uint hashidx)
2277 BtLockSet *lock = bt->mgr->locktable;
2280 // obtain lock manager global lock
2282 bt_spinwritelock (bt->mgr->locklatch);
2284 // return NULL if table is full
2286 if( !(slot = bt->mgr->lockfree) ) {
2287 bt_spinreleasewrite (bt->mgr->locklatch);
2291 // maintain free chain
2293 bt->mgr->lockfree = lock[slot].next;
2294 bt_spinreleasewrite (bt->mgr->locklatch);
2296 if( prev = bt->mgr->hashlock[hashidx].slot )
2297 lock[prev].prev = slot;
2299 bt->mgr->hashlock[hashidx].slot = slot;
2300 lock[slot].hashidx = hashidx;
2301 lock[slot].next = prev;
2302 lock[slot].prev = 0;
2304 // save the key being locked
2306 memcpy (lock[slot].key, key, key->len + sizeof(BtKey));
2310 // add key to the lock table
2311 // block until available.
2313 uint bt_setlock(BtDb *bt, BtKey *key)
2315 uint hashidx = bt_hashkey(key->key, key->len) % bt->mgr->lockhash;
2316 BtLockSet *lock = NULL;
2320 // find existing lock entry
2321 // or recover from full table
2323 while( lock == NULL ) {
2324 // obtain lock on hash slot
2326 bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2328 if( slot = bt->mgr->hashlock[hashidx].slot )
2330 lock = bt->mgr->locktable + slot;
2331 key2 = (BtKey *)lock->key;
2333 if( !keycmp (key, key2->key, key2->len) )
2335 } while( slot = lock->next );
2340 if( slot = bt_newlock (bt, key, hashidx) )
2343 bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2351 lock = bt->mgr->locktable + slot;
2354 bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2355 WriteLock (lock->readwr);
2359 void bt_lockclr (BtDb *bt, uint slot)
2361 BtLockSet *lock = bt->mgr->locktable + slot;
2362 uint hashidx = lock->hashidx;
2365 bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2366 WriteRelease (lock->readwr);
2368 // if entry is no longer in use,
2369 // return it to the free chain.
2371 if( !--lock->pin ) {
2372 if( next = lock->next )
2373 bt->mgr->locktable[next].prev = lock->prev;
2375 if( prev = lock->prev )
2376 bt->mgr->locktable[prev].next = lock->next;
2378 bt->mgr->hashlock[lock->hashidx].slot = next;
2380 bt_spinwritelock (bt->mgr->locklatch);
2381 lock->next = bt->mgr->lockfree;
2382 bt->mgr->lockfree = slot;
2383 bt_spinreleasewrite (bt->mgr->locklatch);
2386 bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2389 // atomic insert of a batch of keys.
2390 // return -1 if BTERR is set
2391 // otherwise return slot number
2392 // causing the key constraint violation
2393 // or zero on successful completion.
2395 int bt_atomicinsert (BtDb *bt, BtPage source)
2397 uint locks[MAX_atomic];
2405 // first sort the list of keys into order to
2406 // prevent deadlocks between threads.
2408 for( slot = 1; slot++ < source->cnt; ) {
2409 *temp = *slotptr(source,slot);
2410 key = keyptr (source,slot);
2411 for( idx = slot; --idx; ) {
2412 key2 = keyptr (source,idx);
2413 if( keycmp (key, key2->key, key2->len) < 0 ) {
2414 *slotptr(source,idx+1) = *slotptr(source,idx);
2415 *slotptr(source,idx) = *temp;
2421 // take each unique-type key and add it to the lock table
2423 for( slot = 0; slot++ < source->cnt; )
2424 if( slotptr(source, slot)->type == Unique )
2425 locks[slot] = bt_setlock (bt, keyptr(source,slot));
2427 // Lookup each unique key and determine constraint violations
2429 for( slot = 0; slot++ < source->cnt; )
2430 if( slotptr(source, slot)->type == Unique ) {
2431 key = keyptr(source, slot);
2432 if( bt_findkey (bt, key->key, key->len, NULL, 0) < 0 )
2438 // add each key to the btree
2441 for( slot = 0; slot++ < source->cnt; ) {
2442 key = keyptr(source,slot);
2443 val = valptr(source,slot);
2444 type = slotptr(source,slot)->type;
2445 if( bt_insertkey (bt, key->key, key->len, 0, val->value, val->len, type == Unique) )
2449 // remove each unique-type key from the lock table
2451 for( slot = 0; slot++ < source->cnt; )
2452 if( slotptr(source, slot)->type == Unique )
2453 bt_lockclr (bt, locks[slot]);
2458 // set cursor to highest slot on highest page
2460 uint bt_lastkey (BtDb *bt)
2462 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2466 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2467 set->page = bt_mappage (bt, set->latch);
2471 bt_lockpage(BtLockRead, set->latch);
2473 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2474 slot = set->page->cnt;
2476 bt_unlockpage(BtLockRead, set->latch);
2477 bt_unpinlatch (set->latch);
2481 // return previous slot on cursor page
2483 uint bt_prevkey (BtDb *bt, uint slot)
2491 if( left = bt_getid(bt->cursor->left) )
2492 bt->cursor_page = left;
2496 if( set->latch = bt_pinlatch (bt, left, 1) )
2497 set->page = bt_mappage (bt, set->latch);
2501 bt_lockpage(BtLockRead, set->latch);
2502 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2503 bt_unlockpage(BtLockRead, set->latch);
2504 bt_unpinlatch (set->latch);
2505 return bt->cursor->cnt;
2508 // return next slot on cursor page
2509 // or slide cursor right into next page
2511 uint bt_nextkey (BtDb *bt, uint slot)
2517 right = bt_getid(bt->cursor->right);
2519 while( slot++ < bt->cursor->cnt )
2520 if( slotptr(bt->cursor,slot)->dead )
2522 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2530 bt->cursor_page = right;
2532 if( set->latch = bt_pinlatch (bt, right, 1) )
2533 set->page = bt_mappage (bt, set->latch);
2537 bt_lockpage(BtLockRead, set->latch);
2539 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2541 bt_unlockpage(BtLockRead, set->latch);
2542 bt_unpinlatch (set->latch);
2550 // cache page of keys into cursor and return starting slot for given key
2552 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2557 // cache page for retrieval
2559 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2560 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2564 bt->cursor_page = set->page_no;
2566 bt_unlockpage(BtLockRead, set->latch);
2567 bt_unpinlatch (set->latch);
2571 BtKey *bt_key(BtDb *bt, uint slot)
2573 return keyptr(bt->cursor, slot);
2576 BtVal *bt_val(BtDb *bt, uint slot)
2578 return valptr(bt->cursor,slot);
2584 double getCpuTime(int type)
2587 FILETIME xittime[1];
2588 FILETIME systime[1];
2589 FILETIME usrtime[1];
2590 SYSTEMTIME timeconv[1];
2593 memset (timeconv, 0, sizeof(SYSTEMTIME));
2597 GetSystemTimeAsFileTime (xittime);
2598 FileTimeToSystemTime (xittime, timeconv);
2599 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2602 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2603 FileTimeToSystemTime (usrtime, timeconv);
2606 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2607 FileTimeToSystemTime (systime, timeconv);
2611 ans += (double)timeconv->wHour * 3600;
2612 ans += (double)timeconv->wMinute * 60;
2613 ans += (double)timeconv->wSecond;
2614 ans += (double)timeconv->wMilliseconds / 1000;
2619 #include <sys/resource.h>
2621 double getCpuTime(int type)
2623 struct rusage used[1];
2624 struct timeval tv[1];
2628 gettimeofday(tv, NULL);
2629 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2632 getrusage(RUSAGE_SELF, used);
2633 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2636 getrusage(RUSAGE_SELF, used);
2637 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2644 void bt_poolaudit (BtMgr *mgr)
2649 while( slot++ < mgr->latchdeployed ) {
2650 latch = mgr->latchsets + slot;
2652 if( *latch->readwr->rin & MASK )
2653 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2654 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2656 if( *latch->access->rin & MASK )
2657 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2658 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2660 if( *latch->parent->rin & MASK )
2661 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2662 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2664 if( latch->pin & ~CLOCK_bit ) {
2665 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2671 uint bt_latchaudit (BtDb *bt)
2673 ushort idx, hashidx;
2679 if( *(ushort *)(bt->mgr->alloclatch) )
2680 fprintf(stderr, "Alloc page locked\n");
2681 *(uint *)(bt->mgr->alloclatch) = 0;
2683 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2684 latch = bt->mgr->latchsets + idx;
2685 if( *latch->readwr->rin & MASK )
2686 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2687 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2689 if( *latch->access->rin & MASK )
2690 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2691 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2693 if( *latch->parent->rin & MASK )
2694 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2695 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2698 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2703 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2704 if( *(uint *)(bt->mgr->hashtable[hashidx].latch) )
2705 fprintf(stderr, "hash entry %d locked\n", hashidx);
2707 *(uint *)(bt->mgr->hashtable[hashidx].latch) = 0;
2709 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2710 latch = bt->mgr->latchsets + idx;
2712 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2713 } while( idx = latch->next );
2716 page_no = LEAF_page;
2718 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2719 uid off = page_no << bt->mgr->page_bits;
2721 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2725 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2727 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2728 return bt->err = BTERR_map;
2730 if( *amt < bt->mgr->page_size )
2731 return bt->err = BTERR_map;
2733 if( !bt->frame->free && !bt->frame->lvl )
2734 cnt += bt->frame->act;
2738 cnt--; // remove stopper key
2739 fprintf(stderr, " Total keys read %d\n", cnt);
2752 // standalone program to index file of keys
2753 // then list them onto std-out
2756 void *index_file (void *arg)
2758 uint __stdcall index_file (void *arg)
2761 int line = 0, found = 0, cnt = 0, unique;
2762 uid next, page_no = LEAF_page; // start on first page of leaves
2763 BtPage page = calloc (4096, 1);
2764 unsigned char key[BT_maxkey];
2765 ThreadArg *args = arg;
2766 int ch, len = 0, slot;
2774 bt = bt_open (args->mgr);
2776 unique = (args->type[1] | 0x20) != 'd';
2777 atomic = (args->type[1] | 0x20) == 'a';
2779 switch(args->type[0] | 0x20)
2782 fprintf(stderr, "started latch mgr audit\n");
2783 cnt = bt_latchaudit (bt);
2784 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2788 fprintf(stderr, "started pennysort for %s\n", args->infile);
2789 if( in = fopen (args->infile, "rb") )
2790 while( ch = getc(in), ch != EOF )
2796 memset (page, 0, 4096);
2797 slotptr(page, 1)->off = 2048;
2798 slotptr(page, 1)->type = Unique;
2799 ptr = keyptr(page,1);
2801 memcpy(ptr->key, key, 10);
2802 val = valptr(page,1);
2803 val->len = len - 10;
2804 memcpy (val->value, key + 10, len - 10);
2806 if( slot = bt_atomicinsert (bt, page) )
2807 fprintf(stderr, "Error %d Line: %d\n", slot, line), exit(0);
2809 else if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2810 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2813 else if( len < BT_maxkey )
2815 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2819 fprintf(stderr, "started indexing for %s\n", args->infile);
2820 if( in = fopen (args->infile, "rb") )
2821 while( ch = getc(in), ch != EOF )
2825 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2826 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2829 else if( len < BT_maxkey )
2831 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2835 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2836 if( in = fopen (args->infile, "rb") )
2837 while( ch = getc(in), ch != EOF )
2841 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2842 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2843 ptr = (BtKey*)(bt->key);
2846 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2847 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2850 else if( len < BT_maxkey )
2852 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2856 fprintf(stderr, "started finding keys for %s\n", args->infile);
2857 if( in = fopen (args->infile, "rb") )
2858 while( ch = getc(in), ch != EOF )
2862 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2865 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2868 else if( len < BT_maxkey )
2870 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2874 fprintf(stderr, "started scanning\n");
2876 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2877 set->page = bt_mappage (bt, set->latch);
2879 fprintf(stderr, "unable to obtain latch"), exit(1);
2880 bt_lockpage (BtLockRead, set->latch);
2881 next = bt_getid (set->page->right);
2883 for( slot = 0; slot++ < set->page->cnt; )
2884 if( next || slot < set->page->cnt )
2885 if( !slotptr(set->page, slot)->dead ) {
2886 ptr = keyptr(set->page, slot);
2889 if( slotptr(set->page, slot)->type == Duplicate )
2892 fwrite (ptr->key, len, 1, stdout);
2893 val = valptr(set->page, slot);
2894 fwrite (val->value, val->len, 1, stdout);
2895 fputc ('\n', stdout);
2899 bt_unlockpage (BtLockRead, set->latch);
2900 bt_unpinlatch (set->latch);
2901 } while( page_no = next );
2903 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2907 fprintf(stderr, "started reverse scan\n");
2908 if( slot = bt_lastkey (bt) )
2909 while( slot = bt_prevkey (bt, slot) ) {
2910 if( slotptr(bt->cursor, slot)->dead )
2913 ptr = keyptr(bt->cursor, slot);
2916 if( slotptr(bt->cursor, slot)->type == Duplicate )
2919 fwrite (ptr->key, len, 1, stdout);
2920 val = valptr(bt->cursor, slot);
2921 fwrite (val->value, val->len, 1, stdout);
2922 fputc ('\n', stdout);
2926 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2931 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2933 fprintf(stderr, "started counting\n");
2934 page_no = LEAF_page;
2936 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2937 if( bt_readpage (bt->mgr, bt->frame, page_no) )
2940 if( !bt->frame->free && !bt->frame->lvl )
2941 cnt += bt->frame->act;
2947 cnt--; // remove stopper key
2948 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2960 typedef struct timeval timer;
2962 int main (int argc, char **argv)
2964 int idx, cnt, len, slot, err;
2965 int segsize, bits = 16;
2982 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find/Atomic [page_bits buffer_pool_size lock_mgr_size src_file1 src_file2 ... ]\n", argv[0]);
2983 fprintf (stderr, " where page_bits is the page size in bits\n");
2984 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
2985 fprintf (stderr, " lock_mgr_size is the maximum number of outstanding key locks\n");
2986 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
2990 start = getCpuTime(0);
2993 bits = atoi(argv[3]);
2996 poolsize = atoi(argv[4]);
2999 fprintf (stderr, "Warning: no mapped_pool\n");
3002 locksize = atoi(argv[5]);
3006 threads = malloc (cnt * sizeof(pthread_t));
3008 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3010 args = malloc (cnt * sizeof(ThreadArg));
3012 mgr = bt_mgr ((argv[1]), bits, poolsize, locksize);
3015 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3021 for( idx = 0; idx < cnt; idx++ ) {
3022 args[idx].infile = argv[idx + 6];
3023 args[idx].type = argv[2];
3024 args[idx].mgr = mgr;
3025 args[idx].idx = idx;
3027 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3028 fprintf(stderr, "Error creating thread %d\n", err);
3030 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3034 // wait for termination
3037 for( idx = 0; idx < cnt; idx++ )
3038 pthread_join (threads[idx], NULL);
3040 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3042 for( idx = 0; idx < cnt; idx++ )
3043 CloseHandle(threads[idx]);
3049 elapsed = getCpuTime(0) - start;
3050 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3051 elapsed = getCpuTime(1);
3052 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3053 elapsed = getCpuTime(2);
3054 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);