1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and ACID batched key-value updates
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
88 There are six lock types for each node in four independent sets:
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
106 // definition for phase-fair reader/writer lock implementation
115 // write only queue lock
127 // definition for spin latch implementation
129 // exclusive is set for write access
130 // share is count of read accessors
131 // grant write lock when share == 0
133 volatile typedef struct {
144 // hash table entries
147 volatile uint slot; // Latch table entry at head of chain
148 BtSpinLatch latch[1];
151 // latch manager table structure
154 uid page_no; // latch set page number
155 RWLock readwr[1]; // read/write page lock
156 RWLock access[1]; // Access Intent/Page delete
157 WOLock parent[1]; // Posting of fence key in parent
158 WOLock atomic[1]; // Atomic update in progress
159 uint split; // right split page atomic insert
160 uint entry; // entry slot in latch table
161 uint next; // next entry in hash table chain
162 uint prev; // prev entry in hash table chain
163 volatile ushort pin; // number of outstanding threads
164 ushort dirty:1; // page in cache is dirty
167 // Define the length of the page record numbers
171 // Page key slot definition.
173 // Keys are marked dead, but remain on the page until
174 // it cleanup is called. The fence key (highest key) for
175 // a leaf page is always present, even after cleanup.
179 // In addition to the Unique keys that occupy slots
180 // there are Librarian and Duplicate key
181 // slots occupying the key slot array.
183 // The Librarian slots are dead keys that
184 // serve as filler, available to add new Unique
185 // or Dup slots that are inserted into the B-tree.
187 // The Duplicate slots have had their key bytes extended
188 // by 6 bytes to contain a binary duplicate key uniqueifier.
199 uint off:BT_maxbits; // page offset for key start
200 uint type:3; // type of slot
201 uint dead:1; // set for deleted slot
204 // The key structure occupies space at the upper end of
205 // each page. It's a length byte followed by the key
209 unsigned char len; // this can be changed to a ushort or uint
210 unsigned char key[0];
213 // the value structure also occupies space at the upper
214 // end of the page. Each key is immediately followed by a value.
217 unsigned char len; // this can be changed to a ushort or uint
218 unsigned char value[0];
221 #define BT_maxkey 255 // maximum number of bytes in a key
222 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
224 // The first part of an index page.
225 // It is immediately followed
226 // by the BtSlot array of keys.
228 // note that this structure size
229 // must be a multiple of 8 bytes
230 // in order to place dups correctly.
232 typedef struct BtPage_ {
233 uint cnt; // count of keys in page
234 uint act; // count of active keys
235 uint min; // next key offset
236 uint garbage; // page garbage in bytes
237 unsigned char bits:7; // page size in bits
238 unsigned char free:1; // page is on free chain
239 unsigned char lvl:7; // level of page
240 unsigned char kill:1; // page is being deleted
241 unsigned char right[BtId]; // page number to right
242 unsigned char left[BtId]; // page number to left
243 unsigned char filler[2]; // padding to multiple of 8
246 // The loadpage interface object
249 BtPage page; // current page pointer
250 BtLatchSet *latch; // current page latch set
253 // structure for latch manager on ALLOC_page
256 struct BtPage_ alloc[1]; // next page_no in right ptr
257 unsigned long long dups[1]; // global duplicate key uniqueifier
258 unsigned char chain[BtId]; // head of free page_nos chain
261 // The object structure for Btree access
264 uint page_size; // page size
265 uint page_bits; // page size in bits
271 BtPageZero *pagezero; // mapped allocation page
272 BtSpinLatch lock[1]; // allocation area lite latch
273 uint latchdeployed; // highest number of latch entries deployed
274 uint nlatchpage; // number of latch pages at BT_latch
275 uint latchtotal; // number of page latch entries
276 uint latchhash; // number of latch hash table slots
277 uint latchvictim; // next latch entry to examine
278 ushort thread_no[1]; // next thread number
279 BtHashEntry *hashtable; // the buffer pool hash table entries
280 BtLatchSet *latchsets; // mapped latch set from buffer pool
281 unsigned char *pagepool; // mapped to the buffer pool pages
283 HANDLE halloc; // allocation handle
284 HANDLE hpool; // buffer pool handle
289 BtMgr *mgr; // buffer manager for thread
290 BtPage cursor; // cached frame for start/next (never mapped)
291 BtPage frame; // spare frame for the page split (never mapped)
292 uid cursor_page; // current cursor page number
293 unsigned char *mem; // frame, cursor, page memory buffer
294 unsigned char key[BT_keyarray]; // last found complete key
295 int found; // last delete or insert was found
296 int err; // last error
297 int reads, writes; // number of reads and writes from the btree
298 ushort thread_no; // thread number
312 #define CLOCK_bit 0x8000
315 extern void bt_close (BtDb *bt);
316 extern BtDb *bt_open (BtMgr *mgr);
317 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
318 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
319 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
320 extern BtKey *bt_foundkey (BtDb *bt);
321 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
322 extern uint bt_nextkey (BtDb *bt, uint slot);
325 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
326 void bt_mgrclose (BtMgr *mgr);
328 // Helper functions to return slot values
329 // from the cursor page.
331 extern BtKey *bt_key (BtDb *bt, uint slot);
332 extern BtVal *bt_val (BtDb *bt, uint slot);
334 // The page is allocated from low and hi ends.
335 // The key slots are allocated from the bottom,
336 // while the text and value of the key
337 // are allocated from the top. When the two
338 // areas meet, the page is split into two.
340 // A key consists of a length byte, two bytes of
341 // index number (0 - 65535), and up to 253 bytes
344 // Associated with each key is a value byte string
345 // containing any value desired.
347 // The b-tree root is always located at page 1.
348 // The first leaf page of level zero is always
349 // located on page 2.
351 // The b-tree pages are linked with next
352 // pointers to facilitate enumerators,
353 // and provide for concurrency.
355 // When to root page fills, it is split in two and
356 // the tree height is raised by a new root at page
357 // one with two keys.
359 // Deleted keys are marked with a dead bit until
360 // page cleanup. The fence key for a leaf node is
363 // To achieve maximum concurrency one page is locked at a time
364 // as the tree is traversed to find leaf key in question. The right
365 // page numbers are used in cases where the page is being split,
368 // Page 0 is dedicated to lock for new page extensions,
369 // and chains empty pages together for reuse. It also
370 // contains the latch manager hash table.
372 // The ParentModification lock on a node is obtained to serialize posting
373 // or changing the fence key for a node.
375 // Empty pages are chained together through the ALLOC page and reused.
377 // Access macros to address slot and key values from the page
378 // Page slots use 1 based indexing.
380 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
381 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
382 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
384 void bt_putid(unsigned char *dest, uid id)
389 dest[i] = (unsigned char)id, id >>= 8;
392 uid bt_getid(unsigned char *src)
397 for( i = 0; i < BtId; i++ )
398 id <<= 8, id |= *src++;
403 uid bt_newdup (BtDb *bt)
406 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
408 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
412 // Write-Only Queue Lock
414 void WriteOLock (WOLock *lock)
418 tix = __sync_fetch_and_add (lock->ticket, 1);
420 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
422 // wait for our ticket to come up
424 while( tix != lock->serving[0] )
432 void WriteORelease (WOLock *lock)
437 // Phase-Fair reader/writer lock implementation
439 void WriteLock (RWLock *lock)
444 tix = __sync_fetch_and_add (lock->ticket, 1);
446 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
448 // wait for our ticket to come up
450 while( tix != lock->serving[0] )
457 w = PRES | (tix & PHID);
459 r = __sync_fetch_and_add (lock->rin, w);
461 r = _InterlockedExchangeAdd16 (lock->rin, w);
463 while( r != *lock->rout )
471 void WriteRelease (RWLock *lock)
474 __sync_fetch_and_and (lock->rin, ~MASK);
476 _InterlockedAnd16 (lock->rin, ~MASK);
481 void ReadLock (RWLock *lock)
485 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
487 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
490 while( w == (*lock->rin & MASK) )
498 void ReadRelease (RWLock *lock)
501 __sync_fetch_and_add (lock->rout, RINC);
503 _InterlockedExchangeAdd16 (lock->rout, RINC);
507 // Spin Latch Manager
509 // wait until write lock mode is clear
510 // and add 1 to the share count
512 void bt_spinreadlock(BtSpinLatch *latch)
518 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
520 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
522 // see if exclusive request is granted or pending
527 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
529 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
532 } while( sched_yield(), 1 );
534 } while( SwitchToThread(), 1 );
538 // wait for other read and write latches to relinquish
540 void bt_spinwritelock(BtSpinLatch *latch)
546 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
548 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
551 if( !(prev & ~BOTH) )
555 __sync_fetch_and_and ((ushort *)latch, ~XCL);
557 _InterlockedAnd16((ushort *)latch, ~XCL);
560 } while( sched_yield(), 1 );
562 } while( SwitchToThread(), 1 );
566 // try to obtain write lock
568 // return 1 if obtained,
571 int bt_spinwritetry(BtSpinLatch *latch)
576 prev = __sync_fetch_and_or((ushort *)latch, XCL);
578 prev = _InterlockedOr16((ushort *)latch, XCL);
580 // take write access if all bits are clear
583 if( !(prev & ~BOTH) )
587 __sync_fetch_and_and ((ushort *)latch, ~XCL);
589 _InterlockedAnd16((ushort *)latch, ~XCL);
596 void bt_spinreleasewrite(BtSpinLatch *latch)
599 __sync_fetch_and_and((ushort *)latch, ~BOTH);
601 _InterlockedAnd16((ushort *)latch, ~BOTH);
605 // decrement reader count
607 void bt_spinreleaseread(BtSpinLatch *latch)
610 __sync_fetch_and_add((ushort *)latch, -SHARE);
612 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
616 // read page from permanent location in Btree file
618 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
620 off64_t off = page_no << mgr->page_bits;
623 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
624 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
631 memset (ovl, 0, sizeof(OVERLAPPED));
633 ovl->OffsetHigh = off >> 32;
635 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
636 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
639 if( *amt < mgr->page_size ) {
640 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
647 // write page to permanent location in Btree file
648 // clear the dirty bit
650 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
652 off64_t off = page_no << mgr->page_bits;
655 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
661 memset (ovl, 0, sizeof(OVERLAPPED));
663 ovl->OffsetHigh = off >> 32;
665 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
668 if( *amt < mgr->page_size )
674 // link latch table entry into head of latch hash table
676 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
678 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
679 BtLatchSet *latch = bt->mgr->latchsets + slot;
681 if( latch->next = bt->mgr->hashtable[hashidx].slot )
682 bt->mgr->latchsets[latch->next].prev = slot;
684 bt->mgr->hashtable[hashidx].slot = slot;
685 latch->page_no = page_no;
692 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
700 // set CLOCK bit in latch
701 // decrement pin count
703 void bt_unpinlatch (BtLatchSet *latch)
706 if( ~latch->pin & CLOCK_bit )
707 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
708 __sync_fetch_and_add(&latch->pin, -1);
710 if( ~latch->pin & CLOCK_bit )
711 _InterlockedOr16 (&latch->pin, CLOCK_bit);
712 _InterlockedDecrement16 (&latch->pin);
716 // return the btree cached page address
718 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
720 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
725 // find existing latchset or inspire new one
726 // return with latchset pinned
728 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
730 uint hashidx = page_no % bt->mgr->latchhash;
738 // try to find our entry
740 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
742 if( slot = bt->mgr->hashtable[hashidx].slot ) do
744 latch = bt->mgr->latchsets + slot;
745 if( page_no == latch->page_no )
747 } while( slot = latch->next );
753 latch = bt->mgr->latchsets + slot;
755 __sync_fetch_and_add(&latch->pin, 1);
757 _InterlockedIncrement16 (&latch->pin);
759 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
763 // see if there are any unused pool entries
765 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
767 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
770 if( slot < bt->mgr->latchtotal ) {
771 latch = bt->mgr->latchsets + slot;
772 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
774 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
779 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
781 _InterlockedDecrement (&bt->mgr->latchdeployed);
783 // find and reuse previous entry on victim
787 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
789 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
791 // try to get write lock on hash chain
792 // skip entry if not obtained
793 // or has outstanding pins
795 slot %= bt->mgr->latchtotal;
800 latch = bt->mgr->latchsets + slot;
801 idx = latch->page_no % bt->mgr->latchhash;
803 // see we are on same chain as hashidx
808 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
811 // skip this slot if it is pinned
812 // or the CLOCK bit is set
815 if( latch->pin & CLOCK_bit ) {
817 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
819 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
822 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
826 // update permanent page area in btree from buffer pool
828 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
831 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
834 latch->dirty = 0, bt->writes++;
836 // unlink our available slot from its hash chain
839 bt->mgr->latchsets[latch->prev].next = latch->next;
841 bt->mgr->hashtable[idx].slot = latch->next;
844 bt->mgr->latchsets[latch->next].prev = latch->prev;
846 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
848 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
851 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
856 void bt_mgrclose (BtMgr *mgr)
863 // flush dirty pool pages to the btree
865 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
866 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
867 latch = mgr->latchsets + slot;
870 bt_writepage(mgr, page, latch->page_no);
871 latch->dirty = 0, num++;
873 // madvise (page, mgr->page_size, MADV_DONTNEED);
876 fprintf(stderr, "%d buffer pool pages flushed\n", num);
879 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
880 munmap (mgr->pagezero, mgr->page_size);
882 FlushViewOfFile(mgr->pagezero, 0);
883 UnmapViewOfFile(mgr->pagezero);
884 UnmapViewOfFile(mgr->hashtable);
885 CloseHandle(mgr->halloc);
886 CloseHandle(mgr->hpool);
892 FlushFileBuffers(mgr->idx);
893 CloseHandle(mgr->idx);
898 // close and release memory
900 void bt_close (BtDb *bt)
907 VirtualFree (bt->mem, 0, MEM_RELEASE);
912 // open/create new btree buffer manager
914 // call with file_name, BT_openmode, bits in page size (e.g. 16),
915 // size of page pool (e.g. 262144)
917 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
919 uint lvl, attr, last, slot, idx;
920 uint nlatchpage, latchhash;
921 unsigned char value[BtId];
922 int flag, initit = 0;
923 BtPageZero *pagezero;
930 // determine sanity of page size and buffer pool
932 if( bits > BT_maxbits )
934 else if( bits < BT_minbits )
938 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
943 mgr = calloc (1, sizeof(BtMgr));
945 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
947 if( mgr->idx == -1 ) {
948 fprintf (stderr, "Unable to open btree file\n");
949 return free(mgr), NULL;
952 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
953 attr = FILE_ATTRIBUTE_NORMAL;
954 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
956 if( mgr->idx == INVALID_HANDLE_VALUE )
957 return GlobalFree(mgr), NULL;
961 pagezero = valloc (BT_maxpage);
964 // read minimum page size to get root info
965 // to support raw disk partition files
966 // check if bits == 0 on the disk.
968 if( size = lseek (mgr->idx, 0L, 2) )
969 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
970 if( pagezero->alloc->bits )
971 bits = pagezero->alloc->bits;
975 return free(mgr), free(pagezero), NULL;
979 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
980 size = GetFileSize(mgr->idx, amt);
983 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
984 return bt_mgrclose (mgr), NULL;
985 bits = pagezero->alloc->bits;
990 mgr->page_size = 1 << bits;
991 mgr->page_bits = bits;
993 // calculate number of latch hash table entries
995 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
996 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
998 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
999 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1000 mgr->latchtotal = nodemax;
1005 // initialize an empty b-tree with latch page, root page, page of leaves
1006 // and page(s) of latches and page pool cache
1008 memset (pagezero, 0, 1 << bits);
1009 pagezero->alloc->bits = mgr->page_bits;
1010 bt_putid(pagezero->alloc->right, MIN_lvl+1);
1012 // initialize left-most LEAF page in
1015 bt_putid (pagezero->alloc->left, LEAF_page);
1017 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1018 fprintf (stderr, "Unable to create btree page zero\n");
1019 return bt_mgrclose (mgr), NULL;
1022 memset (pagezero, 0, 1 << bits);
1023 pagezero->alloc->bits = mgr->page_bits;
1025 for( lvl=MIN_lvl; lvl--; ) {
1026 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1027 key = keyptr(pagezero->alloc, 1);
1028 key->len = 2; // create stopper key
1032 bt_putid(value, MIN_lvl - lvl + 1);
1033 val = valptr(pagezero->alloc, 1);
1034 val->len = lvl ? BtId : 0;
1035 memcpy (val->value, value, val->len);
1037 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1038 pagezero->alloc->lvl = lvl;
1039 pagezero->alloc->cnt = 1;
1040 pagezero->alloc->act = 1;
1042 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1043 fprintf (stderr, "Unable to create btree page zero\n");
1044 return bt_mgrclose (mgr), NULL;
1052 VirtualFree (pagezero, 0, MEM_RELEASE);
1055 // mlock the pagezero page
1057 flag = PROT_READ | PROT_WRITE;
1058 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1059 if( mgr->pagezero == MAP_FAILED ) {
1060 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1061 return bt_mgrclose (mgr), NULL;
1063 mlock (mgr->pagezero, mgr->page_size);
1065 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1066 if( mgr->hashtable == MAP_FAILED ) {
1067 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1068 return bt_mgrclose (mgr), NULL;
1071 flag = PAGE_READWRITE;
1072 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1073 if( !mgr->halloc ) {
1074 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1075 return bt_mgrclose (mgr), NULL;
1078 flag = FILE_MAP_WRITE;
1079 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1080 if( !mgr->pagezero ) {
1081 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1082 return bt_mgrclose (mgr), NULL;
1085 flag = PAGE_READWRITE;
1086 size = (uid)mgr->nlatchpage << mgr->page_bits;
1087 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1089 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1090 return bt_mgrclose (mgr), NULL;
1093 flag = FILE_MAP_WRITE;
1094 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1095 if( !mgr->hashtable ) {
1096 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1097 return bt_mgrclose (mgr), NULL;
1101 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1102 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1107 // open BTree access method
1108 // based on buffer manager
1110 BtDb *bt_open (BtMgr *mgr)
1112 BtDb *bt = malloc (sizeof(*bt));
1114 memset (bt, 0, sizeof(*bt));
1117 bt->mem = valloc (2 *mgr->page_size);
1119 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1121 bt->frame = (BtPage)bt->mem;
1122 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1124 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1126 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1131 // compare two keys, return > 0, = 0, or < 0
1132 // =0: keys are same
1135 // as the comparison value
1137 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1139 uint len1 = key1->len;
1142 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1153 // place write, read, or parent lock on requested page_no.
1155 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1159 ReadLock (latch->readwr);
1162 WriteLock (latch->readwr);
1165 ReadLock (latch->access);
1168 WriteLock (latch->access);
1171 WriteOLock (latch->parent);
1174 WriteOLock (latch->atomic);
1179 // remove write, read, or parent lock on requested page
1181 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1185 ReadRelease (latch->readwr);
1188 WriteRelease (latch->readwr);
1191 ReadRelease (latch->access);
1194 WriteRelease (latch->access);
1197 WriteORelease (latch->parent);
1200 WriteORelease (latch->atomic);
1205 // allocate a new page
1206 // return with page latched, but unlocked.
1208 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1213 // lock allocation page
1215 bt_spinwritelock(bt->mgr->lock);
1217 // use empty chain first
1218 // else allocate empty page
1220 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1221 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1222 set->page = bt_mappage (bt, set->latch);
1224 return bt->err = BTERR_struct, -1;
1226 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1227 bt_spinreleasewrite(bt->mgr->lock);
1229 memcpy (set->page, contents, bt->mgr->page_size);
1230 set->latch->dirty = 1;
1234 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1235 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1237 // unlock allocation latch
1239 bt_spinreleasewrite(bt->mgr->lock);
1241 // don't load cache from btree page
1243 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1244 set->page = bt_mappage (bt, set->latch);
1246 return bt->err = BTERR_struct;
1248 memcpy (set->page, contents, bt->mgr->page_size);
1249 set->latch->dirty = 1;
1253 // find slot in page for given key at a given level
1255 int bt_findslot (BtPage page, unsigned char *key, uint len)
1257 uint diff, higher = page->cnt, low = 1, slot;
1260 // make stopper key an infinite fence value
1262 if( bt_getid (page->right) )
1267 // low is the lowest candidate.
1268 // loop ends when they meet
1270 // higher is already
1271 // tested as .ge. the passed key.
1273 while( diff = higher - low ) {
1274 slot = low + ( diff >> 1 );
1275 if( keycmp (keyptr(page, slot), key, len) < 0 )
1278 higher = slot, good++;
1281 // return zero if key is on right link page
1283 return good ? higher : 0;
1286 // find and load page at given level for given key
1287 // leave page rd or wr locked as requested
1289 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1291 uid page_no = ROOT_page, prevpage = 0;
1292 uint drill = 0xff, slot;
1293 BtLatchSet *prevlatch;
1294 uint mode, prevmode;
1296 // start at root of btree and drill down
1299 // determine lock mode of drill level
1300 mode = (drill == lvl) ? lock : BtLockRead;
1302 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1305 // obtain access lock using lock chaining with Access mode
1307 if( page_no > ROOT_page )
1308 bt_lockpage(bt, BtLockAccess, set->latch);
1310 set->page = bt_mappage (bt, set->latch);
1312 // release & unpin parent or left sibling page
1315 bt_unlockpage(bt, prevmode, prevlatch);
1316 bt_unpinlatch (prevlatch);
1320 // obtain mode lock using lock chaining through AccessLock
1322 bt_lockpage(bt, mode, set->latch);
1324 if( set->page->free )
1325 return bt->err = BTERR_struct, 0;
1327 if( page_no > ROOT_page )
1328 bt_unlockpage(bt, BtLockAccess, set->latch);
1330 // re-read and re-lock root after determining actual level of root
1332 if( set->page->lvl != drill) {
1333 if( set->latch->page_no != ROOT_page )
1334 return bt->err = BTERR_struct, 0;
1336 drill = set->page->lvl;
1338 if( lock != BtLockRead && drill == lvl ) {
1339 bt_unlockpage(bt, mode, set->latch);
1340 bt_unpinlatch (set->latch);
1345 prevpage = set->latch->page_no;
1346 prevlatch = set->latch;
1349 // find key on page at this level
1350 // and descend to requested level
1352 if( !set->page->kill )
1353 if( slot = bt_findslot (set->page, key, len) ) {
1357 // find next non-dead slot -- the fence key if nothing else
1359 while( slotptr(set->page, slot)->dead )
1360 if( slot++ < set->page->cnt )
1363 return bt->err = BTERR_struct, 0;
1365 page_no = bt_getid(valptr(set->page, slot)->value);
1370 // or slide right into next page
1372 page_no = bt_getid(set->page->right);
1375 // return error on end of right chain
1377 bt->err = BTERR_struct;
1378 return 0; // return error
1381 // return page to free list
1382 // page must be delete & write locked
1384 void bt_freepage (BtDb *bt, BtPageSet *set)
1386 // lock allocation page
1388 bt_spinwritelock (bt->mgr->lock);
1392 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1393 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1394 set->latch->dirty = 1;
1395 set->page->free = 1;
1397 // unlock released page
1399 bt_unlockpage (bt, BtLockDelete, set->latch);
1400 bt_unlockpage (bt, BtLockWrite, set->latch);
1401 bt_unpinlatch (set->latch);
1403 // unlock allocation page
1405 bt_spinreleasewrite (bt->mgr->lock);
1408 // a fence key was deleted from a page
1409 // push new fence value upwards
1411 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1413 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1414 unsigned char value[BtId];
1418 // remove the old fence value
1420 ptr = keyptr(set->page, set->page->cnt);
1421 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1422 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1423 set->latch->dirty = 1;
1425 // cache new fence value
1427 ptr = keyptr(set->page, set->page->cnt);
1428 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1430 bt_lockpage (bt, BtLockParent, set->latch);
1431 bt_unlockpage (bt, BtLockWrite, set->latch);
1433 // insert new (now smaller) fence key
1435 bt_putid (value, set->latch->page_no);
1436 ptr = (BtKey*)leftkey;
1438 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1441 // now delete old fence key
1443 ptr = (BtKey*)rightkey;
1445 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1448 bt_unlockpage (bt, BtLockParent, set->latch);
1449 bt_unpinlatch(set->latch);
1453 // root has a single child
1454 // collapse a level from the tree
1456 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1462 // find the child entry and promote as new root contents
1465 for( idx = 0; idx++ < root->page->cnt; )
1466 if( !slotptr(root->page, idx)->dead )
1469 page_no = bt_getid (valptr(root->page, idx)->value);
1471 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1472 child->page = bt_mappage (bt, child->latch);
1476 bt_lockpage (bt, BtLockDelete, child->latch);
1477 bt_lockpage (bt, BtLockWrite, child->latch);
1479 memcpy (root->page, child->page, bt->mgr->page_size);
1480 root->latch->dirty = 1;
1482 bt_freepage (bt, child);
1484 } while( root->page->lvl > 1 && root->page->act == 1 );
1486 bt_unlockpage (bt, BtLockWrite, root->latch);
1487 bt_unpinlatch (root->latch);
1491 // delete a page and manage keys
1492 // call with page writelocked
1493 // returns with page unpinned
1495 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1497 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1498 unsigned char value[BtId];
1499 uint lvl = set->page->lvl;
1504 // cache copy of fence key
1505 // to post in parent
1507 ptr = keyptr(set->page, set->page->cnt);
1508 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1510 // obtain lock on right page
1512 page_no = bt_getid(set->page->right);
1514 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1515 right->page = bt_mappage (bt, right->latch);
1519 bt_lockpage (bt, BtLockWrite, right->latch);
1521 // cache copy of key to update
1523 ptr = keyptr(right->page, right->page->cnt);
1524 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1526 if( right->page->kill )
1527 return bt->err = BTERR_struct;
1529 // pull contents of right peer into our empty page
1531 memcpy (set->page, right->page, bt->mgr->page_size);
1532 set->latch->dirty = 1;
1534 // mark right page deleted and point it to left page
1535 // until we can post parent updates that remove access
1536 // to the deleted page.
1538 bt_putid (right->page->right, set->latch->page_no);
1539 right->latch->dirty = 1;
1540 right->page->kill = 1;
1542 bt_lockpage (bt, BtLockParent, right->latch);
1543 bt_unlockpage (bt, BtLockWrite, right->latch);
1545 bt_lockpage (bt, BtLockParent, set->latch);
1546 bt_unlockpage (bt, BtLockWrite, set->latch);
1548 // redirect higher key directly to our new node contents
1550 bt_putid (value, set->latch->page_no);
1551 ptr = (BtKey*)higherfence;
1553 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1556 // delete old lower key to our node
1558 ptr = (BtKey*)lowerfence;
1560 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1563 // obtain delete and write locks to right node
1565 bt_unlockpage (bt, BtLockParent, right->latch);
1566 bt_lockpage (bt, BtLockDelete, right->latch);
1567 bt_lockpage (bt, BtLockWrite, right->latch);
1568 bt_freepage (bt, right);
1570 bt_unlockpage (bt, BtLockParent, set->latch);
1571 bt_unpinlatch (set->latch);
1576 // find and delete key on page by marking delete flag bit
1577 // if page becomes empty, delete it from the btree
1579 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1581 uint slot, idx, found, fence;
1586 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1587 ptr = keyptr(set->page, slot);
1591 // if librarian slot, advance to real slot
1593 if( slotptr(set->page, slot)->type == Librarian )
1594 ptr = keyptr(set->page, ++slot);
1596 fence = slot == set->page->cnt;
1598 // if key is found delete it, otherwise ignore request
1600 if( found = !keycmp (ptr, key, len) )
1601 if( found = slotptr(set->page, slot)->dead == 0 ) {
1602 val = valptr(set->page,slot);
1603 slotptr(set->page, slot)->dead = 1;
1604 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1607 // collapse empty slots beneath the fence
1609 while( idx = set->page->cnt - 1 )
1610 if( slotptr(set->page, idx)->dead ) {
1611 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1612 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1617 // did we delete a fence key in an upper level?
1619 if( found && lvl && set->page->act && fence )
1620 if( bt_fixfence (bt, set, lvl) )
1623 return bt->found = found, 0;
1625 // do we need to collapse root?
1627 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1628 if( bt_collapseroot (bt, set) )
1631 return bt->found = found, 0;
1633 // delete empty page
1635 if( !set->page->act )
1636 return bt_deletepage (bt, set);
1638 set->latch->dirty = 1;
1639 bt_unlockpage(bt, BtLockWrite, set->latch);
1640 bt_unpinlatch (set->latch);
1641 return bt->found = found, 0;
1644 BtKey *bt_foundkey (BtDb *bt)
1646 return (BtKey*)(bt->key);
1649 // advance to next slot
1651 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1653 BtLatchSet *prevlatch;
1656 if( slot < set->page->cnt )
1659 prevlatch = set->latch;
1661 if( page_no = bt_getid(set->page->right) )
1662 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1663 set->page = bt_mappage (bt, set->latch);
1667 return bt->err = BTERR_struct, 0;
1669 // obtain access lock using lock chaining with Access mode
1671 bt_lockpage(bt, BtLockAccess, set->latch);
1673 bt_unlockpage(bt, BtLockRead, prevlatch);
1674 bt_unpinlatch (prevlatch);
1676 bt_lockpage(bt, BtLockRead, set->latch);
1677 bt_unlockpage(bt, BtLockAccess, set->latch);
1681 // find unique key or first duplicate key in
1682 // leaf level and return number of value bytes
1683 // or (-1) if not found. Setup key for bt_foundkey
1685 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1693 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1695 ptr = keyptr(set->page, slot);
1697 // skip librarian slot place holder
1699 if( slotptr(set->page, slot)->type == Librarian )
1700 ptr = keyptr(set->page, ++slot);
1702 // return actual key found
1704 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1707 if( slotptr(set->page, slot)->type == Duplicate )
1710 // not there if we reach the stopper key
1712 if( slot == set->page->cnt )
1713 if( !bt_getid (set->page->right) )
1716 // if key exists, return >= 0 value bytes copied
1717 // otherwise return (-1)
1719 if( slotptr(set->page, slot)->dead )
1723 if( !memcmp (ptr->key, key, len) ) {
1724 val = valptr (set->page,slot);
1725 if( valmax > val->len )
1727 memcpy (value, val->value, valmax);
1733 } while( slot = bt_findnext (bt, set, slot) );
1735 bt_unlockpage (bt, BtLockRead, set->latch);
1736 bt_unpinlatch (set->latch);
1740 // check page for space available,
1741 // clean if necessary and return
1742 // 0 - page needs splitting
1743 // >0 new slot value
1745 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1747 uint nxt = bt->mgr->page_size;
1748 BtPage page = set->page;
1749 uint cnt = 0, idx = 0;
1750 uint max = page->cnt;
1755 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1758 // skip cleanup and proceed to split
1759 // if there's not enough garbage
1762 if( page->garbage < nxt / 5 )
1765 memcpy (bt->frame, page, bt->mgr->page_size);
1767 // skip page info and set rest of page to zero
1769 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1770 set->latch->dirty = 1;
1774 // clean up page first by
1775 // removing deleted keys
1777 while( cnt++ < max ) {
1780 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1783 // copy the value across
1785 val = valptr(bt->frame, cnt);
1786 nxt -= val->len + sizeof(BtVal);
1787 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1789 // copy the key across
1791 key = keyptr(bt->frame, cnt);
1792 nxt -= key->len + sizeof(BtKey);
1793 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1795 // make a librarian slot
1798 slotptr(page, ++idx)->off = nxt;
1799 slotptr(page, idx)->type = Librarian;
1800 slotptr(page, idx)->dead = 1;
1805 slotptr(page, ++idx)->off = nxt;
1806 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1808 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1815 // see if page has enough space now, or does it need splitting?
1817 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1823 // split the root and raise the height of the btree
1825 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1827 unsigned char leftkey[BT_keyarray];
1828 uint nxt = bt->mgr->page_size;
1829 unsigned char value[BtId];
1835 // save left page fence key for new root
1837 ptr = keyptr(root->page, root->page->cnt);
1838 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1840 // Obtain an empty page to use, and copy the current
1841 // root contents into it, e.g. lower keys
1843 if( bt_newpage(bt, left, root->page) )
1846 left_page_no = left->latch->page_no;
1847 bt_unpinlatch (left->latch);
1849 // preserve the page info at the bottom
1850 // of higher keys and set rest to zero
1852 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1854 // insert stopper key at top of newroot page
1855 // and increase the root height
1857 nxt -= BtId + sizeof(BtVal);
1858 bt_putid (value, right->page_no);
1859 val = (BtVal *)((unsigned char *)root->page + nxt);
1860 memcpy (val->value, value, BtId);
1863 nxt -= 2 + sizeof(BtKey);
1864 slotptr(root->page, 2)->off = nxt;
1865 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1870 // insert lower keys page fence key on newroot page as first key
1872 nxt -= BtId + sizeof(BtVal);
1873 bt_putid (value, left_page_no);
1874 val = (BtVal *)((unsigned char *)root->page + nxt);
1875 memcpy (val->value, value, BtId);
1878 ptr = (BtKey *)leftkey;
1879 nxt -= ptr->len + sizeof(BtKey);
1880 slotptr(root->page, 1)->off = nxt;
1881 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1883 bt_putid(root->page->right, 0);
1884 root->page->min = nxt; // reset lowest used offset and key count
1885 root->page->cnt = 2;
1886 root->page->act = 2;
1889 // release and unpin root pages
1891 bt_unlockpage(bt, BtLockWrite, root->latch);
1892 bt_unpinlatch (root->latch);
1894 bt_unpinlatch (right);
1898 // split already locked full node
1900 // return pool entry for new right
1903 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1905 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1906 uint lvl = set->page->lvl;
1913 // split higher half of keys to bt->frame
1915 memset (bt->frame, 0, bt->mgr->page_size);
1916 max = set->page->cnt;
1920 while( cnt++ < max ) {
1921 if( slotptr(set->page, cnt)->dead && cnt < max )
1923 src = valptr(set->page, cnt);
1924 nxt -= src->len + sizeof(BtVal);
1925 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1927 key = keyptr(set->page, cnt);
1928 nxt -= key->len + sizeof(BtKey);
1929 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1930 memcpy (ptr, key, key->len + sizeof(BtKey));
1932 // add librarian slot
1935 slotptr(bt->frame, ++idx)->off = nxt;
1936 slotptr(bt->frame, idx)->type = Librarian;
1937 slotptr(bt->frame, idx)->dead = 1;
1942 slotptr(bt->frame, ++idx)->off = nxt;
1943 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1945 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1949 bt->frame->bits = bt->mgr->page_bits;
1950 bt->frame->min = nxt;
1951 bt->frame->cnt = idx;
1952 bt->frame->lvl = lvl;
1956 if( set->latch->page_no > ROOT_page )
1957 bt_putid (bt->frame->right, bt_getid (set->page->right));
1959 // get new free page and write higher keys to it.
1961 if( bt_newpage(bt, right, bt->frame) )
1964 memcpy (bt->frame, set->page, bt->mgr->page_size);
1965 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1966 set->latch->dirty = 1;
1968 nxt = bt->mgr->page_size;
1969 set->page->garbage = 0;
1975 if( slotptr(bt->frame, max)->type == Librarian )
1978 // assemble page of smaller keys
1980 while( cnt++ < max ) {
1981 if( slotptr(bt->frame, cnt)->dead )
1983 val = valptr(bt->frame, cnt);
1984 nxt -= val->len + sizeof(BtVal);
1985 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1987 key = keyptr(bt->frame, cnt);
1988 nxt -= key->len + sizeof(BtKey);
1989 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1991 // add librarian slot
1994 slotptr(set->page, ++idx)->off = nxt;
1995 slotptr(set->page, idx)->type = Librarian;
1996 slotptr(set->page, idx)->dead = 1;
2001 slotptr(set->page, ++idx)->off = nxt;
2002 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2006 bt_putid(set->page->right, right->latch->page_no);
2007 set->page->min = nxt;
2008 set->page->cnt = idx;
2010 return right->latch->entry;
2013 // fix keys for newly split page
2014 // call with page locked, return
2017 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2019 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2020 unsigned char value[BtId];
2021 uint lvl = set->page->lvl;
2025 // if current page is the root page, split it
2027 if( set->latch->page_no == ROOT_page )
2028 return bt_splitroot (bt, set, right);
2030 ptr = keyptr(set->page, set->page->cnt);
2031 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2033 page = bt_mappage (bt, right);
2035 ptr = keyptr(page, page->cnt);
2036 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2038 // insert new fences in their parent pages
2040 bt_lockpage (bt, BtLockParent, right);
2042 bt_lockpage (bt, BtLockParent, set->latch);
2043 bt_unlockpage (bt, BtLockWrite, set->latch);
2045 // insert new fence for reformulated left block of smaller keys
2047 bt_putid (value, set->latch->page_no);
2048 ptr = (BtKey *)leftkey;
2050 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2053 // switch fence for right block of larger keys to new right page
2055 bt_putid (value, right->page_no);
2056 ptr = (BtKey *)rightkey;
2058 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2061 bt_unlockpage (bt, BtLockParent, set->latch);
2062 bt_unpinlatch (set->latch);
2064 bt_unlockpage (bt, BtLockParent, right);
2065 bt_unpinlatch (right);
2069 // install new key and value onto page
2070 // page must already be checked for
2073 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2075 uint idx, librarian;
2080 // if found slot > desired slot and previous slot
2081 // is a librarian slot, use it
2084 if( slotptr(set->page, slot-1)->type == Librarian )
2087 // copy value onto page
2089 set->page->min -= vallen + sizeof(BtVal);
2090 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2091 memcpy (val->value, value, vallen);
2094 // copy key onto page
2096 set->page->min -= keylen + sizeof(BtKey);
2097 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2098 memcpy (ptr->key, key, keylen);
2101 // find first empty slot
2103 for( idx = slot; idx < set->page->cnt; idx++ )
2104 if( slotptr(set->page, idx)->dead )
2107 // now insert key into array before slot
2109 if( idx == set->page->cnt )
2110 idx += 2, set->page->cnt += 2, librarian = 2;
2114 set->latch->dirty = 1;
2117 while( idx > slot + librarian - 1 )
2118 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2120 // add librarian slot
2122 if( librarian > 1 ) {
2123 node = slotptr(set->page, slot++);
2124 node->off = set->page->min;
2125 node->type = Librarian;
2131 node = slotptr(set->page, slot);
2132 node->off = set->page->min;
2137 bt_unlockpage (bt, BtLockWrite, set->latch);
2138 bt_unpinlatch (set->latch);
2144 // Insert new key into the btree at given level.
2145 // either add a new key or update/add an existing one
2147 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2149 unsigned char newkey[BT_keyarray];
2150 uint slot, idx, len, entry;
2157 // set up the key we're working on
2159 ins = (BtKey*)newkey;
2160 memcpy (ins->key, key, keylen);
2163 // is this a non-unique index value?
2169 sequence = bt_newdup (bt);
2170 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2174 while( 1 ) { // find the page and slot for the current key
2175 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2176 ptr = keyptr(set->page, slot);
2179 bt->err = BTERR_ovflw;
2183 // if librarian slot == found slot, advance to real slot
2185 if( slotptr(set->page, slot)->type == Librarian )
2186 if( !keycmp (ptr, key, keylen) )
2187 ptr = keyptr(set->page, ++slot);
2191 if( slotptr(set->page, slot)->type == Duplicate )
2194 // if inserting a duplicate key or unique key
2195 // check for adequate space on the page
2196 // and insert the new key before slot.
2198 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2199 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2200 if( !(entry = bt_splitpage (bt, set)) )
2202 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2207 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2210 // if key already exists, update value and return
2212 val = valptr(set->page, slot);
2214 if( val->len >= vallen ) {
2215 if( slotptr(set->page, slot)->dead )
2217 set->page->garbage += val->len - vallen;
2218 set->latch->dirty = 1;
2219 slotptr(set->page, slot)->dead = 0;
2221 memcpy (val->value, value, vallen);
2222 bt_unlockpage(bt, BtLockWrite, set->latch);
2223 bt_unpinlatch (set->latch);
2227 // new update value doesn't fit in existing value area
2229 if( !slotptr(set->page, slot)->dead )
2230 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2232 slotptr(set->page, slot)->dead = 0;
2236 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2237 if( !(entry = bt_splitpage (bt, set)) )
2239 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2244 set->page->min -= vallen + sizeof(BtVal);
2245 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2246 memcpy (val->value, value, vallen);
2249 set->latch->dirty = 1;
2250 set->page->min -= keylen + sizeof(BtKey);
2251 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2252 memcpy (ptr->key, key, keylen);
2255 slotptr(set->page, slot)->off = set->page->min;
2256 bt_unlockpage(bt, BtLockWrite, set->latch);
2257 bt_unpinlatch (set->latch);
2264 uint entry; // latch table entry number
2265 uint slot:31; // page slot number
2266 uint reuse:1; // reused previous page
2270 uid page_no; // page number for split leaf
2271 void *next; // next key to insert
2272 uint entry:29; // latch table entry number
2273 uint type:2; // 0 == insert, 1 == delete, 2 == free
2274 uint nounlock:1; // don't unlock ParentModification
2275 unsigned char leafkey[BT_keyarray];
2278 // find and load leaf page for given key
2279 // leave page Atomic locked and Read locked.
2281 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2283 BtLatchSet *prevlatch;
2287 // find level one slot
2289 if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2292 // find next non-dead entry on this page
2293 // it will be the fence key if nothing else
2295 while( slotptr(set->page, slot)->dead )
2296 if( slot++ < set->page->cnt )
2299 return bt->err = BTERR_struct, 0;
2301 page_no = bt_getid(valptr(set->page, slot)->value);
2302 prevlatch = set->latch;
2305 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2306 set->page = bt_mappage (bt, set->latch);
2310 // obtain read lock using lock chaining with Access mode
2311 // release & unpin parent/left sibling page
2313 bt_lockpage(bt, BtLockAccess, set->latch);
2315 bt_unlockpage(bt, BtLockRead, prevlatch);
2316 bt_unpinlatch (prevlatch);
2318 bt_lockpage(bt, BtLockRead, set->latch);
2320 // find key on page at this level
2321 // and descend to requested level
2323 if( !set->page->kill )
2324 if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2325 bt_unlockpage(bt, BtLockRead, set->latch);
2326 bt_lockpage(bt, BtLockAtomic, set->latch);
2327 bt_lockpage(bt, BtLockRead, set->latch);
2329 if( !set->page->kill )
2330 if( slot = bt_findslot (set->page, key, len) ) {
2331 bt_unlockpage(bt, BtLockAccess, set->latch);
2335 bt_unlockpage(bt, BtLockAtomic, set->latch);
2338 // slide right into next page
2340 bt_unlockpage(bt, BtLockAccess, set->latch);
2341 page_no = bt_getid(set->page->right);
2342 prevlatch = set->latch;
2345 // return error on end of right chain
2347 bt->err = BTERR_struct;
2348 return 0; // return error
2351 // determine actual page where key is located
2352 // return slot number
2354 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2356 BtKey *key = keyptr(source,src);
2357 uint slot = locks[src].slot;
2360 if( src > 1 && locks[src].reuse )
2361 entry = locks[src-1].entry, slot = 0;
2363 entry = locks[src].entry;
2366 set->latch = bt->mgr->latchsets + entry;
2367 set->page = bt_mappage (bt, set->latch);
2371 // is locks->reuse set?
2372 // if so, find where our key
2373 // is located on previous page or split pages
2376 set->latch = bt->mgr->latchsets + entry;
2377 set->page = bt_mappage (bt, set->latch);
2379 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2380 if( locks[src].reuse )
2381 locks[src].entry = entry;
2384 } while( entry = set->latch->split );
2386 bt->err = BTERR_atomic;
2390 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2392 BtKey *key = keyptr(source, src);
2393 BtVal *val = valptr(source, src);
2398 while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2399 if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2400 return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2402 if( entry = bt_splitpage (bt, set) )
2403 latch = bt->mgr->latchsets + entry;
2407 // splice right page into split chain
2408 // and WriteLock it.
2410 latch->split = set->latch->split;
2411 set->latch->split = entry;
2412 bt_lockpage(bt, BtLockWrite, latch);
2415 return bt->err = BTERR_atomic;
2418 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2420 BtKey *key = keyptr(source, src);
2421 uint idx, entry, slot;
2426 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2427 slotptr(set->page, slot)->dead = 1;
2429 return bt->err = BTERR_struct;
2431 ptr = keyptr(set->page, slot);
2432 val = valptr(set->page, slot);
2434 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2435 set->latch->dirty = 1;
2440 // delete an empty master page for a transaction
2442 // note that the far right page never empties because
2443 // it always contains (at least) the infinite stopper key
2444 // and that all pages that don't contain any keys are
2445 // deleted, or are being held under Atomic lock.
2447 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2449 BtPageSet right[1], temp[1];
2450 unsigned char value[BtId];
2454 bt_lockpage(bt, BtLockWrite, prev->latch);
2456 // grab the right sibling
2458 if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2459 right->page = bt_mappage (bt, right->latch);
2463 bt_lockpage(bt, BtLockAtomic, right->latch);
2464 bt_lockpage(bt, BtLockWrite, right->latch);
2466 // and pull contents over empty page
2467 // while preserving master's left link
2469 memcpy (right->page->left, prev->page->left, BtId);
2470 memcpy (prev->page, right->page, bt->mgr->page_size);
2472 // forward seekers to old right sibling
2473 // to new page location in set
2475 bt_putid (right->page->right, prev->latch->page_no);
2476 right->latch->dirty = 1;
2477 right->page->kill = 1;
2479 // remove pointer to right page for searchers by
2480 // changing right fence key to point to the master page
2482 ptr = keyptr(right->page,right->page->cnt);
2483 bt_putid (value, prev->latch->page_no);
2485 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2488 // now that master page is in good shape we can
2489 // remove its locks.
2491 bt_unlockpage (bt, BtLockAtomic, prev->latch);
2492 bt_unlockpage (bt, BtLockWrite, prev->latch);
2494 // fix master's right sibling's left pointer
2495 // to remove scanner's poiner to the right page
2497 if( right_page_no = bt_getid (prev->page->right) ) {
2498 if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2499 temp->page = bt_mappage (bt, temp->latch);
2501 bt_lockpage (bt, BtLockWrite, temp->latch);
2502 bt_putid (temp->page->left, prev->latch->page_no);
2503 temp->latch->dirty = 1;
2505 bt_unlockpage (bt, BtLockWrite, temp->latch);
2506 bt_unpinlatch (temp->latch);
2507 } else { // master is now the far right page
2508 bt_spinwritelock (bt->mgr->lock);
2509 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2510 bt_spinreleasewrite(bt->mgr->lock);
2513 // now that there are no pointers to the right page
2514 // we can delete it after the last read access occurs
2516 bt_unlockpage (bt, BtLockWrite, right->latch);
2517 bt_unlockpage (bt, BtLockAtomic, right->latch);
2518 bt_lockpage (bt, BtLockDelete, right->latch);
2519 bt_lockpage (bt, BtLockWrite, right->latch);
2520 bt_freepage (bt, right);
2524 // atomic modification of a batch of keys.
2526 // return -1 if BTERR is set
2527 // otherwise return slot number
2528 // causing the key constraint violation
2529 // or zero on successful completion.
2531 int bt_atomictxn (BtDb *bt, BtPage source)
2533 uint src, idx, slot, samepage, entry;
2534 AtomicKey *head, *tail, *leaf;
2535 BtPageSet set[1], prev[1];
2536 unsigned char value[BtId];
2537 BtKey *key, *ptr, *key2;
2547 locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2551 // stable sort the list of keys into order to
2552 // prevent deadlocks between threads.
2554 for( src = 1; src++ < source->cnt; ) {
2555 *temp = *slotptr(source,src);
2556 key = keyptr (source,src);
2558 for( idx = src; --idx; ) {
2559 key2 = keyptr (source,idx);
2560 if( keycmp (key, key2->key, key2->len) < 0 ) {
2561 *slotptr(source,idx+1) = *slotptr(source,idx);
2562 *slotptr(source,idx) = *temp;
2568 // Load the leaf page for each key
2569 // group same page references with reuse bit
2570 // and determine any constraint violations
2572 for( src = 0; src++ < source->cnt; ) {
2573 key = keyptr(source, src);
2576 // first determine if this modification falls
2577 // on the same page as the previous modification
2578 // note that the far right leaf page is a special case
2580 if( samepage = src > 1 )
2581 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2582 slot = bt_findslot(set->page, key->key, key->len);
2583 else // release read on previous page
2584 bt_unlockpage(bt, BtLockRead, set->latch);
2587 if( slot = bt_atomicload(bt, set, key->key, key->len) )
2588 set->latch->split = 0;
2592 if( slotptr(set->page, slot)->type == Librarian )
2593 ptr = keyptr(set->page, ++slot);
2595 ptr = keyptr(set->page, slot);
2598 locks[src].entry = set->latch->entry;
2599 locks[src].slot = slot;
2600 locks[src].reuse = 0;
2602 locks[src].entry = 0;
2603 locks[src].slot = 0;
2604 locks[src].reuse = 1;
2607 switch( slotptr(source, src)->type ) {
2610 if( !slotptr(set->page, slot)->dead )
2611 if( slot < set->page->cnt || bt_getid (set->page->right) )
2612 if( !keycmp (ptr, key->key, key->len) ) {
2614 // return constraint violation if key already exists
2616 bt_unlockpage(bt, BtLockRead, set->latch);
2620 if( locks[src].entry ) {
2621 set->latch = bt->mgr->latchsets + locks[src].entry;
2622 bt_unlockpage(bt, BtLockAtomic, set->latch);
2623 bt_unpinlatch (set->latch);
2634 // unlock last loadpage lock
2636 if( source->cnt > 1 )
2637 bt_unlockpage(bt, BtLockRead, set->latch);
2639 // obtain write lock for each master page
2641 for( src = 0; src++ < source->cnt; )
2642 if( locks[src].reuse )
2645 bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2647 // insert or delete each key
2648 // process any splits or merges
2649 // release Write & Atomic latches
2650 // set ParentModifications and build
2651 // queue of keys to insert for split pages
2652 // or delete for deleted pages.
2654 // run through txn list backwards
2656 samepage = source->cnt + 1;
2658 for( src = source->cnt; src; src-- ) {
2659 if( locks[src].reuse )
2662 // perform the txn operations
2663 // from smaller to larger on
2666 for( idx = src; idx < samepage; idx++ )
2667 switch( slotptr(source,idx)->type ) {
2669 if( bt_atomicdelete (bt, source, locks, idx) )
2675 if( bt_atomicinsert (bt, source, locks, idx) )
2680 // after the same page operations have finished,
2681 // process master page for splits or deletion.
2683 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2684 prev->page = bt_mappage (bt, prev->latch);
2687 // pick-up all splits from master page
2688 // each one is already WriteLocked.
2690 entry = prev->latch->split;
2693 set->latch = bt->mgr->latchsets + entry;
2694 set->page = bt_mappage (bt, set->latch);
2695 entry = set->latch->split;
2697 // delete empty master page by undoing its split
2698 // (this is potentially another empty page)
2699 // note that there are no new left pointers yet
2701 if( !prev->page->act ) {
2702 memcpy (set->page->left, prev->page->left, BtId);
2703 memcpy (prev->page, set->page, bt->mgr->page_size);
2704 bt_lockpage (bt, BtLockDelete, set->latch);
2705 bt_freepage (bt, set);
2707 prev->latch->dirty = 1;
2711 // remove empty page from the split chain
2713 if( !set->page->act ) {
2714 memcpy (prev->page->right, set->page->right, BtId);
2715 prev->latch->split = set->latch->split;
2716 bt_lockpage (bt, BtLockDelete, set->latch);
2717 bt_freepage (bt, set);
2721 // schedule prev fence key update
2723 ptr = keyptr(prev->page,prev->page->cnt);
2724 leaf = calloc (sizeof(AtomicKey), 1);
2726 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2727 leaf->page_no = prev->latch->page_no;
2728 leaf->entry = prev->latch->entry;
2738 // splice in the left link into the split page
2740 bt_putid (set->page->left, prev->latch->page_no);
2741 bt_lockpage(bt, BtLockParent, prev->latch);
2742 bt_unlockpage(bt, BtLockWrite, prev->latch);
2746 // update left pointer in next right page from last split page
2747 // (if all splits were reversed, latch->split == 0)
2749 if( latch->split ) {
2750 // fix left pointer in master's original (now split)
2751 // far right sibling or set rightmost page in page zero
2753 if( right = bt_getid (prev->page->right) ) {
2754 if( set->latch = bt_pinlatch (bt, right, 1) )
2755 set->page = bt_mappage (bt, set->latch);
2759 bt_lockpage (bt, BtLockWrite, set->latch);
2760 bt_putid (set->page->left, prev->latch->page_no);
2761 set->latch->dirty = 1;
2762 bt_unlockpage (bt, BtLockWrite, set->latch);
2763 bt_unpinlatch (set->latch);
2764 } else { // prev is rightmost page
2765 bt_spinwritelock (bt->mgr->lock);
2766 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2767 bt_spinreleasewrite(bt->mgr->lock);
2770 // Process last page split in chain
2772 ptr = keyptr(prev->page,prev->page->cnt);
2773 leaf = calloc (sizeof(AtomicKey), 1);
2775 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2776 leaf->page_no = prev->latch->page_no;
2777 leaf->entry = prev->latch->entry;
2787 bt_lockpage(bt, BtLockParent, prev->latch);
2788 bt_unlockpage(bt, BtLockWrite, prev->latch);
2790 // remove atomic lock on master page
2792 bt_unlockpage(bt, BtLockAtomic, latch);
2796 // finished if prev page occupied (either master or final split)
2798 if( prev->page->act ) {
2799 bt_unlockpage(bt, BtLockWrite, latch);
2800 bt_unlockpage(bt, BtLockAtomic, latch);
2801 bt_unpinlatch(latch);
2805 // any and all splits were reversed, and the
2806 // master page located in prev is empty, delete it
2807 // by pulling over master's right sibling.
2809 // Remove empty master's fence key
2811 ptr = keyptr(prev->page,prev->page->cnt);
2813 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2816 // perform the remainder of the delete
2817 // from the FIFO queue
2819 leaf = calloc (sizeof(AtomicKey), 1);
2821 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2822 leaf->page_no = prev->latch->page_no;
2823 leaf->entry = prev->latch->entry;
2834 // leave atomic lock in place until
2835 // deletion completes in next phase.
2837 bt_unlockpage(bt, BtLockWrite, prev->latch);
2840 // add & delete keys for any pages split or merged during transaction
2844 set->latch = bt->mgr->latchsets + leaf->entry;
2845 set->page = bt_mappage (bt, set->latch);
2847 bt_putid (value, leaf->page_no);
2848 ptr = (BtKey *)leaf->leafkey;
2850 switch( leaf->type ) {
2851 case 0: // insert key
2852 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2857 case 1: // delete key
2858 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2863 case 2: // free page
2864 if( bt_atomicfree (bt, set) )
2870 if( !leaf->nounlock )
2871 bt_unlockpage (bt, BtLockParent, set->latch);
2873 bt_unpinlatch (set->latch);
2876 } while( leaf = tail );
2884 // set cursor to highest slot on highest page
2886 uint bt_lastkey (BtDb *bt)
2888 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2891 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2892 set->page = bt_mappage (bt, set->latch);
2896 bt_lockpage(bt, BtLockRead, set->latch);
2897 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2898 bt_unlockpage(bt, BtLockRead, set->latch);
2899 bt_unpinlatch (set->latch);
2901 bt->cursor_page = page_no;
2902 return bt->cursor->cnt;
2905 // return previous slot on cursor page
2907 uint bt_prevkey (BtDb *bt, uint slot)
2909 uid ourright, next, us = bt->cursor_page;
2915 ourright = bt_getid(bt->cursor->right);
2918 if( !(next = bt_getid(bt->cursor->left)) )
2922 bt->cursor_page = next;
2924 if( set->latch = bt_pinlatch (bt, next, 1) )
2925 set->page = bt_mappage (bt, set->latch);
2929 bt_lockpage(bt, BtLockRead, set->latch);
2930 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2931 bt_unlockpage(bt, BtLockRead, set->latch);
2932 bt_unpinlatch (set->latch);
2934 next = bt_getid (bt->cursor->right);
2936 if( bt->cursor->kill )
2940 if( next == ourright )
2945 return bt->cursor->cnt;
2948 // return next slot on cursor page
2949 // or slide cursor right into next page
2951 uint bt_nextkey (BtDb *bt, uint slot)
2957 right = bt_getid(bt->cursor->right);
2959 while( slot++ < bt->cursor->cnt )
2960 if( slotptr(bt->cursor,slot)->dead )
2962 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2970 bt->cursor_page = right;
2972 if( set->latch = bt_pinlatch (bt, right, 1) )
2973 set->page = bt_mappage (bt, set->latch);
2977 bt_lockpage(bt, BtLockRead, set->latch);
2979 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2981 bt_unlockpage(bt, BtLockRead, set->latch);
2982 bt_unpinlatch (set->latch);
2990 // cache page of keys into cursor and return starting slot for given key
2992 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2997 // cache page for retrieval
2999 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3000 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3004 bt->cursor_page = set->latch->page_no;
3006 bt_unlockpage(bt, BtLockRead, set->latch);
3007 bt_unpinlatch (set->latch);
3011 BtKey *bt_key(BtDb *bt, uint slot)
3013 return keyptr(bt->cursor, slot);
3016 BtVal *bt_val(BtDb *bt, uint slot)
3018 return valptr(bt->cursor,slot);
3024 double getCpuTime(int type)
3027 FILETIME xittime[1];
3028 FILETIME systime[1];
3029 FILETIME usrtime[1];
3030 SYSTEMTIME timeconv[1];
3033 memset (timeconv, 0, sizeof(SYSTEMTIME));
3037 GetSystemTimeAsFileTime (xittime);
3038 FileTimeToSystemTime (xittime, timeconv);
3039 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3042 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3043 FileTimeToSystemTime (usrtime, timeconv);
3046 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3047 FileTimeToSystemTime (systime, timeconv);
3051 ans += (double)timeconv->wHour * 3600;
3052 ans += (double)timeconv->wMinute * 60;
3053 ans += (double)timeconv->wSecond;
3054 ans += (double)timeconv->wMilliseconds / 1000;
3059 #include <sys/resource.h>
3061 double getCpuTime(int type)
3063 struct rusage used[1];
3064 struct timeval tv[1];
3068 gettimeofday(tv, NULL);
3069 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3072 getrusage(RUSAGE_SELF, used);
3073 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3076 getrusage(RUSAGE_SELF, used);
3077 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3084 void bt_poolaudit (BtMgr *mgr)
3089 while( slot++ < mgr->latchdeployed ) {
3090 latch = mgr->latchsets + slot;
3092 if( *latch->readwr->rin & MASK )
3093 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3094 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3096 if( *latch->access->rin & MASK )
3097 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3098 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3100 if( *latch->parent->ticket != *latch->parent->serving )
3101 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3102 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3104 if( latch->pin & ~CLOCK_bit ) {
3105 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3111 uint bt_latchaudit (BtDb *bt)
3113 ushort idx, hashidx;
3119 if( *(ushort *)(bt->mgr->lock) )
3120 fprintf(stderr, "Alloc page locked\n");
3121 *(ushort *)(bt->mgr->lock) = 0;
3123 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3124 latch = bt->mgr->latchsets + idx;
3125 if( *latch->readwr->rin & MASK )
3126 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3127 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3129 if( *latch->access->rin & MASK )
3130 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3131 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3133 if( *latch->parent->ticket != *latch->parent->serving )
3134 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3135 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3138 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3143 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3144 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3145 fprintf(stderr, "hash entry %d locked\n", hashidx);
3147 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3149 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3150 latch = bt->mgr->latchsets + idx;
3152 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3153 } while( idx = latch->next );
3156 page_no = LEAF_page;
3158 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3159 uid off = page_no << bt->mgr->page_bits;
3161 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3165 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3167 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3168 return bt->err = BTERR_map;
3170 if( *amt < bt->mgr->page_size )
3171 return bt->err = BTERR_map;
3173 if( !bt->frame->free && !bt->frame->lvl )
3174 cnt += bt->frame->act;
3178 cnt--; // remove stopper key
3179 fprintf(stderr, " Total keys read %d\n", cnt);
3193 // standalone program to index file of keys
3194 // then list them onto std-out
3197 void *index_file (void *arg)
3199 uint __stdcall index_file (void *arg)
3202 int line = 0, found = 0, cnt = 0, idx;
3203 uid next, page_no = LEAF_page; // start on first page of leaves
3204 int ch, len = 0, slot, type = 0;
3205 unsigned char key[BT_maxkey];
3206 unsigned char txn[65536];
3207 ThreadArg *args = arg;
3216 bt = bt_open (args->mgr);
3219 if( args->idx < strlen (args->type) )
3220 ch = args->type[args->idx];
3222 ch = args->type[strlen(args->type) - 1];
3227 fprintf(stderr, "started latch mgr audit\n");
3228 cnt = bt_latchaudit (bt);
3229 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3240 if( type == Delete )
3241 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3243 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3245 if( type == Delete )
3246 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3248 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3250 if( in = fopen (args->infile, "rb") )
3251 while( ch = getc(in), ch != EOF )
3257 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3258 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3264 memcpy (txn + nxt, key + 10, len - 10);
3266 txn[nxt] = len - 10;
3268 memcpy (txn + nxt, key, 10);
3271 slotptr(page,++cnt)->off = nxt;
3272 slotptr(page,cnt)->type = type;
3275 if( cnt < args->num )
3282 if( bt_atomictxn (bt, page) )
3283 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3288 else if( len < BT_maxkey )
3290 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3294 fprintf(stderr, "started indexing for %s\n", args->infile);
3295 if( in = fopen (args->infile, "r") )
3296 while( ch = getc(in), ch != EOF )
3301 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3302 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3305 else if( len < BT_maxkey )
3307 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3311 fprintf(stderr, "started finding keys for %s\n", args->infile);
3312 if( in = fopen (args->infile, "rb") )
3313 while( ch = getc(in), ch != EOF )
3317 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3320 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3323 else if( len < BT_maxkey )
3325 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3329 fprintf(stderr, "started scanning\n");
3331 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3332 set->page = bt_mappage (bt, set->latch);
3334 fprintf(stderr, "unable to obtain latch"), exit(1);
3335 bt_lockpage (bt, BtLockRead, set->latch);
3336 next = bt_getid (set->page->right);
3338 for( slot = 0; slot++ < set->page->cnt; )
3339 if( next || slot < set->page->cnt )
3340 if( !slotptr(set->page, slot)->dead ) {
3341 ptr = keyptr(set->page, slot);
3344 if( slotptr(set->page, slot)->type == Duplicate )
3347 fwrite (ptr->key, len, 1, stdout);
3348 val = valptr(set->page, slot);
3349 fwrite (val->value, val->len, 1, stdout);
3350 fputc ('\n', stdout);
3354 bt_unlockpage (bt, BtLockRead, set->latch);
3355 bt_unpinlatch (set->latch);
3356 } while( page_no = next );
3358 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3362 fprintf(stderr, "started reverse scan\n");
3363 if( slot = bt_lastkey (bt) )
3364 while( slot = bt_prevkey (bt, slot) ) {
3365 if( slotptr(bt->cursor, slot)->dead )
3368 ptr = keyptr(bt->cursor, slot);
3371 if( slotptr(bt->cursor, slot)->type == Duplicate )
3374 fwrite (ptr->key, len, 1, stdout);
3375 val = valptr(bt->cursor, slot);
3376 fwrite (val->value, val->len, 1, stdout);
3377 fputc ('\n', stdout);
3381 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3386 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3388 fprintf(stderr, "started counting\n");
3389 page_no = LEAF_page;
3391 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3392 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3395 if( !bt->frame->free && !bt->frame->lvl )
3396 cnt += bt->frame->act;
3402 cnt--; // remove stopper key
3403 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3415 typedef struct timeval timer;
3417 int main (int argc, char **argv)
3419 int idx, cnt, len, slot, err;
3420 int segsize, bits = 16;
3437 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3438 fprintf (stderr, " where idx_file is the name of the btree file\n");
3439 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3440 fprintf (stderr, " page_bits is the page size in bits\n");
3441 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3442 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3443 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3447 start = getCpuTime(0);
3450 bits = atoi(argv[3]);
3453 poolsize = atoi(argv[4]);
3456 fprintf (stderr, "Warning: no mapped_pool\n");
3459 num = atoi(argv[5]);
3463 threads = malloc (cnt * sizeof(pthread_t));
3465 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3467 args = malloc (cnt * sizeof(ThreadArg));
3469 mgr = bt_mgr ((argv[1]), bits, poolsize);
3472 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3478 for( idx = 0; idx < cnt; idx++ ) {
3479 args[idx].infile = argv[idx + 6];
3480 args[idx].type = argv[2];
3481 args[idx].mgr = mgr;
3482 args[idx].num = num;
3483 args[idx].idx = idx;
3485 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3486 fprintf(stderr, "Error creating thread %d\n", err);
3488 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3492 // wait for termination
3495 for( idx = 0; idx < cnt; idx++ )
3496 pthread_join (threads[idx], NULL);
3498 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3500 for( idx = 0; idx < cnt; idx++ )
3501 CloseHandle(threads[idx]);
3507 elapsed = getCpuTime(0) - start;
3508 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3509 elapsed = getCpuTime(1);
3510 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3511 elapsed = getCpuTime(2);
3512 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);