1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // traditional buffer pool manager
7 // and ACID batched key updates
11 // author: karl malbrain, malbrain@cal.berkeley.edu
14 This work, including the source code, documentation
15 and related data, is placed into the public domain.
17 The orginal author is Karl Malbrain.
19 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
20 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
21 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
22 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
23 RESULTING FROM THE USE, MODIFICATION, OR
24 REDISTRIBUTION OF THIS SOFTWARE.
27 // Please see the project home page for documentation
28 // code.google.com/p/high-concurrency-btree
30 #define _FILE_OFFSET_BITS 64
31 #define _LARGEFILE64_SOURCE
47 #define WIN32_LEAN_AND_MEAN
61 typedef unsigned long long uid;
64 typedef unsigned long long off64_t;
65 typedef unsigned short ushort;
66 typedef unsigned int uint;
69 #define BT_ro 0x6f72 // ro
70 #define BT_rw 0x7772 // rw
72 #define BT_maxbits 24 // maximum page size in bits
73 #define BT_minbits 9 // minimum page size in bits
74 #define BT_minpage (1 << BT_minbits) // minimum page size
75 #define BT_maxpage (1 << BT_maxbits) // maximum page size
77 // BTree page number constants
78 #define ALLOC_page 0 // allocation page
79 #define ROOT_page 1 // root of the btree
80 #define LEAF_page 2 // first page of leaves
82 // Number of levels to create in a new BTree
87 There are six lock types for each node in four independent sets:
88 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
89 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
90 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
91 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
92 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
93 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
105 // definition for phase-fair reader/writer lock implementation
119 // definition for spin latch implementation
121 // exclusive is set for write access
122 // share is count of read accessors
123 // grant write lock when share == 0
125 volatile typedef struct {
136 // hash table entries
139 volatile uint slot; // Latch table entry at head of chain
140 BtSpinLatch latch[1];
143 // latch manager table structure
146 uid page_no; // latch set page number
147 RWLock readwr[1]; // read/write page lock
148 RWLock access[1]; // Access Intent/Page delete
149 RWLock parent[1]; // Posting of fence key in parent
150 RWLock atomic[1]; // Atomic update in progress
151 uint split; // right split page atomic insert
152 uint entry; // entry slot in latch table
153 uint next; // next entry in hash table chain
154 uint prev; // prev entry in hash table chain
155 volatile ushort pin; // number of outstanding threads
156 ushort dirty:1; // page in cache is dirty
158 pthread_t atomictid; // pid holding atomic lock
160 uint atomictid; // pid holding atomic lock
164 // Define the length of the page record numbers
168 // Page key slot definition.
170 // Keys are marked dead, but remain on the page until
171 // it cleanup is called. The fence key (highest key) for
172 // a leaf page is always present, even after cleanup.
176 // In addition to the Unique keys that occupy slots
177 // there are Librarian and Duplicate key
178 // slots occupying the key slot array.
180 // The Librarian slots are dead keys that
181 // serve as filler, available to add new Unique
182 // or Dup slots that are inserted into the B-tree.
184 // The Duplicate slots have had their key bytes extended
185 // by 6 bytes to contain a binary duplicate key uniqueifier.
195 uint off:BT_maxbits; // page offset for key start
196 uint type:3; // type of slot
197 uint dead:1; // set for deleted slot
200 // The key structure occupies space at the upper end of
201 // each page. It's a length byte followed by the key
205 unsigned char len; // this can be changed to a ushort or uint
206 unsigned char key[0];
209 // the value structure also occupies space at the upper
210 // end of the page. Each key is immediately followed by a value.
213 unsigned char len; // this can be changed to a ushort or uint
214 unsigned char value[0];
217 #define BT_maxkey 255 // maximum number of bytes in a key
218 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
220 // The first part of an index page.
221 // It is immediately followed
222 // by the BtSlot array of keys.
224 // note that this structure size
225 // must be a multiple of 8 bytes
226 // in order to place dups correctly.
228 typedef struct BtPage_ {
229 uint cnt; // count of keys in page
230 uint act; // count of active keys
231 uint min; // next key offset
232 uint garbage; // page garbage in bytes
233 unsigned char bits:7; // page size in bits
234 unsigned char free:1; // page is on free chain
235 unsigned char lvl:7; // level of page
236 unsigned char kill:1; // page is being deleted
237 unsigned char right[BtId]; // page number to right
240 // The loadpage interface object
243 BtPage page; // current page pointer
244 BtLatchSet *latch; // current page latch set
247 // structure for latch manager on ALLOC_page
250 struct BtPage_ alloc[1]; // next page_no in right ptr
251 unsigned long long dups[1]; // global duplicate key uniqueifier
252 unsigned char chain[BtId]; // head of free page_nos chain
255 // The object structure for Btree access
258 uint page_size; // page size
259 uint page_bits; // page size in bits
265 BtPageZero *pagezero; // mapped allocation page
266 BtSpinLatch lock[1]; // allocation area lite latch
267 uint latchdeployed; // highest number of latch entries deployed
268 uint nlatchpage; // number of latch pages at BT_latch
269 uint latchtotal; // number of page latch entries
270 uint latchhash; // number of latch hash table slots
271 uint latchvictim; // next latch entry to examine
272 BtHashEntry *hashtable; // the buffer pool hash table entries
273 BtLatchSet *latchsets; // mapped latch set from buffer pool
274 unsigned char *pagepool; // mapped to the buffer pool pages
276 HANDLE halloc; // allocation handle
277 HANDLE hpool; // buffer pool handle
282 BtMgr *mgr; // buffer manager for thread
283 BtPage cursor; // cached frame for start/next (never mapped)
284 BtPage frame; // spare frame for the page split (never mapped)
285 uid cursor_page; // current cursor page number
286 unsigned char *mem; // frame, cursor, page memory buffer
287 unsigned char key[BT_keyarray]; // last found complete key
288 int found; // last delete or insert was found
289 int err; // last error
290 int reads, writes; // number of reads and writes from the btree
304 #define CLOCK_bit 0x8000
307 extern void bt_close (BtDb *bt);
308 extern BtDb *bt_open (BtMgr *mgr);
309 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
310 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
311 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
312 extern BtKey *bt_foundkey (BtDb *bt);
313 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
314 extern uint bt_nextkey (BtDb *bt, uint slot);
317 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
318 void bt_mgrclose (BtMgr *mgr);
320 // Helper functions to return slot values
321 // from the cursor page.
323 extern BtKey *bt_key (BtDb *bt, uint slot);
324 extern BtVal *bt_val (BtDb *bt, uint slot);
326 // The page is allocated from low and hi ends.
327 // The key slots are allocated from the bottom,
328 // while the text and value of the key
329 // are allocated from the top. When the two
330 // areas meet, the page is split into two.
332 // A key consists of a length byte, two bytes of
333 // index number (0 - 65535), and up to 253 bytes
336 // Associated with each key is a value byte string
337 // containing any value desired.
339 // The b-tree root is always located at page 1.
340 // The first leaf page of level zero is always
341 // located on page 2.
343 // The b-tree pages are linked with next
344 // pointers to facilitate enumerators,
345 // and provide for concurrency.
347 // When to root page fills, it is split in two and
348 // the tree height is raised by a new root at page
349 // one with two keys.
351 // Deleted keys are marked with a dead bit until
352 // page cleanup. The fence key for a leaf node is
355 // To achieve maximum concurrency one page is locked at a time
356 // as the tree is traversed to find leaf key in question. The right
357 // page numbers are used in cases where the page is being split,
360 // Page 0 is dedicated to lock for new page extensions,
361 // and chains empty pages together for reuse. It also
362 // contains the latch manager hash table.
364 // The ParentModification lock on a node is obtained to serialize posting
365 // or changing the fence key for a node.
367 // Empty pages are chained together through the ALLOC page and reused.
369 // Access macros to address slot and key values from the page
370 // Page slots use 1 based indexing.
372 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
373 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
374 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
376 void bt_putid(unsigned char *dest, uid id)
381 dest[i] = (unsigned char)id, id >>= 8;
384 uid bt_getid(unsigned char *src)
389 for( i = 0; i < BtId; i++ )
390 id <<= 8, id |= *src++;
395 uid bt_newdup (BtDb *bt)
398 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
400 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
404 // Phase-Fair reader/writer lock implementation
406 void WriteLock (RWLock *lock)
411 tix = __sync_fetch_and_add (lock->ticket, 1);
413 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
415 // wait for our ticket to come up
417 while( tix != lock->serving[0] )
424 w = PRES | (tix & PHID);
426 r = __sync_fetch_and_add (lock->rin, w);
428 r = _InterlockedExchangeAdd16 (lock->rin, w);
430 while( r != *lock->rout )
438 void WriteRelease (RWLock *lock)
441 __sync_fetch_and_and (lock->rin, ~MASK);
443 _InterlockedAnd16 (lock->rin, ~MASK);
448 void ReadLock (RWLock *lock)
452 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
454 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
457 while( w == (*lock->rin & MASK) )
465 void ReadRelease (RWLock *lock)
468 __sync_fetch_and_add (lock->rout, RINC);
470 _InterlockedExchangeAdd16 (lock->rout, RINC);
474 // Spin Latch Manager
476 // wait until write lock mode is clear
477 // and add 1 to the share count
479 void bt_spinreadlock(BtSpinLatch *latch)
485 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
487 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
489 // see if exclusive request is granted or pending
494 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
496 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
499 } while( sched_yield(), 1 );
501 } while( SwitchToThread(), 1 );
505 // wait for other read and write latches to relinquish
507 void bt_spinwritelock(BtSpinLatch *latch)
513 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
515 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
518 if( !(prev & ~BOTH) )
522 __sync_fetch_and_and ((ushort *)latch, ~XCL);
524 _InterlockedAnd16((ushort *)latch, ~XCL);
527 } while( sched_yield(), 1 );
529 } while( SwitchToThread(), 1 );
533 // try to obtain write lock
535 // return 1 if obtained,
538 int bt_spinwritetry(BtSpinLatch *latch)
543 prev = __sync_fetch_and_or((ushort *)latch, XCL);
545 prev = _InterlockedOr16((ushort *)latch, XCL);
547 // take write access if all bits are clear
550 if( !(prev & ~BOTH) )
554 __sync_fetch_and_and ((ushort *)latch, ~XCL);
556 _InterlockedAnd16((ushort *)latch, ~XCL);
563 void bt_spinreleasewrite(BtSpinLatch *latch)
566 __sync_fetch_and_and((ushort *)latch, ~BOTH);
568 _InterlockedAnd16((ushort *)latch, ~BOTH);
572 // decrement reader count
574 void bt_spinreleaseread(BtSpinLatch *latch)
577 __sync_fetch_and_add((ushort *)latch, -SHARE);
579 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
583 // read page from permanent location in Btree file
585 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
587 off64_t off = page_no << mgr->page_bits;
590 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
591 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
598 memset (ovl, 0, sizeof(OVERLAPPED));
600 ovl->OffsetHigh = off >> 32;
602 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
603 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
606 if( *amt < mgr->page_size ) {
607 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
614 // write page to permanent location in Btree file
615 // clear the dirty bit
617 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
619 off64_t off = page_no << mgr->page_bits;
622 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
628 memset (ovl, 0, sizeof(OVERLAPPED));
630 ovl->OffsetHigh = off >> 32;
632 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
635 if( *amt < mgr->page_size )
641 // link latch table entry into head of latch hash table
643 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
645 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
646 BtLatchSet *latch = bt->mgr->latchsets + slot;
648 if( latch->next = bt->mgr->hashtable[hashidx].slot )
649 bt->mgr->latchsets[latch->next].prev = slot;
651 bt->mgr->hashtable[hashidx].slot = slot;
652 memset (&latch->atomictid, 0, sizeof(latch->atomictid));
653 latch->page_no = page_no;
659 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
667 // set CLOCK bit in latch
668 // decrement pin count
670 void bt_unpinlatch (BtLatchSet *latch)
673 if( ~latch->pin & CLOCK_bit )
674 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
675 __sync_fetch_and_add(&latch->pin, -1);
677 if( ~latch->pin & CLOCK_bit )
678 _InterlockedOr16 (&latch->pin, CLOCK_bit);
679 _InterlockedDecrement16 (&latch->pin);
683 // return the btree cached page address
685 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
687 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
692 // find existing latchset or inspire new one
693 // return with latchset pinned
695 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
697 uint hashidx = page_no % bt->mgr->latchhash;
705 // try to find our entry
707 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
709 if( slot = bt->mgr->hashtable[hashidx].slot ) do
711 latch = bt->mgr->latchsets + slot;
712 if( page_no == latch->page_no )
714 } while( slot = latch->next );
720 latch = bt->mgr->latchsets + slot;
722 __sync_fetch_and_add(&latch->pin, 1);
724 _InterlockedIncrement16 (&latch->pin);
726 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
730 // see if there are any unused pool entries
732 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
734 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
737 if( slot < bt->mgr->latchtotal ) {
738 latch = bt->mgr->latchsets + slot;
739 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
741 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
746 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
748 _InterlockedDecrement (&bt->mgr->latchdeployed);
750 // find and reuse previous entry on victim
754 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
756 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
758 // try to get write lock on hash chain
759 // skip entry if not obtained
760 // or has outstanding pins
762 slot %= bt->mgr->latchtotal;
767 latch = bt->mgr->latchsets + slot;
768 idx = latch->page_no % bt->mgr->latchhash;
770 // see we are on same chain as hashidx
775 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
778 // skip this slot if it is pinned
779 // or the CLOCK bit is set
782 if( latch->pin & CLOCK_bit ) {
784 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
786 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
789 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
793 // update permanent page area in btree from buffer pool
795 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
798 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
801 latch->dirty = 0, bt->writes++;
803 // unlink our available slot from its hash chain
806 bt->mgr->latchsets[latch->prev].next = latch->next;
808 bt->mgr->hashtable[idx].slot = latch->next;
811 bt->mgr->latchsets[latch->next].prev = latch->prev;
813 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
815 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
818 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
823 void bt_mgrclose (BtMgr *mgr)
830 // flush dirty pool pages to the btree
832 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
833 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
834 latch = mgr->latchsets + slot;
837 bt_writepage(mgr, page, latch->page_no);
838 latch->dirty = 0, num++;
840 // madvise (page, mgr->page_size, MADV_DONTNEED);
843 fprintf(stderr, "%d buffer pool pages flushed\n", num);
846 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
847 munmap (mgr->pagezero, mgr->page_size);
849 FlushViewOfFile(mgr->pagezero, 0);
850 UnmapViewOfFile(mgr->pagezero);
851 UnmapViewOfFile(mgr->hashtable);
852 CloseHandle(mgr->halloc);
853 CloseHandle(mgr->hpool);
859 FlushFileBuffers(mgr->idx);
860 CloseHandle(mgr->idx);
865 // close and release memory
867 void bt_close (BtDb *bt)
874 VirtualFree (bt->mem, 0, MEM_RELEASE);
879 // open/create new btree buffer manager
881 // call with file_name, BT_openmode, bits in page size (e.g. 16),
882 // size of page pool (e.g. 262144)
884 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
886 uint lvl, attr, last, slot, idx;
887 uint nlatchpage, latchhash;
888 unsigned char value[BtId];
889 int flag, initit = 0;
890 BtPageZero *pagezero;
897 // determine sanity of page size and buffer pool
899 if( bits > BT_maxbits )
901 else if( bits < BT_minbits )
905 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
910 mgr = calloc (1, sizeof(BtMgr));
912 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
914 if( mgr->idx == -1 ) {
915 fprintf (stderr, "Unable to open btree file\n");
916 return free(mgr), NULL;
919 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
920 attr = FILE_ATTRIBUTE_NORMAL;
921 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
923 if( mgr->idx == INVALID_HANDLE_VALUE )
924 return GlobalFree(mgr), NULL;
928 pagezero = valloc (BT_maxpage);
931 // read minimum page size to get root info
932 // to support raw disk partition files
933 // check if bits == 0 on the disk.
935 if( size = lseek (mgr->idx, 0L, 2) )
936 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
937 if( pagezero->alloc->bits )
938 bits = pagezero->alloc->bits;
942 return free(mgr), free(pagezero), NULL;
946 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
947 size = GetFileSize(mgr->idx, amt);
950 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
951 return bt_mgrclose (mgr), NULL;
952 bits = pagezero->alloc->bits;
957 mgr->page_size = 1 << bits;
958 mgr->page_bits = bits;
960 // calculate number of latch hash table entries
962 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
963 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
965 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
966 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
967 mgr->latchtotal = nodemax;
972 // initialize an empty b-tree with latch page, root page, page of leaves
973 // and page(s) of latches and page pool cache
975 memset (pagezero, 0, 1 << bits);
976 pagezero->alloc->bits = mgr->page_bits;
977 bt_putid(pagezero->alloc->right, MIN_lvl+1);
979 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
980 fprintf (stderr, "Unable to create btree page zero\n");
981 return bt_mgrclose (mgr), NULL;
984 memset (pagezero, 0, 1 << bits);
985 pagezero->alloc->bits = mgr->page_bits;
987 for( lvl=MIN_lvl; lvl--; ) {
988 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
989 key = keyptr(pagezero->alloc, 1);
990 key->len = 2; // create stopper key
994 bt_putid(value, MIN_lvl - lvl + 1);
995 val = valptr(pagezero->alloc, 1);
996 val->len = lvl ? BtId : 0;
997 memcpy (val->value, value, val->len);
999 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1000 pagezero->alloc->lvl = lvl;
1001 pagezero->alloc->cnt = 1;
1002 pagezero->alloc->act = 1;
1004 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1005 fprintf (stderr, "Unable to create btree page zero\n");
1006 return bt_mgrclose (mgr), NULL;
1014 VirtualFree (pagezero, 0, MEM_RELEASE);
1017 // mlock the pagezero page
1019 flag = PROT_READ | PROT_WRITE;
1020 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1021 if( mgr->pagezero == MAP_FAILED ) {
1022 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1023 return bt_mgrclose (mgr), NULL;
1025 mlock (mgr->pagezero, mgr->page_size);
1027 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1028 if( mgr->hashtable == MAP_FAILED ) {
1029 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1030 return bt_mgrclose (mgr), NULL;
1033 flag = PAGE_READWRITE;
1034 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1035 if( !mgr->halloc ) {
1036 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1037 return bt_mgrclose (mgr), NULL;
1040 flag = FILE_MAP_WRITE;
1041 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1042 if( !mgr->pagezero ) {
1043 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1044 return bt_mgrclose (mgr), NULL;
1047 flag = PAGE_READWRITE;
1048 size = (uid)mgr->nlatchpage << mgr->page_bits;
1049 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1051 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1052 return bt_mgrclose (mgr), NULL;
1055 flag = FILE_MAP_WRITE;
1056 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1057 if( !mgr->hashtable ) {
1058 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1059 return bt_mgrclose (mgr), NULL;
1063 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1064 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1069 // open BTree access method
1070 // based on buffer manager
1072 BtDb *bt_open (BtMgr *mgr)
1074 BtDb *bt = malloc (sizeof(*bt));
1076 memset (bt, 0, sizeof(*bt));
1079 bt->mem = valloc (2 *mgr->page_size);
1081 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1083 bt->frame = (BtPage)bt->mem;
1084 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1088 // compare two keys, return > 0, = 0, or < 0
1089 // =0: keys are same
1092 // as the comparison value
1094 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1096 uint len1 = key1->len;
1099 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1110 // place write, read, or parent lock on requested page_no.
1112 void bt_lockpage(BtLock mode, BtLatchSet *set)
1116 ReadLock (set->readwr);
1119 WriteLock (set->readwr);
1122 ReadLock (set->access);
1125 WriteLock (set->access);
1128 WriteLock (set->parent);
1131 WriteLock (set->atomic);
1133 case BtLockAtomic | BtLockRead:
1134 WriteLock (set->atomic);
1135 ReadLock (set->readwr);
1140 // remove write, read, or parent lock on requested page
1142 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1146 ReadRelease (set->readwr);
1149 WriteRelease (set->readwr);
1152 ReadRelease (set->access);
1155 WriteRelease (set->access);
1158 WriteRelease (set->parent);
1161 memset (&set->atomictid, 0, sizeof(set->atomictid));
1162 WriteRelease (set->atomic);
1164 case BtLockAtomic | BtLockRead:
1165 ReadRelease (set->readwr);
1166 memset (&set->atomictid, 0, sizeof(set->atomictid));
1167 WriteRelease (set->atomic);
1172 // allocate a new page
1173 // return with page latched, but unlocked.
1175 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1180 // lock allocation page
1182 bt_spinwritelock(bt->mgr->lock);
1184 // use empty chain first
1185 // else allocate empty page
1187 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1188 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1189 set->page = bt_mappage (bt, set->latch);
1191 return bt->err = BTERR_struct, -1;
1193 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1194 bt_spinreleasewrite(bt->mgr->lock);
1196 memcpy (set->page, contents, bt->mgr->page_size);
1197 set->latch->dirty = 1;
1201 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1202 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1204 // unlock allocation latch
1206 bt_spinreleasewrite(bt->mgr->lock);
1208 // don't load cache from btree page
1210 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1211 set->page = bt_mappage (bt, set->latch);
1213 return bt->err = BTERR_struct;
1215 memcpy (set->page, contents, bt->mgr->page_size);
1216 set->latch->dirty = 1;
1220 // find slot in page for given key at a given level
1222 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1224 uint diff, higher = set->page->cnt, low = 1, slot;
1227 // make stopper key an infinite fence value
1229 if( bt_getid (set->page->right) )
1234 // low is the lowest candidate.
1235 // loop ends when they meet
1237 // higher is already
1238 // tested as .ge. the passed key.
1240 while( diff = higher - low ) {
1241 slot = low + ( diff >> 1 );
1242 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1245 higher = slot, good++;
1248 // return zero if key is on right link page
1250 return good ? higher : 0;
1253 // find and load page at given level for given key
1254 // leave page rd or wr locked as requested
1256 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1258 uid page_no = ROOT_page, prevpage = 0;
1259 uint drill = 0xff, slot;
1260 BtLatchSet *prevlatch;
1261 uint mode, prevmode;
1263 // start at root of btree and drill down
1266 // determine lock mode of drill level
1267 mode = (drill == lvl) ? lock : BtLockRead;
1269 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1272 // obtain access lock using lock chaining with Access mode
1274 if( page_no > ROOT_page )
1275 bt_lockpage(BtLockAccess, set->latch);
1277 set->page = bt_mappage (bt, set->latch);
1279 // release & unpin parent page
1282 bt_unlockpage(prevmode, prevlatch);
1283 bt_unpinlatch (prevlatch);
1287 // skip Atomic lock on leaf page if already held
1290 if( mode & BtLockAtomic )
1291 if( pthread_equal (set->latch->atomictid, pthread_self()) )
1292 mode &= ~BtLockAtomic;
1294 // obtain mode lock using lock chaining through AccessLock
1296 bt_lockpage(mode, set->latch);
1298 if( mode & BtLockAtomic )
1299 set->latch->atomictid = pthread_self();
1301 if( set->page->free )
1302 return bt->err = BTERR_struct, 0;
1304 if( page_no > ROOT_page )
1305 bt_unlockpage(BtLockAccess, set->latch);
1307 // re-read and re-lock root after determining actual level of root
1309 if( set->page->lvl != drill) {
1310 if( set->latch->page_no != ROOT_page )
1311 return bt->err = BTERR_struct, 0;
1313 drill = set->page->lvl;
1315 if( lock != BtLockRead && drill == lvl ) {
1316 bt_unlockpage(mode, set->latch);
1317 bt_unpinlatch (set->latch);
1322 prevpage = set->latch->page_no;
1323 prevlatch = set->latch;
1326 // find key on page at this level
1327 // and descend to requested level
1329 if( set->page->kill )
1332 if( slot = bt_findslot (set, key, len) ) {
1336 while( slotptr(set->page, slot)->dead )
1337 if( slot++ < set->page->cnt )
1342 page_no = bt_getid(valptr(set->page, slot)->value);
1347 // or slide right into next page
1350 page_no = bt_getid(set->page->right);
1354 // return error on end of right chain
1356 bt->err = BTERR_struct;
1357 return 0; // return error
1360 // return page to free list
1361 // page must be delete & write locked
1363 void bt_freepage (BtDb *bt, BtPageSet *set)
1365 // lock allocation page
1367 bt_spinwritelock (bt->mgr->lock);
1371 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1372 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1373 set->latch->dirty = 1;
1374 set->page->free = 1;
1376 // unlock released page
1378 bt_unlockpage (BtLockDelete, set->latch);
1379 bt_unlockpage (BtLockWrite, set->latch);
1380 bt_unpinlatch (set->latch);
1382 // unlock allocation page
1384 bt_spinreleasewrite (bt->mgr->lock);
1387 // a fence key was deleted from a page
1388 // push new fence value upwards
1390 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1392 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1393 unsigned char value[BtId];
1397 // remove the old fence value
1399 ptr = keyptr(set->page, set->page->cnt);
1400 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1401 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1402 set->latch->dirty = 1;
1404 // cache new fence value
1406 ptr = keyptr(set->page, set->page->cnt);
1407 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1409 bt_lockpage (BtLockParent, set->latch);
1410 bt_unlockpage (BtLockWrite, set->latch);
1412 // insert new (now smaller) fence key
1414 bt_putid (value, set->latch->page_no);
1415 ptr = (BtKey*)leftkey;
1417 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1420 // now delete old fence key
1422 ptr = (BtKey*)rightkey;
1424 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1427 bt_unlockpage (BtLockParent, set->latch);
1428 bt_unpinlatch(set->latch);
1432 // root has a single child
1433 // collapse a level from the tree
1435 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1441 // find the child entry and promote as new root contents
1444 for( idx = 0; idx++ < root->page->cnt; )
1445 if( !slotptr(root->page, idx)->dead )
1448 page_no = bt_getid (valptr(root->page, idx)->value);
1450 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1451 child->page = bt_mappage (bt, child->latch);
1455 bt_lockpage (BtLockDelete, child->latch);
1456 bt_lockpage (BtLockWrite, child->latch);
1458 memcpy (root->page, child->page, bt->mgr->page_size);
1459 root->latch->dirty = 1;
1461 bt_freepage (bt, child);
1463 } while( root->page->lvl > 1 && root->page->act == 1 );
1465 bt_unlockpage (BtLockWrite, root->latch);
1466 bt_unpinlatch (root->latch);
1470 // delete a page and manage keys
1471 // call with page writelocked
1472 // returns with page unpinned
1474 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1476 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1477 unsigned char value[BtId];
1478 uint lvl = set->page->lvl;
1483 // cache copy of fence key
1484 // to post in parent
1486 ptr = keyptr(set->page, set->page->cnt);
1487 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1489 // obtain lock on right page
1491 page_no = bt_getid(set->page->right);
1493 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1494 right->page = bt_mappage (bt, right->latch);
1498 bt_lockpage (BtLockWrite, right->latch);
1500 // cache copy of key to update
1502 ptr = keyptr(right->page, right->page->cnt);
1503 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1505 if( right->page->kill )
1506 return bt->err = BTERR_struct;
1508 // pull contents of right peer into our empty page
1510 memcpy (set->page, right->page, bt->mgr->page_size);
1511 set->latch->dirty = 1;
1513 // mark right page deleted and point it to left page
1514 // until we can post parent updates that remove access
1515 // to the deleted page.
1517 bt_putid (right->page->right, set->latch->page_no);
1518 right->latch->dirty = 1;
1519 right->page->kill = 1;
1521 bt_lockpage (BtLockParent, right->latch);
1522 bt_unlockpage (BtLockWrite, right->latch);
1524 bt_lockpage (BtLockParent, set->latch);
1525 bt_unlockpage (BtLockWrite, set->latch);
1527 // redirect higher key directly to our new node contents
1529 bt_putid (value, set->latch->page_no);
1530 ptr = (BtKey*)higherfence;
1532 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1535 // delete old lower key to our node
1537 ptr = (BtKey*)lowerfence;
1539 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1542 // obtain delete and write locks to right node
1544 bt_unlockpage (BtLockParent, right->latch);
1545 bt_lockpage (BtLockDelete, right->latch);
1546 bt_lockpage (BtLockWrite, right->latch);
1547 bt_freepage (bt, right);
1549 bt_unlockpage (BtLockParent, set->latch);
1550 bt_unpinlatch (set->latch);
1555 // find and delete key on page by marking delete flag bit
1556 // if page becomes empty, delete it from the btree
1558 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1560 uint slot, idx, found, fence;
1565 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1566 ptr = keyptr(set->page, slot);
1570 // if librarian slot, advance to real slot
1572 if( slotptr(set->page, slot)->type == Librarian )
1573 ptr = keyptr(set->page, ++slot);
1575 fence = slot == set->page->cnt;
1577 // if key is found delete it, otherwise ignore request
1579 if( found = !keycmp (ptr, key, len) )
1580 if( found = slotptr(set->page, slot)->dead == 0 ) {
1581 val = valptr(set->page,slot);
1582 slotptr(set->page, slot)->dead = 1;
1583 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1586 // collapse empty slots beneath the fence
1588 while( idx = set->page->cnt - 1 )
1589 if( slotptr(set->page, idx)->dead ) {
1590 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1591 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1596 // did we delete a fence key in an upper level?
1598 if( found && lvl && set->page->act && fence )
1599 if( bt_fixfence (bt, set, lvl) )
1602 return bt->found = found, 0;
1604 // do we need to collapse root?
1606 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1607 if( bt_collapseroot (bt, set) )
1610 return bt->found = found, 0;
1612 // delete empty page
1614 if( !set->page->act )
1615 return bt_deletepage (bt, set);
1617 set->latch->dirty = 1;
1618 bt_unlockpage(BtLockWrite, set->latch);
1619 bt_unpinlatch (set->latch);
1620 return bt->found = found, 0;
1623 BtKey *bt_foundkey (BtDb *bt)
1625 return (BtKey*)(bt->key);
1628 // advance to next slot
1630 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1632 BtLatchSet *prevlatch;
1635 if( slot < set->page->cnt )
1638 prevlatch = set->latch;
1640 if( page_no = bt_getid(set->page->right) )
1641 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1642 set->page = bt_mappage (bt, set->latch);
1646 return bt->err = BTERR_struct, 0;
1648 // obtain access lock using lock chaining with Access mode
1650 bt_lockpage(BtLockAccess, set->latch);
1652 bt_unlockpage(BtLockRead, prevlatch);
1653 bt_unpinlatch (prevlatch);
1655 bt_lockpage(BtLockRead, set->latch);
1656 bt_unlockpage(BtLockAccess, set->latch);
1660 // find unique key or first duplicate key in
1661 // leaf level and return number of value bytes
1662 // or (-1) if not found. Setup key for bt_foundkey
1664 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1672 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1674 ptr = keyptr(set->page, slot);
1676 // skip librarian slot place holder
1678 if( slotptr(set->page, slot)->type == Librarian )
1679 ptr = keyptr(set->page, ++slot);
1681 // return actual key found
1683 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1686 if( slotptr(set->page, slot)->type == Duplicate )
1689 // not there if we reach the stopper key
1691 if( slot == set->page->cnt )
1692 if( !bt_getid (set->page->right) )
1695 // if key exists, return >= 0 value bytes copied
1696 // otherwise return (-1)
1698 if( slotptr(set->page, slot)->dead )
1702 if( !memcmp (ptr->key, key, len) ) {
1703 val = valptr (set->page,slot);
1704 if( valmax > val->len )
1706 memcpy (value, val->value, valmax);
1712 } while( slot = bt_findnext (bt, set, slot) );
1714 bt_unlockpage (BtLockRead, set->latch);
1715 bt_unpinlatch (set->latch);
1719 // check page for space available,
1720 // clean if necessary and return
1721 // 0 - page needs splitting
1722 // >0 new slot value
1724 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1726 uint nxt = bt->mgr->page_size;
1727 BtPage page = set->page;
1728 uint cnt = 0, idx = 0;
1729 uint max = page->cnt;
1734 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1737 // skip cleanup and proceed to split
1738 // if there's not enough garbage
1741 if( page->garbage < nxt / 5 )
1744 memcpy (bt->frame, page, bt->mgr->page_size);
1746 // skip page info and set rest of page to zero
1748 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1749 set->latch->dirty = 1;
1753 // clean up page first by
1754 // removing deleted keys
1756 while( cnt++ < max ) {
1759 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1762 // copy the value across
1764 val = valptr(bt->frame, cnt);
1765 nxt -= val->len + sizeof(BtVal);
1766 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1768 // copy the key across
1770 key = keyptr(bt->frame, cnt);
1771 nxt -= key->len + sizeof(BtKey);
1772 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1774 // make a librarian slot
1777 slotptr(page, ++idx)->off = nxt;
1778 slotptr(page, idx)->type = Librarian;
1779 slotptr(page, idx)->dead = 1;
1784 slotptr(page, ++idx)->off = nxt;
1785 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1787 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1794 // see if page has enough space now, or does it need splitting?
1796 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1802 // split the root and raise the height of the btree
1804 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1806 unsigned char leftkey[BT_keyarray];
1807 uint nxt = bt->mgr->page_size;
1808 unsigned char value[BtId];
1814 // save left page fence key for new root
1816 ptr = keyptr(root->page, root->page->cnt);
1817 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1819 // Obtain an empty page to use, and copy the current
1820 // root contents into it, e.g. lower keys
1822 if( bt_newpage(bt, left, root->page) )
1825 left_page_no = left->latch->page_no;
1826 bt_unpinlatch (left->latch);
1828 // preserve the page info at the bottom
1829 // of higher keys and set rest to zero
1831 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1833 // insert stopper key at top of newroot page
1834 // and increase the root height
1836 nxt -= BtId + sizeof(BtVal);
1837 bt_putid (value, right->page_no);
1838 val = (BtVal *)((unsigned char *)root->page + nxt);
1839 memcpy (val->value, value, BtId);
1842 nxt -= 2 + sizeof(BtKey);
1843 slotptr(root->page, 2)->off = nxt;
1844 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1849 // insert lower keys page fence key on newroot page as first key
1851 nxt -= BtId + sizeof(BtVal);
1852 bt_putid (value, left_page_no);
1853 val = (BtVal *)((unsigned char *)root->page + nxt);
1854 memcpy (val->value, value, BtId);
1857 ptr = (BtKey *)leftkey;
1858 nxt -= ptr->len + sizeof(BtKey);
1859 slotptr(root->page, 1)->off = nxt;
1860 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1862 bt_putid(root->page->right, 0);
1863 root->page->min = nxt; // reset lowest used offset and key count
1864 root->page->cnt = 2;
1865 root->page->act = 2;
1868 // release and unpin root pages
1870 bt_unlockpage(BtLockWrite, root->latch);
1871 bt_unpinlatch (root->latch);
1873 bt_unpinlatch (right);
1877 // split already locked full node
1879 // return pool entry for new right
1882 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1884 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1885 uint lvl = set->page->lvl;
1892 // split higher half of keys to bt->frame
1894 memset (bt->frame, 0, bt->mgr->page_size);
1895 max = set->page->cnt;
1899 while( cnt++ < max ) {
1900 if( slotptr(set->page, cnt)->dead && cnt < max )
1902 src = valptr(set->page, cnt);
1903 nxt -= src->len + sizeof(BtVal);
1904 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1906 key = keyptr(set->page, cnt);
1907 nxt -= key->len + sizeof(BtKey);
1908 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1909 memcpy (ptr, key, key->len + sizeof(BtKey));
1911 // add librarian slot
1914 slotptr(bt->frame, ++idx)->off = nxt;
1915 slotptr(bt->frame, idx)->type = Librarian;
1916 slotptr(bt->frame, idx)->dead = 1;
1921 slotptr(bt->frame, ++idx)->off = nxt;
1922 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1924 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1928 bt->frame->bits = bt->mgr->page_bits;
1929 bt->frame->min = nxt;
1930 bt->frame->cnt = idx;
1931 bt->frame->lvl = lvl;
1935 if( set->latch->page_no > ROOT_page )
1936 bt_putid (bt->frame->right, bt_getid (set->page->right));
1938 // get new free page and write higher keys to it.
1940 if( bt_newpage(bt, right, bt->frame) )
1943 memcpy (bt->frame, set->page, bt->mgr->page_size);
1944 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1945 set->latch->dirty = 1;
1947 nxt = bt->mgr->page_size;
1948 set->page->garbage = 0;
1954 if( slotptr(bt->frame, max)->type == Librarian )
1957 // assemble page of smaller keys
1959 while( cnt++ < max ) {
1960 if( slotptr(bt->frame, cnt)->dead )
1962 val = valptr(bt->frame, cnt);
1963 nxt -= val->len + sizeof(BtVal);
1964 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1966 key = keyptr(bt->frame, cnt);
1967 nxt -= key->len + sizeof(BtKey);
1968 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1970 // add librarian slot
1973 slotptr(set->page, ++idx)->off = nxt;
1974 slotptr(set->page, idx)->type = Librarian;
1975 slotptr(set->page, idx)->dead = 1;
1980 slotptr(set->page, ++idx)->off = nxt;
1981 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1985 bt_putid(set->page->right, right->latch->page_no);
1986 set->page->min = nxt;
1987 set->page->cnt = idx;
1989 return right->latch->entry;
1992 // fix keys for newly split page
1993 // call with page locked, return
1996 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
1998 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1999 unsigned char value[BtId];
2000 uint lvl = set->page->lvl;
2004 // if current page is the root page, split it
2006 if( set->latch->page_no == ROOT_page )
2007 return bt_splitroot (bt, set, right);
2009 ptr = keyptr(set->page, set->page->cnt);
2010 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2012 page = bt_mappage (bt, right);
2014 ptr = keyptr(page, page->cnt);
2015 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2017 // insert new fences in their parent pages
2019 bt_lockpage (BtLockParent, right);
2021 bt_lockpage (BtLockParent, set->latch);
2022 bt_unlockpage (BtLockWrite, set->latch);
2024 // insert new fence for reformulated left block of smaller keys
2026 bt_putid (value, set->latch->page_no);
2027 ptr = (BtKey *)leftkey;
2029 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2032 // switch fence for right block of larger keys to new right page
2034 bt_putid (value, right->page_no);
2035 ptr = (BtKey *)rightkey;
2037 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2040 bt_unlockpage (BtLockParent, set->latch);
2041 bt_unpinlatch (set->latch);
2043 bt_unlockpage (BtLockParent, right);
2044 bt_unpinlatch (right);
2048 // install new key and value onto page
2049 // page must already be checked for
2052 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2054 uint idx, librarian;
2059 // if found slot > desired slot and previous slot
2060 // is a librarian slot, use it
2063 if( slotptr(set->page, slot-1)->type == Librarian )
2066 // copy value onto page
2068 set->page->min -= vallen + sizeof(BtVal);
2069 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2070 memcpy (val->value, value, vallen);
2073 // copy key onto page
2075 set->page->min -= keylen + sizeof(BtKey);
2076 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2077 memcpy (ptr->key, key, keylen);
2080 // find first empty slot
2082 for( idx = slot; idx < set->page->cnt; idx++ )
2083 if( slotptr(set->page, idx)->dead )
2086 // now insert key into array before slot
2088 if( idx == set->page->cnt )
2089 idx += 2, set->page->cnt += 2, librarian = 2;
2093 set->latch->dirty = 1;
2096 while( idx > slot + librarian - 1 )
2097 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2099 // add librarian slot
2101 if( librarian > 1 ) {
2102 node = slotptr(set->page, slot++);
2103 node->off = set->page->min;
2104 node->type = Librarian;
2110 node = slotptr(set->page, slot);
2111 node->off = set->page->min;
2116 bt_unlockpage (BtLockWrite, set->latch);
2117 bt_unpinlatch (set->latch);
2123 // Insert new key into the btree at given level.
2124 // either add a new key or update/add an existing one
2126 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2128 unsigned char newkey[BT_keyarray];
2129 uint slot, idx, len, entry;
2136 // set up the key we're working on
2138 ins = (BtKey*)newkey;
2139 memcpy (ins->key, key, keylen);
2142 // is this a non-unique index value?
2148 sequence = bt_newdup (bt);
2149 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2153 while( 1 ) { // find the page and slot for the current key
2154 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2155 ptr = keyptr(set->page, slot);
2158 bt->err = BTERR_ovflw;
2162 // if librarian slot == found slot, advance to real slot
2164 if( slotptr(set->page, slot)->type == Librarian )
2165 if( !keycmp (ptr, key, keylen) )
2166 ptr = keyptr(set->page, ++slot);
2170 if( slotptr(set->page, slot)->type == Duplicate )
2173 // if inserting a duplicate key or unique key
2174 // check for adequate space on the page
2175 // and insert the new key before slot.
2177 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2178 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2179 if( !(entry = bt_splitpage (bt, set)) )
2181 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2186 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2189 // if key already exists, update value and return
2191 val = valptr(set->page, slot);
2193 if( val->len >= vallen ) {
2194 if( slotptr(set->page, slot)->dead )
2196 set->page->garbage += val->len - vallen;
2197 set->latch->dirty = 1;
2198 slotptr(set->page, slot)->dead = 0;
2200 memcpy (val->value, value, vallen);
2201 bt_unlockpage(BtLockWrite, set->latch);
2202 bt_unpinlatch (set->latch);
2206 // new update value doesn't fit in existing value area
2208 if( !slotptr(set->page, slot)->dead )
2209 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2211 slotptr(set->page, slot)->dead = 0;
2215 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2216 if( !(entry = bt_splitpage (bt, set)) )
2218 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2223 set->page->min -= vallen + sizeof(BtVal);
2224 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2225 memcpy (val->value, value, vallen);
2228 set->latch->dirty = 1;
2229 set->page->min -= keylen + sizeof(BtKey);
2230 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2231 memcpy (ptr->key, key, keylen);
2234 slotptr(set->page, slot)->off = set->page->min;
2235 bt_unlockpage(BtLockWrite, set->latch);
2236 bt_unpinlatch (set->latch);
2243 uint entry; // latch table entry number
2244 uint slot:30; // page slot number
2245 uint reuse:1; // reused previous page
2246 uint emptied:1; // page was emptied
2250 uid page_no; // page number for split leaf
2251 void *next; // next key to insert
2252 uint entry; // latch table entry number
2253 unsigned char leafkey[BT_keyarray];
2256 // determine actual page where key is located
2257 // return slot number with set page locked
2259 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2261 BtKey *key = keyptr(source,src);
2262 uint slot = locks[src].slot;
2265 if( src > 1 && locks[src].reuse )
2266 entry = locks[src-1].entry, locks[src].entry = entry, slot = 0;
2268 entry = locks[src].entry;
2271 set->latch = bt->mgr->latchsets + entry;
2272 set->page = bt_mappage (bt, set->latch);
2276 // is locks->reuse set?
2277 // if so, find where our key
2278 // is located on previous page or split pages
2281 set->latch = bt->mgr->latchsets + entry;
2282 set->page = bt_mappage (bt, set->latch);
2284 if( slot = bt_findslot(set, key->key, key->len) )
2287 } while( entry = set->latch->split );
2289 bt->err = BTERR_atomic;
2293 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2295 BtKey *key = keyptr(source, src);
2296 BtVal *val = valptr(source, src);
2301 while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2302 if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
2303 return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2304 if( entry = bt_splitpage (bt, set) )
2305 latch = bt->mgr->latchsets + entry;
2308 latch->split = set->latch->split;
2309 set->latch->split = entry;
2310 bt_lockpage(BtLockWrite, latch);
2313 return bt->err = BTERR_atomic;
2316 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2318 BtKey *key = keyptr(source, src);
2319 uint idx, entry, slot;
2324 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2325 slotptr(set->page, slot)->dead = 1;
2327 return bt->err = BTERR_struct;
2329 ptr = keyptr(set->page, slot);
2330 val = valptr(set->page, slot);
2332 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2335 // collapse empty slots beneath the fence
2337 while( idx = set->page->cnt - 1 )
2338 if( slotptr(set->page, idx)->dead ) {
2339 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2340 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2344 set->latch->dirty = 1;
2348 // qsort comparison function
2350 int bt_qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2352 BtKey *key1 = (BtKey *)((unsigned char *)page + slot1->off);
2353 BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off);
2355 return keycmp (key1, key2->key, key2->len);
2358 // atomic modification of a batch of keys.
2360 // return -1 if BTERR is set
2361 // otherwise return slot number
2362 // causing the key constraint violation
2363 // or zero on successful completion.
2365 int bt_atomicmods (BtDb *bt, BtPage source)
2367 uint src, idx, slot, samepage, entry, next, split;
2368 AtomicKey *head, *tail, *leaf;
2369 BtPageSet set[1], prev[1];
2370 unsigned char value[BtId];
2371 BtKey *key, *ptr, *key2;
2380 locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2384 // first sort the list of keys into order to
2385 // prevent deadlocks between threads.
2386 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)bt_qsortcmp, source);
2388 for( src = 1; src++ < source->cnt; ) {
2389 *temp = *slotptr(source,src);
2390 key = keyptr (source,src);
2392 for( idx = src; --idx; ) {
2393 key2 = keyptr (source,idx);
2394 if( keycmp (key, key2->key, key2->len) < 0 ) {
2395 *slotptr(source,idx+1) = *slotptr(source,idx);
2396 *slotptr(source,idx) = *temp;
2402 // Load the leafpage for each key
2403 // and determine any constraint violations
2405 for( src = 0; src++ < source->cnt; ) {
2406 key = keyptr(source, src);
2407 val = valptr(source, src);
2410 // first determine if this modification falls
2411 // on the same page as the previous modification
2412 // note that the far right leaf page is a special case
2414 if( samepage = src > 1 )
2415 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 )
2416 slot = bt_findslot(set, key->key, key->len);
2417 else // release read on previous page
2418 bt_unlockpage(BtLockRead, set->latch);
2421 slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead);
2422 set->latch->split = 0;
2429 for(idx = 0; idx++ < source->cnt; )
2430 if( locks[idx].entry == set->latch->entry )
2432 locks[src].entry = set->latch->entry;
2433 locks[src].slot = slot;
2434 locks[src].reuse = 0;
2436 locks[src].entry = 0;
2437 locks[src].slot = 0;
2438 locks[src].reuse = 1;
2441 if( slotptr(set->page, slot)->type == Librarian )
2442 ptr = keyptr(set->page, ++slot);
2444 ptr = keyptr(set->page, slot);
2446 switch( slotptr(source, src)->type ) {
2449 if( !slotptr(set->page, slot)->dead )
2450 if( slot < set->page->cnt || bt_getid (set->page->right) )
2451 if( ptr->len == key->len && !memcmp (ptr->key, key->key, key->len) ) {
2453 // return constraint violation if key already exists
2458 if( locks[src].entry ) {
2459 set->latch = bt->mgr->latchsets + locks[src].entry;
2460 bt_unlockpage(BtLockAtomic, set->latch);
2461 bt_unlockpage(BtLockRead, set->latch);
2462 bt_unpinlatch (set->latch);
2473 if( !slotptr(set->page, slot)->dead )
2474 if( ptr->len == key->len )
2475 if( !memcmp (ptr->key, key->key, key->len) )
2478 // Key to delete doesn't exist
2483 if( locks[src].entry ) {
2484 set->latch = bt->mgr->latchsets + locks[src].entry;
2485 bt_unlockpage(BtLockAtomic, set->latch);
2486 bt_unlockpage(BtLockRead, set->latch);
2487 bt_unpinlatch (set->latch);
2496 // unlock last loadpage lock
2498 if( source->cnt > 1 )
2499 bt_unlockpage(BtLockRead, set->latch);
2501 // obtain write lock for each page
2503 for( src = 0; src++ < source->cnt; )
2504 if( locks[src].entry )
2505 bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2507 // insert or delete each key
2509 for( src = 0; src++ < source->cnt; )
2510 if( slotptr(source,src)->type == Delete )
2511 if( bt_atomicdelete (bt, source, locks, src) )
2515 else if( bt_atomicinsert (bt, source, locks, src) )
2520 // set ParentModification and release WriteLock
2521 // leave empty pages locked for later processing
2522 // build queue of keys to insert for split pages
2524 for( src = 0; src++ < source->cnt; ) {
2525 if( locks[src].reuse )
2528 prev->latch = bt->mgr->latchsets + locks[src].entry;
2529 prev->page = bt_mappage (bt, prev->latch);
2531 // pick-up all splits from first entry
2532 // save page that points to this entry
2534 split = next = prev->latch->split;
2536 if( !prev->page->act )
2537 locks[src].emptied = 1;
2539 while( entry = next ) {
2540 set->latch = bt->mgr->latchsets + entry;
2541 set->page = bt_mappage (bt, set->latch);
2542 next = set->latch->split;
2544 // delete empty previous page
2546 if( !prev->page->act ) {
2547 memcpy (prev->page, set->page, bt->mgr->page_size);
2548 bt_lockpage (BtLockDelete, set->latch);
2549 bt_freepage (bt, set);
2551 prev->latch->dirty = 1;
2553 if( prev->page->act )
2554 locks[src].emptied = 0;
2559 // prev page is not emptied
2561 locks[src].emptied = 0;
2563 // schedule previous fence key update
2565 ptr = keyptr(prev->page,prev->page->cnt);
2566 leaf = malloc (sizeof(AtomicKey));
2568 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2569 leaf->page_no = prev->latch->page_no;
2570 leaf->entry = prev->latch->entry;
2578 latch = prev->latch;
2581 // remove empty block from the split chain
2583 if( !set->page->act ) {
2584 memcpy (prev->page->right, set->page->right, BtId);
2585 bt_lockpage (BtLockDelete, set->latch);
2586 bt_freepage (bt, set);
2590 bt_lockpage(BtLockParent, latch);
2591 bt_unlockpage(BtLockWrite, latch);
2594 if( !prev->page->act )
2598 bt_unlockpage(BtLockWrite, prev->latch);
2602 // Process last page split in chain
2604 ptr = keyptr(prev->page,prev->page->cnt);
2605 leaf = malloc (sizeof(AtomicKey));
2607 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2608 leaf->page_no = prev->latch->page_no;
2609 leaf->entry = prev->latch->entry;
2619 bt_lockpage(BtLockParent, prev->latch);
2620 bt_unlockpage(BtLockWrite, prev->latch);
2623 // remove Atomic locks and
2624 // process any empty pages
2626 for( src = source->cnt; src; src-- ) {
2627 if( locks[src].reuse )
2630 // delete page emptied by our atomic action
2632 set->latch = bt->mgr->latchsets + locks[src].entry;
2633 set->page = bt_mappage (bt, set->latch);
2635 if( locks[src].emptied ) {
2636 bt_unlockpage (BtLockAtomic, set->latch);
2637 if( bt_deletepage (bt, set) )
2642 bt_unlockpage (BtLockAtomic, set->latch);
2644 if( !set->latch->split )
2645 bt_unpinlatch (set->latch);
2648 // add keys for any pages split during action
2652 ptr = (BtKey *)leaf->leafkey;
2653 bt_putid (value, leaf->page_no);
2655 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2658 bt_unlockpage (BtLockParent, bt->mgr->latchsets + leaf->entry);
2659 bt_unpinlatch (bt->mgr->latchsets + leaf->entry);
2662 } while( leaf = tail );
2670 // return next slot on cursor page
2671 // or slide cursor right into next page
2673 uint bt_nextkey (BtDb *bt, uint slot)
2679 right = bt_getid(bt->cursor->right);
2681 while( slot++ < bt->cursor->cnt )
2682 if( slotptr(bt->cursor,slot)->dead )
2684 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2692 bt->cursor_page = right;
2694 if( set->latch = bt_pinlatch (bt, right, 1) )
2695 set->page = bt_mappage (bt, set->latch);
2699 bt_lockpage(BtLockRead, set->latch);
2701 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2703 bt_unlockpage(BtLockRead, set->latch);
2704 bt_unpinlatch (set->latch);
2712 // cache page of keys into cursor and return starting slot for given key
2714 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2719 // cache page for retrieval
2721 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2722 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2726 bt->cursor_page = set->latch->page_no;
2728 bt_unlockpage(BtLockRead, set->latch);
2729 bt_unpinlatch (set->latch);
2733 BtKey *bt_key(BtDb *bt, uint slot)
2735 return keyptr(bt->cursor, slot);
2738 BtVal *bt_val(BtDb *bt, uint slot)
2740 return valptr(bt->cursor,slot);
2746 double getCpuTime(int type)
2749 FILETIME xittime[1];
2750 FILETIME systime[1];
2751 FILETIME usrtime[1];
2752 SYSTEMTIME timeconv[1];
2755 memset (timeconv, 0, sizeof(SYSTEMTIME));
2759 GetSystemTimeAsFileTime (xittime);
2760 FileTimeToSystemTime (xittime, timeconv);
2761 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2764 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2765 FileTimeToSystemTime (usrtime, timeconv);
2768 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2769 FileTimeToSystemTime (systime, timeconv);
2773 ans += (double)timeconv->wHour * 3600;
2774 ans += (double)timeconv->wMinute * 60;
2775 ans += (double)timeconv->wSecond;
2776 ans += (double)timeconv->wMilliseconds / 1000;
2781 #include <sys/resource.h>
2783 double getCpuTime(int type)
2785 struct rusage used[1];
2786 struct timeval tv[1];
2790 gettimeofday(tv, NULL);
2791 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2794 getrusage(RUSAGE_SELF, used);
2795 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2798 getrusage(RUSAGE_SELF, used);
2799 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2806 void bt_poolaudit (BtMgr *mgr)
2811 while( slot++ < mgr->latchdeployed ) {
2812 latch = mgr->latchsets + slot;
2814 if( *latch->readwr->rin & MASK )
2815 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2816 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2818 if( *latch->access->rin & MASK )
2819 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2820 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2822 if( *latch->parent->rin & MASK )
2823 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2824 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2826 if( latch->pin & ~CLOCK_bit ) {
2827 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2833 uint bt_latchaudit (BtDb *bt)
2835 ushort idx, hashidx;
2841 if( *(ushort *)(bt->mgr->lock) )
2842 fprintf(stderr, "Alloc page locked\n");
2843 *(ushort *)(bt->mgr->lock) = 0;
2845 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2846 latch = bt->mgr->latchsets + idx;
2847 if( *latch->readwr->rin & MASK )
2848 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2849 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2851 if( *latch->access->rin & MASK )
2852 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2853 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2855 if( *latch->parent->rin & MASK )
2856 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2857 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2860 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2865 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2866 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
2867 fprintf(stderr, "hash entry %d locked\n", hashidx);
2869 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
2871 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2872 latch = bt->mgr->latchsets + idx;
2874 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2875 } while( idx = latch->next );
2878 page_no = LEAF_page;
2880 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2881 uid off = page_no << bt->mgr->page_bits;
2883 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2887 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2889 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2890 return bt->err = BTERR_map;
2892 if( *amt < bt->mgr->page_size )
2893 return bt->err = BTERR_map;
2895 if( !bt->frame->free && !bt->frame->lvl )
2896 cnt += bt->frame->act;
2900 cnt--; // remove stopper key
2901 fprintf(stderr, " Total keys read %d\n", cnt);
2915 // standalone program to index file of keys
2916 // then list them onto std-out
2919 void *index_file (void *arg)
2921 uint __stdcall index_file (void *arg)
2924 int line = 0, found = 0, cnt = 0, unique;
2925 uid next, page_no = LEAF_page; // start on first page of leaves
2926 unsigned char key[BT_maxkey];
2927 unsigned char txn[65536];
2928 ThreadArg *args = arg;
2929 int ch, len = 0, slot;
2938 bt = bt_open (args->mgr);
2941 unique = (args->type[1] | 0x20) != 'd';
2943 switch(args->type[0] | 0x20)
2946 fprintf(stderr, "started latch mgr audit\n");
2947 cnt = bt_latchaudit (bt);
2948 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2952 fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
2953 if( in = fopen (args->infile, "rb") )
2954 while( ch = getc(in), ch != EOF )
2959 memcpy (txn + nxt, key + 10, len - 10);
2961 txn[nxt] = len - 10;
2963 memcpy (txn + nxt, key, 10);
2966 slotptr(page,++cnt)->off = nxt;
2967 slotptr(page,cnt)->type = Unique;
2977 if( bt_atomicmods (bt, page) )
2978 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2983 else if( len < BT_maxkey )
2985 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2989 fprintf(stderr, "started pennysort for %s\n", args->infile);
2990 if( in = fopen (args->infile, "rb") )
2991 while( ch = getc(in), ch != EOF )
2996 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2997 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3000 else if( len < BT_maxkey )
3002 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3006 fprintf(stderr, "started indexing for %s\n", args->infile);
3007 if( in = fopen (args->infile, "r") )
3008 while( ch = getc(in), ch != EOF )
3013 if( args->num == 1 )
3014 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3016 else if( args->num )
3017 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3019 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
3020 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3023 else if( len < BT_maxkey )
3025 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3029 fprintf(stderr, "started deleting keys for %s\n", args->infile);
3030 if( in = fopen (args->infile, "rb") )
3031 while( ch = getc(in), ch != EOF )
3035 if( args->num == 1 )
3036 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3038 else if( args->num )
3039 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3041 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
3042 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
3043 ptr = (BtKey*)(bt->key);
3046 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
3047 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3050 else if( len < BT_maxkey )
3052 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3056 fprintf(stderr, "started finding keys for %s\n", args->infile);
3057 if( in = fopen (args->infile, "rb") )
3058 while( ch = getc(in), ch != EOF )
3062 if( args->num == 1 )
3063 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3065 else if( args->num )
3066 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3068 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3071 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3074 else if( len < BT_maxkey )
3076 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3080 fprintf(stderr, "started scanning\n");
3082 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3083 set->page = bt_mappage (bt, set->latch);
3085 fprintf(stderr, "unable to obtain latch"), exit(1);
3086 bt_lockpage (BtLockRead, set->latch);
3087 next = bt_getid (set->page->right);
3089 for( slot = 0; slot++ < set->page->cnt; )
3090 if( next || slot < set->page->cnt )
3091 if( !slotptr(set->page, slot)->dead ) {
3092 ptr = keyptr(set->page, slot);
3095 if( slotptr(set->page, slot)->type == Duplicate )
3098 fwrite (ptr->key, len, 1, stdout);
3099 val = valptr(set->page, slot);
3100 fwrite (val->value, val->len, 1, stdout);
3101 fputc ('\n', stdout);
3105 bt_unlockpage (BtLockRead, set->latch);
3106 bt_unpinlatch (set->latch);
3107 } while( page_no = next );
3109 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3114 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3116 fprintf(stderr, "started counting\n");
3117 page_no = LEAF_page;
3119 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3120 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3123 if( !bt->frame->free && !bt->frame->lvl )
3124 cnt += bt->frame->act;
3130 cnt--; // remove stopper key
3131 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3143 typedef struct timeval timer;
3145 int main (int argc, char **argv)
3147 int idx, cnt, len, slot, err;
3148 int segsize, bits = 16;
3165 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3166 fprintf (stderr, " where page_bits is the page size in bits\n");
3167 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3168 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
3169 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3173 start = getCpuTime(0);
3176 bits = atoi(argv[3]);
3179 poolsize = atoi(argv[4]);
3182 fprintf (stderr, "Warning: no mapped_pool\n");
3185 num = atoi(argv[5]);
3189 threads = malloc (cnt * sizeof(pthread_t));
3191 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3193 args = malloc (cnt * sizeof(ThreadArg));
3195 mgr = bt_mgr ((argv[1]), bits, poolsize);
3198 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3204 for( idx = 0; idx < cnt; idx++ ) {
3205 args[idx].infile = argv[idx + 6];
3206 args[idx].type = argv[2];
3207 args[idx].mgr = mgr;
3208 args[idx].num = num;
3209 args[idx].idx = idx;
3211 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3212 fprintf(stderr, "Error creating thread %d\n", err);
3214 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3218 // wait for termination
3221 for( idx = 0; idx < cnt; idx++ )
3222 pthread_join (threads[idx], NULL);
3224 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3226 for( idx = 0; idx < cnt; idx++ )
3227 CloseHandle(threads[idx]);
3233 elapsed = getCpuTime(0) - start;
3234 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3235 elapsed = getCpuTime(1);
3236 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3237 elapsed = getCpuTime(2);
3238 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);