1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and ACID batched key-value updates
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
88 There are six lock types for each node in four independent sets:
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
106 // definition for phase-fair reader/writer lock implementation
115 // write only queue lock
127 // definition for spin latch implementation
129 // exclusive is set for write access
130 // share is count of read accessors
131 // grant write lock when share == 0
133 volatile typedef struct {
144 // hash table entries
147 volatile uint slot; // Latch table entry at head of chain
148 BtSpinLatch latch[1];
151 // latch manager table structure
154 uid page_no; // latch set page number
155 RWLock readwr[1]; // read/write page lock
156 RWLock access[1]; // Access Intent/Page delete
157 WOLock parent[1]; // Posting of fence key in parent
158 WOLock atomic[1]; // Atomic update in progress
159 uint split; // right split page atomic insert
160 uint entry; // entry slot in latch table
161 uint next; // next entry in hash table chain
162 uint prev; // prev entry in hash table chain
163 volatile ushort pin; // number of outstanding threads
164 ushort dirty:1; // page in cache is dirty
167 // Define the length of the page record numbers
171 // Page key slot definition.
173 // Keys are marked dead, but remain on the page until
174 // it cleanup is called. The fence key (highest key) for
175 // a leaf page is always present, even after cleanup.
179 // In addition to the Unique keys that occupy slots
180 // there are Librarian and Duplicate key
181 // slots occupying the key slot array.
183 // The Librarian slots are dead keys that
184 // serve as filler, available to add new Unique
185 // or Dup slots that are inserted into the B-tree.
187 // The Duplicate slots have had their key bytes extended
188 // by 6 bytes to contain a binary duplicate key uniqueifier.
199 uint off:BT_maxbits; // page offset for key start
200 uint type:3; // type of slot
201 uint dead:1; // set for deleted slot
204 // The key structure occupies space at the upper end of
205 // each page. It's a length byte followed by the key
209 unsigned char len; // this can be changed to a ushort or uint
210 unsigned char key[0];
213 // the value structure also occupies space at the upper
214 // end of the page. Each key is immediately followed by a value.
217 unsigned char len; // this can be changed to a ushort or uint
218 unsigned char value[0];
221 #define BT_maxkey 255 // maximum number of bytes in a key
222 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
224 // The first part of an index page.
225 // It is immediately followed
226 // by the BtSlot array of keys.
228 // note that this structure size
229 // must be a multiple of 8 bytes
230 // in order to place dups correctly.
232 typedef struct BtPage_ {
233 uint cnt; // count of keys in page
234 uint act; // count of active keys
235 uint min; // next key offset
236 uint garbage; // page garbage in bytes
237 unsigned char bits:7; // page size in bits
238 unsigned char free:1; // page is on free chain
239 unsigned char lvl:7; // level of page
240 unsigned char kill:1; // page is being deleted
241 unsigned char right[BtId]; // page number to right
242 unsigned char left[BtId]; // page number to left
243 unsigned char filler[2]; // padding to multiple of 8
246 // The loadpage interface object
249 BtPage page; // current page pointer
250 BtLatchSet *latch; // current page latch set
253 // structure for latch manager on ALLOC_page
256 struct BtPage_ alloc[1]; // next page_no in right ptr
257 unsigned long long dups[1]; // global duplicate key uniqueifier
258 unsigned char chain[BtId]; // head of free page_nos chain
261 // The object structure for Btree access
264 uint page_size; // page size
265 uint page_bits; // page size in bits
271 BtPageZero *pagezero; // mapped allocation page
272 BtSpinLatch lock[1]; // allocation area lite latch
273 uint latchdeployed; // highest number of latch entries deployed
274 uint nlatchpage; // number of latch pages at BT_latch
275 uint latchtotal; // number of page latch entries
276 uint latchhash; // number of latch hash table slots
277 uint latchvictim; // next latch entry to examine
278 ushort thread_no[1]; // next thread number
279 BtHashEntry *hashtable; // the buffer pool hash table entries
280 BtLatchSet *latchsets; // mapped latch set from buffer pool
281 unsigned char *pagepool; // mapped to the buffer pool pages
283 HANDLE halloc; // allocation handle
284 HANDLE hpool; // buffer pool handle
289 BtMgr *mgr; // buffer manager for thread
290 BtPage cursor; // cached frame for start/next (never mapped)
291 BtPage frame; // spare frame for the page split (never mapped)
292 uid cursor_page; // current cursor page number
293 unsigned char *mem; // frame, cursor, page memory buffer
294 unsigned char key[BT_keyarray]; // last found complete key
295 int found; // last delete or insert was found
296 int err; // last error
297 int reads, writes; // number of reads and writes from the btree
298 ushort thread_no; // thread number
312 #define CLOCK_bit 0x8000
315 extern void bt_close (BtDb *bt);
316 extern BtDb *bt_open (BtMgr *mgr);
317 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
318 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
319 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
320 extern BtKey *bt_foundkey (BtDb *bt);
321 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
322 extern uint bt_nextkey (BtDb *bt, uint slot);
325 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
326 void bt_mgrclose (BtMgr *mgr);
328 // Helper functions to return slot values
329 // from the cursor page.
331 extern BtKey *bt_key (BtDb *bt, uint slot);
332 extern BtVal *bt_val (BtDb *bt, uint slot);
334 // The page is allocated from low and hi ends.
335 // The key slots are allocated from the bottom,
336 // while the text and value of the key
337 // are allocated from the top. When the two
338 // areas meet, the page is split into two.
340 // A key consists of a length byte, two bytes of
341 // index number (0 - 65535), and up to 253 bytes
344 // Associated with each key is a value byte string
345 // containing any value desired.
347 // The b-tree root is always located at page 1.
348 // The first leaf page of level zero is always
349 // located on page 2.
351 // The b-tree pages are linked with next
352 // pointers to facilitate enumerators,
353 // and provide for concurrency.
355 // When to root page fills, it is split in two and
356 // the tree height is raised by a new root at page
357 // one with two keys.
359 // Deleted keys are marked with a dead bit until
360 // page cleanup. The fence key for a leaf node is
363 // To achieve maximum concurrency one page is locked at a time
364 // as the tree is traversed to find leaf key in question. The right
365 // page numbers are used in cases where the page is being split,
368 // Page 0 is dedicated to lock for new page extensions,
369 // and chains empty pages together for reuse. It also
370 // contains the latch manager hash table.
372 // The ParentModification lock on a node is obtained to serialize posting
373 // or changing the fence key for a node.
375 // Empty pages are chained together through the ALLOC page and reused.
377 // Access macros to address slot and key values from the page
378 // Page slots use 1 based indexing.
380 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
381 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
382 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
384 void bt_putid(unsigned char *dest, uid id)
389 dest[i] = (unsigned char)id, id >>= 8;
392 uid bt_getid(unsigned char *src)
397 for( i = 0; i < BtId; i++ )
398 id <<= 8, id |= *src++;
403 uid bt_newdup (BtDb *bt)
406 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
408 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
412 // Write-Only Queue Lock
414 void WriteOLock (WOLock *lock)
418 tix = __sync_fetch_and_add (lock->ticket, 1);
420 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
422 // wait for our ticket to come up
424 while( tix != lock->serving[0] )
432 void WriteORelease (WOLock *lock)
437 // Phase-Fair reader/writer lock implementation
439 void WriteLock (RWLock *lock)
444 tix = __sync_fetch_and_add (lock->ticket, 1);
446 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
448 // wait for our ticket to come up
450 while( tix != lock->serving[0] )
457 w = PRES | (tix & PHID);
459 r = __sync_fetch_and_add (lock->rin, w);
461 r = _InterlockedExchangeAdd16 (lock->rin, w);
463 while( r != *lock->rout )
471 void WriteRelease (RWLock *lock)
474 __sync_fetch_and_and (lock->rin, ~MASK);
476 _InterlockedAnd16 (lock->rin, ~MASK);
481 void ReadLock (RWLock *lock)
485 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
487 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
490 while( w == (*lock->rin & MASK) )
498 void ReadRelease (RWLock *lock)
501 __sync_fetch_and_add (lock->rout, RINC);
503 _InterlockedExchangeAdd16 (lock->rout, RINC);
507 // Spin Latch Manager
509 // wait until write lock mode is clear
510 // and add 1 to the share count
512 void bt_spinreadlock(BtSpinLatch *latch)
518 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
520 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
522 // see if exclusive request is granted or pending
527 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
529 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
532 } while( sched_yield(), 1 );
534 } while( SwitchToThread(), 1 );
538 // wait for other read and write latches to relinquish
540 void bt_spinwritelock(BtSpinLatch *latch)
546 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
548 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
551 if( !(prev & ~BOTH) )
555 __sync_fetch_and_and ((ushort *)latch, ~XCL);
557 _InterlockedAnd16((ushort *)latch, ~XCL);
560 } while( sched_yield(), 1 );
562 } while( SwitchToThread(), 1 );
566 // try to obtain write lock
568 // return 1 if obtained,
571 int bt_spinwritetry(BtSpinLatch *latch)
576 prev = __sync_fetch_and_or((ushort *)latch, XCL);
578 prev = _InterlockedOr16((ushort *)latch, XCL);
580 // take write access if all bits are clear
583 if( !(prev & ~BOTH) )
587 __sync_fetch_and_and ((ushort *)latch, ~XCL);
589 _InterlockedAnd16((ushort *)latch, ~XCL);
596 void bt_spinreleasewrite(BtSpinLatch *latch)
599 __sync_fetch_and_and((ushort *)latch, ~BOTH);
601 _InterlockedAnd16((ushort *)latch, ~BOTH);
605 // decrement reader count
607 void bt_spinreleaseread(BtSpinLatch *latch)
610 __sync_fetch_and_add((ushort *)latch, -SHARE);
612 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
616 // read page from permanent location in Btree file
618 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
620 off64_t off = page_no << mgr->page_bits;
623 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
624 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
631 memset (ovl, 0, sizeof(OVERLAPPED));
633 ovl->OffsetHigh = off >> 32;
635 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
636 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
639 if( *amt < mgr->page_size ) {
640 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
647 // write page to permanent location in Btree file
648 // clear the dirty bit
650 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
652 off64_t off = page_no << mgr->page_bits;
655 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
661 memset (ovl, 0, sizeof(OVERLAPPED));
663 ovl->OffsetHigh = off >> 32;
665 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
668 if( *amt < mgr->page_size )
674 // link latch table entry into head of latch hash table
676 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
678 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
679 BtLatchSet *latch = bt->mgr->latchsets + slot;
681 if( latch->next = bt->mgr->hashtable[hashidx].slot )
682 bt->mgr->latchsets[latch->next].prev = slot;
684 bt->mgr->hashtable[hashidx].slot = slot;
685 latch->page_no = page_no;
692 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
700 // set CLOCK bit in latch
701 // decrement pin count
703 void bt_unpinlatch (BtLatchSet *latch)
706 if( ~latch->pin & CLOCK_bit )
707 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
708 __sync_fetch_and_add(&latch->pin, -1);
710 if( ~latch->pin & CLOCK_bit )
711 _InterlockedOr16 (&latch->pin, CLOCK_bit);
712 _InterlockedDecrement16 (&latch->pin);
716 // return the btree cached page address
718 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
720 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
725 // find existing latchset or inspire new one
726 // return with latchset pinned
728 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
730 uint hashidx = page_no % bt->mgr->latchhash;
738 // try to find our entry
740 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
742 if( slot = bt->mgr->hashtable[hashidx].slot ) do
744 latch = bt->mgr->latchsets + slot;
745 if( page_no == latch->page_no )
747 } while( slot = latch->next );
753 latch = bt->mgr->latchsets + slot;
755 __sync_fetch_and_add(&latch->pin, 1);
757 _InterlockedIncrement16 (&latch->pin);
759 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
763 // see if there are any unused pool entries
765 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
767 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
770 if( slot < bt->mgr->latchtotal ) {
771 latch = bt->mgr->latchsets + slot;
772 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
774 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
779 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
781 _InterlockedDecrement (&bt->mgr->latchdeployed);
783 // find and reuse previous entry on victim
787 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
789 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
791 // try to get write lock on hash chain
792 // skip entry if not obtained
793 // or has outstanding pins
795 slot %= bt->mgr->latchtotal;
800 latch = bt->mgr->latchsets + slot;
801 idx = latch->page_no % bt->mgr->latchhash;
803 // see we are on same chain as hashidx
808 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
811 // skip this slot if it is pinned
812 // or the CLOCK bit is set
815 if( latch->pin & CLOCK_bit ) {
817 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
819 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
822 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
826 // update permanent page area in btree from buffer pool
828 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
831 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
834 latch->dirty = 0, bt->writes++;
836 // unlink our available slot from its hash chain
839 bt->mgr->latchsets[latch->prev].next = latch->next;
841 bt->mgr->hashtable[idx].slot = latch->next;
844 bt->mgr->latchsets[latch->next].prev = latch->prev;
846 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
848 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
851 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
856 void bt_mgrclose (BtMgr *mgr)
863 // flush dirty pool pages to the btree
865 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
866 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
867 latch = mgr->latchsets + slot;
870 bt_writepage(mgr, page, latch->page_no);
871 latch->dirty = 0, num++;
873 // madvise (page, mgr->page_size, MADV_DONTNEED);
876 fprintf(stderr, "%d buffer pool pages flushed\n", num);
879 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
880 munmap (mgr->pagezero, mgr->page_size);
882 FlushViewOfFile(mgr->pagezero, 0);
883 UnmapViewOfFile(mgr->pagezero);
884 UnmapViewOfFile(mgr->hashtable);
885 CloseHandle(mgr->halloc);
886 CloseHandle(mgr->hpool);
892 FlushFileBuffers(mgr->idx);
893 CloseHandle(mgr->idx);
898 // close and release memory
900 void bt_close (BtDb *bt)
907 VirtualFree (bt->mem, 0, MEM_RELEASE);
912 // open/create new btree buffer manager
914 // call with file_name, BT_openmode, bits in page size (e.g. 16),
915 // size of page pool (e.g. 262144)
917 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
919 uint lvl, attr, last, slot, idx;
920 uint nlatchpage, latchhash;
921 unsigned char value[BtId];
922 int flag, initit = 0;
923 BtPageZero *pagezero;
930 // determine sanity of page size and buffer pool
932 if( bits > BT_maxbits )
934 else if( bits < BT_minbits )
938 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
943 mgr = calloc (1, sizeof(BtMgr));
945 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
947 if( mgr->idx == -1 ) {
948 fprintf (stderr, "Unable to open btree file\n");
949 return free(mgr), NULL;
952 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
953 attr = FILE_ATTRIBUTE_NORMAL;
954 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
956 if( mgr->idx == INVALID_HANDLE_VALUE )
957 return GlobalFree(mgr), NULL;
961 pagezero = valloc (BT_maxpage);
964 // read minimum page size to get root info
965 // to support raw disk partition files
966 // check if bits == 0 on the disk.
968 if( size = lseek (mgr->idx, 0L, 2) )
969 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
970 if( pagezero->alloc->bits )
971 bits = pagezero->alloc->bits;
975 return free(mgr), free(pagezero), NULL;
979 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
980 size = GetFileSize(mgr->idx, amt);
983 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
984 return bt_mgrclose (mgr), NULL;
985 bits = pagezero->alloc->bits;
990 mgr->page_size = 1 << bits;
991 mgr->page_bits = bits;
993 // calculate number of latch hash table entries
995 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
996 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
998 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
999 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1000 mgr->latchtotal = nodemax;
1005 // initialize an empty b-tree with latch page, root page, page of leaves
1006 // and page(s) of latches and page pool cache
1008 memset (pagezero, 0, 1 << bits);
1009 pagezero->alloc->bits = mgr->page_bits;
1010 bt_putid(pagezero->alloc->right, MIN_lvl+1);
1012 // initialize left-most LEAF page in
1015 bt_putid (pagezero->alloc->left, LEAF_page);
1017 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1018 fprintf (stderr, "Unable to create btree page zero\n");
1019 return bt_mgrclose (mgr), NULL;
1022 memset (pagezero, 0, 1 << bits);
1023 pagezero->alloc->bits = mgr->page_bits;
1025 for( lvl=MIN_lvl; lvl--; ) {
1026 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1027 key = keyptr(pagezero->alloc, 1);
1028 key->len = 2; // create stopper key
1032 bt_putid(value, MIN_lvl - lvl + 1);
1033 val = valptr(pagezero->alloc, 1);
1034 val->len = lvl ? BtId : 0;
1035 memcpy (val->value, value, val->len);
1037 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1038 pagezero->alloc->lvl = lvl;
1039 pagezero->alloc->cnt = 1;
1040 pagezero->alloc->act = 1;
1042 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1043 fprintf (stderr, "Unable to create btree page zero\n");
1044 return bt_mgrclose (mgr), NULL;
1052 VirtualFree (pagezero, 0, MEM_RELEASE);
1055 // mlock the pagezero page
1057 flag = PROT_READ | PROT_WRITE;
1058 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1059 if( mgr->pagezero == MAP_FAILED ) {
1060 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1061 return bt_mgrclose (mgr), NULL;
1063 mlock (mgr->pagezero, mgr->page_size);
1065 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1066 if( mgr->hashtable == MAP_FAILED ) {
1067 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1068 return bt_mgrclose (mgr), NULL;
1071 flag = PAGE_READWRITE;
1072 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1073 if( !mgr->halloc ) {
1074 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1075 return bt_mgrclose (mgr), NULL;
1078 flag = FILE_MAP_WRITE;
1079 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1080 if( !mgr->pagezero ) {
1081 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1082 return bt_mgrclose (mgr), NULL;
1085 flag = PAGE_READWRITE;
1086 size = (uid)mgr->nlatchpage << mgr->page_bits;
1087 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1089 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1090 return bt_mgrclose (mgr), NULL;
1093 flag = FILE_MAP_WRITE;
1094 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1095 if( !mgr->hashtable ) {
1096 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1097 return bt_mgrclose (mgr), NULL;
1101 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1102 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1107 // open BTree access method
1108 // based on buffer manager
1110 BtDb *bt_open (BtMgr *mgr)
1112 BtDb *bt = malloc (sizeof(*bt));
1114 memset (bt, 0, sizeof(*bt));
1117 bt->mem = valloc (2 *mgr->page_size);
1119 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1121 bt->frame = (BtPage)bt->mem;
1122 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1124 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1126 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1131 // compare two keys, return > 0, = 0, or < 0
1132 // =0: keys are same
1135 // as the comparison value
1137 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1139 uint len1 = key1->len;
1142 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1153 // place write, read, or parent lock on requested page_no.
1155 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1159 ReadLock (latch->readwr);
1162 WriteLock (latch->readwr);
1165 ReadLock (latch->access);
1168 WriteLock (latch->access);
1171 WriteOLock (latch->parent);
1174 WriteOLock (latch->atomic);
1179 // remove write, read, or parent lock on requested page
1181 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1185 ReadRelease (latch->readwr);
1188 WriteRelease (latch->readwr);
1191 ReadRelease (latch->access);
1194 WriteRelease (latch->access);
1197 WriteORelease (latch->parent);
1200 WriteORelease (latch->atomic);
1205 // allocate a new page
1206 // return with page latched, but unlocked.
1208 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1213 // lock allocation page
1215 bt_spinwritelock(bt->mgr->lock);
1217 // use empty chain first
1218 // else allocate empty page
1220 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1221 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1222 set->page = bt_mappage (bt, set->latch);
1224 return bt->err = BTERR_struct, -1;
1226 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1227 bt_spinreleasewrite(bt->mgr->lock);
1229 memcpy (set->page, contents, bt->mgr->page_size);
1230 set->latch->dirty = 1;
1234 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1235 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1237 // unlock allocation latch
1239 bt_spinreleasewrite(bt->mgr->lock);
1241 // don't load cache from btree page
1243 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1244 set->page = bt_mappage (bt, set->latch);
1246 return bt->err = BTERR_struct;
1248 memcpy (set->page, contents, bt->mgr->page_size);
1249 set->latch->dirty = 1;
1253 // find slot in page for given key at a given level
1255 int bt_findslot (BtPage page, unsigned char *key, uint len)
1257 uint diff, higher = page->cnt, low = 1, slot;
1260 // make stopper key an infinite fence value
1262 if( bt_getid (page->right) )
1267 // low is the lowest candidate.
1268 // loop ends when they meet
1270 // higher is already
1271 // tested as .ge. the passed key.
1273 while( diff = higher - low ) {
1274 slot = low + ( diff >> 1 );
1275 if( keycmp (keyptr(page, slot), key, len) < 0 )
1278 higher = slot, good++;
1281 // return zero if key is on right link page
1283 return good ? higher : 0;
1286 // find and load page at given level for given key
1287 // leave page rd or wr locked as requested
1289 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1291 uid page_no = ROOT_page, prevpage = 0;
1292 uint drill = 0xff, slot;
1293 BtLatchSet *prevlatch;
1294 uint mode, prevmode;
1296 // start at root of btree and drill down
1299 // determine lock mode of drill level
1300 mode = (drill == lvl) ? lock : BtLockRead;
1302 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1305 // obtain access lock using lock chaining with Access mode
1307 if( page_no > ROOT_page )
1308 bt_lockpage(bt, BtLockAccess, set->latch);
1310 set->page = bt_mappage (bt, set->latch);
1312 // release & unpin parent or left sibling page
1315 bt_unlockpage(bt, prevmode, prevlatch);
1316 bt_unpinlatch (prevlatch);
1320 // obtain mode lock using lock chaining through AccessLock
1322 bt_lockpage(bt, mode, set->latch);
1324 if( set->page->free )
1325 return bt->err = BTERR_struct, 0;
1327 if( page_no > ROOT_page )
1328 bt_unlockpage(bt, BtLockAccess, set->latch);
1330 // re-read and re-lock root after determining actual level of root
1332 if( set->page->lvl != drill) {
1333 if( set->latch->page_no != ROOT_page )
1334 return bt->err = BTERR_struct, 0;
1336 drill = set->page->lvl;
1338 if( lock != BtLockRead && drill == lvl ) {
1339 bt_unlockpage(bt, mode, set->latch);
1340 bt_unpinlatch (set->latch);
1345 prevpage = set->latch->page_no;
1346 prevlatch = set->latch;
1349 // find key on page at this level
1350 // and descend to requested level
1352 if( !set->page->kill )
1353 if( slot = bt_findslot (set->page, key, len) ) {
1357 // find next non-dead slot -- the fence key if nothing else
1359 while( slotptr(set->page, slot)->dead )
1360 if( slot++ < set->page->cnt )
1363 return bt->err = BTERR_struct, 0;
1365 page_no = bt_getid(valptr(set->page, slot)->value);
1370 // or slide right into next page
1372 page_no = bt_getid(set->page->right);
1375 // return error on end of right chain
1377 bt->err = BTERR_struct;
1378 return 0; // return error
1381 // return page to free list
1382 // page must be delete & write locked
1384 void bt_freepage (BtDb *bt, BtPageSet *set)
1386 // lock allocation page
1388 bt_spinwritelock (bt->mgr->lock);
1392 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1393 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1394 set->latch->dirty = 1;
1395 set->page->free = 1;
1397 // unlock released page
1399 bt_unlockpage (bt, BtLockDelete, set->latch);
1400 bt_unlockpage (bt, BtLockWrite, set->latch);
1401 bt_unpinlatch (set->latch);
1403 // unlock allocation page
1405 bt_spinreleasewrite (bt->mgr->lock);
1408 // a fence key was deleted from a page
1409 // push new fence value upwards
1411 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1413 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1414 unsigned char value[BtId];
1418 // remove the old fence value
1420 ptr = keyptr(set->page, set->page->cnt);
1421 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1422 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1423 set->latch->dirty = 1;
1425 // cache new fence value
1427 ptr = keyptr(set->page, set->page->cnt);
1428 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1430 bt_lockpage (bt, BtLockParent, set->latch);
1431 bt_unlockpage (bt, BtLockWrite, set->latch);
1433 // insert new (now smaller) fence key
1435 bt_putid (value, set->latch->page_no);
1436 ptr = (BtKey*)leftkey;
1438 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1441 // now delete old fence key
1443 ptr = (BtKey*)rightkey;
1445 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1448 bt_unlockpage (bt, BtLockParent, set->latch);
1449 bt_unpinlatch(set->latch);
1453 // root has a single child
1454 // collapse a level from the tree
1456 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1462 // find the child entry and promote as new root contents
1465 for( idx = 0; idx++ < root->page->cnt; )
1466 if( !slotptr(root->page, idx)->dead )
1469 page_no = bt_getid (valptr(root->page, idx)->value);
1471 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1472 child->page = bt_mappage (bt, child->latch);
1476 bt_lockpage (bt, BtLockDelete, child->latch);
1477 bt_lockpage (bt, BtLockWrite, child->latch);
1479 memcpy (root->page, child->page, bt->mgr->page_size);
1480 root->latch->dirty = 1;
1482 bt_freepage (bt, child);
1484 } while( root->page->lvl > 1 && root->page->act == 1 );
1486 bt_unlockpage (bt, BtLockWrite, root->latch);
1487 bt_unpinlatch (root->latch);
1491 // delete a page and manage keys
1492 // call with page writelocked
1493 // returns with page unpinned
1495 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1497 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1498 unsigned char value[BtId];
1499 uint lvl = set->page->lvl;
1504 // cache copy of fence key
1505 // to post in parent
1507 ptr = keyptr(set->page, set->page->cnt);
1508 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1510 // obtain lock on right page
1512 page_no = bt_getid(set->page->right);
1514 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1515 right->page = bt_mappage (bt, right->latch);
1519 bt_lockpage (bt, BtLockWrite, right->latch);
1521 // cache copy of key to update
1523 ptr = keyptr(right->page, right->page->cnt);
1524 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1526 if( right->page->kill )
1527 return bt->err = BTERR_struct;
1529 // pull contents of right peer into our empty page
1531 memcpy (set->page, right->page, bt->mgr->page_size);
1532 set->latch->dirty = 1;
1534 // mark right page deleted and point it to left page
1535 // until we can post parent updates that remove access
1536 // to the deleted page.
1538 bt_putid (right->page->right, set->latch->page_no);
1539 right->latch->dirty = 1;
1540 right->page->kill = 1;
1542 bt_lockpage (bt, BtLockParent, right->latch);
1543 bt_unlockpage (bt, BtLockWrite, right->latch);
1545 bt_lockpage (bt, BtLockParent, set->latch);
1546 bt_unlockpage (bt, BtLockWrite, set->latch);
1548 // redirect higher key directly to our new node contents
1550 bt_putid (value, set->latch->page_no);
1551 ptr = (BtKey*)higherfence;
1553 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1556 // delete old lower key to our node
1558 ptr = (BtKey*)lowerfence;
1560 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1563 // obtain delete and write locks to right node
1565 bt_unlockpage (bt, BtLockParent, right->latch);
1566 bt_lockpage (bt, BtLockDelete, right->latch);
1567 bt_lockpage (bt, BtLockWrite, right->latch);
1568 bt_freepage (bt, right);
1570 bt_unlockpage (bt, BtLockParent, set->latch);
1571 bt_unpinlatch (set->latch);
1576 // find and delete key on page by marking delete flag bit
1577 // if page becomes empty, delete it from the btree
1579 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1581 uint slot, idx, found, fence;
1586 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1587 ptr = keyptr(set->page, slot);
1591 // if librarian slot, advance to real slot
1593 if( slotptr(set->page, slot)->type == Librarian )
1594 ptr = keyptr(set->page, ++slot);
1596 fence = slot == set->page->cnt;
1598 // if key is found delete it, otherwise ignore request
1600 if( found = !keycmp (ptr, key, len) )
1601 if( found = slotptr(set->page, slot)->dead == 0 ) {
1602 val = valptr(set->page,slot);
1603 slotptr(set->page, slot)->dead = 1;
1604 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1607 // collapse empty slots beneath the fence
1609 while( idx = set->page->cnt - 1 )
1610 if( slotptr(set->page, idx)->dead ) {
1611 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1612 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1617 // did we delete a fence key in an upper level?
1619 if( found && lvl && set->page->act && fence )
1620 if( bt_fixfence (bt, set, lvl) )
1623 return bt->found = found, 0;
1625 // do we need to collapse root?
1627 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1628 if( bt_collapseroot (bt, set) )
1631 return bt->found = found, 0;
1633 // delete empty page
1635 if( !set->page->act )
1636 return bt_deletepage (bt, set);
1638 set->latch->dirty = 1;
1639 bt_unlockpage(bt, BtLockWrite, set->latch);
1640 bt_unpinlatch (set->latch);
1641 return bt->found = found, 0;
1644 BtKey *bt_foundkey (BtDb *bt)
1646 return (BtKey*)(bt->key);
1649 // advance to next slot
1651 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1653 BtLatchSet *prevlatch;
1656 if( slot < set->page->cnt )
1659 prevlatch = set->latch;
1661 if( page_no = bt_getid(set->page->right) )
1662 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1663 set->page = bt_mappage (bt, set->latch);
1667 return bt->err = BTERR_struct, 0;
1669 // obtain access lock using lock chaining with Access mode
1671 bt_lockpage(bt, BtLockAccess, set->latch);
1673 bt_unlockpage(bt, BtLockRead, prevlatch);
1674 bt_unpinlatch (prevlatch);
1676 bt_lockpage(bt, BtLockRead, set->latch);
1677 bt_unlockpage(bt, BtLockAccess, set->latch);
1681 // find unique key or first duplicate key in
1682 // leaf level and return number of value bytes
1683 // or (-1) if not found. Setup key for bt_foundkey
1685 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1693 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1695 ptr = keyptr(set->page, slot);
1697 // skip librarian slot place holder
1699 if( slotptr(set->page, slot)->type == Librarian )
1700 ptr = keyptr(set->page, ++slot);
1702 // return actual key found
1704 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1707 if( slotptr(set->page, slot)->type == Duplicate )
1710 // not there if we reach the stopper key
1712 if( slot == set->page->cnt )
1713 if( !bt_getid (set->page->right) )
1716 // if key exists, return >= 0 value bytes copied
1717 // otherwise return (-1)
1719 if( slotptr(set->page, slot)->dead )
1723 if( !memcmp (ptr->key, key, len) ) {
1724 val = valptr (set->page,slot);
1725 if( valmax > val->len )
1727 memcpy (value, val->value, valmax);
1733 } while( slot = bt_findnext (bt, set, slot) );
1735 bt_unlockpage (bt, BtLockRead, set->latch);
1736 bt_unpinlatch (set->latch);
1740 // check page for space available,
1741 // clean if necessary and return
1742 // 0 - page needs splitting
1743 // >0 new slot value
1745 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1747 uint nxt = bt->mgr->page_size;
1748 BtPage page = set->page;
1749 uint cnt = 0, idx = 0;
1750 uint max = page->cnt;
1755 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1758 // skip cleanup and proceed to split
1759 // if there's not enough garbage
1762 if( page->garbage < nxt / 5 )
1765 memcpy (bt->frame, page, bt->mgr->page_size);
1767 // skip page info and set rest of page to zero
1769 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1770 set->latch->dirty = 1;
1774 // clean up page first by
1775 // removing deleted keys
1777 while( cnt++ < max ) {
1780 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1783 // copy the value across
1785 val = valptr(bt->frame, cnt);
1786 nxt -= val->len + sizeof(BtVal);
1787 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1789 // copy the key across
1791 key = keyptr(bt->frame, cnt);
1792 nxt -= key->len + sizeof(BtKey);
1793 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1795 // make a librarian slot
1798 slotptr(page, ++idx)->off = nxt;
1799 slotptr(page, idx)->type = Librarian;
1800 slotptr(page, idx)->dead = 1;
1805 slotptr(page, ++idx)->off = nxt;
1806 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1808 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1815 // see if page has enough space now, or does it need splitting?
1817 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1823 // split the root and raise the height of the btree
1825 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1827 unsigned char leftkey[BT_keyarray];
1828 uint nxt = bt->mgr->page_size;
1829 unsigned char value[BtId];
1835 // save left page fence key for new root
1837 ptr = keyptr(root->page, root->page->cnt);
1838 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1840 // Obtain an empty page to use, and copy the current
1841 // root contents into it, e.g. lower keys
1843 if( bt_newpage(bt, left, root->page) )
1846 left_page_no = left->latch->page_no;
1847 bt_unpinlatch (left->latch);
1849 // preserve the page info at the bottom
1850 // of higher keys and set rest to zero
1852 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1854 // insert stopper key at top of newroot page
1855 // and increase the root height
1857 nxt -= BtId + sizeof(BtVal);
1858 bt_putid (value, right->page_no);
1859 val = (BtVal *)((unsigned char *)root->page + nxt);
1860 memcpy (val->value, value, BtId);
1863 nxt -= 2 + sizeof(BtKey);
1864 slotptr(root->page, 2)->off = nxt;
1865 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1870 // insert lower keys page fence key on newroot page as first key
1872 nxt -= BtId + sizeof(BtVal);
1873 bt_putid (value, left_page_no);
1874 val = (BtVal *)((unsigned char *)root->page + nxt);
1875 memcpy (val->value, value, BtId);
1878 ptr = (BtKey *)leftkey;
1879 nxt -= ptr->len + sizeof(BtKey);
1880 slotptr(root->page, 1)->off = nxt;
1881 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1883 bt_putid(root->page->right, 0);
1884 root->page->min = nxt; // reset lowest used offset and key count
1885 root->page->cnt = 2;
1886 root->page->act = 2;
1889 // release and unpin root pages
1891 bt_unlockpage(bt, BtLockWrite, root->latch);
1892 bt_unpinlatch (root->latch);
1894 bt_unpinlatch (right);
1898 // split already locked full node
1900 // return pool entry for new right
1903 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1905 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1906 uint lvl = set->page->lvl;
1913 // split higher half of keys to bt->frame
1915 memset (bt->frame, 0, bt->mgr->page_size);
1916 max = set->page->cnt;
1920 while( cnt++ < max ) {
1921 if( slotptr(set->page, cnt)->dead && cnt < max )
1923 src = valptr(set->page, cnt);
1924 nxt -= src->len + sizeof(BtVal);
1925 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1927 key = keyptr(set->page, cnt);
1928 nxt -= key->len + sizeof(BtKey);
1929 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1930 memcpy (ptr, key, key->len + sizeof(BtKey));
1932 // add librarian slot
1935 slotptr(bt->frame, ++idx)->off = nxt;
1936 slotptr(bt->frame, idx)->type = Librarian;
1937 slotptr(bt->frame, idx)->dead = 1;
1942 slotptr(bt->frame, ++idx)->off = nxt;
1943 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1945 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1949 bt->frame->bits = bt->mgr->page_bits;
1950 bt->frame->min = nxt;
1951 bt->frame->cnt = idx;
1952 bt->frame->lvl = lvl;
1956 if( set->latch->page_no > ROOT_page )
1957 bt_putid (bt->frame->right, bt_getid (set->page->right));
1959 // get new free page and write higher keys to it.
1961 if( bt_newpage(bt, right, bt->frame) )
1964 memcpy (bt->frame, set->page, bt->mgr->page_size);
1965 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1966 set->latch->dirty = 1;
1968 nxt = bt->mgr->page_size;
1969 set->page->garbage = 0;
1975 if( slotptr(bt->frame, max)->type == Librarian )
1978 // assemble page of smaller keys
1980 while( cnt++ < max ) {
1981 if( slotptr(bt->frame, cnt)->dead )
1983 val = valptr(bt->frame, cnt);
1984 nxt -= val->len + sizeof(BtVal);
1985 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1987 key = keyptr(bt->frame, cnt);
1988 nxt -= key->len + sizeof(BtKey);
1989 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1991 // add librarian slot
1994 slotptr(set->page, ++idx)->off = nxt;
1995 slotptr(set->page, idx)->type = Librarian;
1996 slotptr(set->page, idx)->dead = 1;
2001 slotptr(set->page, ++idx)->off = nxt;
2002 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2006 bt_putid(set->page->right, right->latch->page_no);
2007 set->page->min = nxt;
2008 set->page->cnt = idx;
2010 return right->latch->entry;
2013 // fix keys for newly split page
2014 // call with page locked, return
2017 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2019 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2020 unsigned char value[BtId];
2021 uint lvl = set->page->lvl;
2025 // if current page is the root page, split it
2027 if( set->latch->page_no == ROOT_page )
2028 return bt_splitroot (bt, set, right);
2030 ptr = keyptr(set->page, set->page->cnt);
2031 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2033 page = bt_mappage (bt, right);
2035 ptr = keyptr(page, page->cnt);
2036 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2038 // insert new fences in their parent pages
2040 bt_lockpage (bt, BtLockParent, right);
2042 bt_lockpage (bt, BtLockParent, set->latch);
2043 bt_unlockpage (bt, BtLockWrite, set->latch);
2045 // insert new fence for reformulated left block of smaller keys
2047 bt_putid (value, set->latch->page_no);
2048 ptr = (BtKey *)leftkey;
2050 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2053 // switch fence for right block of larger keys to new right page
2055 bt_putid (value, right->page_no);
2056 ptr = (BtKey *)rightkey;
2058 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2061 bt_unlockpage (bt, BtLockParent, set->latch);
2062 bt_unpinlatch (set->latch);
2064 bt_unlockpage (bt, BtLockParent, right);
2065 bt_unpinlatch (right);
2069 // install new key and value onto page
2070 // page must already be checked for
2073 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2075 uint idx, librarian;
2080 // if found slot > desired slot and previous slot
2081 // is a librarian slot, use it
2084 if( slotptr(set->page, slot-1)->type == Librarian )
2087 // copy value onto page
2089 set->page->min -= vallen + sizeof(BtVal);
2090 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2091 memcpy (val->value, value, vallen);
2094 // copy key onto page
2096 set->page->min -= keylen + sizeof(BtKey);
2097 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2098 memcpy (ptr->key, key, keylen);
2101 // find first empty slot
2103 for( idx = slot; idx < set->page->cnt; idx++ )
2104 if( slotptr(set->page, idx)->dead )
2107 // now insert key into array before slot
2109 if( idx == set->page->cnt )
2110 idx += 2, set->page->cnt += 2, librarian = 2;
2114 set->latch->dirty = 1;
2117 while( idx > slot + librarian - 1 )
2118 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2120 // add librarian slot
2122 if( librarian > 1 ) {
2123 node = slotptr(set->page, slot++);
2124 node->off = set->page->min;
2125 node->type = Librarian;
2131 node = slotptr(set->page, slot);
2132 node->off = set->page->min;
2137 bt_unlockpage (bt, BtLockWrite, set->latch);
2138 bt_unpinlatch (set->latch);
2144 // Insert new key into the btree at given level.
2145 // either add a new key or update/add an existing one
2147 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2149 unsigned char newkey[BT_keyarray];
2150 uint slot, idx, len, entry;
2157 // set up the key we're working on
2159 ins = (BtKey*)newkey;
2160 memcpy (ins->key, key, keylen);
2163 // is this a non-unique index value?
2169 sequence = bt_newdup (bt);
2170 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2174 while( 1 ) { // find the page and slot for the current key
2175 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2176 ptr = keyptr(set->page, slot);
2179 bt->err = BTERR_ovflw;
2183 // if librarian slot == found slot, advance to real slot
2185 if( slotptr(set->page, slot)->type == Librarian )
2186 if( !keycmp (ptr, key, keylen) )
2187 ptr = keyptr(set->page, ++slot);
2191 if( slotptr(set->page, slot)->type == Duplicate )
2194 // if inserting a duplicate key or unique key
2195 // check for adequate space on the page
2196 // and insert the new key before slot.
2198 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2199 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2200 if( !(entry = bt_splitpage (bt, set)) )
2202 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2207 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2210 // if key already exists, update value and return
2212 val = valptr(set->page, slot);
2214 if( val->len >= vallen ) {
2215 if( slotptr(set->page, slot)->dead )
2217 set->page->garbage += val->len - vallen;
2218 set->latch->dirty = 1;
2219 slotptr(set->page, slot)->dead = 0;
2221 memcpy (val->value, value, vallen);
2222 bt_unlockpage(bt, BtLockWrite, set->latch);
2223 bt_unpinlatch (set->latch);
2227 // new update value doesn't fit in existing value area
2229 if( !slotptr(set->page, slot)->dead )
2230 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2232 slotptr(set->page, slot)->dead = 0;
2236 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2237 if( !(entry = bt_splitpage (bt, set)) )
2239 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2244 set->page->min -= vallen + sizeof(BtVal);
2245 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2246 memcpy (val->value, value, vallen);
2249 set->latch->dirty = 1;
2250 set->page->min -= keylen + sizeof(BtKey);
2251 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2252 memcpy (ptr->key, key, keylen);
2255 slotptr(set->page, slot)->off = set->page->min;
2256 bt_unlockpage(bt, BtLockWrite, set->latch);
2257 bt_unpinlatch (set->latch);
2264 uint entry; // latch table entry number
2265 uint slot:31; // page slot number
2266 uint reuse:1; // reused previous page
2270 uid page_no; // page number for split leaf
2271 void *next; // next key to insert
2272 uint entry:29; // latch table entry number
2273 uint type:2; // 0 == insert, 1 == delete, 2 == free
2274 uint nounlock:1; // don't unlock ParentModification
2275 unsigned char leafkey[BT_keyarray];
2278 // find and load leaf page for given key
2279 // leave page Atomic locked and Read locked.
2281 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2283 BtLatchSet *prevlatch;
2287 // find level one slot
2289 if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2292 // find next non-dead entry on this page
2293 // it will be the fence key if nothing else
2295 while( slotptr(set->page, slot)->dead )
2296 if( slot++ < set->page->cnt )
2299 return bt->err = BTERR_struct, 0;
2301 page_no = bt_getid(valptr(set->page, slot)->value);
2302 prevlatch = set->latch;
2305 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2306 set->page = bt_mappage (bt, set->latch);
2310 // obtain read lock using lock chaining with Access mode
2311 // release & unpin parent/left sibling page
2313 bt_lockpage(bt, BtLockAccess, set->latch);
2315 bt_unlockpage(bt, BtLockRead, prevlatch);
2316 bt_unpinlatch (prevlatch);
2318 bt_lockpage(bt, BtLockRead, set->latch);
2320 // find key on page at this level
2321 // and descend to requested level
2323 if( !set->page->kill )
2324 if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2325 bt_unlockpage(bt, BtLockRead, set->latch);
2326 bt_lockpage(bt, BtLockAtomic, set->latch);
2327 bt_lockpage(bt, BtLockRead, set->latch);
2328 bt_unlockpage(bt, BtLockAccess, set->latch);
2330 if( !set->page->kill )
2331 if( slot = bt_findslot (set->page, key, len) )
2334 bt_unlockpage(bt, BtLockAtomic, set->latch);
2337 // slide right into next page
2339 page_no = bt_getid(set->page->right);
2340 prevlatch = set->latch;
2343 // return error on end of right chain
2345 bt->err = BTERR_struct;
2346 return 0; // return error
2349 // determine actual page where key is located
2350 // return slot number
2352 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2354 BtKey *key = keyptr(source,src);
2355 uint slot = locks[src].slot;
2358 if( src > 1 && locks[src].reuse )
2359 entry = locks[src-1].entry, slot = 0;
2361 entry = locks[src].entry;
2364 set->latch = bt->mgr->latchsets + entry;
2365 set->page = bt_mappage (bt, set->latch);
2369 // is locks->reuse set?
2370 // if so, find where our key
2371 // is located on previous page or split pages
2374 set->latch = bt->mgr->latchsets + entry;
2375 set->page = bt_mappage (bt, set->latch);
2377 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2378 if( locks[src].reuse )
2379 locks[src].entry = entry;
2382 } while( entry = set->latch->split );
2384 bt->err = BTERR_atomic;
2388 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2390 BtKey *key = keyptr(source, src);
2391 BtVal *val = valptr(source, src);
2396 while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2397 if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2398 return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2400 if( entry = bt_splitpage (bt, set) )
2401 latch = bt->mgr->latchsets + entry;
2405 // splice right page into split chain
2406 // and WriteLock it.
2408 latch->split = set->latch->split;
2409 set->latch->split = entry;
2410 bt_lockpage(bt, BtLockWrite, latch);
2413 return bt->err = BTERR_atomic;
2416 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2418 BtKey *key = keyptr(source, src);
2419 uint idx, entry, slot;
2424 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2425 slotptr(set->page, slot)->dead = 1;
2427 return bt->err = BTERR_struct;
2429 ptr = keyptr(set->page, slot);
2430 val = valptr(set->page, slot);
2432 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2433 set->latch->dirty = 1;
2438 // delete an empty master page for a transaction
2440 // note that the far right page never empties because
2441 // it always contains (at least) the infinite stopper key
2442 // and that all pages that don't contain any keys are
2443 // deleted, or are being held under Atomic lock.
2445 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2447 BtPageSet right[1], temp[1];
2448 unsigned char value[BtId];
2452 bt_lockpage(bt, BtLockWrite, prev->latch);
2454 // grab the right sibling
2456 if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2457 right->page = bt_mappage (bt, right->latch);
2461 bt_lockpage(bt, BtLockAtomic, right->latch);
2462 bt_lockpage(bt, BtLockWrite, right->latch);
2464 // and pull contents over empty page
2465 // while preserving master's left link
2467 memcpy (right->page->left, prev->page->left, BtId);
2468 memcpy (prev->page, right->page, bt->mgr->page_size);
2470 // forward seekers to old right sibling
2471 // to new page location in set
2473 bt_putid (right->page->right, prev->latch->page_no);
2474 right->latch->dirty = 1;
2475 right->page->kill = 1;
2477 // remove pointer to right page for searchers by
2478 // changing right fence key to point to the master page
2480 ptr = keyptr(right->page,right->page->cnt);
2481 bt_putid (value, prev->latch->page_no);
2483 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2486 // now that master page is in good shape we can
2487 // remove its locks.
2489 bt_unlockpage (bt, BtLockAtomic, prev->latch);
2490 bt_unlockpage (bt, BtLockWrite, prev->latch);
2492 // fix master's right sibling's left pointer
2493 // to remove scanner's poiner to the right page
2495 if( right_page_no = bt_getid (prev->page->right) ) {
2496 if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2497 temp->page = bt_mappage (bt, temp->latch);
2499 bt_lockpage (bt, BtLockWrite, temp->latch);
2500 bt_putid (temp->page->left, prev->latch->page_no);
2501 temp->latch->dirty = 1;
2503 bt_unlockpage (bt, BtLockWrite, temp->latch);
2504 bt_unpinlatch (temp->latch);
2505 } else { // master is now the far right page
2506 bt_spinwritelock (bt->mgr->lock);
2507 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2508 bt_spinreleasewrite(bt->mgr->lock);
2511 // now that there are no pointers to the right page
2512 // we can delete it after the last read access occurs
2514 bt_unlockpage (bt, BtLockWrite, right->latch);
2515 bt_unlockpage (bt, BtLockAtomic, right->latch);
2516 bt_lockpage (bt, BtLockDelete, right->latch);
2517 bt_lockpage (bt, BtLockWrite, right->latch);
2518 bt_freepage (bt, right);
2522 // atomic modification of a batch of keys.
2524 // return -1 if BTERR is set
2525 // otherwise return slot number
2526 // causing the key constraint violation
2527 // or zero on successful completion.
2529 int bt_atomictxn (BtDb *bt, BtPage source)
2531 uint src, idx, slot, samepage, entry;
2532 AtomicKey *head, *tail, *leaf;
2533 BtPageSet set[1], prev[1];
2534 unsigned char value[BtId];
2535 BtKey *key, *ptr, *key2;
2545 locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2549 // stable sort the list of keys into order to
2550 // prevent deadlocks between threads.
2552 for( src = 1; src++ < source->cnt; ) {
2553 *temp = *slotptr(source,src);
2554 key = keyptr (source,src);
2556 for( idx = src; --idx; ) {
2557 key2 = keyptr (source,idx);
2558 if( keycmp (key, key2->key, key2->len) < 0 ) {
2559 *slotptr(source,idx+1) = *slotptr(source,idx);
2560 *slotptr(source,idx) = *temp;
2566 // Load the leaf page for each key
2567 // group same page references with reuse bit
2568 // and determine any constraint violations
2570 for( src = 0; src++ < source->cnt; ) {
2571 key = keyptr(source, src);
2574 // first determine if this modification falls
2575 // on the same page as the previous modification
2576 // note that the far right leaf page is a special case
2578 if( samepage = src > 1 )
2579 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2580 slot = bt_findslot(set->page, key->key, key->len);
2581 else // release read on previous page
2582 bt_unlockpage(bt, BtLockRead, set->latch);
2585 if( slot = bt_atomicload(bt, set, key->key, key->len) )
2586 set->latch->split = 0;
2590 if( slotptr(set->page, slot)->type == Librarian )
2591 ptr = keyptr(set->page, ++slot);
2593 ptr = keyptr(set->page, slot);
2596 locks[src].entry = set->latch->entry;
2597 locks[src].slot = slot;
2598 locks[src].reuse = 0;
2600 locks[src].entry = 0;
2601 locks[src].slot = 0;
2602 locks[src].reuse = 1;
2605 switch( slotptr(source, src)->type ) {
2608 if( !slotptr(set->page, slot)->dead )
2609 if( slot < set->page->cnt || bt_getid (set->page->right) )
2610 if( !keycmp (ptr, key->key, key->len) ) {
2612 // return constraint violation if key already exists
2614 bt_unlockpage(bt, BtLockRead, set->latch);
2618 if( locks[src].entry ) {
2619 set->latch = bt->mgr->latchsets + locks[src].entry;
2620 bt_unlockpage(bt, BtLockAtomic, set->latch);
2621 bt_unpinlatch (set->latch);
2632 // unlock last loadpage lock
2634 if( source->cnt > 1 )
2635 bt_unlockpage(bt, BtLockRead, set->latch);
2637 // obtain write lock for each master page
2639 for( src = 0; src++ < source->cnt; )
2640 if( locks[src].reuse )
2643 bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2645 // insert or delete each key
2646 // process any splits or merges
2647 // release Write & Atomic latches
2648 // set ParentModifications and build
2649 // queue of keys to insert for split pages
2650 // or delete for deleted pages.
2652 // run through txn list backwards
2654 samepage = source->cnt + 1;
2656 for( src = source->cnt; src; src-- ) {
2657 if( locks[src].reuse )
2660 // perform the txn operations
2661 // from smaller to larger on
2664 for( idx = src; idx < samepage; idx++ )
2665 switch( slotptr(source,idx)->type ) {
2667 if( bt_atomicdelete (bt, source, locks, idx) )
2673 if( bt_atomicinsert (bt, source, locks, idx) )
2678 // after the same page operations have finished,
2679 // process master page for splits or deletion.
2681 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2682 prev->page = bt_mappage (bt, prev->latch);
2685 // pick-up all splits from master page
2686 // each one is already WriteLocked.
2688 entry = prev->latch->split;
2691 set->latch = bt->mgr->latchsets + entry;
2692 set->page = bt_mappage (bt, set->latch);
2693 entry = set->latch->split;
2695 // delete empty master page by undoing its split
2696 // (this is potentially another empty page)
2697 // note that there are no new left pointers yet
2699 if( !prev->page->act ) {
2700 memcpy (set->page->left, prev->page->left, BtId);
2701 memcpy (prev->page, set->page, bt->mgr->page_size);
2702 bt_lockpage (bt, BtLockDelete, set->latch);
2703 bt_freepage (bt, set);
2705 prev->latch->dirty = 1;
2709 // remove empty page from the split chain
2711 if( !set->page->act ) {
2712 memcpy (prev->page->right, set->page->right, BtId);
2713 prev->latch->split = set->latch->split;
2714 bt_lockpage (bt, BtLockDelete, set->latch);
2715 bt_freepage (bt, set);
2719 // schedule prev fence key update
2721 ptr = keyptr(prev->page,prev->page->cnt);
2722 leaf = calloc (sizeof(AtomicKey), 1);
2724 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2725 leaf->page_no = prev->latch->page_no;
2726 leaf->entry = prev->latch->entry;
2736 // splice in the left link into the split page
2738 bt_putid (set->page->left, prev->latch->page_no);
2739 bt_lockpage(bt, BtLockParent, prev->latch);
2740 bt_unlockpage(bt, BtLockWrite, prev->latch);
2744 // update left pointer in next right page from last split page
2745 // (if all splits were reversed, latch->split == 0)
2747 if( latch->split ) {
2748 // fix left pointer in master's original (now split)
2749 // far right sibling or set rightmost page in page zero
2751 if( right = bt_getid (prev->page->right) ) {
2752 if( set->latch = bt_pinlatch (bt, right, 1) )
2753 set->page = bt_mappage (bt, set->latch);
2757 bt_lockpage (bt, BtLockWrite, set->latch);
2758 bt_putid (set->page->left, prev->latch->page_no);
2759 set->latch->dirty = 1;
2760 bt_unlockpage (bt, BtLockWrite, set->latch);
2761 bt_unpinlatch (set->latch);
2762 } else { // prev is rightmost page
2763 bt_spinwritelock (bt->mgr->lock);
2764 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2765 bt_spinreleasewrite(bt->mgr->lock);
2768 // Process last page split in chain
2770 ptr = keyptr(prev->page,prev->page->cnt);
2771 leaf = calloc (sizeof(AtomicKey), 1);
2773 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2774 leaf->page_no = prev->latch->page_no;
2775 leaf->entry = prev->latch->entry;
2785 bt_lockpage(bt, BtLockParent, prev->latch);
2786 bt_unlockpage(bt, BtLockWrite, prev->latch);
2788 // remove atomic lock on master page
2790 bt_unlockpage(bt, BtLockAtomic, latch);
2794 // finished if prev page occupied (either master or final split)
2796 if( prev->page->act ) {
2797 bt_unlockpage(bt, BtLockWrite, latch);
2798 bt_unlockpage(bt, BtLockAtomic, latch);
2799 bt_unpinlatch(latch);
2803 // any and all splits were reversed, and the
2804 // master page located in prev is empty, delete it
2805 // by pulling over master's right sibling.
2807 // Remove empty master's fence key
2809 ptr = keyptr(prev->page,prev->page->cnt);
2811 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2814 // perform the remainder of the delete
2815 // from the FIFO queue
2817 leaf = calloc (sizeof(AtomicKey), 1);
2819 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2820 leaf->page_no = prev->latch->page_no;
2821 leaf->entry = prev->latch->entry;
2832 // leave atomic lock in place until
2833 // deletion completes in next phase.
2835 bt_unlockpage(bt, BtLockWrite, prev->latch);
2838 // add & delete keys for any pages split or merged during transaction
2842 set->latch = bt->mgr->latchsets + leaf->entry;
2843 set->page = bt_mappage (bt, set->latch);
2845 bt_putid (value, leaf->page_no);
2846 ptr = (BtKey *)leaf->leafkey;
2848 switch( leaf->type ) {
2849 case 0: // insert key
2850 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2855 case 1: // delete key
2856 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2861 case 2: // free page
2862 if( bt_atomicfree (bt, set) )
2868 if( !leaf->nounlock )
2869 bt_unlockpage (bt, BtLockParent, set->latch);
2871 bt_unpinlatch (set->latch);
2874 } while( leaf = tail );
2882 // set cursor to highest slot on highest page
2884 uint bt_lastkey (BtDb *bt)
2886 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2889 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2890 set->page = bt_mappage (bt, set->latch);
2894 bt_lockpage(bt, BtLockRead, set->latch);
2895 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2896 bt_unlockpage(bt, BtLockRead, set->latch);
2897 bt_unpinlatch (set->latch);
2899 bt->cursor_page = page_no;
2900 return bt->cursor->cnt;
2903 // return previous slot on cursor page
2905 uint bt_prevkey (BtDb *bt, uint slot)
2907 uid ourright, next, us = bt->cursor_page;
2913 ourright = bt_getid(bt->cursor->right);
2916 if( !(next = bt_getid(bt->cursor->left)) )
2920 bt->cursor_page = next;
2922 if( set->latch = bt_pinlatch (bt, next, 1) )
2923 set->page = bt_mappage (bt, set->latch);
2927 bt_lockpage(bt, BtLockRead, set->latch);
2928 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2929 bt_unlockpage(bt, BtLockRead, set->latch);
2930 bt_unpinlatch (set->latch);
2932 next = bt_getid (bt->cursor->right);
2934 if( bt->cursor->kill )
2938 if( next == ourright )
2943 return bt->cursor->cnt;
2946 // return next slot on cursor page
2947 // or slide cursor right into next page
2949 uint bt_nextkey (BtDb *bt, uint slot)
2955 right = bt_getid(bt->cursor->right);
2957 while( slot++ < bt->cursor->cnt )
2958 if( slotptr(bt->cursor,slot)->dead )
2960 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2968 bt->cursor_page = right;
2970 if( set->latch = bt_pinlatch (bt, right, 1) )
2971 set->page = bt_mappage (bt, set->latch);
2975 bt_lockpage(bt, BtLockRead, set->latch);
2977 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2979 bt_unlockpage(bt, BtLockRead, set->latch);
2980 bt_unpinlatch (set->latch);
2988 // cache page of keys into cursor and return starting slot for given key
2990 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2995 // cache page for retrieval
2997 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2998 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3002 bt->cursor_page = set->latch->page_no;
3004 bt_unlockpage(bt, BtLockRead, set->latch);
3005 bt_unpinlatch (set->latch);
3009 BtKey *bt_key(BtDb *bt, uint slot)
3011 return keyptr(bt->cursor, slot);
3014 BtVal *bt_val(BtDb *bt, uint slot)
3016 return valptr(bt->cursor,slot);
3022 double getCpuTime(int type)
3025 FILETIME xittime[1];
3026 FILETIME systime[1];
3027 FILETIME usrtime[1];
3028 SYSTEMTIME timeconv[1];
3031 memset (timeconv, 0, sizeof(SYSTEMTIME));
3035 GetSystemTimeAsFileTime (xittime);
3036 FileTimeToSystemTime (xittime, timeconv);
3037 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3040 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3041 FileTimeToSystemTime (usrtime, timeconv);
3044 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3045 FileTimeToSystemTime (systime, timeconv);
3049 ans += (double)timeconv->wHour * 3600;
3050 ans += (double)timeconv->wMinute * 60;
3051 ans += (double)timeconv->wSecond;
3052 ans += (double)timeconv->wMilliseconds / 1000;
3057 #include <sys/resource.h>
3059 double getCpuTime(int type)
3061 struct rusage used[1];
3062 struct timeval tv[1];
3066 gettimeofday(tv, NULL);
3067 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3070 getrusage(RUSAGE_SELF, used);
3071 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3074 getrusage(RUSAGE_SELF, used);
3075 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3082 void bt_poolaudit (BtMgr *mgr)
3087 while( slot++ < mgr->latchdeployed ) {
3088 latch = mgr->latchsets + slot;
3090 if( *latch->readwr->rin & MASK )
3091 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3092 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3094 if( *latch->access->rin & MASK )
3095 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3096 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3098 if( *latch->parent->ticket != *latch->parent->serving )
3099 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3100 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3102 if( latch->pin & ~CLOCK_bit ) {
3103 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3109 uint bt_latchaudit (BtDb *bt)
3111 ushort idx, hashidx;
3117 if( *(ushort *)(bt->mgr->lock) )
3118 fprintf(stderr, "Alloc page locked\n");
3119 *(ushort *)(bt->mgr->lock) = 0;
3121 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3122 latch = bt->mgr->latchsets + idx;
3123 if( *latch->readwr->rin & MASK )
3124 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3125 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3127 if( *latch->access->rin & MASK )
3128 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3129 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3131 if( *latch->parent->ticket != *latch->parent->serving )
3132 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3133 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3136 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3141 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3142 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3143 fprintf(stderr, "hash entry %d locked\n", hashidx);
3145 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3147 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3148 latch = bt->mgr->latchsets + idx;
3150 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3151 } while( idx = latch->next );
3154 page_no = LEAF_page;
3156 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3157 uid off = page_no << bt->mgr->page_bits;
3159 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3163 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3165 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3166 return bt->err = BTERR_map;
3168 if( *amt < bt->mgr->page_size )
3169 return bt->err = BTERR_map;
3171 if( !bt->frame->free && !bt->frame->lvl )
3172 cnt += bt->frame->act;
3176 cnt--; // remove stopper key
3177 fprintf(stderr, " Total keys read %d\n", cnt);
3191 // standalone program to index file of keys
3192 // then list them onto std-out
3195 void *index_file (void *arg)
3197 uint __stdcall index_file (void *arg)
3200 int line = 0, found = 0, cnt = 0, idx;
3201 uid next, page_no = LEAF_page; // start on first page of leaves
3202 int ch, len = 0, slot, type = 0;
3203 unsigned char key[BT_maxkey];
3204 unsigned char txn[65536];
3205 ThreadArg *args = arg;
3214 bt = bt_open (args->mgr);
3217 if( args->idx < strlen (args->type) )
3218 ch = args->type[args->idx];
3220 ch = args->type[strlen(args->type) - 1];
3225 fprintf(stderr, "started latch mgr audit\n");
3226 cnt = bt_latchaudit (bt);
3227 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3238 if( type == Delete )
3239 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3241 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3243 if( type == Delete )
3244 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3246 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3248 if( in = fopen (args->infile, "rb") )
3249 while( ch = getc(in), ch != EOF )
3255 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3256 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3262 memcpy (txn + nxt, key + 10, len - 10);
3264 txn[nxt] = len - 10;
3266 memcpy (txn + nxt, key, 10);
3269 slotptr(page,++cnt)->off = nxt;
3270 slotptr(page,cnt)->type = type;
3273 if( cnt < args->num )
3280 if( bt_atomictxn (bt, page) )
3281 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3286 else if( len < BT_maxkey )
3288 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3292 fprintf(stderr, "started indexing for %s\n", args->infile);
3293 if( in = fopen (args->infile, "r") )
3294 while( ch = getc(in), ch != EOF )
3299 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3300 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3303 else if( len < BT_maxkey )
3305 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3309 fprintf(stderr, "started finding keys for %s\n", args->infile);
3310 if( in = fopen (args->infile, "rb") )
3311 while( ch = getc(in), ch != EOF )
3315 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3318 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3321 else if( len < BT_maxkey )
3323 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3327 fprintf(stderr, "started scanning\n");
3329 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3330 set->page = bt_mappage (bt, set->latch);
3332 fprintf(stderr, "unable to obtain latch"), exit(1);
3333 bt_lockpage (bt, BtLockRead, set->latch);
3334 next = bt_getid (set->page->right);
3336 for( slot = 0; slot++ < set->page->cnt; )
3337 if( next || slot < set->page->cnt )
3338 if( !slotptr(set->page, slot)->dead ) {
3339 ptr = keyptr(set->page, slot);
3342 if( slotptr(set->page, slot)->type == Duplicate )
3345 fwrite (ptr->key, len, 1, stdout);
3346 val = valptr(set->page, slot);
3347 fwrite (val->value, val->len, 1, stdout);
3348 fputc ('\n', stdout);
3352 bt_unlockpage (bt, BtLockRead, set->latch);
3353 bt_unpinlatch (set->latch);
3354 } while( page_no = next );
3356 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3360 fprintf(stderr, "started reverse scan\n");
3361 if( slot = bt_lastkey (bt) )
3362 while( slot = bt_prevkey (bt, slot) ) {
3363 if( slotptr(bt->cursor, slot)->dead )
3366 ptr = keyptr(bt->cursor, slot);
3369 if( slotptr(bt->cursor, slot)->type == Duplicate )
3372 fwrite (ptr->key, len, 1, stdout);
3373 val = valptr(bt->cursor, slot);
3374 fwrite (val->value, val->len, 1, stdout);
3375 fputc ('\n', stdout);
3379 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3384 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3386 fprintf(stderr, "started counting\n");
3387 page_no = LEAF_page;
3389 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3390 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3393 if( !bt->frame->free && !bt->frame->lvl )
3394 cnt += bt->frame->act;
3400 cnt--; // remove stopper key
3401 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3413 typedef struct timeval timer;
3415 int main (int argc, char **argv)
3417 int idx, cnt, len, slot, err;
3418 int segsize, bits = 16;
3435 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3436 fprintf (stderr, " where idx_file is the name of the btree file\n");
3437 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3438 fprintf (stderr, " page_bits is the page size in bits\n");
3439 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3440 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3441 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3445 start = getCpuTime(0);
3448 bits = atoi(argv[3]);
3451 poolsize = atoi(argv[4]);
3454 fprintf (stderr, "Warning: no mapped_pool\n");
3457 num = atoi(argv[5]);
3461 threads = malloc (cnt * sizeof(pthread_t));
3463 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3465 args = malloc (cnt * sizeof(ThreadArg));
3467 mgr = bt_mgr ((argv[1]), bits, poolsize);
3470 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3476 for( idx = 0; idx < cnt; idx++ ) {
3477 args[idx].infile = argv[idx + 6];
3478 args[idx].type = argv[2];
3479 args[idx].mgr = mgr;
3480 args[idx].num = num;
3481 args[idx].idx = idx;
3483 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3484 fprintf(stderr, "Error creating thread %d\n", err);
3486 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3490 // wait for termination
3493 for( idx = 0; idx < cnt; idx++ )
3494 pthread_join (threads[idx], NULL);
3496 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3498 for( idx = 0; idx < cnt; idx++ )
3499 CloseHandle(threads[idx]);
3505 elapsed = getCpuTime(0) - start;
3506 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3507 elapsed = getCpuTime(1);
3508 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3509 elapsed = getCpuTime(2);
3510 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);