1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and ACID batched key-value updates
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
88 There are six lock types for each node in four independent sets:
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
106 // definition for phase-fair reader/writer lock implementation
120 // definition for spin latch implementation
122 // exclusive is set for write access
123 // share is count of read accessors
124 // grant write lock when share == 0
126 volatile typedef struct {
137 // hash table entries
140 volatile uint slot; // Latch table entry at head of chain
141 BtSpinLatch latch[1];
144 // latch manager table structure
147 uid page_no; // latch set page number
148 RWLock readwr[1]; // read/write page lock
149 RWLock access[1]; // Access Intent/Page delete
150 RWLock parent[1]; // Posting of fence key in parent
151 RWLock atomic[1]; // Atomic update in progress
152 uint split; // right split page atomic insert
153 uint entry; // entry slot in latch table
154 uint next; // next entry in hash table chain
155 uint prev; // prev entry in hash table chain
156 volatile ushort pin; // number of outstanding threads
157 ushort dirty:1; // page in cache is dirty
160 // Define the length of the page record numbers
164 // Page key slot definition.
166 // Keys are marked dead, but remain on the page until
167 // it cleanup is called. The fence key (highest key) for
168 // a leaf page is always present, even after cleanup.
172 // In addition to the Unique keys that occupy slots
173 // there are Librarian and Duplicate key
174 // slots occupying the key slot array.
176 // The Librarian slots are dead keys that
177 // serve as filler, available to add new Unique
178 // or Dup slots that are inserted into the B-tree.
180 // The Duplicate slots have had their key bytes extended
181 // by 6 bytes to contain a binary duplicate key uniqueifier.
191 uint off:BT_maxbits; // page offset for key start
192 uint type:3; // type of slot
193 uint dead:1; // set for deleted slot
196 // The key structure occupies space at the upper end of
197 // each page. It's a length byte followed by the key
201 unsigned char len; // this can be changed to a ushort or uint
202 unsigned char key[0];
205 // the value structure also occupies space at the upper
206 // end of the page. Each key is immediately followed by a value.
209 unsigned char len; // this can be changed to a ushort or uint
210 unsigned char value[0];
213 #define BT_maxkey 255 // maximum number of bytes in a key
214 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
216 // The first part of an index page.
217 // It is immediately followed
218 // by the BtSlot array of keys.
220 // note that this structure size
221 // must be a multiple of 8 bytes
222 // in order to place dups correctly.
224 typedef struct BtPage_ {
225 uint cnt; // count of keys in page
226 uint act; // count of active keys
227 uint min; // next key offset
228 uint garbage; // page garbage in bytes
229 unsigned char bits:7; // page size in bits
230 unsigned char free:1; // page is on free chain
231 unsigned char lvl:7; // level of page
232 unsigned char kill:1; // page is being deleted
233 unsigned char right[BtId]; // page number to right
234 unsigned char left[BtId]; // page number to left
235 unsigned char filler[2]; // padding to multiple of 8
238 // The loadpage interface object
241 BtPage page; // current page pointer
242 BtLatchSet *latch; // current page latch set
245 // structure for latch manager on ALLOC_page
248 struct BtPage_ alloc[1]; // next page_no in right ptr
249 unsigned long long dups[1]; // global duplicate key uniqueifier
250 unsigned char chain[BtId]; // head of free page_nos chain
253 // The object structure for Btree access
256 uint page_size; // page size
257 uint page_bits; // page size in bits
263 BtPageZero *pagezero; // mapped allocation page
264 BtSpinLatch lock[1]; // allocation area lite latch
265 uint latchdeployed; // highest number of latch entries deployed
266 uint nlatchpage; // number of latch pages at BT_latch
267 uint latchtotal; // number of page latch entries
268 uint latchhash; // number of latch hash table slots
269 uint latchvictim; // next latch entry to examine
270 BtHashEntry *hashtable; // the buffer pool hash table entries
271 BtLatchSet *latchsets; // mapped latch set from buffer pool
272 unsigned char *pagepool; // mapped to the buffer pool pages
274 HANDLE halloc; // allocation handle
275 HANDLE hpool; // buffer pool handle
280 BtMgr *mgr; // buffer manager for thread
281 BtPage cursor; // cached frame for start/next (never mapped)
282 BtPage frame; // spare frame for the page split (never mapped)
283 uid cursor_page; // current cursor page number
284 unsigned char *mem; // frame, cursor, page memory buffer
285 unsigned char key[BT_keyarray]; // last found complete key
286 int found; // last delete or insert was found
287 int err; // last error
288 int reads, writes; // number of reads and writes from the btree
302 #define CLOCK_bit 0x8000
305 extern void bt_close (BtDb *bt);
306 extern BtDb *bt_open (BtMgr *mgr);
307 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
308 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
309 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
310 extern BtKey *bt_foundkey (BtDb *bt);
311 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
312 extern uint bt_nextkey (BtDb *bt, uint slot);
315 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
316 void bt_mgrclose (BtMgr *mgr);
318 // Helper functions to return slot values
319 // from the cursor page.
321 extern BtKey *bt_key (BtDb *bt, uint slot);
322 extern BtVal *bt_val (BtDb *bt, uint slot);
324 // The page is allocated from low and hi ends.
325 // The key slots are allocated from the bottom,
326 // while the text and value of the key
327 // are allocated from the top. When the two
328 // areas meet, the page is split into two.
330 // A key consists of a length byte, two bytes of
331 // index number (0 - 65535), and up to 253 bytes
334 // Associated with each key is a value byte string
335 // containing any value desired.
337 // The b-tree root is always located at page 1.
338 // The first leaf page of level zero is always
339 // located on page 2.
341 // The b-tree pages are linked with next
342 // pointers to facilitate enumerators,
343 // and provide for concurrency.
345 // When to root page fills, it is split in two and
346 // the tree height is raised by a new root at page
347 // one with two keys.
349 // Deleted keys are marked with a dead bit until
350 // page cleanup. The fence key for a leaf node is
353 // To achieve maximum concurrency one page is locked at a time
354 // as the tree is traversed to find leaf key in question. The right
355 // page numbers are used in cases where the page is being split,
358 // Page 0 is dedicated to lock for new page extensions,
359 // and chains empty pages together for reuse. It also
360 // contains the latch manager hash table.
362 // The ParentModification lock on a node is obtained to serialize posting
363 // or changing the fence key for a node.
365 // Empty pages are chained together through the ALLOC page and reused.
367 // Access macros to address slot and key values from the page
368 // Page slots use 1 based indexing.
370 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
371 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
372 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
374 void bt_putid(unsigned char *dest, uid id)
379 dest[i] = (unsigned char)id, id >>= 8;
382 uid bt_getid(unsigned char *src)
387 for( i = 0; i < BtId; i++ )
388 id <<= 8, id |= *src++;
393 uid bt_newdup (BtDb *bt)
396 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
398 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
402 // Phase-Fair reader/writer lock implementation
404 void WriteLock (RWLock *lock)
409 tix = __sync_fetch_and_add (lock->ticket, 1);
411 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
413 // wait for our ticket to come up
415 while( tix != lock->serving[0] )
422 w = PRES | (tix & PHID);
424 r = __sync_fetch_and_add (lock->rin, w);
426 r = _InterlockedExchangeAdd16 (lock->rin, w);
428 while( r != *lock->rout )
436 void WriteRelease (RWLock *lock)
439 __sync_fetch_and_and (lock->rin, ~MASK);
441 _InterlockedAnd16 (lock->rin, ~MASK);
446 void ReadLock (RWLock *lock)
450 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
452 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
455 while( w == (*lock->rin & MASK) )
463 void ReadRelease (RWLock *lock)
466 __sync_fetch_and_add (lock->rout, RINC);
468 _InterlockedExchangeAdd16 (lock->rout, RINC);
472 // Spin Latch Manager
474 // wait until write lock mode is clear
475 // and add 1 to the share count
477 void bt_spinreadlock(BtSpinLatch *latch)
483 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
485 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
487 // see if exclusive request is granted or pending
492 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
494 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
497 } while( sched_yield(), 1 );
499 } while( SwitchToThread(), 1 );
503 // wait for other read and write latches to relinquish
505 void bt_spinwritelock(BtSpinLatch *latch)
511 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
513 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
516 if( !(prev & ~BOTH) )
520 __sync_fetch_and_and ((ushort *)latch, ~XCL);
522 _InterlockedAnd16((ushort *)latch, ~XCL);
525 } while( sched_yield(), 1 );
527 } while( SwitchToThread(), 1 );
531 // try to obtain write lock
533 // return 1 if obtained,
536 int bt_spinwritetry(BtSpinLatch *latch)
541 prev = __sync_fetch_and_or((ushort *)latch, XCL);
543 prev = _InterlockedOr16((ushort *)latch, XCL);
545 // take write access if all bits are clear
548 if( !(prev & ~BOTH) )
552 __sync_fetch_and_and ((ushort *)latch, ~XCL);
554 _InterlockedAnd16((ushort *)latch, ~XCL);
561 void bt_spinreleasewrite(BtSpinLatch *latch)
564 __sync_fetch_and_and((ushort *)latch, ~BOTH);
566 _InterlockedAnd16((ushort *)latch, ~BOTH);
570 // decrement reader count
572 void bt_spinreleaseread(BtSpinLatch *latch)
575 __sync_fetch_and_add((ushort *)latch, -SHARE);
577 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
581 // read page from permanent location in Btree file
583 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
585 off64_t off = page_no << mgr->page_bits;
588 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
589 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
596 memset (ovl, 0, sizeof(OVERLAPPED));
598 ovl->OffsetHigh = off >> 32;
600 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
601 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
604 if( *amt < mgr->page_size ) {
605 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
612 // write page to permanent location in Btree file
613 // clear the dirty bit
615 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
617 off64_t off = page_no << mgr->page_bits;
620 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
626 memset (ovl, 0, sizeof(OVERLAPPED));
628 ovl->OffsetHigh = off >> 32;
630 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
633 if( *amt < mgr->page_size )
639 // link latch table entry into head of latch hash table
641 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
643 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
644 BtLatchSet *latch = bt->mgr->latchsets + slot;
646 if( latch->next = bt->mgr->hashtable[hashidx].slot )
647 bt->mgr->latchsets[latch->next].prev = slot;
649 bt->mgr->hashtable[hashidx].slot = slot;
650 latch->page_no = page_no;
657 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
665 // set CLOCK bit in latch
666 // decrement pin count
668 void bt_unpinlatch (BtLatchSet *latch)
671 if( ~latch->pin & CLOCK_bit )
672 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
673 __sync_fetch_and_add(&latch->pin, -1);
675 if( ~latch->pin & CLOCK_bit )
676 _InterlockedOr16 (&latch->pin, CLOCK_bit);
677 _InterlockedDecrement16 (&latch->pin);
681 // return the btree cached page address
683 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
685 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
690 // find existing latchset or inspire new one
691 // return with latchset pinned
693 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
695 uint hashidx = page_no % bt->mgr->latchhash;
703 // try to find our entry
705 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
707 if( slot = bt->mgr->hashtable[hashidx].slot ) do
709 latch = bt->mgr->latchsets + slot;
710 if( page_no == latch->page_no )
712 } while( slot = latch->next );
718 latch = bt->mgr->latchsets + slot;
720 __sync_fetch_and_add(&latch->pin, 1);
722 _InterlockedIncrement16 (&latch->pin);
724 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
728 // see if there are any unused pool entries
730 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
732 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
735 if( slot < bt->mgr->latchtotal ) {
736 latch = bt->mgr->latchsets + slot;
737 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
739 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
744 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
746 _InterlockedDecrement (&bt->mgr->latchdeployed);
748 // find and reuse previous entry on victim
752 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
754 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
756 // try to get write lock on hash chain
757 // skip entry if not obtained
758 // or has outstanding pins
760 slot %= bt->mgr->latchtotal;
765 latch = bt->mgr->latchsets + slot;
766 idx = latch->page_no % bt->mgr->latchhash;
768 // see we are on same chain as hashidx
773 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
776 // skip this slot if it is pinned
777 // or the CLOCK bit is set
780 if( latch->pin & CLOCK_bit ) {
782 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
784 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
787 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
791 // update permanent page area in btree from buffer pool
793 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
796 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
799 latch->dirty = 0, bt->writes++;
801 // unlink our available slot from its hash chain
804 bt->mgr->latchsets[latch->prev].next = latch->next;
806 bt->mgr->hashtable[idx].slot = latch->next;
809 bt->mgr->latchsets[latch->next].prev = latch->prev;
811 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
813 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
816 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
821 void bt_mgrclose (BtMgr *mgr)
828 // flush dirty pool pages to the btree
830 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
831 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
832 latch = mgr->latchsets + slot;
835 bt_writepage(mgr, page, latch->page_no);
836 latch->dirty = 0, num++;
838 // madvise (page, mgr->page_size, MADV_DONTNEED);
841 fprintf(stderr, "%d buffer pool pages flushed\n", num);
844 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
845 munmap (mgr->pagezero, mgr->page_size);
847 FlushViewOfFile(mgr->pagezero, 0);
848 UnmapViewOfFile(mgr->pagezero);
849 UnmapViewOfFile(mgr->hashtable);
850 CloseHandle(mgr->halloc);
851 CloseHandle(mgr->hpool);
857 FlushFileBuffers(mgr->idx);
858 CloseHandle(mgr->idx);
863 // close and release memory
865 void bt_close (BtDb *bt)
872 VirtualFree (bt->mem, 0, MEM_RELEASE);
877 // open/create new btree buffer manager
879 // call with file_name, BT_openmode, bits in page size (e.g. 16),
880 // size of page pool (e.g. 262144)
882 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
884 uint lvl, attr, last, slot, idx;
885 uint nlatchpage, latchhash;
886 unsigned char value[BtId];
887 int flag, initit = 0;
888 BtPageZero *pagezero;
895 // determine sanity of page size and buffer pool
897 if( bits > BT_maxbits )
899 else if( bits < BT_minbits )
903 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
908 mgr = calloc (1, sizeof(BtMgr));
910 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
912 if( mgr->idx == -1 ) {
913 fprintf (stderr, "Unable to open btree file\n");
914 return free(mgr), NULL;
917 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
918 attr = FILE_ATTRIBUTE_NORMAL;
919 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
921 if( mgr->idx == INVALID_HANDLE_VALUE )
922 return GlobalFree(mgr), NULL;
926 pagezero = valloc (BT_maxpage);
929 // read minimum page size to get root info
930 // to support raw disk partition files
931 // check if bits == 0 on the disk.
933 if( size = lseek (mgr->idx, 0L, 2) )
934 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
935 if( pagezero->alloc->bits )
936 bits = pagezero->alloc->bits;
940 return free(mgr), free(pagezero), NULL;
944 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
945 size = GetFileSize(mgr->idx, amt);
948 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
949 return bt_mgrclose (mgr), NULL;
950 bits = pagezero->alloc->bits;
955 mgr->page_size = 1 << bits;
956 mgr->page_bits = bits;
958 // calculate number of latch hash table entries
960 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
961 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
963 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
964 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
965 mgr->latchtotal = nodemax;
970 // initialize an empty b-tree with latch page, root page, page of leaves
971 // and page(s) of latches and page pool cache
973 memset (pagezero, 0, 1 << bits);
974 pagezero->alloc->bits = mgr->page_bits;
975 bt_putid(pagezero->alloc->right, MIN_lvl+1);
977 // initialize left-most LEAF page in
980 bt_putid (pagezero->alloc->left, LEAF_page);
982 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
983 fprintf (stderr, "Unable to create btree page zero\n");
984 return bt_mgrclose (mgr), NULL;
987 memset (pagezero, 0, 1 << bits);
988 pagezero->alloc->bits = mgr->page_bits;
990 for( lvl=MIN_lvl; lvl--; ) {
991 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
992 key = keyptr(pagezero->alloc, 1);
993 key->len = 2; // create stopper key
997 bt_putid(value, MIN_lvl - lvl + 1);
998 val = valptr(pagezero->alloc, 1);
999 val->len = lvl ? BtId : 0;
1000 memcpy (val->value, value, val->len);
1002 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1003 pagezero->alloc->lvl = lvl;
1004 pagezero->alloc->cnt = 1;
1005 pagezero->alloc->act = 1;
1007 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1008 fprintf (stderr, "Unable to create btree page zero\n");
1009 return bt_mgrclose (mgr), NULL;
1017 VirtualFree (pagezero, 0, MEM_RELEASE);
1020 // mlock the pagezero page
1022 flag = PROT_READ | PROT_WRITE;
1023 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1024 if( mgr->pagezero == MAP_FAILED ) {
1025 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1026 return bt_mgrclose (mgr), NULL;
1028 mlock (mgr->pagezero, mgr->page_size);
1030 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1031 if( mgr->hashtable == MAP_FAILED ) {
1032 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1033 return bt_mgrclose (mgr), NULL;
1036 flag = PAGE_READWRITE;
1037 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1038 if( !mgr->halloc ) {
1039 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1040 return bt_mgrclose (mgr), NULL;
1043 flag = FILE_MAP_WRITE;
1044 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1045 if( !mgr->pagezero ) {
1046 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1047 return bt_mgrclose (mgr), NULL;
1050 flag = PAGE_READWRITE;
1051 size = (uid)mgr->nlatchpage << mgr->page_bits;
1052 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1054 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1055 return bt_mgrclose (mgr), NULL;
1058 flag = FILE_MAP_WRITE;
1059 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1060 if( !mgr->hashtable ) {
1061 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1062 return bt_mgrclose (mgr), NULL;
1066 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1067 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1072 // open BTree access method
1073 // based on buffer manager
1075 BtDb *bt_open (BtMgr *mgr)
1077 BtDb *bt = malloc (sizeof(*bt));
1079 memset (bt, 0, sizeof(*bt));
1082 bt->mem = valloc (2 *mgr->page_size);
1084 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1086 bt->frame = (BtPage)bt->mem;
1087 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1091 // compare two keys, return > 0, = 0, or < 0
1092 // =0: keys are same
1095 // as the comparison value
1097 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1099 uint len1 = key1->len;
1102 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1113 // place write, read, or parent lock on requested page_no.
1115 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1119 ReadLock (latch->readwr);
1122 WriteLock (latch->readwr);
1125 ReadLock (latch->access);
1128 WriteLock (latch->access);
1131 WriteLock (latch->parent);
1134 WriteLock (latch->atomic);
1139 // remove write, read, or parent lock on requested page
1141 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1145 ReadRelease (latch->readwr);
1148 WriteRelease (latch->readwr);
1151 ReadRelease (latch->access);
1154 WriteRelease (latch->access);
1157 WriteRelease (latch->parent);
1160 WriteRelease (latch->atomic);
1165 // allocate a new page
1166 // return with page latched, but unlocked.
1168 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1173 // lock allocation page
1175 bt_spinwritelock(bt->mgr->lock);
1177 // use empty chain first
1178 // else allocate empty page
1180 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1181 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1182 set->page = bt_mappage (bt, set->latch);
1184 return bt->err = BTERR_struct, -1;
1186 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1187 bt_spinreleasewrite(bt->mgr->lock);
1189 memcpy (set->page, contents, bt->mgr->page_size);
1190 set->latch->dirty = 1;
1194 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1195 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1197 // unlock allocation latch
1199 bt_spinreleasewrite(bt->mgr->lock);
1201 // don't load cache from btree page
1203 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1204 set->page = bt_mappage (bt, set->latch);
1206 return bt->err = BTERR_struct;
1208 memcpy (set->page, contents, bt->mgr->page_size);
1209 set->latch->dirty = 1;
1213 // find slot in page for given key at a given level
1215 int bt_findslot (BtPage page, unsigned char *key, uint len)
1217 uint diff, higher = page->cnt, low = 1, slot;
1220 // make stopper key an infinite fence value
1222 if( bt_getid (page->right) )
1227 // low is the lowest candidate.
1228 // loop ends when they meet
1230 // higher is already
1231 // tested as .ge. the passed key.
1233 while( diff = higher - low ) {
1234 slot = low + ( diff >> 1 );
1235 if( keycmp (keyptr(page, slot), key, len) < 0 )
1238 higher = slot, good++;
1241 // return zero if key is on right link page
1243 return good ? higher : 0;
1246 // find and load page at given level for given key
1247 // leave page rd or wr locked as requested
1249 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1251 uid page_no = ROOT_page, prevpage = 0;
1252 uint drill = 0xff, slot;
1253 BtLatchSet *prevlatch;
1254 uint mode, prevmode;
1256 // start at root of btree and drill down
1259 // determine lock mode of drill level
1260 mode = (drill == lvl) ? lock : BtLockRead;
1262 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1265 // obtain access lock using lock chaining with Access mode
1267 if( page_no > ROOT_page )
1268 bt_lockpage(BtLockAccess, set->latch);
1270 set->page = bt_mappage (bt, set->latch);
1272 // release & unpin parent or left sibling page
1275 bt_unlockpage(prevmode, prevlatch);
1276 bt_unpinlatch (prevlatch);
1280 // obtain mode lock using lock chaining through AccessLock
1282 bt_lockpage(mode, set->latch);
1284 if( set->page->free )
1285 return bt->err = BTERR_struct, 0;
1287 if( page_no > ROOT_page )
1288 bt_unlockpage(BtLockAccess, set->latch);
1290 // re-read and re-lock root after determining actual level of root
1292 if( set->page->lvl != drill) {
1293 if( set->latch->page_no != ROOT_page )
1294 return bt->err = BTERR_struct, 0;
1296 drill = set->page->lvl;
1298 if( lock != BtLockRead && drill == lvl ) {
1299 bt_unlockpage(mode, set->latch);
1300 bt_unpinlatch (set->latch);
1305 prevpage = set->latch->page_no;
1306 prevlatch = set->latch;
1309 // find key on page at this level
1310 // and descend to requested level
1312 if( set->page->kill )
1315 if( slot = bt_findslot (set->page, key, len) ) {
1319 // find next non-dead slot -- the fence key if nothing else
1321 while( slotptr(set->page, slot)->dead )
1322 if( slot++ < set->page->cnt )
1325 return bt->err = BTERR_struct, 0;
1327 page_no = bt_getid(valptr(set->page, slot)->value);
1332 // or slide right into next page
1335 page_no = bt_getid(set->page->right);
1339 // return error on end of right chain
1341 bt->err = BTERR_struct;
1342 return 0; // return error
1345 // return page to free list
1346 // page must be delete & write locked
1348 void bt_freepage (BtDb *bt, BtPageSet *set)
1350 // lock allocation page
1352 bt_spinwritelock (bt->mgr->lock);
1356 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1357 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1358 set->latch->dirty = 1;
1359 set->page->free = 1;
1361 // unlock released page
1363 bt_unlockpage (BtLockDelete, set->latch);
1364 bt_unlockpage (BtLockWrite, set->latch);
1366 // unlock allocation page
1368 bt_spinreleasewrite (bt->mgr->lock);
1371 // a fence key was deleted from a page
1372 // push new fence value upwards
1374 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1376 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1377 unsigned char value[BtId];
1381 // remove the old fence value
1383 ptr = keyptr(set->page, set->page->cnt);
1384 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1385 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1386 set->latch->dirty = 1;
1388 // cache new fence value
1390 ptr = keyptr(set->page, set->page->cnt);
1391 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1393 bt_lockpage (BtLockParent, set->latch);
1394 bt_unlockpage (BtLockWrite, set->latch);
1396 // insert new (now smaller) fence key
1398 bt_putid (value, set->latch->page_no);
1399 ptr = (BtKey*)leftkey;
1401 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1404 // now delete old fence key
1406 ptr = (BtKey*)rightkey;
1408 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1411 bt_unlockpage (BtLockParent, set->latch);
1412 bt_unpinlatch(set->latch);
1416 // root has a single child
1417 // collapse a level from the tree
1419 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1425 // find the child entry and promote as new root contents
1428 for( idx = 0; idx++ < root->page->cnt; )
1429 if( !slotptr(root->page, idx)->dead )
1432 page_no = bt_getid (valptr(root->page, idx)->value);
1434 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1435 child->page = bt_mappage (bt, child->latch);
1439 bt_lockpage (BtLockDelete, child->latch);
1440 bt_lockpage (BtLockWrite, child->latch);
1442 memcpy (root->page, child->page, bt->mgr->page_size);
1443 root->latch->dirty = 1;
1445 bt_freepage (bt, child);
1446 bt_unpinlatch (child->latch);
1448 } while( root->page->lvl > 1 && root->page->act == 1 );
1450 bt_unlockpage (BtLockWrite, root->latch);
1451 bt_unpinlatch (root->latch);
1455 // delete a page and manage keys
1456 // call with page writelocked
1457 // returns with page unpinned
1459 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1461 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1462 unsigned char value[BtId];
1463 uint lvl = set->page->lvl;
1468 // cache copy of fence key
1469 // to post in parent
1471 ptr = keyptr(set->page, set->page->cnt);
1472 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1474 // obtain lock on right page
1476 page_no = bt_getid(set->page->right);
1478 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1479 right->page = bt_mappage (bt, right->latch);
1483 bt_lockpage (BtLockWrite, right->latch);
1485 // cache copy of key to update
1487 ptr = keyptr(right->page, right->page->cnt);
1488 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1490 if( right->page->kill )
1491 return bt->err = BTERR_struct;
1493 // pull contents of right peer into our empty page
1495 memcpy (set->page, right->page, bt->mgr->page_size);
1496 set->latch->dirty = 1;
1498 // mark right page deleted and point it to left page
1499 // until we can post parent updates that remove access
1500 // to the deleted page.
1502 bt_putid (right->page->right, set->latch->page_no);
1503 right->latch->dirty = 1;
1504 right->page->kill = 1;
1506 bt_lockpage (BtLockParent, right->latch);
1507 bt_unlockpage (BtLockWrite, right->latch);
1509 bt_lockpage (BtLockParent, set->latch);
1510 bt_unlockpage (BtLockWrite, set->latch);
1512 // redirect higher key directly to our new node contents
1514 bt_putid (value, set->latch->page_no);
1515 ptr = (BtKey*)higherfence;
1517 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1520 // delete old lower key to our node
1522 ptr = (BtKey*)lowerfence;
1524 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1527 // obtain delete and write locks to right node
1529 bt_unlockpage (BtLockParent, right->latch);
1530 bt_lockpage (BtLockDelete, right->latch);
1531 bt_lockpage (BtLockWrite, right->latch);
1532 bt_freepage (bt, right);
1533 bt_unpinlatch (right->latch);
1535 bt_unlockpage (BtLockParent, set->latch);
1536 bt_unpinlatch (set->latch);
1541 // find and delete key on page by marking delete flag bit
1542 // if page becomes empty, delete it from the btree
1544 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1546 uint slot, idx, found, fence;
1551 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1552 ptr = keyptr(set->page, slot);
1556 // if librarian slot, advance to real slot
1558 if( slotptr(set->page, slot)->type == Librarian )
1559 ptr = keyptr(set->page, ++slot);
1561 fence = slot == set->page->cnt;
1563 // if key is found delete it, otherwise ignore request
1565 if( found = !keycmp (ptr, key, len) )
1566 if( found = slotptr(set->page, slot)->dead == 0 ) {
1567 val = valptr(set->page,slot);
1568 slotptr(set->page, slot)->dead = 1;
1569 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1572 // collapse empty slots beneath the fence
1574 while( idx = set->page->cnt - 1 )
1575 if( slotptr(set->page, idx)->dead ) {
1576 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1577 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1582 // did we delete a fence key in an upper level?
1584 if( found && lvl && set->page->act && fence )
1585 if( bt_fixfence (bt, set, lvl) )
1588 return bt->found = found, 0;
1590 // do we need to collapse root?
1592 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1593 if( bt_collapseroot (bt, set) )
1596 return bt->found = found, 0;
1598 // delete empty page
1600 if( !set->page->act )
1601 return bt_deletepage (bt, set);
1603 set->latch->dirty = 1;
1604 bt_unlockpage(BtLockWrite, set->latch);
1605 bt_unpinlatch (set->latch);
1606 return bt->found = found, 0;
1609 BtKey *bt_foundkey (BtDb *bt)
1611 return (BtKey*)(bt->key);
1614 // advance to next slot
1616 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1618 BtLatchSet *prevlatch;
1621 if( slot < set->page->cnt )
1624 prevlatch = set->latch;
1626 if( page_no = bt_getid(set->page->right) )
1627 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1628 set->page = bt_mappage (bt, set->latch);
1632 return bt->err = BTERR_struct, 0;
1634 // obtain access lock using lock chaining with Access mode
1636 bt_lockpage(BtLockAccess, set->latch);
1638 bt_unlockpage(BtLockRead, prevlatch);
1639 bt_unpinlatch (prevlatch);
1641 bt_lockpage(BtLockRead, set->latch);
1642 bt_unlockpage(BtLockAccess, set->latch);
1646 // find unique key or first duplicate key in
1647 // leaf level and return number of value bytes
1648 // or (-1) if not found. Setup key for bt_foundkey
1650 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1658 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1660 ptr = keyptr(set->page, slot);
1662 // skip librarian slot place holder
1664 if( slotptr(set->page, slot)->type == Librarian )
1665 ptr = keyptr(set->page, ++slot);
1667 // return actual key found
1669 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1672 if( slotptr(set->page, slot)->type == Duplicate )
1675 // not there if we reach the stopper key
1677 if( slot == set->page->cnt )
1678 if( !bt_getid (set->page->right) )
1681 // if key exists, return >= 0 value bytes copied
1682 // otherwise return (-1)
1684 if( slotptr(set->page, slot)->dead )
1688 if( !memcmp (ptr->key, key, len) ) {
1689 val = valptr (set->page,slot);
1690 if( valmax > val->len )
1692 memcpy (value, val->value, valmax);
1698 } while( slot = bt_findnext (bt, set, slot) );
1700 bt_unlockpage (BtLockRead, set->latch);
1701 bt_unpinlatch (set->latch);
1705 // check page for space available,
1706 // clean if necessary and return
1707 // 0 - page needs splitting
1708 // >0 new slot value
1710 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1712 uint nxt = bt->mgr->page_size;
1713 BtPage page = set->page;
1714 uint cnt = 0, idx = 0;
1715 uint max = page->cnt;
1720 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1723 // skip cleanup and proceed to split
1724 // if there's not enough garbage
1727 if( page->garbage < nxt / 5 )
1730 memcpy (bt->frame, page, bt->mgr->page_size);
1732 // skip page info and set rest of page to zero
1734 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1735 set->latch->dirty = 1;
1739 // clean up page first by
1740 // removing deleted keys
1742 while( cnt++ < max ) {
1745 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1748 // copy the value across
1750 val = valptr(bt->frame, cnt);
1751 nxt -= val->len + sizeof(BtVal);
1752 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1754 // copy the key across
1756 key = keyptr(bt->frame, cnt);
1757 nxt -= key->len + sizeof(BtKey);
1758 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1760 // make a librarian slot
1763 slotptr(page, ++idx)->off = nxt;
1764 slotptr(page, idx)->type = Librarian;
1765 slotptr(page, idx)->dead = 1;
1770 slotptr(page, ++idx)->off = nxt;
1771 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1773 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1780 // see if page has enough space now, or does it need splitting?
1782 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1788 // split the root and raise the height of the btree
1790 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1792 unsigned char leftkey[BT_keyarray];
1793 uint nxt = bt->mgr->page_size;
1794 unsigned char value[BtId];
1800 // save left page fence key for new root
1802 ptr = keyptr(root->page, root->page->cnt);
1803 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1805 // Obtain an empty page to use, and copy the current
1806 // root contents into it, e.g. lower keys
1808 if( bt_newpage(bt, left, root->page) )
1811 left_page_no = left->latch->page_no;
1812 bt_unpinlatch (left->latch);
1814 // preserve the page info at the bottom
1815 // of higher keys and set rest to zero
1817 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1819 // insert stopper key at top of newroot page
1820 // and increase the root height
1822 nxt -= BtId + sizeof(BtVal);
1823 bt_putid (value, right->page_no);
1824 val = (BtVal *)((unsigned char *)root->page + nxt);
1825 memcpy (val->value, value, BtId);
1828 nxt -= 2 + sizeof(BtKey);
1829 slotptr(root->page, 2)->off = nxt;
1830 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1835 // insert lower keys page fence key on newroot page as first key
1837 nxt -= BtId + sizeof(BtVal);
1838 bt_putid (value, left_page_no);
1839 val = (BtVal *)((unsigned char *)root->page + nxt);
1840 memcpy (val->value, value, BtId);
1843 ptr = (BtKey *)leftkey;
1844 nxt -= ptr->len + sizeof(BtKey);
1845 slotptr(root->page, 1)->off = nxt;
1846 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1848 bt_putid(root->page->right, 0);
1849 root->page->min = nxt; // reset lowest used offset and key count
1850 root->page->cnt = 2;
1851 root->page->act = 2;
1854 // release and unpin root pages
1856 bt_unlockpage(BtLockWrite, root->latch);
1857 bt_unpinlatch (root->latch);
1859 bt_unpinlatch (right);
1863 // split already locked full node
1865 // return pool entry for new right
1868 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1870 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1871 uint lvl = set->page->lvl;
1878 // split higher half of keys to bt->frame
1880 memset (bt->frame, 0, bt->mgr->page_size);
1881 max = set->page->cnt;
1885 while( cnt++ < max ) {
1886 if( slotptr(set->page, cnt)->dead && cnt < max )
1888 src = valptr(set->page, cnt);
1889 nxt -= src->len + sizeof(BtVal);
1890 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1892 key = keyptr(set->page, cnt);
1893 nxt -= key->len + sizeof(BtKey);
1894 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1895 memcpy (ptr, key, key->len + sizeof(BtKey));
1897 // add librarian slot
1900 slotptr(bt->frame, ++idx)->off = nxt;
1901 slotptr(bt->frame, idx)->type = Librarian;
1902 slotptr(bt->frame, idx)->dead = 1;
1907 slotptr(bt->frame, ++idx)->off = nxt;
1908 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1910 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1914 bt->frame->bits = bt->mgr->page_bits;
1915 bt->frame->min = nxt;
1916 bt->frame->cnt = idx;
1917 bt->frame->lvl = lvl;
1921 if( set->latch->page_no > ROOT_page )
1922 bt_putid (bt->frame->right, bt_getid (set->page->right));
1924 // get new free page and write higher keys to it.
1926 if( bt_newpage(bt, right, bt->frame) )
1929 memcpy (bt->frame, set->page, bt->mgr->page_size);
1930 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1931 set->latch->dirty = 1;
1933 nxt = bt->mgr->page_size;
1934 set->page->garbage = 0;
1940 if( slotptr(bt->frame, max)->type == Librarian )
1943 // assemble page of smaller keys
1945 while( cnt++ < max ) {
1946 if( slotptr(bt->frame, cnt)->dead )
1948 val = valptr(bt->frame, cnt);
1949 nxt -= val->len + sizeof(BtVal);
1950 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1952 key = keyptr(bt->frame, cnt);
1953 nxt -= key->len + sizeof(BtKey);
1954 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1956 // add librarian slot
1959 slotptr(set->page, ++idx)->off = nxt;
1960 slotptr(set->page, idx)->type = Librarian;
1961 slotptr(set->page, idx)->dead = 1;
1966 slotptr(set->page, ++idx)->off = nxt;
1967 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1971 bt_putid(set->page->right, right->latch->page_no);
1972 set->page->min = nxt;
1973 set->page->cnt = idx;
1975 return right->latch->entry;
1978 // fix keys for newly split page
1979 // call with page locked, return
1982 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
1984 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1985 unsigned char value[BtId];
1986 uint lvl = set->page->lvl;
1990 // if current page is the root page, split it
1992 if( set->latch->page_no == ROOT_page )
1993 return bt_splitroot (bt, set, right);
1995 ptr = keyptr(set->page, set->page->cnt);
1996 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1998 page = bt_mappage (bt, right);
2000 ptr = keyptr(page, page->cnt);
2001 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2003 // insert new fences in their parent pages
2005 bt_lockpage (BtLockParent, right);
2007 bt_lockpage (BtLockParent, set->latch);
2008 bt_unlockpage (BtLockWrite, set->latch);
2010 // insert new fence for reformulated left block of smaller keys
2012 bt_putid (value, set->latch->page_no);
2013 ptr = (BtKey *)leftkey;
2015 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2018 // switch fence for right block of larger keys to new right page
2020 bt_putid (value, right->page_no);
2021 ptr = (BtKey *)rightkey;
2023 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2026 bt_unlockpage (BtLockParent, set->latch);
2027 bt_unpinlatch (set->latch);
2029 bt_unlockpage (BtLockParent, right);
2030 bt_unpinlatch (right);
2034 // install new key and value onto page
2035 // page must already be checked for
2038 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2040 uint idx, librarian;
2045 // if found slot > desired slot and previous slot
2046 // is a librarian slot, use it
2049 if( slotptr(set->page, slot-1)->type == Librarian )
2052 // copy value onto page
2054 set->page->min -= vallen + sizeof(BtVal);
2055 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2056 memcpy (val->value, value, vallen);
2059 // copy key onto page
2061 set->page->min -= keylen + sizeof(BtKey);
2062 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2063 memcpy (ptr->key, key, keylen);
2066 // find first empty slot
2068 for( idx = slot; idx < set->page->cnt; idx++ )
2069 if( slotptr(set->page, idx)->dead )
2072 // now insert key into array before slot
2074 if( idx == set->page->cnt )
2075 idx += 2, set->page->cnt += 2, librarian = 2;
2079 set->latch->dirty = 1;
2082 while( idx > slot + librarian - 1 )
2083 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2085 // add librarian slot
2087 if( librarian > 1 ) {
2088 node = slotptr(set->page, slot++);
2089 node->off = set->page->min;
2090 node->type = Librarian;
2096 node = slotptr(set->page, slot);
2097 node->off = set->page->min;
2102 bt_unlockpage (BtLockWrite, set->latch);
2103 bt_unpinlatch (set->latch);
2109 // Insert new key into the btree at given level.
2110 // either add a new key or update/add an existing one
2112 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2114 unsigned char newkey[BT_keyarray];
2115 uint slot, idx, len, entry;
2122 // set up the key we're working on
2124 ins = (BtKey*)newkey;
2125 memcpy (ins->key, key, keylen);
2128 // is this a non-unique index value?
2134 sequence = bt_newdup (bt);
2135 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2139 while( 1 ) { // find the page and slot for the current key
2140 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2141 ptr = keyptr(set->page, slot);
2144 bt->err = BTERR_ovflw;
2148 // if librarian slot == found slot, advance to real slot
2150 if( slotptr(set->page, slot)->type == Librarian )
2151 if( !keycmp (ptr, key, keylen) )
2152 ptr = keyptr(set->page, ++slot);
2156 if( slotptr(set->page, slot)->type == Duplicate )
2159 // if inserting a duplicate key or unique key
2160 // check for adequate space on the page
2161 // and insert the new key before slot.
2163 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2164 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2165 if( !(entry = bt_splitpage (bt, set)) )
2167 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2172 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2175 // if key already exists, update value and return
2177 val = valptr(set->page, slot);
2179 if( val->len >= vallen ) {
2180 if( slotptr(set->page, slot)->dead )
2182 set->page->garbage += val->len - vallen;
2183 set->latch->dirty = 1;
2184 slotptr(set->page, slot)->dead = 0;
2186 memcpy (val->value, value, vallen);
2187 bt_unlockpage(BtLockWrite, set->latch);
2188 bt_unpinlatch (set->latch);
2192 // new update value doesn't fit in existing value area
2194 if( !slotptr(set->page, slot)->dead )
2195 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2197 slotptr(set->page, slot)->dead = 0;
2201 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2202 if( !(entry = bt_splitpage (bt, set)) )
2204 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2209 set->page->min -= vallen + sizeof(BtVal);
2210 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2211 memcpy (val->value, value, vallen);
2214 set->latch->dirty = 1;
2215 set->page->min -= keylen + sizeof(BtKey);
2216 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2217 memcpy (ptr->key, key, keylen);
2220 slotptr(set->page, slot)->off = set->page->min;
2221 bt_unlockpage(BtLockWrite, set->latch);
2222 bt_unpinlatch (set->latch);
2229 uint entry; // latch table entry number
2230 uint slot:31; // page slot number
2231 uint reuse:1; // reused previous page
2235 uid page_no; // page number for split leaf
2236 void *next; // next key to insert
2237 uint entry:30; // latch table entry number
2238 uint type:2; // 0 == insert, 1 == delete, 2 == free
2239 unsigned char leafkey[BT_keyarray];
2242 // find and load leaf page for given key
2243 // leave page Atomic locked and Read locked.
2245 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2247 BtLatchSet *prevlatch;
2251 // find level zero page
2253 if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2256 // find next non-dead entry on this page
2257 // it will be the fence key if nothing else
2259 while( slotptr(set->page, slot)->dead )
2260 if( slot++ < set->page->cnt )
2263 return bt->err = BTERR_struct, 0;
2265 page_no = bt_getid(valptr(set->page, slot)->value);
2266 prevlatch = set->latch;
2269 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2270 set->page = bt_mappage (bt, set->latch);
2274 if( set->page->free || set->page->lvl )
2275 return bt->err = BTERR_struct, 0;
2277 // obtain read lock using lock chaining with Access mode
2278 // release & unpin parent/left sibling page
2280 bt_lockpage(BtLockAccess, set->latch);
2282 bt_unlockpage(BtLockRead, prevlatch);
2283 bt_unpinlatch (prevlatch);
2285 bt_lockpage(BtLockRead, set->latch);
2286 bt_unlockpage(BtLockAccess, set->latch);
2288 // find key on page at this level
2289 // and descend to requested level
2291 if( !set->page->kill )
2292 if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2293 bt_unlockpage(BtLockRead, set->latch);
2294 bt_lockpage(BtLockAtomic, set->latch);
2295 bt_lockpage(BtLockAccess, set->latch);
2296 bt_lockpage(BtLockRead, set->latch);
2297 bt_unlockpage(BtLockAccess, set->latch);
2299 if( slot = bt_findslot (set->page, key, len) )
2302 bt_unlockpage(BtLockAtomic, set->latch);
2305 // slide right into next page
2307 page_no = bt_getid(set->page->right);
2308 prevlatch = set->latch;
2311 // return error on end of right chain
2313 bt->err = BTERR_struct;
2314 return 0; // return error
2317 // determine actual page where key is located
2318 // return slot number
2320 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2322 BtKey *key = keyptr(source,src);
2323 uint slot = locks[src].slot;
2326 if( src > 1 && locks[src].reuse )
2327 entry = locks[src-1].entry, slot = 0;
2329 entry = locks[src].entry;
2332 set->latch = bt->mgr->latchsets + entry;
2333 set->page = bt_mappage (bt, set->latch);
2337 // is locks->reuse set?
2338 // if so, find where our key
2339 // is located on previous page or split pages
2342 set->latch = bt->mgr->latchsets + entry;
2343 set->page = bt_mappage (bt, set->latch);
2345 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2346 if( locks[src].reuse )
2347 locks[src].entry = entry;
2350 } while( entry = set->latch->split );
2352 bt->err = BTERR_atomic;
2356 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2358 BtKey *key = keyptr(source, src);
2359 BtVal *val = valptr(source, src);
2364 while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2365 if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2366 return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2368 if( entry = bt_splitpage (bt, set) )
2369 latch = bt->mgr->latchsets + entry;
2373 // splice right page into split chain
2374 // and WriteLock it.
2376 latch->split = set->latch->split;
2377 set->latch->split = entry;
2378 bt_lockpage(BtLockWrite, latch);
2381 return bt->err = BTERR_atomic;
2384 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2386 BtKey *key = keyptr(source, src);
2387 uint idx, entry, slot;
2392 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2393 slotptr(set->page, slot)->dead = 1;
2395 return bt->err = BTERR_struct;
2397 ptr = keyptr(set->page, slot);
2398 val = valptr(set->page, slot);
2400 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2401 set->latch->dirty = 1;
2406 // atomic modification of a batch of keys.
2408 // return -1 if BTERR is set
2409 // otherwise return slot number
2410 // causing the key constraint violation
2411 // or zero on successful completion.
2413 int bt_atomictxn (BtDb *bt, BtPage source)
2415 uint src, idx, slot, samepage, entry;
2416 AtomicKey *head, *tail, *leaf;
2417 BtPageSet set[1], prev[1];
2418 unsigned char value[BtId];
2419 BtKey *key, *ptr, *key2;
2429 locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2433 // stable sort the list of keys into order to
2434 // prevent deadlocks between threads.
2436 for( src = 1; src++ < source->cnt; ) {
2437 *temp = *slotptr(source,src);
2438 key = keyptr (source,src);
2440 for( idx = src; --idx; ) {
2441 key2 = keyptr (source,idx);
2442 if( keycmp (key, key2->key, key2->len) < 0 ) {
2443 *slotptr(source,idx+1) = *slotptr(source,idx);
2444 *slotptr(source,idx) = *temp;
2450 // Load the leaf page for each key
2451 // group same page references with reuse bit
2452 // and determine any constraint violations
2454 for( src = 0; src++ < source->cnt; ) {
2455 key = keyptr(source, src);
2458 // first determine if this modification falls
2459 // on the same page as the previous modification
2460 // note that the far right leaf page is a special case
2462 if( samepage = src > 1 )
2463 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2464 slot = bt_findslot(set->page, key->key, key->len);
2465 else // release read on previous page
2466 bt_unlockpage(BtLockRead, set->latch);
2469 if( slot = bt_atomicload(bt, set, key->key, key->len) )
2470 set->latch->split = 0;
2474 if( slotptr(set->page, slot)->type == Librarian )
2475 ptr = keyptr(set->page, ++slot);
2477 ptr = keyptr(set->page, slot);
2480 locks[src].entry = set->latch->entry;
2481 locks[src].slot = slot;
2482 locks[src].reuse = 0;
2484 locks[src].entry = 0;
2485 locks[src].slot = 0;
2486 locks[src].reuse = 1;
2489 switch( slotptr(source, src)->type ) {
2492 if( !slotptr(set->page, slot)->dead )
2493 if( slot < set->page->cnt || bt_getid (set->page->right) )
2494 if( !keycmp (ptr, key->key, key->len) ) {
2496 // return constraint violation if key already exists
2498 bt_unlockpage(BtLockRead, set->latch);
2502 if( locks[src].entry ) {
2503 set->latch = bt->mgr->latchsets + locks[src].entry;
2504 bt_unlockpage(BtLockAtomic, set->latch);
2505 bt_unpinlatch (set->latch);
2516 // unlock last loadpage lock
2518 if( source->cnt > 1 )
2519 bt_unlockpage(BtLockRead, set->latch);
2521 // obtain write lock for each master page
2523 for( src = 0; src++ < source->cnt; )
2524 if( locks[src].reuse )
2527 bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2529 // insert or delete each key
2530 // process any splits or merges
2531 // release Write & Atomic latches
2532 // set ParentModifications and build
2533 // queue of keys to insert for split pages
2534 // or delete for deleted pages.
2536 // run through txn list backwards
2538 samepage = source->cnt + 1;
2540 for( src = source->cnt; src; src-- ) {
2541 if( locks[src].reuse )
2544 // perform the txn operations
2545 // from smaller to larger on
2548 for( idx = src; idx < samepage; idx++ )
2549 switch( slotptr(source,idx)->type ) {
2551 if( bt_atomicdelete (bt, source, locks, idx) )
2557 if( bt_atomicinsert (bt, source, locks, idx) )
2562 // after the same page operations have finished,
2563 // process master page for splits or deletion.
2565 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2566 prev->page = bt_mappage (bt, prev->latch);
2569 // pick-up all splits from master page
2571 entry = prev->latch->split;
2574 set->latch = bt->mgr->latchsets + entry;
2575 set->page = bt_mappage (bt, set->latch);
2576 entry = set->latch->split;
2578 // delete empty master page
2580 if( !prev->page->act ) {
2581 memcpy (set->page->left, prev->page->left, BtId);
2582 memcpy (prev->page, set->page, bt->mgr->page_size);
2583 bt_lockpage (BtLockDelete, set->latch);
2584 bt_freepage (bt, set);
2585 bt_unpinlatch (set->latch);
2587 prev->latch->dirty = 1;
2591 // remove empty page from the split chain
2593 if( !set->page->act ) {
2594 memcpy (prev->page->right, set->page->right, BtId);
2595 prev->latch->split = set->latch->split;
2596 bt_lockpage (BtLockDelete, set->latch);
2597 bt_freepage (bt, set);
2598 bt_unpinlatch (set->latch);
2602 // schedule prev fence key update
2604 ptr = keyptr(prev->page,prev->page->cnt);
2605 leaf = malloc (sizeof(AtomicKey));
2607 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2608 leaf->page_no = prev->latch->page_no;
2609 leaf->entry = prev->latch->entry;
2620 bt_putid (set->page->left, prev->latch->page_no);
2621 bt_lockpage(BtLockParent, prev->latch);
2622 bt_unlockpage(BtLockWrite, prev->latch);
2626 // update left pointer in next right page
2627 // if we did any now non-empty splits
2629 if( latch->split ) {
2630 // fix left pointer in master's original right sibling
2631 // or set rightmost page in page zero
2633 if( right = bt_getid (prev->page->right) ) {
2634 if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
2635 set->page = bt_mappage (bt, set->latch);
2639 bt_lockpage (BtLockWrite, set->latch);
2640 bt_putid (set->page->left, prev->latch->page_no);
2641 set->latch->dirty = 1;
2642 bt_unlockpage (BtLockWrite, set->latch);
2643 bt_unpinlatch (set->latch);
2645 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2647 // Process last page split in chain
2649 ptr = keyptr(prev->page,prev->page->cnt);
2650 leaf = malloc (sizeof(AtomicKey));
2652 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2653 leaf->page_no = prev->latch->page_no;
2654 leaf->entry = prev->latch->entry;
2665 bt_lockpage(BtLockParent, prev->latch);
2666 bt_unlockpage(BtLockWrite, prev->latch);
2667 bt_unlockpage(BtLockAtomic, latch);
2671 // finished if prev page occupied (either master or final split)
2673 if( prev->page->act ) {
2674 bt_unlockpage(BtLockWrite, latch);
2675 bt_unlockpage(BtLockAtomic, latch);
2676 bt_unpinlatch(latch);
2680 // any splits were reversed, and the
2681 // master page located in prev is empty, delete it
2682 // by pulling over master's right sibling.
2683 // Delete empty master's fence
2685 ptr = keyptr(prev->page,prev->page->cnt);
2686 leaf = malloc (sizeof(AtomicKey));
2688 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2689 leaf->page_no = prev->latch->page_no;
2690 leaf->entry = prev->latch->entry;
2701 bt_lockpage(BtLockParent, prev->latch);
2702 bt_unlockpage(BtLockWrite, prev->latch);
2704 if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2705 set->page = bt_mappage (bt, set->latch);
2709 // add page to our transaction
2711 bt_lockpage(BtLockAtomic, set->latch);
2712 bt_lockpage(BtLockWrite, set->latch);
2714 // pull contents over empty page
2716 memcpy (set->page->left, prev->page->left, BtId);
2717 memcpy (prev->page, set->page, bt->mgr->page_size);
2719 bt_putid (set->page->right, prev->latch->page_no);
2720 set->latch->dirty = 1;
2721 set->page->kill = 1;
2723 // add new parent key for new master page contents
2724 // delete it after parent posts the new master fence.
2726 ptr = keyptr(set->page,set->page->cnt);
2727 leaf = malloc (sizeof(AtomicKey));
2729 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2730 leaf->page_no = prev->latch->page_no;
2731 leaf->entry = set->latch->entry;
2742 // bt_lockpage(BtLockParent, set->latch);
2743 bt_unlockpage(BtLockWrite, set->latch);
2745 // fix master's far right sibling's left pointer
2747 if( right = bt_getid (set->page->right) ) {
2748 if( set->latch = bt_pinlatch (bt, right, 1) )
2749 set->page = bt_mappage (bt, set->latch);
2751 bt_lockpage (BtLockWrite, set->latch);
2752 bt_putid (set->page->left, prev->latch->page_no);
2753 set->latch->dirty = 1;
2755 bt_unlockpage (BtLockWrite, set->latch);
2756 bt_unpinlatch (set->latch);
2759 bt_unlockpage(BtLockAtomic, latch);
2762 // add & delete keys for any pages split or merged during transaction
2766 set->latch = bt->mgr->latchsets + leaf->entry;
2767 set->page = bt_mappage (bt, set->latch);
2769 bt_putid (value, leaf->page_no);
2770 ptr = (BtKey *)leaf->leafkey;
2772 switch( leaf->type ) {
2773 case 0: // insert key
2774 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2777 bt_unlockpage (BtLockParent, set->latch);
2780 case 1: // delete key
2781 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2784 bt_unlockpage (BtLockParent, set->latch);
2787 case 2: // insert key & free
2788 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2791 bt_unlockpage (BtLockAtomic, set->latch);
2792 bt_lockpage (BtLockDelete, set->latch);
2793 bt_lockpage (BtLockWrite, set->latch);
2794 bt_freepage (bt, set);
2798 bt_unpinlatch (set->latch);
2801 } while( leaf = tail );
2809 // set cursor to highest slot on highest page
2811 uint bt_lastkey (BtDb *bt)
2813 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2817 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2818 set->page = bt_mappage (bt, set->latch);
2822 bt_lockpage(BtLockRead, set->latch);
2824 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2825 slot = set->page->cnt;
2827 bt_unlockpage(BtLockRead, set->latch);
2828 bt_unpinlatch (set->latch);
2832 // return previous slot on cursor page
2834 uint bt_prevkey (BtDb *bt, uint slot)
2836 uid ourright, next, us = bt->cursor_page;
2842 ourright = bt_getid(bt->cursor->right);
2845 if( !(next = bt_getid(bt->cursor->left)) )
2849 bt->cursor_page = next;
2851 if( set->latch = bt_pinlatch (bt, next, 1) )
2852 set->page = bt_mappage (bt, set->latch);
2856 bt_lockpage(BtLockRead, set->latch);
2857 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2858 bt_unlockpage(BtLockRead, set->latch);
2859 bt_unpinlatch (set->latch);
2861 next = bt_getid (bt->cursor->right);
2864 if( next == ourright )
2869 return bt->cursor->cnt;
2872 // return next slot on cursor page
2873 // or slide cursor right into next page
2875 uint bt_nextkey (BtDb *bt, uint slot)
2881 right = bt_getid(bt->cursor->right);
2883 while( slot++ < bt->cursor->cnt )
2884 if( slotptr(bt->cursor,slot)->dead )
2886 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2894 bt->cursor_page = right;
2896 if( set->latch = bt_pinlatch (bt, right, 1) )
2897 set->page = bt_mappage (bt, set->latch);
2901 bt_lockpage(BtLockRead, set->latch);
2903 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2905 bt_unlockpage(BtLockRead, set->latch);
2906 bt_unpinlatch (set->latch);
2914 // cache page of keys into cursor and return starting slot for given key
2916 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2921 // cache page for retrieval
2923 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2924 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2928 bt->cursor_page = set->latch->page_no;
2930 bt_unlockpage(BtLockRead, set->latch);
2931 bt_unpinlatch (set->latch);
2935 BtKey *bt_key(BtDb *bt, uint slot)
2937 return keyptr(bt->cursor, slot);
2940 BtVal *bt_val(BtDb *bt, uint slot)
2942 return valptr(bt->cursor,slot);
2948 double getCpuTime(int type)
2951 FILETIME xittime[1];
2952 FILETIME systime[1];
2953 FILETIME usrtime[1];
2954 SYSTEMTIME timeconv[1];
2957 memset (timeconv, 0, sizeof(SYSTEMTIME));
2961 GetSystemTimeAsFileTime (xittime);
2962 FileTimeToSystemTime (xittime, timeconv);
2963 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2966 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2967 FileTimeToSystemTime (usrtime, timeconv);
2970 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2971 FileTimeToSystemTime (systime, timeconv);
2975 ans += (double)timeconv->wHour * 3600;
2976 ans += (double)timeconv->wMinute * 60;
2977 ans += (double)timeconv->wSecond;
2978 ans += (double)timeconv->wMilliseconds / 1000;
2983 #include <sys/resource.h>
2985 double getCpuTime(int type)
2987 struct rusage used[1];
2988 struct timeval tv[1];
2992 gettimeofday(tv, NULL);
2993 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2996 getrusage(RUSAGE_SELF, used);
2997 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3000 getrusage(RUSAGE_SELF, used);
3001 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3008 void bt_poolaudit (BtMgr *mgr)
3013 while( slot++ < mgr->latchdeployed ) {
3014 latch = mgr->latchsets + slot;
3016 if( *latch->readwr->rin & MASK )
3017 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3018 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3020 if( *latch->access->rin & MASK )
3021 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3022 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3024 if( *latch->parent->rin & MASK )
3025 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3026 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3028 if( latch->pin & ~CLOCK_bit ) {
3029 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3035 uint bt_latchaudit (BtDb *bt)
3037 ushort idx, hashidx;
3043 if( *(ushort *)(bt->mgr->lock) )
3044 fprintf(stderr, "Alloc page locked\n");
3045 *(ushort *)(bt->mgr->lock) = 0;
3047 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3048 latch = bt->mgr->latchsets + idx;
3049 if( *latch->readwr->rin & MASK )
3050 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3051 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3053 if( *latch->access->rin & MASK )
3054 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3055 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3057 if( *latch->parent->rin & MASK )
3058 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3059 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3062 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3067 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3068 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3069 fprintf(stderr, "hash entry %d locked\n", hashidx);
3071 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3073 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3074 latch = bt->mgr->latchsets + idx;
3076 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3077 } while( idx = latch->next );
3080 page_no = LEAF_page;
3082 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3083 uid off = page_no << bt->mgr->page_bits;
3085 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3089 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3091 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3092 return bt->err = BTERR_map;
3094 if( *amt < bt->mgr->page_size )
3095 return bt->err = BTERR_map;
3097 if( !bt->frame->free && !bt->frame->lvl )
3098 cnt += bt->frame->act;
3102 cnt--; // remove stopper key
3103 fprintf(stderr, " Total keys read %d\n", cnt);
3117 // standalone program to index file of keys
3118 // then list them onto std-out
3121 void *index_file (void *arg)
3123 uint __stdcall index_file (void *arg)
3126 int line = 0, found = 0, cnt = 0, unique;
3127 uid next, page_no = LEAF_page; // start on first page of leaves
3128 unsigned char key[BT_maxkey];
3129 unsigned char txn[65536];
3130 ThreadArg *args = arg;
3131 int ch, len = 0, slot;
3140 bt = bt_open (args->mgr);
3143 unique = (args->type[1] | 0x20) != 'd';
3145 switch(args->type[0] | 0x20)
3148 fprintf(stderr, "started latch mgr audit\n");
3149 cnt = bt_latchaudit (bt);
3150 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3154 fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
3155 if( in = fopen (args->infile, "rb") )
3156 while( ch = getc(in), ch != EOF )
3161 memcpy (txn + nxt, key + 10, len - 10);
3163 txn[nxt] = len - 10;
3165 memcpy (txn + nxt, key, 10);
3168 slotptr(page,++cnt)->off = nxt;
3169 slotptr(page,cnt)->type = Unique;
3172 if( cnt < args->num )
3179 if( bt_atomictxn (bt, page) )
3180 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3185 else if( len < BT_maxkey )
3187 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3191 fprintf(stderr, "started pennysort for %s\n", args->infile);
3192 if( in = fopen (args->infile, "rb") )
3193 while( ch = getc(in), ch != EOF )
3198 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
3199 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3202 else if( len < BT_maxkey )
3204 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3208 fprintf(stderr, "started indexing for %s\n", args->infile);
3209 if( in = fopen (args->infile, "r") )
3210 while( ch = getc(in), ch != EOF )
3215 if( args->num == 1 )
3216 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3218 else if( args->num )
3219 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3221 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
3222 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3225 else if( len < BT_maxkey )
3227 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3231 fprintf(stderr, "started deleting keys for %s\n", args->infile);
3232 if( in = fopen (args->infile, "rb") )
3233 while( ch = getc(in), ch != EOF )
3237 if( args->num == 1 )
3238 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3240 else if( args->num )
3241 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3243 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
3244 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
3245 ptr = (BtKey*)(bt->key);
3248 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
3249 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3252 else if( len < BT_maxkey )
3254 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3258 fprintf(stderr, "started finding keys for %s\n", args->infile);
3259 if( in = fopen (args->infile, "rb") )
3260 while( ch = getc(in), ch != EOF )
3264 if( args->num == 1 )
3265 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3267 else if( args->num )
3268 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3270 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3273 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3276 else if( len < BT_maxkey )
3278 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3282 fprintf(stderr, "started scanning\n");
3284 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3285 set->page = bt_mappage (bt, set->latch);
3287 fprintf(stderr, "unable to obtain latch"), exit(1);
3288 bt_lockpage (BtLockRead, set->latch);
3289 next = bt_getid (set->page->right);
3291 for( slot = 0; slot++ < set->page->cnt; )
3292 if( next || slot < set->page->cnt )
3293 if( !slotptr(set->page, slot)->dead ) {
3294 ptr = keyptr(set->page, slot);
3297 if( slotptr(set->page, slot)->type == Duplicate )
3300 fwrite (ptr->key, len, 1, stdout);
3301 val = valptr(set->page, slot);
3302 fwrite (val->value, val->len, 1, stdout);
3303 fputc ('\n', stdout);
3307 bt_unlockpage (BtLockRead, set->latch);
3308 bt_unpinlatch (set->latch);
3309 } while( page_no = next );
3311 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3316 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3318 fprintf(stderr, "started counting\n");
3319 page_no = LEAF_page;
3321 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3322 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3325 if( !bt->frame->free && !bt->frame->lvl )
3326 cnt += bt->frame->act;
3332 cnt--; // remove stopper key
3333 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3345 typedef struct timeval timer;
3347 int main (int argc, char **argv)
3349 int idx, cnt, len, slot, err;
3350 int segsize, bits = 16;
3367 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3368 fprintf (stderr, " where page_bits is the page size in bits\n");
3369 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3370 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
3371 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3375 start = getCpuTime(0);
3378 bits = atoi(argv[3]);
3381 poolsize = atoi(argv[4]);
3384 fprintf (stderr, "Warning: no mapped_pool\n");
3387 num = atoi(argv[5]);
3391 threads = malloc (cnt * sizeof(pthread_t));
3393 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3395 args = malloc (cnt * sizeof(ThreadArg));
3397 mgr = bt_mgr ((argv[1]), bits, poolsize);
3400 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3406 for( idx = 0; idx < cnt; idx++ ) {
3407 args[idx].infile = argv[idx + 6];
3408 args[idx].type = argv[2];
3409 args[idx].mgr = mgr;
3410 args[idx].num = num;
3411 args[idx].idx = idx;
3413 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3414 fprintf(stderr, "Error creating thread %d\n", err);
3416 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3420 // wait for termination
3423 for( idx = 0; idx < cnt; idx++ )
3424 pthread_join (threads[idx], NULL);
3426 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3428 for( idx = 0; idx < cnt; idx++ )
3429 CloseHandle(threads[idx]);
3435 elapsed = getCpuTime(0) - start;
3436 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3437 elapsed = getCpuTime(1);
3438 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3439 elapsed = getCpuTime(2);
3440 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);