1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and ACID batched key-value updates
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
88 There are six lock types for each node in four independent sets:
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
106 // definition for phase-fair reader/writer lock implementation
115 // write only queue lock
129 // definition for spin latch implementation
131 // exclusive is set for write access
132 // share is count of read accessors
133 // grant write lock when share == 0
135 volatile typedef struct {
146 // hash table entries
149 volatile uint slot; // Latch table entry at head of chain
150 BtSpinLatch latch[1];
153 // latch manager table structure
156 uid page_no; // latch set page number
157 RWLock readwr[1]; // read/write page lock
158 RWLock access[1]; // Access Intent/Page delete
159 WOLock parent[1]; // Posting of fence key in parent
160 WOLock atomic[1]; // Atomic update in progress
161 uint split; // right split page atomic insert
162 uint entry; // entry slot in latch table
163 uint next; // next entry in hash table chain
164 uint prev; // prev entry in hash table chain
165 volatile ushort pin; // number of outstanding threads
166 ushort dirty:1; // page in cache is dirty
169 // Define the length of the page record numbers
173 // Page key slot definition.
175 // Keys are marked dead, but remain on the page until
176 // it cleanup is called. The fence key (highest key) for
177 // a leaf page is always present, even after cleanup.
181 // In addition to the Unique keys that occupy slots
182 // there are Librarian and Duplicate key
183 // slots occupying the key slot array.
185 // The Librarian slots are dead keys that
186 // serve as filler, available to add new Unique
187 // or Dup slots that are inserted into the B-tree.
189 // The Duplicate slots have had their key bytes extended
190 // by 6 bytes to contain a binary duplicate key uniqueifier.
201 uint off:BT_maxbits; // page offset for key start
202 uint type:3; // type of slot
203 uint dead:1; // set for deleted slot
206 // The key structure occupies space at the upper end of
207 // each page. It's a length byte followed by the key
211 unsigned char len; // this can be changed to a ushort or uint
212 unsigned char key[0];
215 // the value structure also occupies space at the upper
216 // end of the page. Each key is immediately followed by a value.
219 unsigned char len; // this can be changed to a ushort or uint
220 unsigned char value[0];
223 #define BT_maxkey 255 // maximum number of bytes in a key
224 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
226 // The first part of an index page.
227 // It is immediately followed
228 // by the BtSlot array of keys.
230 // note that this structure size
231 // must be a multiple of 8 bytes
232 // in order to place dups correctly.
234 typedef struct BtPage_ {
235 uint cnt; // count of keys in page
236 uint act; // count of active keys
237 uint min; // next key offset
238 uint garbage; // page garbage in bytes
239 unsigned char bits:7; // page size in bits
240 unsigned char free:1; // page is on free chain
241 unsigned char lvl:7; // level of page
242 unsigned char kill:1; // page is being deleted
243 unsigned char right[BtId]; // page number to right
244 unsigned char left[BtId]; // page number to left
245 unsigned char filler[2]; // padding to multiple of 8
248 // The loadpage interface object
251 BtPage page; // current page pointer
252 BtLatchSet *latch; // current page latch set
255 // structure for latch manager on ALLOC_page
258 struct BtPage_ alloc[1]; // next page_no in right ptr
259 unsigned long long dups[1]; // global duplicate key uniqueifier
260 unsigned char chain[BtId]; // head of free page_nos chain
263 // The object structure for Btree access
266 uint page_size; // page size
267 uint page_bits; // page size in bits
273 BtPageZero *pagezero; // mapped allocation page
274 BtSpinLatch lock[1]; // allocation area lite latch
275 uint latchdeployed; // highest number of latch entries deployed
276 uint nlatchpage; // number of latch pages at BT_latch
277 uint latchtotal; // number of page latch entries
278 uint latchhash; // number of latch hash table slots
279 uint latchvictim; // next latch entry to examine
280 ushort thread_no[1]; // next thread number
281 BtHashEntry *hashtable; // the buffer pool hash table entries
282 BtLatchSet *latchsets; // mapped latch set from buffer pool
283 unsigned char *pagepool; // mapped to the buffer pool pages
285 HANDLE halloc; // allocation handle
286 HANDLE hpool; // buffer pool handle
291 BtMgr *mgr; // buffer manager for thread
292 BtPage cursor; // cached frame for start/next (never mapped)
293 BtPage frame; // spare frame for the page split (never mapped)
294 uid cursor_page; // current cursor page number
295 unsigned char *mem; // frame, cursor, page memory buffer
296 unsigned char key[BT_keyarray]; // last found complete key
297 int found; // last delete or insert was found
298 int err; // last error
299 int reads, writes; // number of reads and writes from the btree
300 ushort thread_no; // thread number
314 #define CLOCK_bit 0x8000
317 extern void bt_close (BtDb *bt);
318 extern BtDb *bt_open (BtMgr *mgr);
319 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
320 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
321 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
322 extern BtKey *bt_foundkey (BtDb *bt);
323 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
324 extern uint bt_nextkey (BtDb *bt, uint slot);
327 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
328 void bt_mgrclose (BtMgr *mgr);
330 // Helper functions to return slot values
331 // from the cursor page.
333 extern BtKey *bt_key (BtDb *bt, uint slot);
334 extern BtVal *bt_val (BtDb *bt, uint slot);
336 // The page is allocated from low and hi ends.
337 // The key slots are allocated from the bottom,
338 // while the text and value of the key
339 // are allocated from the top. When the two
340 // areas meet, the page is split into two.
342 // A key consists of a length byte, two bytes of
343 // index number (0 - 65535), and up to 253 bytes
346 // Associated with each key is a value byte string
347 // containing any value desired.
349 // The b-tree root is always located at page 1.
350 // The first leaf page of level zero is always
351 // located on page 2.
353 // The b-tree pages are linked with next
354 // pointers to facilitate enumerators,
355 // and provide for concurrency.
357 // When to root page fills, it is split in two and
358 // the tree height is raised by a new root at page
359 // one with two keys.
361 // Deleted keys are marked with a dead bit until
362 // page cleanup. The fence key for a leaf node is
365 // To achieve maximum concurrency one page is locked at a time
366 // as the tree is traversed to find leaf key in question. The right
367 // page numbers are used in cases where the page is being split,
370 // Page 0 is dedicated to lock for new page extensions,
371 // and chains empty pages together for reuse. It also
372 // contains the latch manager hash table.
374 // The ParentModification lock on a node is obtained to serialize posting
375 // or changing the fence key for a node.
377 // Empty pages are chained together through the ALLOC page and reused.
379 // Access macros to address slot and key values from the page
380 // Page slots use 1 based indexing.
382 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
383 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
384 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
386 void bt_putid(unsigned char *dest, uid id)
391 dest[i] = (unsigned char)id, id >>= 8;
394 uid bt_getid(unsigned char *src)
399 for( i = 0; i < BtId; i++ )
400 id <<= 8, id |= *src++;
405 uid bt_newdup (BtDb *bt)
408 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
410 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
414 // Write-Only Queue Lock
416 void WriteOLock (WOLock *lock, ushort tid)
420 if( lock->tid == tid ) {
425 tix = __sync_fetch_and_add (lock->ticket, 1);
427 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
429 // wait for our ticket to come up
431 while( tix != lock->serving[0] )
440 void WriteORelease (WOLock *lock)
451 // Phase-Fair reader/writer lock implementation
453 void WriteLock (RWLock *lock)
458 tix = __sync_fetch_and_add (lock->ticket, 1);
460 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
462 // wait for our ticket to come up
464 while( tix != lock->serving[0] )
471 w = PRES | (tix & PHID);
473 r = __sync_fetch_and_add (lock->rin, w);
475 r = _InterlockedExchangeAdd16 (lock->rin, w);
477 while( r != *lock->rout )
485 void WriteRelease (RWLock *lock)
488 __sync_fetch_and_and (lock->rin, ~MASK);
490 _InterlockedAnd16 (lock->rin, ~MASK);
495 void ReadLock (RWLock *lock)
499 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
501 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
504 while( w == (*lock->rin & MASK) )
512 void ReadRelease (RWLock *lock)
515 __sync_fetch_and_add (lock->rout, RINC);
517 _InterlockedExchangeAdd16 (lock->rout, RINC);
521 // Spin Latch Manager
523 // wait until write lock mode is clear
524 // and add 1 to the share count
526 void bt_spinreadlock(BtSpinLatch *latch)
532 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
534 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
536 // see if exclusive request is granted or pending
541 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
543 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
546 } while( sched_yield(), 1 );
548 } while( SwitchToThread(), 1 );
552 // wait for other read and write latches to relinquish
554 void bt_spinwritelock(BtSpinLatch *latch)
560 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
562 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
565 if( !(prev & ~BOTH) )
569 __sync_fetch_and_and ((ushort *)latch, ~XCL);
571 _InterlockedAnd16((ushort *)latch, ~XCL);
574 } while( sched_yield(), 1 );
576 } while( SwitchToThread(), 1 );
580 // try to obtain write lock
582 // return 1 if obtained,
585 int bt_spinwritetry(BtSpinLatch *latch)
590 prev = __sync_fetch_and_or((ushort *)latch, XCL);
592 prev = _InterlockedOr16((ushort *)latch, XCL);
594 // take write access if all bits are clear
597 if( !(prev & ~BOTH) )
601 __sync_fetch_and_and ((ushort *)latch, ~XCL);
603 _InterlockedAnd16((ushort *)latch, ~XCL);
610 void bt_spinreleasewrite(BtSpinLatch *latch)
613 __sync_fetch_and_and((ushort *)latch, ~BOTH);
615 _InterlockedAnd16((ushort *)latch, ~BOTH);
619 // decrement reader count
621 void bt_spinreleaseread(BtSpinLatch *latch)
624 __sync_fetch_and_add((ushort *)latch, -SHARE);
626 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
630 // read page from permanent location in Btree file
632 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
634 off64_t off = page_no << mgr->page_bits;
637 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
638 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
645 memset (ovl, 0, sizeof(OVERLAPPED));
647 ovl->OffsetHigh = off >> 32;
649 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
650 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
653 if( *amt < mgr->page_size ) {
654 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
661 // write page to permanent location in Btree file
662 // clear the dirty bit
664 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
666 off64_t off = page_no << mgr->page_bits;
669 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
675 memset (ovl, 0, sizeof(OVERLAPPED));
677 ovl->OffsetHigh = off >> 32;
679 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
682 if( *amt < mgr->page_size )
688 // link latch table entry into head of latch hash table
690 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
692 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
693 BtLatchSet *latch = bt->mgr->latchsets + slot;
695 if( latch->next = bt->mgr->hashtable[hashidx].slot )
696 bt->mgr->latchsets[latch->next].prev = slot;
698 bt->mgr->hashtable[hashidx].slot = slot;
699 latch->page_no = page_no;
706 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
714 // set CLOCK bit in latch
715 // decrement pin count
717 void bt_unpinlatch (BtLatchSet *latch)
720 if( ~latch->pin & CLOCK_bit )
721 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
722 __sync_fetch_and_add(&latch->pin, -1);
724 if( ~latch->pin & CLOCK_bit )
725 _InterlockedOr16 (&latch->pin, CLOCK_bit);
726 _InterlockedDecrement16 (&latch->pin);
730 // return the btree cached page address
732 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
734 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
739 // find existing latchset or inspire new one
740 // return with latchset pinned
742 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
744 uint hashidx = page_no % bt->mgr->latchhash;
752 // try to find our entry
754 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
756 if( slot = bt->mgr->hashtable[hashidx].slot ) do
758 latch = bt->mgr->latchsets + slot;
759 if( page_no == latch->page_no )
761 } while( slot = latch->next );
767 latch = bt->mgr->latchsets + slot;
769 __sync_fetch_and_add(&latch->pin, 1);
771 _InterlockedIncrement16 (&latch->pin);
773 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
777 // see if there are any unused pool entries
779 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
781 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
784 if( slot < bt->mgr->latchtotal ) {
785 latch = bt->mgr->latchsets + slot;
786 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
788 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
793 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
795 _InterlockedDecrement (&bt->mgr->latchdeployed);
797 // find and reuse previous entry on victim
801 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
803 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
805 // try to get write lock on hash chain
806 // skip entry if not obtained
807 // or has outstanding pins
809 slot %= bt->mgr->latchtotal;
814 latch = bt->mgr->latchsets + slot;
815 idx = latch->page_no % bt->mgr->latchhash;
817 // see we are on same chain as hashidx
822 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
825 // skip this slot if it is pinned
826 // or the CLOCK bit is set
829 if( latch->pin & CLOCK_bit ) {
831 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
833 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
836 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
840 // update permanent page area in btree from buffer pool
842 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
845 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
848 latch->dirty = 0, bt->writes++;
850 // unlink our available slot from its hash chain
853 bt->mgr->latchsets[latch->prev].next = latch->next;
855 bt->mgr->hashtable[idx].slot = latch->next;
858 bt->mgr->latchsets[latch->next].prev = latch->prev;
860 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
862 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
865 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
870 void bt_mgrclose (BtMgr *mgr)
877 // flush dirty pool pages to the btree
879 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
880 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
881 latch = mgr->latchsets + slot;
884 bt_writepage(mgr, page, latch->page_no);
885 latch->dirty = 0, num++;
887 // madvise (page, mgr->page_size, MADV_DONTNEED);
890 fprintf(stderr, "%d buffer pool pages flushed\n", num);
893 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
894 munmap (mgr->pagezero, mgr->page_size);
896 FlushViewOfFile(mgr->pagezero, 0);
897 UnmapViewOfFile(mgr->pagezero);
898 UnmapViewOfFile(mgr->hashtable);
899 CloseHandle(mgr->halloc);
900 CloseHandle(mgr->hpool);
906 FlushFileBuffers(mgr->idx);
907 CloseHandle(mgr->idx);
912 // close and release memory
914 void bt_close (BtDb *bt)
921 VirtualFree (bt->mem, 0, MEM_RELEASE);
926 // open/create new btree buffer manager
928 // call with file_name, BT_openmode, bits in page size (e.g. 16),
929 // size of page pool (e.g. 262144)
931 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
933 uint lvl, attr, last, slot, idx;
934 uint nlatchpage, latchhash;
935 unsigned char value[BtId];
936 int flag, initit = 0;
937 BtPageZero *pagezero;
944 // determine sanity of page size and buffer pool
946 if( bits > BT_maxbits )
948 else if( bits < BT_minbits )
952 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
957 mgr = calloc (1, sizeof(BtMgr));
959 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
961 if( mgr->idx == -1 ) {
962 fprintf (stderr, "Unable to open btree file\n");
963 return free(mgr), NULL;
966 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
967 attr = FILE_ATTRIBUTE_NORMAL;
968 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
970 if( mgr->idx == INVALID_HANDLE_VALUE )
971 return GlobalFree(mgr), NULL;
975 pagezero = valloc (BT_maxpage);
978 // read minimum page size to get root info
979 // to support raw disk partition files
980 // check if bits == 0 on the disk.
982 if( size = lseek (mgr->idx, 0L, 2) )
983 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
984 if( pagezero->alloc->bits )
985 bits = pagezero->alloc->bits;
989 return free(mgr), free(pagezero), NULL;
993 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
994 size = GetFileSize(mgr->idx, amt);
997 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
998 return bt_mgrclose (mgr), NULL;
999 bits = pagezero->alloc->bits;
1004 mgr->page_size = 1 << bits;
1005 mgr->page_bits = bits;
1007 // calculate number of latch hash table entries
1009 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1010 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
1012 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1013 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1014 mgr->latchtotal = nodemax;
1019 // initialize an empty b-tree with latch page, root page, page of leaves
1020 // and page(s) of latches and page pool cache
1022 memset (pagezero, 0, 1 << bits);
1023 pagezero->alloc->bits = mgr->page_bits;
1024 bt_putid(pagezero->alloc->right, MIN_lvl+1);
1026 // initialize left-most LEAF page in
1029 bt_putid (pagezero->alloc->left, LEAF_page);
1031 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1032 fprintf (stderr, "Unable to create btree page zero\n");
1033 return bt_mgrclose (mgr), NULL;
1036 memset (pagezero, 0, 1 << bits);
1037 pagezero->alloc->bits = mgr->page_bits;
1039 for( lvl=MIN_lvl; lvl--; ) {
1040 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1041 key = keyptr(pagezero->alloc, 1);
1042 key->len = 2; // create stopper key
1046 bt_putid(value, MIN_lvl - lvl + 1);
1047 val = valptr(pagezero->alloc, 1);
1048 val->len = lvl ? BtId : 0;
1049 memcpy (val->value, value, val->len);
1051 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1052 pagezero->alloc->lvl = lvl;
1053 pagezero->alloc->cnt = 1;
1054 pagezero->alloc->act = 1;
1056 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1057 fprintf (stderr, "Unable to create btree page zero\n");
1058 return bt_mgrclose (mgr), NULL;
1066 VirtualFree (pagezero, 0, MEM_RELEASE);
1069 // mlock the pagezero page
1071 flag = PROT_READ | PROT_WRITE;
1072 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1073 if( mgr->pagezero == MAP_FAILED ) {
1074 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1075 return bt_mgrclose (mgr), NULL;
1077 mlock (mgr->pagezero, mgr->page_size);
1079 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1080 if( mgr->hashtable == MAP_FAILED ) {
1081 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1082 return bt_mgrclose (mgr), NULL;
1085 flag = PAGE_READWRITE;
1086 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1087 if( !mgr->halloc ) {
1088 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1089 return bt_mgrclose (mgr), NULL;
1092 flag = FILE_MAP_WRITE;
1093 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1094 if( !mgr->pagezero ) {
1095 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1096 return bt_mgrclose (mgr), NULL;
1099 flag = PAGE_READWRITE;
1100 size = (uid)mgr->nlatchpage << mgr->page_bits;
1101 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1103 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1104 return bt_mgrclose (mgr), NULL;
1107 flag = FILE_MAP_WRITE;
1108 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1109 if( !mgr->hashtable ) {
1110 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1111 return bt_mgrclose (mgr), NULL;
1115 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1116 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1121 // open BTree access method
1122 // based on buffer manager
1124 BtDb *bt_open (BtMgr *mgr)
1126 BtDb *bt = malloc (sizeof(*bt));
1128 memset (bt, 0, sizeof(*bt));
1131 bt->mem = valloc (2 *mgr->page_size);
1133 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1135 bt->frame = (BtPage)bt->mem;
1136 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1138 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1140 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1145 // compare two keys, return > 0, = 0, or < 0
1146 // =0: keys are same
1149 // as the comparison value
1151 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1153 uint len1 = key1->len;
1156 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1167 // place write, read, or parent lock on requested page_no.
1169 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1173 ReadLock (latch->readwr);
1176 WriteLock (latch->readwr);
1179 ReadLock (latch->access);
1182 WriteLock (latch->access);
1185 WriteOLock (latch->parent, bt->thread_no);
1188 WriteOLock (latch->atomic, bt->thread_no);
1190 case BtLockAtomic | BtLockRead:
1191 WriteOLock (latch->atomic, bt->thread_no);
1192 ReadLock (latch->readwr);
1197 // remove write, read, or parent lock on requested page
1199 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1203 ReadRelease (latch->readwr);
1206 WriteRelease (latch->readwr);
1209 ReadRelease (latch->access);
1212 WriteRelease (latch->access);
1215 WriteORelease (latch->parent);
1218 WriteORelease (latch->atomic);
1220 case BtLockAtomic | BtLockRead:
1221 WriteORelease (latch->atomic);
1222 ReadRelease (latch->readwr);
1227 // allocate a new page
1228 // return with page latched, but unlocked.
1230 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1235 // lock allocation page
1237 bt_spinwritelock(bt->mgr->lock);
1239 // use empty chain first
1240 // else allocate empty page
1242 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1243 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1244 set->page = bt_mappage (bt, set->latch);
1246 return bt->err = BTERR_struct, -1;
1248 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1249 bt_spinreleasewrite(bt->mgr->lock);
1251 memcpy (set->page, contents, bt->mgr->page_size);
1252 set->latch->dirty = 1;
1256 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1257 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1259 // unlock allocation latch
1261 bt_spinreleasewrite(bt->mgr->lock);
1263 // don't load cache from btree page
1265 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1266 set->page = bt_mappage (bt, set->latch);
1268 return bt->err = BTERR_struct;
1270 memcpy (set->page, contents, bt->mgr->page_size);
1271 set->latch->dirty = 1;
1275 // find slot in page for given key at a given level
1277 int bt_findslot (BtPage page, unsigned char *key, uint len)
1279 uint diff, higher = page->cnt, low = 1, slot;
1282 // make stopper key an infinite fence value
1284 if( bt_getid (page->right) )
1289 // low is the lowest candidate.
1290 // loop ends when they meet
1292 // higher is already
1293 // tested as .ge. the passed key.
1295 while( diff = higher - low ) {
1296 slot = low + ( diff >> 1 );
1297 if( keycmp (keyptr(page, slot), key, len) < 0 )
1300 higher = slot, good++;
1303 // return zero if key is on right link page
1305 return good ? higher : 0;
1308 // find and load page at given level for given key
1309 // leave page rd or wr locked as requested
1311 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1313 uid page_no = ROOT_page, prevpage = 0;
1314 uint drill = 0xff, slot;
1315 BtLatchSet *prevlatch;
1316 uint mode, prevmode;
1318 // start at root of btree and drill down
1321 // determine lock mode of drill level
1322 mode = (drill == lvl) ? lock : BtLockRead;
1324 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1327 // obtain access lock using lock chaining with Access mode
1329 if( page_no > ROOT_page )
1330 bt_lockpage(bt, BtLockAccess, set->latch);
1332 set->page = bt_mappage (bt, set->latch);
1334 // release & unpin parent or left sibling page
1337 bt_unlockpage(bt, prevmode, prevlatch);
1338 bt_unpinlatch (prevlatch);
1342 // obtain mode lock using lock chaining through AccessLock
1344 bt_lockpage(bt, mode, set->latch);
1346 if( set->page->free )
1347 return bt->err = BTERR_struct, 0;
1349 if( page_no > ROOT_page )
1350 bt_unlockpage(bt, BtLockAccess, set->latch);
1352 // re-read and re-lock root after determining actual level of root
1354 if( set->page->lvl != drill) {
1355 if( set->latch->page_no != ROOT_page )
1356 return bt->err = BTERR_struct, 0;
1358 drill = set->page->lvl;
1360 if( lock != BtLockRead && drill == lvl ) {
1361 bt_unlockpage(bt, mode, set->latch);
1362 bt_unpinlatch (set->latch);
1367 prevpage = set->latch->page_no;
1368 prevlatch = set->latch;
1371 // find key on page at this level
1372 // and descend to requested level
1374 if( !set->page->kill )
1375 if( slot = bt_findslot (set->page, key, len) ) {
1379 // find next non-dead slot -- the fence key if nothing else
1381 while( slotptr(set->page, slot)->dead )
1382 if( slot++ < set->page->cnt )
1385 return bt->err = BTERR_struct, 0;
1387 page_no = bt_getid(valptr(set->page, slot)->value);
1392 // or slide right into next page
1394 page_no = bt_getid(set->page->right);
1397 // return error on end of right chain
1399 bt->err = BTERR_struct;
1400 return 0; // return error
1403 // return page to free list
1404 // page must be delete & write locked
1406 void bt_freepage (BtDb *bt, BtPageSet *set)
1408 // lock allocation page
1410 bt_spinwritelock (bt->mgr->lock);
1414 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1415 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1416 set->latch->dirty = 1;
1417 set->page->free = 1;
1419 // unlock released page
1421 bt_unlockpage (bt, BtLockDelete, set->latch);
1422 bt_unlockpage (bt, BtLockWrite, set->latch);
1423 bt_unpinlatch (set->latch);
1425 // unlock allocation page
1427 bt_spinreleasewrite (bt->mgr->lock);
1430 // a fence key was deleted from a page
1431 // push new fence value upwards
1433 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1435 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1436 unsigned char value[BtId];
1440 // remove the old fence value
1442 ptr = keyptr(set->page, set->page->cnt);
1443 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1444 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1445 set->latch->dirty = 1;
1447 // cache new fence value
1449 ptr = keyptr(set->page, set->page->cnt);
1450 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1452 bt_lockpage (bt, BtLockParent, set->latch);
1453 bt_unlockpage (bt, BtLockWrite, set->latch);
1455 // insert new (now smaller) fence key
1457 bt_putid (value, set->latch->page_no);
1458 ptr = (BtKey*)leftkey;
1460 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1463 // now delete old fence key
1465 ptr = (BtKey*)rightkey;
1467 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1470 bt_unlockpage (bt, BtLockParent, set->latch);
1471 bt_unpinlatch(set->latch);
1475 // root has a single child
1476 // collapse a level from the tree
1478 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1484 // find the child entry and promote as new root contents
1487 for( idx = 0; idx++ < root->page->cnt; )
1488 if( !slotptr(root->page, idx)->dead )
1491 page_no = bt_getid (valptr(root->page, idx)->value);
1493 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1494 child->page = bt_mappage (bt, child->latch);
1498 bt_lockpage (bt, BtLockDelete, child->latch);
1499 bt_lockpage (bt, BtLockWrite, child->latch);
1501 memcpy (root->page, child->page, bt->mgr->page_size);
1502 root->latch->dirty = 1;
1504 bt_freepage (bt, child);
1506 } while( root->page->lvl > 1 && root->page->act == 1 );
1508 bt_unlockpage (bt, BtLockWrite, root->latch);
1509 bt_unpinlatch (root->latch);
1513 // delete a page and manage keys
1514 // call with page writelocked
1515 // returns with page unpinned
1517 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1519 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1520 unsigned char value[BtId];
1521 uint lvl = set->page->lvl;
1526 // cache copy of fence key
1527 // to post in parent
1529 ptr = keyptr(set->page, set->page->cnt);
1530 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1532 // obtain lock on right page
1534 page_no = bt_getid(set->page->right);
1536 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1537 right->page = bt_mappage (bt, right->latch);
1541 bt_lockpage (bt, BtLockWrite, right->latch);
1543 // cache copy of key to update
1545 ptr = keyptr(right->page, right->page->cnt);
1546 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1548 if( right->page->kill )
1549 return bt->err = BTERR_struct;
1551 // pull contents of right peer into our empty page
1553 memcpy (set->page, right->page, bt->mgr->page_size);
1554 set->latch->dirty = 1;
1556 // mark right page deleted and point it to left page
1557 // until we can post parent updates that remove access
1558 // to the deleted page.
1560 bt_putid (right->page->right, set->latch->page_no);
1561 right->latch->dirty = 1;
1562 right->page->kill = 1;
1564 bt_lockpage (bt, BtLockParent, right->latch);
1565 bt_unlockpage (bt, BtLockWrite, right->latch);
1567 bt_lockpage (bt, BtLockParent, set->latch);
1568 bt_unlockpage (bt, BtLockWrite, set->latch);
1570 // redirect higher key directly to our new node contents
1572 bt_putid (value, set->latch->page_no);
1573 ptr = (BtKey*)higherfence;
1575 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1578 // delete old lower key to our node
1580 ptr = (BtKey*)lowerfence;
1582 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1585 // obtain delete and write locks to right node
1587 bt_unlockpage (bt, BtLockParent, right->latch);
1588 bt_lockpage (bt, BtLockDelete, right->latch);
1589 bt_lockpage (bt, BtLockWrite, right->latch);
1590 bt_freepage (bt, right);
1592 bt_unlockpage (bt, BtLockParent, set->latch);
1593 bt_unpinlatch (set->latch);
1597 // find and delete key on page by marking delete flag bit
1598 // if page becomes empty, delete it from the btree
1600 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1602 uint slot, idx, found, fence;
1607 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1608 ptr = keyptr(set->page, slot);
1612 // if librarian slot, advance to real slot
1614 if( slotptr(set->page, slot)->type == Librarian )
1615 ptr = keyptr(set->page, ++slot);
1617 fence = slot == set->page->cnt;
1619 // if key is found delete it, otherwise ignore request
1621 if( found = !keycmp (ptr, key, len) )
1622 if( found = slotptr(set->page, slot)->dead == 0 ) {
1623 val = valptr(set->page,slot);
1624 slotptr(set->page, slot)->dead = 1;
1625 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1628 // collapse empty slots beneath the fence
1630 while( idx = set->page->cnt - 1 )
1631 if( slotptr(set->page, idx)->dead ) {
1632 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1633 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1638 // did we delete a fence key in an upper level?
1640 if( found && lvl && set->page->act && fence )
1641 if( bt_fixfence (bt, set, lvl) )
1646 // do we need to collapse root?
1648 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1649 if( bt_collapseroot (bt, set) )
1654 // delete empty page
1656 if( !set->page->act )
1657 return bt_deletepage (bt, set);
1659 set->latch->dirty = 1;
1660 bt_unlockpage(bt, BtLockWrite, set->latch);
1661 bt_unpinlatch (set->latch);
1665 BtKey *bt_foundkey (BtDb *bt)
1667 return (BtKey*)(bt->key);
1670 // advance to next slot
1672 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1674 BtLatchSet *prevlatch;
1677 if( slot < set->page->cnt )
1680 prevlatch = set->latch;
1682 if( page_no = bt_getid(set->page->right) )
1683 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1684 set->page = bt_mappage (bt, set->latch);
1688 return bt->err = BTERR_struct, 0;
1690 // obtain access lock using lock chaining with Access mode
1692 bt_lockpage(bt, BtLockAccess, set->latch);
1694 bt_unlockpage(bt, BtLockRead, prevlatch);
1695 bt_unpinlatch (prevlatch);
1697 bt_lockpage(bt, BtLockRead, set->latch);
1698 bt_unlockpage(bt, BtLockAccess, set->latch);
1702 // find unique key or first duplicate key in
1703 // leaf level and return number of value bytes
1704 // or (-1) if not found. Setup key for bt_foundkey
1706 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1714 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1716 ptr = keyptr(set->page, slot);
1718 // skip librarian slot place holder
1720 if( slotptr(set->page, slot)->type == Librarian )
1721 ptr = keyptr(set->page, ++slot);
1723 // return actual key found
1725 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1728 if( slotptr(set->page, slot)->type == Duplicate )
1731 // not there if we reach the stopper key
1733 if( slot == set->page->cnt )
1734 if( !bt_getid (set->page->right) )
1737 // if key exists, return >= 0 value bytes copied
1738 // otherwise return (-1)
1740 if( slotptr(set->page, slot)->dead )
1744 if( !memcmp (ptr->key, key, len) ) {
1745 val = valptr (set->page,slot);
1746 if( valmax > val->len )
1748 memcpy (value, val->value, valmax);
1754 } while( slot = bt_findnext (bt, set, slot) );
1756 bt_unlockpage (bt, BtLockRead, set->latch);
1757 bt_unpinlatch (set->latch);
1761 // check page for space available,
1762 // clean if necessary and return
1763 // 0 - page needs splitting
1764 // >0 new slot value
1766 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1768 uint nxt = bt->mgr->page_size;
1769 BtPage page = set->page;
1770 uint cnt = 0, idx = 0;
1771 uint max = page->cnt;
1776 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1779 // skip cleanup and proceed to split
1780 // if there's not enough garbage
1783 if( page->garbage < nxt / 5 )
1786 memcpy (bt->frame, page, bt->mgr->page_size);
1788 // skip page info and set rest of page to zero
1790 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1791 set->latch->dirty = 1;
1795 // clean up page first by
1796 // removing deleted keys
1798 while( cnt++ < max ) {
1802 if( cnt < max || bt->frame->lvl )
1803 if( slotptr(bt->frame,cnt)->dead )
1806 // copy the value across
1808 val = valptr(bt->frame, cnt);
1809 nxt -= val->len + sizeof(BtVal);
1810 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1812 // copy the key across
1814 key = keyptr(bt->frame, cnt);
1815 nxt -= key->len + sizeof(BtKey);
1816 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1818 // make a librarian slot
1820 slotptr(page, ++idx)->off = nxt;
1821 slotptr(page, idx)->type = Librarian;
1822 slotptr(page, idx)->dead = 1;
1826 slotptr(page, ++idx)->off = nxt;
1827 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1829 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1836 // see if page has enough space now, or does it need splitting?
1838 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1844 // split the root and raise the height of the btree
1846 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1848 unsigned char leftkey[BT_keyarray];
1849 uint nxt = bt->mgr->page_size;
1850 unsigned char value[BtId];
1856 // save left page fence key for new root
1858 ptr = keyptr(root->page, root->page->cnt);
1859 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1861 // Obtain an empty page to use, and copy the current
1862 // root contents into it, e.g. lower keys
1864 if( bt_newpage(bt, left, root->page) )
1867 left_page_no = left->latch->page_no;
1868 bt_unpinlatch (left->latch);
1870 // preserve the page info at the bottom
1871 // of higher keys and set rest to zero
1873 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1875 // insert stopper key at top of newroot page
1876 // and increase the root height
1878 nxt -= BtId + sizeof(BtVal);
1879 bt_putid (value, right->page_no);
1880 val = (BtVal *)((unsigned char *)root->page + nxt);
1881 memcpy (val->value, value, BtId);
1884 nxt -= 2 + sizeof(BtKey);
1885 slotptr(root->page, 2)->off = nxt;
1886 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1891 // insert lower keys page fence key on newroot page as first key
1893 nxt -= BtId + sizeof(BtVal);
1894 bt_putid (value, left_page_no);
1895 val = (BtVal *)((unsigned char *)root->page + nxt);
1896 memcpy (val->value, value, BtId);
1899 ptr = (BtKey *)leftkey;
1900 nxt -= ptr->len + sizeof(BtKey);
1901 slotptr(root->page, 1)->off = nxt;
1902 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1904 bt_putid(root->page->right, 0);
1905 root->page->min = nxt; // reset lowest used offset and key count
1906 root->page->cnt = 2;
1907 root->page->act = 2;
1910 // release and unpin root pages
1912 bt_unlockpage(bt, BtLockWrite, root->latch);
1913 bt_unpinlatch (root->latch);
1915 bt_unpinlatch (right);
1919 // split already locked full node
1921 // return pool entry for new right
1924 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1926 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1927 uint lvl = set->page->lvl;
1934 // split higher half of keys to bt->frame
1936 memset (bt->frame, 0, bt->mgr->page_size);
1937 max = set->page->cnt;
1941 while( cnt++ < max ) {
1942 if( cnt < max || set->page->lvl )
1943 if( slotptr(set->page, cnt)->dead )
1946 src = valptr(set->page, cnt);
1947 nxt -= src->len + sizeof(BtVal);
1948 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1950 key = keyptr(set->page, cnt);
1951 nxt -= key->len + sizeof(BtKey);
1952 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1953 memcpy (ptr, key, key->len + sizeof(BtKey));
1955 // add librarian slot
1957 slotptr(bt->frame, ++idx)->off = nxt;
1958 slotptr(bt->frame, idx)->type = Librarian;
1959 slotptr(bt->frame, idx)->dead = 1;
1963 slotptr(bt->frame, ++idx)->off = nxt;
1964 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1966 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1970 bt->frame->bits = bt->mgr->page_bits;
1971 bt->frame->min = nxt;
1972 bt->frame->cnt = idx;
1973 bt->frame->lvl = lvl;
1977 if( set->latch->page_no > ROOT_page )
1978 bt_putid (bt->frame->right, bt_getid (set->page->right));
1980 // get new free page and write higher keys to it.
1982 if( bt_newpage(bt, right, bt->frame) )
1985 memcpy (bt->frame, set->page, bt->mgr->page_size);
1986 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1987 set->latch->dirty = 1;
1989 nxt = bt->mgr->page_size;
1990 set->page->garbage = 0;
1996 if( slotptr(bt->frame, max)->type == Librarian )
1999 // assemble page of smaller keys
2001 while( cnt++ < max ) {
2002 if( slotptr(bt->frame, cnt)->dead )
2004 val = valptr(bt->frame, cnt);
2005 nxt -= val->len + sizeof(BtVal);
2006 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2008 key = keyptr(bt->frame, cnt);
2009 nxt -= key->len + sizeof(BtKey);
2010 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2012 // add librarian slot
2014 slotptr(set->page, ++idx)->off = nxt;
2015 slotptr(set->page, idx)->type = Librarian;
2016 slotptr(set->page, idx)->dead = 1;
2020 slotptr(set->page, ++idx)->off = nxt;
2021 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2025 bt_putid(set->page->right, right->latch->page_no);
2026 set->page->min = nxt;
2027 set->page->cnt = idx;
2029 return right->latch->entry;
2032 // fix keys for newly split page
2033 // call with page locked, return
2036 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2038 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2039 unsigned char value[BtId];
2040 uint lvl = set->page->lvl;
2044 // if current page is the root page, split it
2046 if( set->latch->page_no == ROOT_page )
2047 return bt_splitroot (bt, set, right);
2049 ptr = keyptr(set->page, set->page->cnt);
2050 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2052 page = bt_mappage (bt, right);
2054 ptr = keyptr(page, page->cnt);
2055 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2057 // insert new fences in their parent pages
2059 bt_lockpage (bt, BtLockParent, right);
2061 bt_lockpage (bt, BtLockParent, set->latch);
2062 bt_unlockpage (bt, BtLockWrite, set->latch);
2064 // insert new fence for reformulated left block of smaller keys
2066 bt_putid (value, set->latch->page_no);
2067 ptr = (BtKey *)leftkey;
2069 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2072 // switch fence for right block of larger keys to new right page
2074 bt_putid (value, right->page_no);
2075 ptr = (BtKey *)rightkey;
2077 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2080 bt_unlockpage (bt, BtLockParent, set->latch);
2081 bt_unpinlatch (set->latch);
2083 bt_unlockpage (bt, BtLockParent, right);
2084 bt_unpinlatch (right);
2088 // install new key and value onto page
2089 // page must already be checked for
2092 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2094 uint idx, librarian;
2099 // if found slot > desired slot and previous slot
2100 // is a librarian slot, use it
2103 if( slotptr(set->page, slot-1)->type == Librarian )
2106 // copy value onto page
2108 set->page->min -= vallen + sizeof(BtVal);
2109 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2110 memcpy (val->value, value, vallen);
2113 // copy key onto page
2115 set->page->min -= keylen + sizeof(BtKey);
2116 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2117 memcpy (ptr->key, key, keylen);
2120 // find first empty slot
2122 for( idx = slot; idx < set->page->cnt; idx++ )
2123 if( slotptr(set->page, idx)->dead )
2126 // now insert key into array before slot
2128 if( idx == set->page->cnt )
2129 idx += 2, set->page->cnt += 2, librarian = 2;
2133 set->latch->dirty = 1;
2136 while( idx > slot + librarian - 1 )
2137 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2139 // add librarian slot
2141 if( librarian > 1 ) {
2142 node = slotptr(set->page, slot++);
2143 node->off = set->page->min;
2144 node->type = Librarian;
2150 node = slotptr(set->page, slot);
2151 node->off = set->page->min;
2156 bt_unlockpage (bt, BtLockWrite, set->latch);
2157 bt_unpinlatch (set->latch);
2163 // Insert new key into the btree at given level.
2164 // either add a new key or update/add an existing one
2166 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2168 unsigned char newkey[BT_keyarray];
2169 uint slot, idx, len, entry;
2176 // set up the key we're working on
2178 ins = (BtKey*)newkey;
2179 memcpy (ins->key, key, keylen);
2182 // is this a non-unique index value?
2188 sequence = bt_newdup (bt);
2189 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2193 while( 1 ) { // find the page and slot for the current key
2194 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2195 ptr = keyptr(set->page, slot);
2198 bt->err = BTERR_ovflw;
2202 // if librarian slot == found slot, advance to real slot
2204 if( slotptr(set->page, slot)->type == Librarian )
2205 if( !keycmp (ptr, key, keylen) )
2206 ptr = keyptr(set->page, ++slot);
2210 if( slotptr(set->page, slot)->type == Duplicate )
2213 // if inserting a duplicate key or unique key
2214 // check for adequate space on the page
2215 // and insert the new key before slot.
2217 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2218 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2219 if( !(entry = bt_splitpage (bt, set)) )
2221 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2226 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2229 // if key already exists, update value and return
2231 val = valptr(set->page, slot);
2233 if( val->len >= vallen ) {
2234 if( slotptr(set->page, slot)->dead )
2236 set->page->garbage += val->len - vallen;
2237 set->latch->dirty = 1;
2238 slotptr(set->page, slot)->dead = 0;
2240 memcpy (val->value, value, vallen);
2241 bt_unlockpage(bt, BtLockWrite, set->latch);
2242 bt_unpinlatch (set->latch);
2246 // new update value doesn't fit in existing value area
2248 if( !slotptr(set->page, slot)->dead )
2249 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2251 slotptr(set->page, slot)->dead = 0;
2255 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2256 if( !(entry = bt_splitpage (bt, set)) )
2258 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2263 set->page->min -= vallen + sizeof(BtVal);
2264 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2265 memcpy (val->value, value, vallen);
2268 set->latch->dirty = 1;
2269 set->page->min -= keylen + sizeof(BtKey);
2270 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2271 memcpy (ptr->key, key, keylen);
2274 slotptr(set->page, slot)->off = set->page->min;
2275 bt_unlockpage(bt, BtLockWrite, set->latch);
2276 bt_unpinlatch (set->latch);
2283 uint entry; // latch table entry number
2284 uint slot:31; // page slot number
2285 uint reuse:1; // reused previous page
2289 uid page_no; // page number for split leaf
2290 void *next; // next key to insert
2291 uint entry:29; // latch table entry number
2292 uint type:2; // 0 == insert, 1 == delete, 2 == free
2293 uint nounlock:1; // don't unlock ParentModification
2294 unsigned char leafkey[BT_keyarray];
2297 // determine actual page where key is located
2298 // return slot number
2300 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2302 BtKey *key = keyptr(source,src);
2303 uint slot = locks[src].slot;
2306 if( src > 1 && locks[src].reuse )
2307 entry = locks[src-1].entry, slot = 0;
2309 entry = locks[src].entry;
2312 set->latch = bt->mgr->latchsets + entry;
2313 set->page = bt_mappage (bt, set->latch);
2317 // is locks->reuse set? or was slot zeroed?
2318 // if so, find where our key is located
2319 // on current page or pages split on
2320 // same page txn operations.
2323 set->latch = bt->mgr->latchsets + entry;
2324 set->page = bt_mappage (bt, set->latch);
2326 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2327 if( slotptr(set->page, slot)->type == Librarian )
2329 if( locks[src].reuse )
2330 locks[src].entry = entry;
2333 } while( entry = set->latch->split );
2335 bt->err = BTERR_atomic;
2339 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2341 BtKey *key = keyptr(source, src);
2342 BtVal *val = valptr(source, src);
2347 while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2348 if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
2349 return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2351 if( entry = bt_splitpage (bt, set) )
2352 latch = bt->mgr->latchsets + entry;
2356 // splice right page into split chain
2357 // and WriteLock it.
2359 bt_lockpage(bt, BtLockWrite, latch);
2360 latch->split = set->latch->split;
2361 set->latch->split = entry;
2362 locks[src].slot = 0;
2365 return bt->err = BTERR_atomic;
2368 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2370 BtKey *key = keyptr(source, src);
2371 uint idx, entry, slot;
2376 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2377 ptr = keyptr(set->page, slot);
2379 return bt->err = BTERR_struct;
2381 if( !keycmp (ptr, key->key, key->len) )
2382 if( !slotptr(set->page, slot)->dead )
2383 slotptr(set->page, slot)->dead = 1;
2389 val = valptr(set->page, slot);
2390 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2391 set->latch->dirty = 1;
2397 // delete an empty master page for a transaction
2399 // note that the far right page never empties because
2400 // it always contains (at least) the infinite stopper key
2401 // and that all pages that don't contain any keys are
2402 // deleted, or are being held under Atomic lock.
2404 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2406 BtPageSet right[1], temp[1];
2407 unsigned char value[BtId];
2411 bt_lockpage(bt, BtLockWrite, prev->latch);
2413 // grab the right sibling
2415 if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2416 right->page = bt_mappage (bt, right->latch);
2420 bt_lockpage(bt, BtLockAtomic, right->latch);
2421 bt_lockpage(bt, BtLockWrite, right->latch);
2423 // and pull contents over empty page
2424 // while preserving master's left link
2426 memcpy (right->page->left, prev->page->left, BtId);
2427 memcpy (prev->page, right->page, bt->mgr->page_size);
2429 // forward seekers to old right sibling
2430 // to new page location in set
2432 bt_putid (right->page->right, prev->latch->page_no);
2433 right->latch->dirty = 1;
2434 right->page->kill = 1;
2436 // remove pointer to right page for searchers by
2437 // changing right fence key to point to the master page
2439 ptr = keyptr(right->page,right->page->cnt);
2440 bt_putid (value, prev->latch->page_no);
2442 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2445 // now that master page is in good shape we can
2446 // remove its locks.
2448 bt_unlockpage (bt, BtLockAtomic, prev->latch);
2449 bt_unlockpage (bt, BtLockWrite, prev->latch);
2451 // fix master's right sibling's left pointer
2452 // to remove scanner's poiner to the right page
2454 if( right_page_no = bt_getid (prev->page->right) ) {
2455 if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2456 temp->page = bt_mappage (bt, temp->latch);
2458 bt_lockpage (bt, BtLockWrite, temp->latch);
2459 bt_putid (temp->page->left, prev->latch->page_no);
2460 temp->latch->dirty = 1;
2462 bt_unlockpage (bt, BtLockWrite, temp->latch);
2463 bt_unpinlatch (temp->latch);
2464 } else { // master is now the far right page
2465 bt_spinwritelock (bt->mgr->lock);
2466 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2467 bt_spinreleasewrite(bt->mgr->lock);
2470 // now that there are no pointers to the right page
2471 // we can delete it after the last read access occurs
2473 bt_unlockpage (bt, BtLockWrite, right->latch);
2474 bt_unlockpage (bt, BtLockAtomic, right->latch);
2475 bt_lockpage (bt, BtLockDelete, right->latch);
2476 bt_lockpage (bt, BtLockWrite, right->latch);
2477 bt_freepage (bt, right);
2481 // atomic modification of a batch of keys.
2483 // return -1 if BTERR is set
2484 // otherwise return slot number
2485 // causing the key constraint violation
2486 // or zero on successful completion.
2488 int bt_atomictxn (BtDb *bt, BtPage source)
2490 uint src, idx, slot, samepage, entry;
2491 AtomicKey *head, *tail, *leaf;
2492 BtPageSet set[1], prev[1];
2493 unsigned char value[BtId];
2494 BtKey *key, *ptr, *key2;
2504 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2508 // stable sort the list of keys into order to
2509 // prevent deadlocks between threads.
2511 for( src = 1; src++ < source->cnt; ) {
2512 *temp = *slotptr(source,src);
2513 key = keyptr (source,src);
2515 for( idx = src; --idx; ) {
2516 key2 = keyptr (source,idx);
2517 if( keycmp (key, key2->key, key2->len) < 0 ) {
2518 *slotptr(source,idx+1) = *slotptr(source,idx);
2519 *slotptr(source,idx) = *temp;
2525 // Load the leaf page for each key
2526 // group same page references with reuse bit
2527 // and determine any constraint violations
2529 for( src = 0; src++ < source->cnt; ) {
2530 key = keyptr(source, src);
2533 // first determine if this modification falls
2534 // on the same page as the previous modification
2535 // note that the far right leaf page is a special case
2537 if( samepage = src > 1 )
2538 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2539 slot = bt_findslot(set->page, key->key, key->len);
2541 bt_unlockpage(bt, BtLockRead, set->latch);
2544 if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
2545 set->latch->split = 0;
2549 if( slotptr(set->page, slot)->type == Librarian )
2550 ptr = keyptr(set->page, ++slot);
2552 ptr = keyptr(set->page, slot);
2555 locks[src].entry = set->latch->entry;
2556 locks[src].slot = slot;
2557 locks[src].reuse = 0;
2559 locks[src].entry = 0;
2560 locks[src].slot = 0;
2561 locks[src].reuse = 1;
2564 switch( slotptr(source, src)->type ) {
2567 if( !slotptr(set->page, slot)->dead )
2568 if( slot < set->page->cnt || bt_getid (set->page->right) )
2569 if( !keycmp (ptr, key->key, key->len) ) {
2571 // return constraint violation if key already exists
2573 bt_unlockpage(bt, BtLockRead, set->latch);
2577 if( locks[src].entry ) {
2578 set->latch = bt->mgr->latchsets + locks[src].entry;
2579 bt_unlockpage(bt, BtLockAtomic, set->latch);
2580 bt_unpinlatch (set->latch);
2591 // unlock last loadpage lock
2594 bt_unlockpage(bt, BtLockRead, set->latch);
2596 // obtain write lock for each master page
2598 for( src = 0; src++ < source->cnt; )
2599 if( locks[src].reuse )
2602 bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2604 // insert or delete each key
2605 // process any splits or merges
2606 // release Write & Atomic latches
2607 // set ParentModifications and build
2608 // queue of keys to insert for split pages
2609 // or delete for deleted pages.
2611 // run through txn list backwards
2613 samepage = source->cnt + 1;
2615 for( src = source->cnt; src; src-- ) {
2616 if( locks[src].reuse )
2619 // perform the txn operations
2620 // from smaller to larger on
2623 for( idx = src; idx < samepage; idx++ )
2624 switch( slotptr(source,idx)->type ) {
2626 if( bt_atomicdelete (bt, source, locks, idx) )
2632 if( bt_atomicinsert (bt, source, locks, idx) )
2637 // after the same page operations have finished,
2638 // process master page for splits or deletion.
2640 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2641 prev->page = bt_mappage (bt, prev->latch);
2644 // pick-up all splits from master page
2645 // each one is already WriteLocked.
2647 entry = prev->latch->split;
2650 set->latch = bt->mgr->latchsets + entry;
2651 set->page = bt_mappage (bt, set->latch);
2652 entry = set->latch->split;
2654 // delete empty master page by undoing its split
2655 // (this is potentially another empty page)
2656 // note that there are no new left pointers yet
2658 if( !prev->page->act ) {
2659 memcpy (set->page->left, prev->page->left, BtId);
2660 memcpy (prev->page, set->page, bt->mgr->page_size);
2661 bt_lockpage (bt, BtLockDelete, set->latch);
2662 bt_freepage (bt, set);
2664 prev->latch->dirty = 1;
2668 // remove empty page from the split chain
2670 if( !set->page->act ) {
2671 memcpy (prev->page->right, set->page->right, BtId);
2672 prev->latch->split = set->latch->split;
2673 bt_lockpage (bt, BtLockDelete, set->latch);
2674 bt_freepage (bt, set);
2678 // schedule prev fence key update
2680 ptr = keyptr(prev->page,prev->page->cnt);
2681 leaf = calloc (sizeof(AtomicKey), 1);
2683 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2684 leaf->page_no = prev->latch->page_no;
2685 leaf->entry = prev->latch->entry;
2695 // splice in the left link into the split page
2697 bt_putid (set->page->left, prev->latch->page_no);
2698 bt_lockpage(bt, BtLockParent, prev->latch);
2699 bt_unlockpage(bt, BtLockWrite, prev->latch);
2703 // update left pointer in next right page from last split page
2704 // (if all splits were reversed, latch->split == 0)
2706 if( latch->split ) {
2707 // fix left pointer in master's original (now split)
2708 // far right sibling or set rightmost page in page zero
2710 if( right = bt_getid (prev->page->right) ) {
2711 if( set->latch = bt_pinlatch (bt, right, 1) )
2712 set->page = bt_mappage (bt, set->latch);
2716 bt_lockpage (bt, BtLockWrite, set->latch);
2717 bt_putid (set->page->left, prev->latch->page_no);
2718 set->latch->dirty = 1;
2719 bt_unlockpage (bt, BtLockWrite, set->latch);
2720 bt_unpinlatch (set->latch);
2721 } else { // prev is rightmost page
2722 bt_spinwritelock (bt->mgr->lock);
2723 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2724 bt_spinreleasewrite(bt->mgr->lock);
2727 // Process last page split in chain
2729 ptr = keyptr(prev->page,prev->page->cnt);
2730 leaf = calloc (sizeof(AtomicKey), 1);
2732 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2733 leaf->page_no = prev->latch->page_no;
2734 leaf->entry = prev->latch->entry;
2744 bt_lockpage(bt, BtLockParent, prev->latch);
2745 bt_unlockpage(bt, BtLockWrite, prev->latch);
2747 // remove atomic lock on master page
2749 bt_unlockpage(bt, BtLockAtomic, latch);
2753 // finished if prev page occupied (either master or final split)
2755 if( prev->page->act ) {
2756 bt_unlockpage(bt, BtLockWrite, latch);
2757 bt_unlockpage(bt, BtLockAtomic, latch);
2758 bt_unpinlatch(latch);
2762 // any and all splits were reversed, and the
2763 // master page located in prev is empty, delete it
2764 // by pulling over master's right sibling.
2766 // Remove empty master's fence key
2768 ptr = keyptr(prev->page,prev->page->cnt);
2770 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2773 // perform the remainder of the delete
2774 // from the FIFO queue
2776 leaf = calloc (sizeof(AtomicKey), 1);
2778 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2779 leaf->page_no = prev->latch->page_no;
2780 leaf->entry = prev->latch->entry;
2791 // leave atomic lock in place until
2792 // deletion completes in next phase.
2794 bt_unlockpage(bt, BtLockWrite, prev->latch);
2797 // add & delete keys for any pages split or merged during transaction
2801 set->latch = bt->mgr->latchsets + leaf->entry;
2802 set->page = bt_mappage (bt, set->latch);
2804 bt_putid (value, leaf->page_no);
2805 ptr = (BtKey *)leaf->leafkey;
2807 switch( leaf->type ) {
2808 case 0: // insert key
2809 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2814 case 1: // delete key
2815 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2820 case 2: // free page
2821 if( bt_atomicfree (bt, set) )
2827 if( !leaf->nounlock )
2828 bt_unlockpage (bt, BtLockParent, set->latch);
2830 bt_unpinlatch (set->latch);
2833 } while( leaf = tail );
2841 // set cursor to highest slot on highest page
2843 uint bt_lastkey (BtDb *bt)
2845 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2848 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2849 set->page = bt_mappage (bt, set->latch);
2853 bt_lockpage(bt, BtLockRead, set->latch);
2854 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2855 bt_unlockpage(bt, BtLockRead, set->latch);
2856 bt_unpinlatch (set->latch);
2858 bt->cursor_page = page_no;
2859 return bt->cursor->cnt;
2862 // return previous slot on cursor page
2864 uint bt_prevkey (BtDb *bt, uint slot)
2866 uid ourright, next, us = bt->cursor_page;
2872 ourright = bt_getid(bt->cursor->right);
2875 if( !(next = bt_getid(bt->cursor->left)) )
2879 bt->cursor_page = next;
2881 if( set->latch = bt_pinlatch (bt, next, 1) )
2882 set->page = bt_mappage (bt, set->latch);
2886 bt_lockpage(bt, BtLockRead, set->latch);
2887 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2888 bt_unlockpage(bt, BtLockRead, set->latch);
2889 bt_unpinlatch (set->latch);
2891 next = bt_getid (bt->cursor->right);
2893 if( bt->cursor->kill )
2897 if( next == ourright )
2902 return bt->cursor->cnt;
2905 // return next slot on cursor page
2906 // or slide cursor right into next page
2908 uint bt_nextkey (BtDb *bt, uint slot)
2914 right = bt_getid(bt->cursor->right);
2916 while( slot++ < bt->cursor->cnt )
2917 if( slotptr(bt->cursor,slot)->dead )
2919 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2927 bt->cursor_page = right;
2929 if( set->latch = bt_pinlatch (bt, right, 1) )
2930 set->page = bt_mappage (bt, set->latch);
2934 bt_lockpage(bt, BtLockRead, set->latch);
2936 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2938 bt_unlockpage(bt, BtLockRead, set->latch);
2939 bt_unpinlatch (set->latch);
2947 // cache page of keys into cursor and return starting slot for given key
2949 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2954 // cache page for retrieval
2956 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2957 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2961 bt->cursor_page = set->latch->page_no;
2963 bt_unlockpage(bt, BtLockRead, set->latch);
2964 bt_unpinlatch (set->latch);
2968 BtKey *bt_key(BtDb *bt, uint slot)
2970 return keyptr(bt->cursor, slot);
2973 BtVal *bt_val(BtDb *bt, uint slot)
2975 return valptr(bt->cursor,slot);
2981 double getCpuTime(int type)
2984 FILETIME xittime[1];
2985 FILETIME systime[1];
2986 FILETIME usrtime[1];
2987 SYSTEMTIME timeconv[1];
2990 memset (timeconv, 0, sizeof(SYSTEMTIME));
2994 GetSystemTimeAsFileTime (xittime);
2995 FileTimeToSystemTime (xittime, timeconv);
2996 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2999 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3000 FileTimeToSystemTime (usrtime, timeconv);
3003 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3004 FileTimeToSystemTime (systime, timeconv);
3008 ans += (double)timeconv->wHour * 3600;
3009 ans += (double)timeconv->wMinute * 60;
3010 ans += (double)timeconv->wSecond;
3011 ans += (double)timeconv->wMilliseconds / 1000;
3016 #include <sys/resource.h>
3018 double getCpuTime(int type)
3020 struct rusage used[1];
3021 struct timeval tv[1];
3025 gettimeofday(tv, NULL);
3026 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3029 getrusage(RUSAGE_SELF, used);
3030 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3033 getrusage(RUSAGE_SELF, used);
3034 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3041 void bt_poolaudit (BtMgr *mgr)
3046 while( slot++ < mgr->latchdeployed ) {
3047 latch = mgr->latchsets + slot;
3049 if( *latch->readwr->rin & MASK )
3050 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3051 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3053 if( *latch->access->rin & MASK )
3054 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3055 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3057 if( *latch->parent->ticket != *latch->parent->serving )
3058 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3059 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3061 if( latch->pin & ~CLOCK_bit ) {
3062 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3068 uint bt_latchaudit (BtDb *bt)
3070 ushort idx, hashidx;
3076 if( *(ushort *)(bt->mgr->lock) )
3077 fprintf(stderr, "Alloc page locked\n");
3078 *(ushort *)(bt->mgr->lock) = 0;
3080 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3081 latch = bt->mgr->latchsets + idx;
3082 if( *latch->readwr->rin & MASK )
3083 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3084 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3086 if( *latch->access->rin & MASK )
3087 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3088 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3090 if( *latch->parent->ticket != *latch->parent->serving )
3091 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3092 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3095 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3100 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3101 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3102 fprintf(stderr, "hash entry %d locked\n", hashidx);
3104 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3106 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3107 latch = bt->mgr->latchsets + idx;
3109 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3110 } while( idx = latch->next );
3113 page_no = LEAF_page;
3115 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3116 uid off = page_no << bt->mgr->page_bits;
3118 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3122 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3124 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3125 return bt->err = BTERR_map;
3127 if( *amt < bt->mgr->page_size )
3128 return bt->err = BTERR_map;
3130 if( !bt->frame->free && !bt->frame->lvl )
3131 cnt += bt->frame->act;
3135 cnt--; // remove stopper key
3136 fprintf(stderr, " Total keys read %d\n", cnt);
3150 // standalone program to index file of keys
3151 // then list them onto std-out
3154 void *index_file (void *arg)
3156 uint __stdcall index_file (void *arg)
3159 int line = 0, found = 0, cnt = 0, idx;
3160 uid next, page_no = LEAF_page; // start on first page of leaves
3161 int ch, len = 0, slot, type = 0;
3162 unsigned char key[BT_maxkey];
3163 unsigned char txn[65536];
3164 ThreadArg *args = arg;
3173 bt = bt_open (args->mgr);
3176 if( args->idx < strlen (args->type) )
3177 ch = args->type[args->idx];
3179 ch = args->type[strlen(args->type) - 1];
3184 fprintf(stderr, "started latch mgr audit\n");
3185 cnt = bt_latchaudit (bt);
3186 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3197 if( type == Delete )
3198 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3200 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3202 if( type == Delete )
3203 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3205 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3207 if( in = fopen (args->infile, "rb") )
3208 while( ch = getc(in), ch != EOF )
3214 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3215 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3221 memcpy (txn + nxt, key + 10, len - 10);
3223 txn[nxt] = len - 10;
3225 memcpy (txn + nxt, key, 10);
3228 slotptr(page,++cnt)->off = nxt;
3229 slotptr(page,cnt)->type = type;
3232 if( cnt < args->num )
3239 if( bt_atomictxn (bt, page) )
3240 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3245 else if( len < BT_maxkey )
3247 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3251 fprintf(stderr, "started indexing for %s\n", args->infile);
3252 if( in = fopen (args->infile, "r") )
3253 while( ch = getc(in), ch != EOF )
3258 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3259 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3262 else if( len < BT_maxkey )
3264 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3268 fprintf(stderr, "started finding keys for %s\n", args->infile);
3269 if( in = fopen (args->infile, "rb") )
3270 while( ch = getc(in), ch != EOF )
3274 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3277 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3280 else if( len < BT_maxkey )
3282 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3286 fprintf(stderr, "started scanning\n");
3288 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3289 set->page = bt_mappage (bt, set->latch);
3291 fprintf(stderr, "unable to obtain latch"), exit(1);
3292 bt_lockpage (bt, BtLockRead, set->latch);
3293 next = bt_getid (set->page->right);
3295 for( slot = 0; slot++ < set->page->cnt; )
3296 if( next || slot < set->page->cnt )
3297 if( !slotptr(set->page, slot)->dead ) {
3298 ptr = keyptr(set->page, slot);
3301 if( slotptr(set->page, slot)->type == Duplicate )
3304 fwrite (ptr->key, len, 1, stdout);
3305 val = valptr(set->page, slot);
3306 fwrite (val->value, val->len, 1, stdout);
3307 fputc ('\n', stdout);
3311 bt_unlockpage (bt, BtLockRead, set->latch);
3312 bt_unpinlatch (set->latch);
3313 } while( page_no = next );
3315 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3319 fprintf(stderr, "started reverse scan\n");
3320 if( slot = bt_lastkey (bt) )
3321 while( slot = bt_prevkey (bt, slot) ) {
3322 if( slotptr(bt->cursor, slot)->dead )
3325 ptr = keyptr(bt->cursor, slot);
3328 if( slotptr(bt->cursor, slot)->type == Duplicate )
3331 fwrite (ptr->key, len, 1, stdout);
3332 val = valptr(bt->cursor, slot);
3333 fwrite (val->value, val->len, 1, stdout);
3334 fputc ('\n', stdout);
3338 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3343 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3345 fprintf(stderr, "started counting\n");
3346 page_no = LEAF_page;
3348 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3349 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3352 if( !bt->frame->free && !bt->frame->lvl )
3353 cnt += bt->frame->act;
3359 cnt--; // remove stopper key
3360 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3372 typedef struct timeval timer;
3374 int main (int argc, char **argv)
3376 int idx, cnt, len, slot, err;
3377 int segsize, bits = 16;
3394 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3395 fprintf (stderr, " where idx_file is the name of the btree file\n");
3396 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3397 fprintf (stderr, " page_bits is the page size in bits\n");
3398 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3399 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3400 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3404 start = getCpuTime(0);
3407 bits = atoi(argv[3]);
3410 poolsize = atoi(argv[4]);
3413 fprintf (stderr, "Warning: no mapped_pool\n");
3416 num = atoi(argv[5]);
3420 threads = malloc (cnt * sizeof(pthread_t));
3422 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3424 args = malloc (cnt * sizeof(ThreadArg));
3426 mgr = bt_mgr ((argv[1]), bits, poolsize);
3429 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3435 for( idx = 0; idx < cnt; idx++ ) {
3436 args[idx].infile = argv[idx + 6];
3437 args[idx].type = argv[2];
3438 args[idx].mgr = mgr;
3439 args[idx].num = num;
3440 args[idx].idx = idx;
3442 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3443 fprintf(stderr, "Error creating thread %d\n", err);
3445 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3449 // wait for termination
3452 for( idx = 0; idx < cnt; idx++ )
3453 pthread_join (threads[idx], NULL);
3455 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3457 for( idx = 0; idx < cnt; idx++ )
3458 CloseHandle(threads[idx]);
3464 elapsed = getCpuTime(0) - start;
3465 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3466 elapsed = getCpuTime(1);
3467 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3468 elapsed = getCpuTime(2);
3469 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);