1 // btree version threadskv7 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and atomic non-consistent key insert
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
87 // maximum number of keys to insert atomically in one call
89 #define MAX_atomic 256
91 #define BT_maxkey 255 // maximum number of bytes in a key
92 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
95 There are five lock types for each node in three independent sets:
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
111 // definition for phase-fair reader/writer lock implementation
125 // definition for spin latch implementation
127 // exclusive is set for write access
128 // share is count of read accessors
129 // grant write lock when share == 0
131 volatile typedef struct {
142 // The key structure occupies space at the upper end of
143 // each page. It's a length byte followed by the key
147 unsigned char len; // this can be changed to a ushort or uint
148 unsigned char key[0];
151 // the value structure also occupies space at the upper
152 // end of the page. Each key is immediately followed by a value.
155 unsigned char len; // this can be changed to a ushort or uint
156 unsigned char value[0];
159 // hash table entries
162 uint slot; // Latch table entry at head of chain
163 BtSpinLatch latch[1];
166 // latch manager table structure
169 uid page_no; // latch set page number
170 RWLock readwr[1]; // read/write page lock
171 RWLock access[1]; // Access Intent/Page delete
172 RWLock parent[1]; // Posting of fence key in parent
173 uint slot; // entry slot in latch table
174 uint next; // next entry in hash table chain
175 uint prev; // prev entry in hash table chain
176 volatile ushort pin; // number of outstanding threads
177 ushort dirty:1; // page in cache is dirty
180 // lock manager table structure
183 RWLock readwr[1]; // read/write key lock
186 uint pin; // count of readers waiting
187 uint hashidx; // hash idx for entry
188 unsigned char key[BT_keyarray];
191 // Define the length of the page record numbers
195 // Page key slot definition.
197 // Keys are marked dead, but remain on the page until
198 // it cleanup is called. The fence key (highest key) for
199 // a leaf page is always present, even after cleanup.
203 // In addition to the Unique keys that occupy slots
204 // there are Librarian and Duplicate key
205 // slots occupying the key slot array.
207 // The Librarian slots are dead keys that
208 // serve as filler, available to add new Unique
209 // or Dup slots that are inserted into the B-tree.
211 // The Duplicate slots have had their key bytes extended
212 // by 6 bytes to contain a binary duplicate key uniqueifier.
221 uint off:BT_maxbits; // page offset for key start
222 uint type:3; // type of slot
223 uint dead:1; // set for deleted slot
226 // The first part of an index page.
227 // It is immediately followed
228 // by the BtSlot array of keys.
230 // note that this structure size
231 // must be a multiple of 8 bytes
232 // in order to place dups correctly.
234 typedef struct BtPage_ {
235 uint cnt; // count of keys in page
236 uint act; // count of active keys
237 uint min; // next key offset
238 uint garbage; // page garbage in bytes
239 unsigned char bits:7; // page size in bits
240 unsigned char free:1; // page is on free chain
241 unsigned char lvl:7; // level of page
242 unsigned char kill:1; // page is being deleted
243 unsigned char left[BtId]; // page number to left
244 unsigned char filler[2]; // padding to multiple of 8
245 unsigned char right[BtId]; // page number to right
248 // The loadpage interface object
251 uid page_no; // current page number
252 BtPage page; // current page pointer
253 BtLatchSet *latch; // current page latch set
256 // structure for latch manager on ALLOC_page
259 struct BtPage_ alloc[1]; // next page_no in right ptr
260 unsigned long long dups[1]; // global duplicate key uniqueifier
261 unsigned char chain[BtId]; // head of free page_nos chain
264 // The object structure for Btree access
267 uint page_size; // page size
268 uint page_bits; // page size in bits
274 BtPageZero *pagezero; // mapped allocation page
275 BtSpinLatch alloclatch[1]; // allocation area lite latch
276 uint latchdeployed; // highest number of pool entries deployed
277 uint nlatchpage; // number of latch & lock & pool pages
278 uint latchtotal; // number of page latch entries
279 uint latchhash; // number of latch hash table slots
280 uint latchvictim; // next latch entry to examine
281 BtHashEntry *hashtable; // the anonymous mapping buffer pool
282 BtLatchSet *latchsets; // mapped latch set from latch pages
283 unsigned char *pagepool; // mapped to the buffer pool pages
284 uint lockhash; // number of lock hash table slots
285 uint lockfree; // next available lock table entry
286 BtSpinLatch locklatch[1]; // lock manager free chain latch
287 BtHashEntry *hashlock; // the lock manager hash table
288 BtLockSet *locktable; // the lock manager key table
290 HANDLE halloc; // allocation handle
291 HANDLE hpool; // buffer pool handle
296 BtMgr *mgr; // buffer manager for thread
297 BtPage cursor; // cached frame for start/next (never mapped)
298 BtPage frame; // spare frame for the page split (never mapped)
299 uid cursor_page; // current cursor page number
300 unsigned char *mem; // frame, cursor, page memory buffer
301 unsigned char key[BT_keyarray]; // last found complete key
302 int found; // last delete or insert was found
303 int err; // last error
304 int reads, writes; // number of reads and writes from the btree
318 #define CLOCK_bit 0x8000
321 extern void bt_close (BtDb *bt);
322 extern BtDb *bt_open (BtMgr *mgr);
323 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, int unique);
324 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
325 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
326 extern BtKey *bt_foundkey (BtDb *bt);
327 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
328 extern uint bt_nextkey (BtDb *bt, uint slot);
331 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint locksize);
332 void bt_mgrclose (BtMgr *mgr);
334 // Helper functions to return slot values
335 // from the cursor page.
337 extern BtKey *bt_key (BtDb *bt, uint slot);
338 extern BtVal *bt_val (BtDb *bt, uint slot);
340 // The page is allocated from low and hi ends.
341 // The key slots are allocated from the bottom,
342 // while the text and value of the key
343 // are allocated from the top. When the two
344 // areas meet, the page is split into two.
346 // A key consists of a length byte, two bytes of
347 // index number (0 - 65535), and up to 253 bytes
350 // Associated with each key is a value byte string
351 // containing any value desired.
353 // The b-tree root is always located at page 1.
354 // The first leaf page of level zero is always
355 // located on page 2.
357 // The b-tree pages are linked with next
358 // pointers to facilitate enumerators,
359 // and provide for concurrency.
361 // When to root page fills, it is split in two and
362 // the tree height is raised by a new root at page
363 // one with two keys.
365 // Deleted keys are marked with a dead bit until
366 // page cleanup. The fence key for a leaf node is
369 // To achieve maximum concurrency one page is locked at a time
370 // as the tree is traversed to find leaf key in question. The right
371 // page numbers are used in cases where the page is being split,
374 // Page 0 is dedicated to lock for new page extensions,
375 // and chains empty pages together for reuse. It also
376 // contains the latch manager hash table.
378 // The ParentModification lock on a node is obtained to serialize posting
379 // or changing the fence key for a node.
381 // Empty pages are chained together through the ALLOC page and reused.
383 // Access macros to address slot and key values from the page
384 // Page slots use 1 based indexing.
386 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
387 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
388 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
390 void bt_putid(unsigned char *dest, uid id)
395 dest[i] = (unsigned char)id, id >>= 8;
398 uid bt_getid(unsigned char *src)
403 for( i = 0; i < BtId; i++ )
404 id <<= 8, id |= *src++;
409 uid bt_newdup (BtDb *bt)
412 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
414 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
418 // Phase-Fair reader/writer lock implementation
420 void WriteLock (RWLock *lock)
425 tix = __sync_fetch_and_add (lock->ticket, 1);
427 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
429 // wait for our ticket to come up
431 while( tix != lock->serving[0] )
438 w = PRES | (tix & PHID);
440 r = __sync_fetch_and_add (lock->rin, w);
442 r = _InterlockedExchangeAdd16 (lock->rin, w);
444 while( r != *lock->rout )
452 void WriteRelease (RWLock *lock)
455 __sync_fetch_and_and (lock->rin, ~MASK);
457 _InterlockedAnd16 (lock->rin, ~MASK);
462 void ReadLock (RWLock *lock)
466 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
468 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
471 while( w == (*lock->rin & MASK) )
479 void ReadRelease (RWLock *lock)
482 __sync_fetch_and_add (lock->rout, RINC);
484 _InterlockedExchangeAdd16 (lock->rout, RINC);
488 // Spin Latch Manager
490 // wait until write lock mode is clear
491 // and add 1 to the share count
493 void bt_spinreadlock(BtSpinLatch *latch)
499 prev = __sync_fetch_and_add ((uint *)latch, SHARE);
501 prev = _InterlockedExchangeAdd((uint *)latch, SHARE);
503 // see if exclusive request is granted or pending
508 prev = __sync_fetch_and_add ((uint *)latch, -SHARE);
510 prev = _InterlockedExchangeAdd((uint *)latch, -SHARE);
513 } while( sched_yield(), 1 );
515 } while( SwitchToThread(), 1 );
519 // wait for other read and write latches to relinquish
521 void bt_spinwritelock(BtSpinLatch *latch)
527 prev = __sync_fetch_and_or((uint *)latch, PEND | XCL);
529 prev = _InterlockedOr((uint *)latch, PEND | XCL);
532 if( !(prev & ~BOTH) )
536 __sync_fetch_and_and ((uint *)latch, ~XCL);
538 _InterlockedAnd((uint *)latch, ~XCL);
541 } while( sched_yield(), 1 );
543 } while( SwitchToThread(), 1 );
547 // try to obtain write lock
549 // return 1 if obtained,
552 int bt_spinwritetry(BtSpinLatch *latch)
557 prev = __sync_fetch_and_or((uint *)latch, XCL);
559 prev = _InterlockedOr((uint *)latch, XCL);
561 // take write access if all bits are clear
564 if( !(prev & ~BOTH) )
568 __sync_fetch_and_and ((uint *)latch, ~XCL);
570 _InterlockedAnd((uint *)latch, ~XCL);
577 void bt_spinreleasewrite(BtSpinLatch *latch)
580 __sync_fetch_and_and((uint *)latch, ~BOTH);
582 _InterlockedAnd((uint *)latch, ~BOTH);
586 // decrement reader count
588 void bt_spinreleaseread(BtSpinLatch *latch)
591 __sync_fetch_and_add((uint *)latch, -SHARE);
593 _InterlockedExchangeAdd((uint *)latch, -SHARE);
597 // read page from permanent location in Btree file
599 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
601 off64_t off = page_no << mgr->page_bits;
604 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
605 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
612 memset (ovl, 0, sizeof(OVERLAPPED));
614 ovl->OffsetHigh = off >> 32;
616 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
617 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
620 if( *amt < mgr->page_size ) {
621 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
628 // write page to permanent location in Btree file
629 // clear the dirty bit
631 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
633 off64_t off = page_no << mgr->page_bits;
636 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
642 memset (ovl, 0, sizeof(OVERLAPPED));
644 ovl->OffsetHigh = off >> 32;
646 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
649 if( *amt < mgr->page_size )
655 // link latch table entry into head of latch hash table
657 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
659 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
660 BtLatchSet *latch = bt->mgr->latchsets + slot;
662 if( latch->next = bt->mgr->hashtable[hashidx].slot )
663 bt->mgr->latchsets[latch->next].prev = slot;
665 bt->mgr->hashtable[hashidx].slot = slot;
666 latch->page_no = page_no;
672 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
680 // set CLOCK bit in latch
681 // decrement pin count
683 void bt_unpinlatch (BtLatchSet *latch)
686 if( ~latch->pin & CLOCK_bit )
687 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
688 __sync_fetch_and_add(&latch->pin, -1);
690 if( ~latch->pin & CLOCK_bit )
691 _InterlockedOr16 (&latch->pin, CLOCK_bit);
692 _InterlockedDecrement16 (&latch->pin);
696 // return the btree cached page address
698 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
700 BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
705 // find existing latchset or inspire new one
706 // return with latchset pinned
708 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
710 uint hashidx = page_no % bt->mgr->latchhash;
718 // try to find our entry
720 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
722 if( slot = bt->mgr->hashtable[hashidx].slot ) do
724 latch = bt->mgr->latchsets + slot;
725 if( page_no == latch->page_no )
727 } while( slot = latch->next );
733 latch = bt->mgr->latchsets + slot;
735 __sync_fetch_and_add(&latch->pin, 1);
737 _InterlockedIncrement16 (&latch->pin);
739 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
743 // see if there are any unused pool entries
745 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
747 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
750 if( slot < bt->mgr->latchtotal ) {
751 latch = bt->mgr->latchsets + slot;
752 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
754 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
759 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
761 _InterlockedDecrement (&bt->mgr->latchdeployed);
763 // find and reuse previous entry on victim
767 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
769 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
771 // try to get write lock on hash chain
772 // skip entry if not obtained
773 // or has outstanding pins
775 slot %= bt->mgr->latchtotal;
780 latch = bt->mgr->latchsets + slot;
781 idx = latch->page_no % bt->mgr->latchhash;
783 // see we are on same chain as hashidx
788 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
791 // skip this slot if it is pinned
792 // or the CLOCK bit is set
795 if( latch->pin & CLOCK_bit ) {
797 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
799 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
802 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
806 // update permanent page area in btree from buffer pool
808 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
811 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
814 latch->dirty = 0, bt->writes++;
816 // unlink our available slot from its hash chain
819 bt->mgr->latchsets[latch->prev].next = latch->next;
821 bt->mgr->hashtable[idx].slot = latch->next;
824 bt->mgr->latchsets[latch->next].prev = latch->prev;
826 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
828 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
831 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
836 void bt_mgrclose (BtMgr *mgr)
843 // flush dirty pool pages to the btree
845 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
846 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
847 latch = mgr->latchsets + slot;
850 bt_writepage(mgr, page, latch->page_no);
851 latch->dirty = 0, num++;
853 // madvise (page, mgr->page_size, MADV_DONTNEED);
856 fprintf(stderr, "%d buffer pool pages flushed\n", num);
859 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
860 munmap (mgr->pagezero, mgr->page_size);
862 FlushViewOfFile(mgr->pagezero, 0);
863 UnmapViewOfFile(mgr->pagezero);
864 UnmapViewOfFile(mgr->hashtable);
865 CloseHandle(mgr->halloc);
866 CloseHandle(mgr->hpool);
872 FlushFileBuffers(mgr->idx);
873 CloseHandle(mgr->idx);
878 // close and release memory
880 void bt_close (BtDb *bt)
887 VirtualFree (bt->mem, 0, MEM_RELEASE);
892 // open/create new btree buffer manager
894 // call with file_name, BT_openmode, bits in page size (e.g. 16),
895 // size of page pool (e.g. 262144) and number of lock table entries.
897 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint lockmax)
899 uint lvl, attr, last, slot, idx;
900 unsigned char value[BtId];
901 int flag, initit = 0;
902 BtPageZero *pagezero;
909 // determine sanity of page size and buffer pool
911 if( bits > BT_maxbits )
913 else if( bits < BT_minbits )
917 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
922 mgr = calloc (1, sizeof(BtMgr));
924 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
926 if( mgr->idx == -1 ) {
927 fprintf (stderr, "Unable to open btree file\n");
928 return free(mgr), NULL;
931 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
932 attr = FILE_ATTRIBUTE_NORMAL;
933 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
935 if( mgr->idx == INVALID_HANDLE_VALUE )
936 return GlobalFree(mgr), NULL;
940 pagezero = valloc (BT_maxpage);
943 // read minimum page size to get root info
944 // to support raw disk partition files
945 // check if bits == 0 on the disk.
947 if( size = lseek (mgr->idx, 0L, 2) )
948 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
949 if( pagezero->alloc->bits )
950 bits = pagezero->alloc->bits;
954 return free(mgr), free(pagezero), NULL;
958 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
959 size = GetFileSize(mgr->idx, amt);
962 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
963 return bt_mgrclose (mgr), NULL;
964 bits = pagezero->alloc->bits;
969 mgr->page_size = 1 << bits;
970 mgr->page_bits = bits;
972 // calculate number of latch hash table entries
974 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
975 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
977 // add on the number of pages in buffer pool
978 // along with the corresponding latch table
980 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
981 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
982 mgr->latchtotal = nodemax;
984 // add on the sizeof the lock manager hash table and the lock table
986 mgr->nlatchpage += (lockmax / 16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
988 mgr->nlatchpage += (lockmax * sizeof(BtLockSet) + mgr->page_size - 1) / mgr->page_size;
993 // initialize an empty b-tree with latch page, root page, page of leaves
994 // and page(s) of latches and page pool cache
996 memset (pagezero, 0, 1 << bits);
997 pagezero->alloc->bits = mgr->page_bits;
998 bt_putid(pagezero->alloc->right, MIN_lvl+1);
1000 // initialize left-most LEAF page in
1003 bt_putid (pagezero->alloc->left, LEAF_page);
1005 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1006 fprintf (stderr, "Unable to create btree page zero\n");
1007 return bt_mgrclose (mgr), NULL;
1010 memset (pagezero, 0, 1 << bits);
1011 pagezero->alloc->bits = mgr->page_bits;
1013 for( lvl=MIN_lvl; lvl--; ) {
1014 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1015 key = keyptr(pagezero->alloc, 1);
1016 key->len = 2; // create stopper key
1020 bt_putid(value, MIN_lvl - lvl + 1);
1021 val = valptr(pagezero->alloc, 1);
1022 val->len = lvl ? BtId : 0;
1023 memcpy (val->value, value, val->len);
1025 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1026 pagezero->alloc->lvl = lvl;
1027 pagezero->alloc->cnt = 1;
1028 pagezero->alloc->act = 1;
1030 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1031 fprintf (stderr, "Unable to create btree page zero\n");
1032 return bt_mgrclose (mgr), NULL;
1040 VirtualFree (pagezero, 0, MEM_RELEASE);
1043 // mlock the pagezero page
1045 flag = PROT_READ | PROT_WRITE;
1046 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1047 if( mgr->pagezero == MAP_FAILED ) {
1048 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1049 return bt_mgrclose (mgr), NULL;
1051 mlock (mgr->pagezero, mgr->page_size);
1053 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1054 if( mgr->hashtable == MAP_FAILED ) {
1055 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1056 return bt_mgrclose (mgr), NULL;
1059 flag = PAGE_READWRITE;
1060 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1061 if( !mgr->halloc ) {
1062 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1063 return bt_mgrclose (mgr), NULL;
1066 flag = FILE_MAP_WRITE;
1067 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1068 if( !mgr->pagezero ) {
1069 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1070 return bt_mgrclose (mgr), NULL;
1073 flag = PAGE_READWRITE;
1074 size = (uid)mgr->nlatchpage << mgr->page_bits;
1075 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1077 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1078 return bt_mgrclose (mgr), NULL;
1081 flag = FILE_MAP_WRITE;
1082 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1083 if( !mgr->hashtable ) {
1084 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1085 return bt_mgrclose (mgr), NULL;
1089 size = (mgr->latchhash * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1090 mgr->latchsets = (BtLatchSet *)((unsigned char *)mgr->hashtable + size * mgr->page_size);
1091 size = (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1093 mgr->pagepool = (unsigned char *)mgr->hashtable + (size << mgr->page_bits);
1094 mgr->hashlock = (BtHashEntry *)(mgr->pagepool + ((uid)nodemax << mgr->page_bits));
1095 mgr->locktable = (BtLockSet *)((unsigned char *)mgr->hashtable + ((uid)mgr->nlatchpage << mgr->page_bits) - lockmax * sizeof(BtLockSet));
1097 mgr->lockfree = lockmax - 1;
1098 mgr->lockhash = ((unsigned char *)mgr->locktable - (unsigned char *)mgr->hashlock) / sizeof(BtHashEntry);
1100 for( idx = 1; idx < lockmax; idx++ )
1101 mgr->locktable[idx].next = idx - 1;
1106 // open BTree access method
1107 // based on buffer manager
1109 BtDb *bt_open (BtMgr *mgr)
1111 BtDb *bt = malloc (sizeof(*bt));
1113 memset (bt, 0, sizeof(*bt));
1116 bt->mem = valloc (2 *mgr->page_size);
1118 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1120 bt->frame = (BtPage)bt->mem;
1121 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1125 // compare two keys, returning > 0, = 0, or < 0
1126 // as the comparison value
1128 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1130 uint len1 = key1->len;
1133 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1144 // place write, read, or parent lock on requested page_no.
1146 void bt_lockpage(BtLock mode, BtLatchSet *set)
1150 ReadLock (set->readwr);
1153 WriteLock (set->readwr);
1156 ReadLock (set->access);
1159 WriteLock (set->access);
1162 WriteLock (set->parent);
1167 // remove write, read, or parent lock on requested page
1169 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1173 ReadRelease (set->readwr);
1176 WriteRelease (set->readwr);
1179 ReadRelease (set->access);
1182 WriteRelease (set->access);
1185 WriteRelease (set->parent);
1190 // allocate a new page
1191 // return with page latched.
1193 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1197 // lock allocation page
1199 bt_spinwritelock(bt->mgr->alloclatch);
1201 // use empty chain first
1202 // else allocate empty page
1204 if( set->page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1205 if( set->latch = bt_pinlatch (bt, set->page_no, 1) )
1206 set->page = bt_mappage (bt, set->latch);
1208 return bt->err = BTERR_struct, -1;
1210 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1211 bt_spinreleasewrite(bt->mgr->alloclatch);
1213 memcpy (set->page, contents, bt->mgr->page_size);
1214 set->latch->dirty = 1;
1218 set->page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1219 bt_putid(bt->mgr->pagezero->alloc->right, set->page_no+1);
1221 // unlock allocation latch
1223 bt_spinreleasewrite(bt->mgr->alloclatch);
1225 // don't load cache from btree page
1227 if( set->latch = bt_pinlatch (bt, set->page_no, 0) )
1228 set->page = bt_mappage (bt, set->latch);
1230 return bt->err = BTERR_struct;
1232 memcpy (set->page, contents, bt->mgr->page_size);
1233 set->latch->dirty = 1;
1237 // find slot in page for given key at a given level
1239 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1241 uint diff, higher = set->page->cnt, low = 1, slot;
1244 // make stopper key an infinite fence value
1246 if( bt_getid (set->page->right) )
1251 // low is the lowest candidate.
1252 // loop ends when they meet
1254 // higher is already
1255 // tested as .ge. the passed key.
1257 while( diff = higher - low ) {
1258 slot = low + ( diff >> 1 );
1259 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1262 higher = slot, good++;
1265 // return zero if key is on right link page
1267 return good ? higher : 0;
1270 // find and load page at given level for given key
1271 // leave page rd or wr locked as requested
1273 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1275 uid page_no = ROOT_page, prevpage = 0;
1276 uint drill = 0xff, slot;
1277 BtLatchSet *prevlatch;
1278 uint mode, prevmode;
1280 // start at root of btree and drill down
1283 // determine lock mode of drill level
1284 mode = (drill == lvl) ? lock : BtLockRead;
1286 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1287 set->page_no = page_no;
1291 // obtain access lock using lock chaining with Access mode
1293 if( page_no > ROOT_page )
1294 bt_lockpage(BtLockAccess, set->latch);
1296 set->page = bt_mappage (bt, set->latch);
1298 // release & unpin parent page
1301 bt_unlockpage(prevmode, prevlatch);
1302 bt_unpinlatch (prevlatch);
1306 // obtain read lock using lock chaining
1308 bt_lockpage(mode, set->latch);
1310 if( set->page->free )
1311 return bt->err = BTERR_struct, 0;
1313 if( page_no > ROOT_page )
1314 bt_unlockpage(BtLockAccess, set->latch);
1316 // re-read and re-lock root after determining actual level of root
1318 if( set->page->lvl != drill) {
1319 if( set->page_no != ROOT_page )
1320 return bt->err = BTERR_struct, 0;
1322 drill = set->page->lvl;
1324 if( lock != BtLockRead && drill == lvl ) {
1325 bt_unlockpage(mode, set->latch);
1326 bt_unpinlatch (set->latch);
1331 prevpage = set->page_no;
1332 prevlatch = set->latch;
1335 // find key on page at this level
1336 // and descend to requested level
1338 if( set->page->kill )
1341 if( slot = bt_findslot (set, key, len) ) {
1345 while( slotptr(set->page, slot)->dead )
1346 if( slot++ < set->page->cnt )
1351 page_no = bt_getid(valptr(set->page, slot)->value);
1356 // or slide right into next page
1359 page_no = bt_getid(set->page->right);
1363 // return error on end of right chain
1365 bt->err = BTERR_struct;
1366 return 0; // return error
1369 // return page to free list
1370 // page must be delete & write locked
1372 void bt_freepage (BtDb *bt, BtPageSet *set)
1374 // lock allocation page
1376 bt_spinwritelock (bt->mgr->alloclatch);
1379 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1380 bt_putid(bt->mgr->pagezero->chain, set->page_no);
1381 set->latch->dirty = 1;
1382 set->page->free = 1;
1384 // unlock released page
1386 bt_unlockpage (BtLockDelete, set->latch);
1387 bt_unlockpage (BtLockWrite, set->latch);
1388 bt_unpinlatch (set->latch);
1390 // unlock allocation page
1392 bt_spinreleasewrite (bt->mgr->alloclatch);
1395 // a fence key was deleted from a page
1396 // push new fence value upwards
1398 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1400 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1401 unsigned char value[BtId];
1405 // remove the old fence value
1407 ptr = keyptr(set->page, set->page->cnt);
1408 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1409 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1410 set->latch->dirty = 1;
1412 // cache new fence value
1414 ptr = keyptr(set->page, set->page->cnt);
1415 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1417 bt_lockpage (BtLockParent, set->latch);
1418 bt_unlockpage (BtLockWrite, set->latch);
1420 // insert new (now smaller) fence key
1422 bt_putid (value, set->page_no);
1423 ptr = (BtKey*)leftkey;
1425 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1428 // now delete old fence key
1430 ptr = (BtKey*)rightkey;
1432 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1435 bt_unlockpage (BtLockParent, set->latch);
1436 bt_unpinlatch(set->latch);
1440 // root has a single child
1441 // collapse a level from the tree
1443 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1448 // find the child entry and promote as new root contents
1451 for( idx = 0; idx++ < root->page->cnt; )
1452 if( !slotptr(root->page, idx)->dead )
1455 child->page_no = bt_getid (valptr(root->page, idx)->value);
1457 if( child->latch = bt_pinlatch (bt, child->page_no, 1) )
1458 child->page = bt_mappage (bt, child->latch);
1462 bt_lockpage (BtLockDelete, child->latch);
1463 bt_lockpage (BtLockWrite, child->latch);
1465 memcpy (root->page, child->page, bt->mgr->page_size);
1466 root->latch->dirty = 1;
1468 bt_freepage (bt, child);
1470 } while( root->page->lvl > 1 && root->page->act == 1 );
1472 bt_unlockpage (BtLockWrite, root->latch);
1473 bt_unpinlatch (root->latch);
1477 // maintain reverse scan pointers by
1478 // linking left pointer of far right node
1480 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1482 BtPageSet right2[1];
1484 // keep track of rightmost leaf page
1486 if( !right_page_no ) {
1487 bt_putid (bt->mgr->pagezero->alloc->left, left_page_no);
1491 // link right page to left page
1493 if( right2->latch = bt_pinlatch (bt, right_page_no, 1) )
1494 right2->page = bt_mappage (bt, right2->latch);
1498 bt_lockpage (BtLockWrite, right2->latch);
1500 bt_putid(right2->page->left, left_page_no);
1501 bt_unlockpage (BtLockWrite, right2->latch);
1502 bt_unpinlatch (right2->latch);
1506 // find and delete key on page by marking delete flag bit
1507 // if page becomes empty, delete it from the btree
1509 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1511 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1512 uint slot, idx, found, fence;
1513 BtPageSet set[1], right[1];
1514 unsigned char value[BtId];
1518 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1519 ptr = keyptr(set->page, slot);
1523 // if librarian slot, advance to real slot
1525 if( slotptr(set->page, slot)->type == Librarian )
1526 ptr = keyptr(set->page, ++slot);
1528 fence = slot == set->page->cnt;
1530 // if key is found delete it, otherwise ignore request
1532 if( found = !keycmp (ptr, key, len) )
1533 if( found = slotptr(set->page, slot)->dead == 0 ) {
1534 val = valptr(set->page,slot);
1535 slotptr(set->page, slot)->dead = 1;
1536 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1539 // collapse empty slots beneath the fence
1541 while( idx = set->page->cnt - 1 )
1542 if( slotptr(set->page, idx)->dead ) {
1543 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1544 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1549 // did we delete a fence key in an upper level?
1551 if( found && lvl && set->page->act && fence )
1552 if( bt_fixfence (bt, set, lvl) )
1555 return bt->found = found, 0;
1557 // do we need to collapse root?
1559 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1560 if( bt_collapseroot (bt, set) )
1563 return bt->found = found, 0;
1565 // return if page is not empty
1567 if( set->page->act ) {
1568 set->latch->dirty = 1;
1569 bt_unlockpage(BtLockWrite, set->latch);
1570 bt_unpinlatch (set->latch);
1571 return bt->found = found, 0;
1574 // cache copy of fence key
1575 // to post in parent
1577 ptr = keyptr(set->page, set->page->cnt);
1578 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1580 // obtain lock on right page
1582 right->page_no = bt_getid(set->page->right);
1584 if( right->latch = bt_pinlatch (bt, right->page_no, 1) )
1585 right->page = bt_mappage (bt, right->latch);
1589 bt_lockpage (BtLockWrite, right->latch);
1591 if( right->page->kill )
1592 return bt->err = BTERR_struct;
1594 // transfer left link
1596 memcpy (right->page->left, set->page->left, BtId);
1598 // pull contents of right peer into our empty page
1600 memcpy (set->page, right->page, bt->mgr->page_size);
1601 set->latch->dirty = 1;
1606 if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1609 // cache copy of key to update
1611 ptr = keyptr(right->page, right->page->cnt);
1612 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1614 // mark right page deleted and point it to left page
1615 // until we can post parent updates
1617 bt_putid (right->page->right, set->page_no);
1618 right->latch->dirty = 1;
1619 right->page->kill = 1;
1621 bt_lockpage (BtLockParent, right->latch);
1622 bt_unlockpage (BtLockWrite, right->latch);
1624 bt_lockpage (BtLockParent, set->latch);
1625 bt_unlockpage (BtLockWrite, set->latch);
1627 // redirect higher key directly to our new node contents
1629 bt_putid (value, set->page_no);
1630 ptr = (BtKey*)higherfence;
1632 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1635 // delete old lower key to our node
1637 ptr = (BtKey*)lowerfence;
1639 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1642 // obtain delete and write locks to right node
1644 bt_unlockpage (BtLockParent, right->latch);
1645 bt_lockpage (BtLockDelete, right->latch);
1646 bt_lockpage (BtLockWrite, right->latch);
1647 bt_freepage (bt, right);
1649 bt_unlockpage (BtLockParent, set->latch);
1650 bt_unpinlatch (set->latch);
1655 BtKey *bt_foundkey (BtDb *bt)
1657 return (BtKey*)(bt->key);
1660 // advance to next slot
1662 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1664 BtLatchSet *prevlatch;
1667 if( slot < set->page->cnt )
1670 prevlatch = set->latch;
1672 if( page_no = bt_getid(set->page->right) )
1673 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1674 set->page = bt_mappage (bt, set->latch);
1678 return bt->err = BTERR_struct, 0;
1680 // obtain access lock using lock chaining with Access mode
1682 bt_lockpage(BtLockAccess, set->latch);
1684 bt_unlockpage(BtLockRead, prevlatch);
1685 bt_unpinlatch (prevlatch);
1687 bt_lockpage(BtLockRead, set->latch);
1688 bt_unlockpage(BtLockAccess, set->latch);
1690 set->page_no = page_no;
1694 // find unique key or first duplicate key in
1695 // leaf level and return number of value bytes
1696 // or (-1) if not found. Setup key for bt_foundkey
1698 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1706 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1708 ptr = keyptr(set->page, slot);
1710 // skip librarian slot place holder
1712 if( slotptr(set->page, slot)->type == Librarian )
1713 ptr = keyptr(set->page, ++slot);
1715 // return actual key found
1717 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1720 if( slotptr(set->page, slot)->type == Duplicate )
1723 // not there if we reach the stopper key
1725 if( slot == set->page->cnt )
1726 if( !bt_getid (set->page->right) )
1729 // if key exists, return >= 0 value bytes copied
1730 // otherwise return (-1)
1732 if( slotptr(set->page, slot)->dead )
1736 if( !memcmp (ptr->key, key, len) ) {
1737 val = valptr (set->page,slot);
1738 if( valmax > val->len )
1740 memcpy (value, val->value, valmax);
1746 } while( slot = bt_findnext (bt, set, slot) );
1748 bt_unlockpage (BtLockRead, set->latch);
1749 bt_unpinlatch (set->latch);
1753 // check page for space available,
1754 // clean if necessary and return
1755 // 0 - page needs splitting
1756 // >0 new slot value
1758 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1760 uint nxt = bt->mgr->page_size;
1761 BtPage page = set->page;
1762 uint cnt = 0, idx = 0;
1763 uint max = page->cnt;
1768 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1771 // skip cleanup and proceed to split
1772 // if there's not enough garbage
1775 if( page->garbage < nxt / 5 )
1778 memcpy (bt->frame, page, bt->mgr->page_size);
1780 // skip page info and set rest of page to zero
1782 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1783 set->latch->dirty = 1;
1787 // clean up page first by
1788 // removing deleted keys
1790 while( cnt++ < max ) {
1793 if( cnt < max || page->lvl )
1794 if( slotptr(bt->frame,cnt)->dead )
1797 // copy the value across
1799 val = valptr(bt->frame, cnt);
1800 nxt -= val->len + sizeof(BtVal);
1801 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1803 // copy the key across
1805 key = keyptr(bt->frame, cnt);
1806 nxt -= key->len + sizeof(BtKey);
1807 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1809 // make a librarian slot
1811 slotptr(page, ++idx)->off = nxt;
1812 slotptr(page, idx)->type = Librarian;
1813 slotptr(page, idx)->dead = 1;
1817 slotptr(page, ++idx)->off = nxt;
1818 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1820 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1827 // see if page has enough space now, or does it need splitting?
1829 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1835 // split the root and raise the height of the btree
1837 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
1839 uint nxt = bt->mgr->page_size;
1840 unsigned char value[BtId];
1845 // Obtain an empty page to use, and copy the current
1846 // root contents into it, e.g. lower keys
1848 if( bt_newpage(bt, left, root->page) )
1851 bt_unpinlatch (left->latch);
1853 // set left from higher to lower page
1855 bt_putid (right->page->left, left->page_no);
1856 bt_unpinlatch (right->latch);
1858 // preserve the page info at the bottom
1859 // of higher keys and set rest to zero
1861 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1863 // insert stopper key at top of newroot page
1864 // and increase the root height
1866 nxt -= BtId + sizeof(BtVal);
1867 bt_putid (value, right->page_no);
1868 val = (BtVal *)((unsigned char *)root->page + nxt);
1869 memcpy (val->value, value, BtId);
1872 nxt -= 2 + sizeof(BtKey);
1873 slotptr(root->page, 2)->off = nxt;
1874 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1879 // insert lower keys page fence key on newroot page as first key
1881 nxt -= BtId + sizeof(BtVal);
1882 bt_putid (value, left->page_no);
1883 val = (BtVal *)((unsigned char *)root->page + nxt);
1884 memcpy (val->value, value, BtId);
1887 nxt -= leftkey->len + sizeof(BtKey);
1888 slotptr(root->page, 1)->off = nxt;
1889 memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
1891 bt_putid(root->page->right, 0);
1892 root->page->min = nxt; // reset lowest used offset and key count
1893 root->page->cnt = 2;
1894 root->page->act = 2;
1897 // release and unpin root pages
1899 bt_unlockpage(BtLockWrite, root->latch);
1900 bt_unpinlatch (root->latch);
1904 // split already locked full node
1907 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1909 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
1910 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1911 unsigned char value[BtId];
1912 uint lvl = set->page->lvl;
1919 // split higher half of keys to bt->frame
1921 memset (bt->frame, 0, bt->mgr->page_size);
1922 max = set->page->cnt;
1926 while( cnt++ < max ) {
1927 if( cnt < max || set->page->lvl )
1928 if( slotptr(set->page, cnt)->dead && cnt < max )
1931 src = valptr(set->page, cnt);
1932 nxt -= src->len + sizeof(BtVal);
1933 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1935 key = keyptr(set->page, cnt);
1936 nxt -= key->len + sizeof(BtKey);
1937 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1938 memcpy (ptr, key, key->len + sizeof(BtKey));
1940 // add librarian slot
1942 slotptr(bt->frame, ++idx)->off = nxt;
1943 slotptr(bt->frame, idx)->type = Librarian;
1944 slotptr(bt->frame, idx)->dead = 1;
1948 slotptr(bt->frame, ++idx)->off = nxt;
1949 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1951 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1955 // remember existing fence key for new page to the right
1957 memcpy (rightkey, key, key->len + sizeof(BtKey));
1959 bt->frame->bits = bt->mgr->page_bits;
1960 bt->frame->min = nxt;
1961 bt->frame->cnt = idx;
1962 bt->frame->lvl = lvl;
1966 if( set->page_no > ROOT_page ) {
1967 bt_putid (bt->frame->right, bt_getid (set->page->right));
1968 bt_putid(bt->frame->left, set->page_no);
1971 // get new free page and write higher keys to it.
1973 if( bt_newpage(bt, right, bt->frame) )
1978 if( set->page_no > ROOT_page && !lvl )
1979 if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
1982 // update lower keys to continue in old page
1984 memcpy (bt->frame, set->page, bt->mgr->page_size);
1985 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1986 set->latch->dirty = 1;
1988 nxt = bt->mgr->page_size;
1989 set->page->garbage = 0;
1995 if( slotptr(bt->frame, max)->type == Librarian )
1998 // assemble page of smaller keys
2000 while( cnt++ < max ) {
2001 if( slotptr(bt->frame, cnt)->dead )
2003 val = valptr(bt->frame, cnt);
2004 nxt -= val->len + sizeof(BtVal);
2005 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2007 key = keyptr(bt->frame, cnt);
2008 nxt -= key->len + sizeof(BtKey);
2009 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2011 // add librarian slot
2013 slotptr(set->page, ++idx)->off = nxt;
2014 slotptr(set->page, idx)->type = Librarian;
2015 slotptr(set->page, idx)->dead = 1;
2019 slotptr(set->page, ++idx)->off = nxt;
2020 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2024 // remember fence key for smaller page
2026 memcpy(fencekey, key, key->len + sizeof(BtKey));
2028 bt_putid(set->page->right, right->page_no);
2029 set->page->min = nxt;
2030 set->page->cnt = idx;
2032 // if current page is the root page, split it
2034 if( set->page_no == ROOT_page )
2035 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2037 // insert new fences in their parent pages
2039 bt_lockpage (BtLockParent, right->latch);
2041 bt_lockpage (BtLockParent, set->latch);
2042 bt_unlockpage (BtLockWrite, set->latch);
2044 // insert new fence for reformulated left block of smaller keys
2046 ptr = (BtKey*)fencekey;
2047 bt_putid (value, set->page_no);
2049 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2052 // switch fence for right block of larger keys to new right page
2054 ptr = (BtKey*)rightkey;
2055 bt_putid (value, right->page_no);
2057 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2060 bt_unlockpage (BtLockParent, set->latch);
2061 bt_unpinlatch (set->latch);
2063 bt_unlockpage (BtLockParent, right->latch);
2064 bt_unpinlatch (right->latch);
2068 // install new key and value onto page
2069 // page must already be checked for
2072 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2074 uint idx, librarian;
2079 // if found slot > desired slot and previous slot
2080 // is a librarian slot, use it
2083 if( slotptr(set->page, slot-1)->type == Librarian )
2086 // copy value onto page
2088 set->page->min -= vallen + sizeof(BtVal);
2089 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2090 memcpy (val->value, value, vallen);
2093 // copy key onto page
2095 set->page->min -= keylen + sizeof(BtKey);
2096 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2097 memcpy (ptr->key, key, keylen);
2100 // find first empty slot
2102 for( idx = slot; idx < set->page->cnt; idx++ )
2103 if( slotptr(set->page, idx)->dead )
2106 // now insert key into array before slot
2108 if( idx == set->page->cnt )
2109 idx += 2, set->page->cnt += 2, librarian = 2;
2113 set->latch->dirty = 1;
2116 while( idx > slot + librarian - 1 )
2117 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2119 // add librarian slot
2121 if( librarian > 1 ) {
2122 node = slotptr(set->page, slot++);
2123 node->off = set->page->min;
2124 node->type = Librarian;
2130 node = slotptr(set->page, slot);
2131 node->off = set->page->min;
2135 bt_unlockpage (BtLockWrite, set->latch);
2136 bt_unpinlatch (set->latch);
2140 // Insert new key into the btree at given level.
2141 // either add a new key or update/add an existing one
2143 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, int unique)
2145 unsigned char newkey[BT_keyarray];
2146 uint slot, idx, len;
2153 // set up the key we're working on
2155 ins = (BtKey*)newkey;
2156 memcpy (ins->key, key, keylen);
2159 // is this a non-unique index value?
2165 sequence = bt_newdup (bt);
2166 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2170 while( 1 ) { // find the page and slot for the current key
2171 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2172 ptr = keyptr(set->page, slot);
2175 bt->err = BTERR_ovflw;
2179 // if librarian slot == found slot, advance to real slot
2181 if( slotptr(set->page, slot)->type == Librarian )
2182 if( !keycmp (ptr, key, keylen) )
2183 ptr = keyptr(set->page, ++slot);
2187 if( slotptr(set->page, slot)->type == Duplicate )
2190 // if inserting a duplicate key or unique key
2191 // check for adequate space on the page
2192 // and insert the new key before slot.
2194 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2195 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2196 if( bt_splitpage (bt, set) )
2201 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2204 // if key already exists, update value and return
2206 val = valptr(set->page, slot);
2208 if( val->len >= vallen ) {
2209 if( slotptr(set->page, slot)->dead )
2211 set->page->garbage += val->len - vallen;
2212 set->latch->dirty = 1;
2213 slotptr(set->page, slot)->dead = 0;
2215 memcpy (val->value, value, vallen);
2216 bt_unlockpage(BtLockWrite, set->latch);
2217 bt_unpinlatch (set->latch);
2221 // new update value doesn't fit in existing value area
2223 if( !slotptr(set->page, slot)->dead )
2224 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2226 slotptr(set->page, slot)->dead = 0;
2230 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2231 if( bt_splitpage (bt, set) )
2236 set->page->min -= vallen + sizeof(BtVal);
2237 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2238 memcpy (val->value, value, vallen);
2241 set->latch->dirty = 1;
2242 set->page->min -= keylen + sizeof(BtKey);
2243 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2244 memcpy (ptr->key, key, keylen);
2247 slotptr(set->page, slot)->off = set->page->min;
2248 bt_unlockpage(BtLockWrite, set->latch);
2249 bt_unpinlatch (set->latch);
2255 // compute hash of string
2257 uint bt_hashkey (unsigned char *key, unsigned int len)
2261 while( len >= sizeof(uint) )
2262 hash *= 11, hash += *(uint *)key, len -= sizeof(uint), key += sizeof(uint);
2265 hash *= 11, hash += *key++ * len--;
2270 // add a new lock table entry
2272 uint bt_newlock (BtDb *bt, BtKey *key, uint hashidx)
2274 BtLockSet *lock = bt->mgr->locktable;
2277 // obtain lock manager global lock
2279 bt_spinwritelock (bt->mgr->locklatch);
2281 // return NULL if table is full
2283 if( !(slot = bt->mgr->lockfree) ) {
2284 bt_spinreleasewrite (bt->mgr->locklatch);
2288 // maintain free chain
2290 bt->mgr->lockfree = lock[slot].next;
2291 bt_spinreleasewrite (bt->mgr->locklatch);
2293 if( prev = bt->mgr->hashlock[hashidx].slot )
2294 lock[prev].prev = slot;
2296 bt->mgr->hashlock[hashidx].slot = slot;
2297 lock[slot].hashidx = hashidx;
2298 lock[slot].next = prev;
2299 lock[slot].prev = 0;
2301 // save the key being locked
2303 memcpy (lock[slot].key, key, key->len + sizeof(BtKey));
2307 // add key to the lock table
2308 // block until available.
2310 uint bt_setlock(BtDb *bt, BtKey *key)
2312 uint hashidx = bt_hashkey(key->key, key->len) % bt->mgr->lockhash;
2313 BtLockSet *lock = NULL;
2317 // find existing lock entry
2318 // or recover from full table
2320 while( lock == NULL ) {
2321 // obtain lock on hash slot
2323 bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2325 if( slot = bt->mgr->hashlock[hashidx].slot )
2327 lock = bt->mgr->locktable + slot;
2328 key2 = (BtKey *)lock->key;
2330 if( !keycmp (key, key2->key, key2->len) )
2332 } while( slot = lock->next );
2337 if( slot = bt_newlock (bt, key, hashidx) )
2340 bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2348 lock = bt->mgr->locktable + slot;
2351 bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2352 WriteLock (lock->readwr);
2356 void bt_lockclr (BtDb *bt, uint slot)
2358 BtLockSet *lock = bt->mgr->locktable + slot;
2359 uint hashidx = lock->hashidx;
2362 bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2363 WriteRelease (lock->readwr);
2365 // if entry is no longer in use,
2366 // return it to the free chain.
2368 if( !--lock->pin ) {
2369 if( next = lock->next )
2370 bt->mgr->locktable[next].prev = lock->prev;
2372 if( prev = lock->prev )
2373 bt->mgr->locktable[prev].next = lock->next;
2375 bt->mgr->hashlock[lock->hashidx].slot = next;
2377 bt_spinwritelock (bt->mgr->locklatch);
2378 lock->next = bt->mgr->lockfree;
2379 bt->mgr->lockfree = slot;
2380 bt_spinreleasewrite (bt->mgr->locklatch);
2383 bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2386 // atomic insert of a batch of keys.
2387 // return -1 if BTERR is set
2388 // otherwise return slot number
2389 // causing the key constraint violation
2390 // or zero on successful completion.
2392 int bt_atomicinsert (BtDb *bt, BtPage source)
2394 uint locks[MAX_atomic];
2402 // first sort the list of keys into order to
2403 // prevent deadlocks between threads.
2405 for( slot = 1; slot++ < source->cnt; ) {
2406 *temp = *slotptr(source,slot);
2407 key = keyptr (source,slot);
2408 for( idx = slot; --idx; ) {
2409 key2 = keyptr (source,idx);
2410 if( keycmp (key, key2->key, key2->len) < 0 ) {
2411 *slotptr(source,idx+1) = *slotptr(source,idx);
2412 *slotptr(source,idx) = *temp;
2418 // take each unique-type key and add it to the lock table
2420 for( slot = 0; slot++ < source->cnt; )
2421 if( slotptr(source, slot)->type == Unique )
2422 locks[slot] = bt_setlock (bt, keyptr(source,slot));
2424 // Lookup each unique key and determine constraint violations
2426 for( slot = 0; slot++ < source->cnt; )
2427 if( slotptr(source, slot)->type == Unique ) {
2428 key = keyptr(source, slot);
2429 if( bt_findkey (bt, key->key, key->len, NULL, 0) < 0 )
2435 // add each key to the btree
2438 for( slot = 0; slot++ < source->cnt; ) {
2439 key = keyptr(source,slot);
2440 val = valptr(source,slot);
2441 type = slotptr(source,slot)->type;
2442 if( bt_insertkey (bt, key->key, key->len, 0, val->value, val->len, type == Unique) )
2446 // remove each unique-type key from the lock table
2448 for( slot = 0; slot++ < source->cnt; )
2449 if( slotptr(source, slot)->type == Unique )
2450 bt_lockclr (bt, locks[slot]);
2455 // set cursor to highest slot on highest page
2457 uint bt_lastkey (BtDb *bt)
2459 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2463 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2464 set->page = bt_mappage (bt, set->latch);
2468 bt_lockpage(BtLockRead, set->latch);
2470 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2471 slot = set->page->cnt;
2473 bt_unlockpage(BtLockRead, set->latch);
2474 bt_unpinlatch (set->latch);
2478 // return previous slot on cursor page
2480 uint bt_prevkey (BtDb *bt, uint slot)
2488 if( left = bt_getid(bt->cursor->left) )
2489 bt->cursor_page = left;
2493 if( set->latch = bt_pinlatch (bt, left, 1) )
2494 set->page = bt_mappage (bt, set->latch);
2498 bt_lockpage(BtLockRead, set->latch);
2499 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2500 bt_unlockpage(BtLockRead, set->latch);
2501 bt_unpinlatch (set->latch);
2502 return bt->cursor->cnt;
2505 // return next slot on cursor page
2506 // or slide cursor right into next page
2508 uint bt_nextkey (BtDb *bt, uint slot)
2514 right = bt_getid(bt->cursor->right);
2516 while( slot++ < bt->cursor->cnt )
2517 if( slotptr(bt->cursor,slot)->dead )
2519 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2527 bt->cursor_page = right;
2529 if( set->latch = bt_pinlatch (bt, right, 1) )
2530 set->page = bt_mappage (bt, set->latch);
2534 bt_lockpage(BtLockRead, set->latch);
2536 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2538 bt_unlockpage(BtLockRead, set->latch);
2539 bt_unpinlatch (set->latch);
2547 // cache page of keys into cursor and return starting slot for given key
2549 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2554 // cache page for retrieval
2556 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2557 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2561 bt->cursor_page = set->page_no;
2563 bt_unlockpage(BtLockRead, set->latch);
2564 bt_unpinlatch (set->latch);
2568 BtKey *bt_key(BtDb *bt, uint slot)
2570 return keyptr(bt->cursor, slot);
2573 BtVal *bt_val(BtDb *bt, uint slot)
2575 return valptr(bt->cursor,slot);
2581 double getCpuTime(int type)
2584 FILETIME xittime[1];
2585 FILETIME systime[1];
2586 FILETIME usrtime[1];
2587 SYSTEMTIME timeconv[1];
2590 memset (timeconv, 0, sizeof(SYSTEMTIME));
2594 GetSystemTimeAsFileTime (xittime);
2595 FileTimeToSystemTime (xittime, timeconv);
2596 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2599 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2600 FileTimeToSystemTime (usrtime, timeconv);
2603 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2604 FileTimeToSystemTime (systime, timeconv);
2608 ans += (double)timeconv->wHour * 3600;
2609 ans += (double)timeconv->wMinute * 60;
2610 ans += (double)timeconv->wSecond;
2611 ans += (double)timeconv->wMilliseconds / 1000;
2616 #include <sys/resource.h>
2618 double getCpuTime(int type)
2620 struct rusage used[1];
2621 struct timeval tv[1];
2625 gettimeofday(tv, NULL);
2626 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2629 getrusage(RUSAGE_SELF, used);
2630 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2633 getrusage(RUSAGE_SELF, used);
2634 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2641 void bt_poolaudit (BtMgr *mgr)
2646 while( slot++ < mgr->latchdeployed ) {
2647 latch = mgr->latchsets + slot;
2649 if( *latch->readwr->rin & MASK )
2650 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2651 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2653 if( *latch->access->rin & MASK )
2654 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2655 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2657 if( *latch->parent->rin & MASK )
2658 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2659 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2661 if( latch->pin & ~CLOCK_bit ) {
2662 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2668 uint bt_latchaudit (BtDb *bt)
2670 ushort idx, hashidx;
2676 if( *(ushort *)(bt->mgr->alloclatch) )
2677 fprintf(stderr, "Alloc page locked\n");
2678 *(uint *)(bt->mgr->alloclatch) = 0;
2680 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2681 latch = bt->mgr->latchsets + idx;
2682 if( *latch->readwr->rin & MASK )
2683 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2684 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2686 if( *latch->access->rin & MASK )
2687 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2688 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2690 if( *latch->parent->rin & MASK )
2691 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2692 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2695 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2700 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2701 if( *(uint *)(bt->mgr->hashtable[hashidx].latch) )
2702 fprintf(stderr, "hash entry %d locked\n", hashidx);
2704 *(uint *)(bt->mgr->hashtable[hashidx].latch) = 0;
2706 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2707 latch = bt->mgr->latchsets + idx;
2709 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2710 } while( idx = latch->next );
2713 page_no = LEAF_page;
2715 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2716 uid off = page_no << bt->mgr->page_bits;
2718 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2722 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2724 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2725 return bt->err = BTERR_map;
2727 if( *amt < bt->mgr->page_size )
2728 return bt->err = BTERR_map;
2730 if( !bt->frame->free && !bt->frame->lvl )
2731 cnt += bt->frame->act;
2735 cnt--; // remove stopper key
2736 fprintf(stderr, " Total keys read %d\n", cnt);
2749 // standalone program to index file of keys
2750 // then list them onto std-out
2753 void *index_file (void *arg)
2755 uint __stdcall index_file (void *arg)
2758 int line = 0, found = 0, cnt = 0, unique;
2759 uid next, page_no = LEAF_page; // start on first page of leaves
2760 BtPage page = calloc (4096, 1);
2761 unsigned char key[BT_maxkey];
2762 ThreadArg *args = arg;
2763 int ch, len = 0, slot;
2771 bt = bt_open (args->mgr);
2773 unique = (args->type[1] | 0x20) != 'd';
2774 atomic = (args->type[1] | 0x20) == 'a';
2776 switch(args->type[0] | 0x20)
2779 fprintf(stderr, "started latch mgr audit\n");
2780 cnt = bt_latchaudit (bt);
2781 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2785 fprintf(stderr, "started pennysort for %s\n", args->infile);
2786 if( in = fopen (args->infile, "rb") )
2787 while( ch = getc(in), ch != EOF )
2793 memset (page, 0, 4096);
2794 slotptr(page, 1)->off = 2048;
2795 slotptr(page, 1)->type = Unique;
2796 ptr = keyptr(page,1);
2798 memcpy(ptr->key, key, 10);
2799 val = valptr(page,1);
2800 val->len = len - 10;
2801 memcpy (val->value, key + 10, len - 10);
2803 if( slot = bt_atomicinsert (bt, page) )
2804 fprintf(stderr, "Error %d Line: %d\n", slot, line), exit(0);
2806 else if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2807 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2810 else if( len < BT_maxkey )
2812 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2816 fprintf(stderr, "started indexing for %s\n", args->infile);
2817 if( in = fopen (args->infile, "rb") )
2818 while( ch = getc(in), ch != EOF )
2822 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2823 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2826 else if( len < BT_maxkey )
2828 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2832 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2833 if( in = fopen (args->infile, "rb") )
2834 while( ch = getc(in), ch != EOF )
2838 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2839 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2840 ptr = (BtKey*)(bt->key);
2843 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2844 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2847 else if( len < BT_maxkey )
2849 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2853 fprintf(stderr, "started finding keys for %s\n", args->infile);
2854 if( in = fopen (args->infile, "rb") )
2855 while( ch = getc(in), ch != EOF )
2859 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2862 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2865 else if( len < BT_maxkey )
2867 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2871 fprintf(stderr, "started scanning\n");
2873 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2874 set->page = bt_mappage (bt, set->latch);
2876 fprintf(stderr, "unable to obtain latch"), exit(1);
2877 bt_lockpage (BtLockRead, set->latch);
2878 next = bt_getid (set->page->right);
2880 for( slot = 0; slot++ < set->page->cnt; )
2881 if( next || slot < set->page->cnt )
2882 if( !slotptr(set->page, slot)->dead ) {
2883 ptr = keyptr(set->page, slot);
2886 if( slotptr(set->page, slot)->type == Duplicate )
2889 fwrite (ptr->key, len, 1, stdout);
2890 val = valptr(set->page, slot);
2891 fwrite (val->value, val->len, 1, stdout);
2892 fputc ('\n', stdout);
2896 bt_unlockpage (BtLockRead, set->latch);
2897 bt_unpinlatch (set->latch);
2898 } while( page_no = next );
2900 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2904 fprintf(stderr, "started reverse scan\n");
2905 if( slot = bt_lastkey (bt) )
2906 while( slot = bt_prevkey (bt, slot) ) {
2907 if( slotptr(bt->cursor, slot)->dead )
2910 ptr = keyptr(bt->cursor, slot);
2913 if( slotptr(bt->cursor, slot)->type == Duplicate )
2916 fwrite (ptr->key, len, 1, stdout);
2917 val = valptr(bt->cursor, slot);
2918 fwrite (val->value, val->len, 1, stdout);
2919 fputc ('\n', stdout);
2923 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2928 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2930 fprintf(stderr, "started counting\n");
2931 page_no = LEAF_page;
2933 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2934 if( bt_readpage (bt->mgr, bt->frame, page_no) )
2937 if( !bt->frame->free && !bt->frame->lvl )
2938 cnt += bt->frame->act;
2944 cnt--; // remove stopper key
2945 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2957 typedef struct timeval timer;
2959 int main (int argc, char **argv)
2961 int idx, cnt, len, slot, err;
2962 int segsize, bits = 16;
2979 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find/Atomic [page_bits buffer_pool_size lock_mgr_size src_file1 src_file2 ... ]\n", argv[0]);
2980 fprintf (stderr, " where page_bits is the page size in bits\n");
2981 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
2982 fprintf (stderr, " lock_mgr_size is the maximum number of outstanding key locks\n");
2983 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
2987 start = getCpuTime(0);
2990 bits = atoi(argv[3]);
2993 poolsize = atoi(argv[4]);
2996 fprintf (stderr, "Warning: no mapped_pool\n");
2999 locksize = atoi(argv[5]);
3003 threads = malloc (cnt * sizeof(pthread_t));
3005 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3007 args = malloc (cnt * sizeof(ThreadArg));
3009 mgr = bt_mgr ((argv[1]), bits, poolsize, locksize);
3012 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3018 for( idx = 0; idx < cnt; idx++ ) {
3019 args[idx].infile = argv[idx + 6];
3020 args[idx].type = argv[2];
3021 args[idx].mgr = mgr;
3022 args[idx].idx = idx;
3024 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3025 fprintf(stderr, "Error creating thread %d\n", err);
3027 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3031 // wait for termination
3034 for( idx = 0; idx < cnt; idx++ )
3035 pthread_join (threads[idx], NULL);
3037 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3039 for( idx = 0; idx < cnt; idx++ )
3040 CloseHandle(threads[idx]);
3046 elapsed = getCpuTime(0) - start;
3047 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3048 elapsed = getCpuTime(1);
3049 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3050 elapsed = getCpuTime(2);
3051 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);