1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and ACID batched key-value updates
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
88 There are six lock types for each node in four independent sets:
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
106 // definition for phase-fair reader/writer lock implementation
115 // write only queue lock
127 // definition for spin latch implementation
129 // exclusive is set for write access
130 // share is count of read accessors
131 // grant write lock when share == 0
133 volatile typedef struct {
144 // hash table entries
147 volatile uint slot; // Latch table entry at head of chain
148 BtSpinLatch latch[1];
151 // latch manager table structure
154 uid page_no; // latch set page number
155 RWLock readwr[1]; // read/write page lock
156 RWLock access[1]; // Access Intent/Page delete
157 WOLock parent[1]; // Posting of fence key in parent
158 WOLock atomic[1]; // Atomic update in progress
159 uint split; // right split page atomic insert
160 uint entry; // entry slot in latch table
161 uint next; // next entry in hash table chain
162 uint prev; // prev entry in hash table chain
163 volatile ushort pin; // number of outstanding threads
164 ushort dirty:1; // page in cache is dirty
167 // Define the length of the page record numbers
171 // Page key slot definition.
173 // Keys are marked dead, but remain on the page until
174 // it cleanup is called. The fence key (highest key) for
175 // a leaf page is always present, even after cleanup.
179 // In addition to the Unique keys that occupy slots
180 // there are Librarian and Duplicate key
181 // slots occupying the key slot array.
183 // The Librarian slots are dead keys that
184 // serve as filler, available to add new Unique
185 // or Dup slots that are inserted into the B-tree.
187 // The Duplicate slots have had their key bytes extended
188 // by 6 bytes to contain a binary duplicate key uniqueifier.
199 uint off:BT_maxbits; // page offset for key start
200 uint type:3; // type of slot
201 uint dead:1; // set for deleted slot
204 // The key structure occupies space at the upper end of
205 // each page. It's a length byte followed by the key
209 unsigned char len; // this can be changed to a ushort or uint
210 unsigned char key[0];
213 // the value structure also occupies space at the upper
214 // end of the page. Each key is immediately followed by a value.
217 unsigned char len; // this can be changed to a ushort or uint
218 unsigned char value[0];
221 #define BT_maxkey 255 // maximum number of bytes in a key
222 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
224 // The first part of an index page.
225 // It is immediately followed
226 // by the BtSlot array of keys.
228 // note that this structure size
229 // must be a multiple of 8 bytes
230 // in order to place dups correctly.
232 typedef struct BtPage_ {
233 uint cnt; // count of keys in page
234 uint act; // count of active keys
235 uint min; // next key offset
236 uint garbage; // page garbage in bytes
237 unsigned char bits:7; // page size in bits
238 unsigned char free:1; // page is on free chain
239 unsigned char lvl:7; // level of page
240 unsigned char kill:1; // page is being deleted
241 unsigned char right[BtId]; // page number to right
242 unsigned char left[BtId]; // page number to left
243 unsigned char filler[2]; // padding to multiple of 8
246 // The loadpage interface object
249 BtPage page; // current page pointer
250 BtLatchSet *latch; // current page latch set
253 // structure for latch manager on ALLOC_page
256 struct BtPage_ alloc[1]; // next page_no in right ptr
257 unsigned long long dups[1]; // global duplicate key uniqueifier
258 unsigned char chain[BtId]; // head of free page_nos chain
261 // The object structure for Btree access
264 uint page_size; // page size
265 uint page_bits; // page size in bits
271 BtPageZero *pagezero; // mapped allocation page
272 BtSpinLatch lock[1]; // allocation area lite latch
273 uint latchdeployed; // highest number of latch entries deployed
274 uint nlatchpage; // number of latch pages at BT_latch
275 uint latchtotal; // number of page latch entries
276 uint latchhash; // number of latch hash table slots
277 uint latchvictim; // next latch entry to examine
278 ushort thread_no[1]; // next thread number
279 BtHashEntry *hashtable; // the buffer pool hash table entries
280 BtLatchSet *latchsets; // mapped latch set from buffer pool
281 unsigned char *pagepool; // mapped to the buffer pool pages
283 HANDLE halloc; // allocation handle
284 HANDLE hpool; // buffer pool handle
289 BtMgr *mgr; // buffer manager for thread
290 BtPage cursor; // cached frame for start/next (never mapped)
291 BtPage frame; // spare frame for the page split (never mapped)
292 uid cursor_page; // current cursor page number
293 unsigned char *mem; // frame, cursor, page memory buffer
294 unsigned char key[BT_keyarray]; // last found complete key
295 int found; // last delete or insert was found
296 int err; // last error
297 int reads, writes; // number of reads and writes from the btree
298 ushort thread_no; // thread number
312 #define CLOCK_bit 0x8000
315 extern void bt_close (BtDb *bt);
316 extern BtDb *bt_open (BtMgr *mgr);
317 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
318 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
319 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
320 extern BtKey *bt_foundkey (BtDb *bt);
321 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
322 extern uint bt_nextkey (BtDb *bt, uint slot);
325 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
326 void bt_mgrclose (BtMgr *mgr);
328 // Helper functions to return slot values
329 // from the cursor page.
331 extern BtKey *bt_key (BtDb *bt, uint slot);
332 extern BtVal *bt_val (BtDb *bt, uint slot);
334 // The page is allocated from low and hi ends.
335 // The key slots are allocated from the bottom,
336 // while the text and value of the key
337 // are allocated from the top. When the two
338 // areas meet, the page is split into two.
340 // A key consists of a length byte, two bytes of
341 // index number (0 - 65535), and up to 253 bytes
344 // Associated with each key is a value byte string
345 // containing any value desired.
347 // The b-tree root is always located at page 1.
348 // The first leaf page of level zero is always
349 // located on page 2.
351 // The b-tree pages are linked with next
352 // pointers to facilitate enumerators,
353 // and provide for concurrency.
355 // When to root page fills, it is split in two and
356 // the tree height is raised by a new root at page
357 // one with two keys.
359 // Deleted keys are marked with a dead bit until
360 // page cleanup. The fence key for a leaf node is
363 // To achieve maximum concurrency one page is locked at a time
364 // as the tree is traversed to find leaf key in question. The right
365 // page numbers are used in cases where the page is being split,
368 // Page 0 is dedicated to lock for new page extensions,
369 // and chains empty pages together for reuse. It also
370 // contains the latch manager hash table.
372 // The ParentModification lock on a node is obtained to serialize posting
373 // or changing the fence key for a node.
375 // Empty pages are chained together through the ALLOC page and reused.
377 // Access macros to address slot and key values from the page
378 // Page slots use 1 based indexing.
380 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
381 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
382 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
384 void bt_putid(unsigned char *dest, uid id)
389 dest[i] = (unsigned char)id, id >>= 8;
392 uid bt_getid(unsigned char *src)
397 for( i = 0; i < BtId; i++ )
398 id <<= 8, id |= *src++;
403 uid bt_newdup (BtDb *bt)
406 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
408 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
412 // Write-Only Queue Lock
414 void WriteOLock (WOLock *lock)
418 tix = __sync_fetch_and_add (lock->ticket, 1);
420 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
422 // wait for our ticket to come up
424 while( tix != lock->serving[0] )
432 void WriteORelease (WOLock *lock)
437 // Phase-Fair reader/writer lock implementation
439 void WriteLock (RWLock *lock)
444 tix = __sync_fetch_and_add (lock->ticket, 1);
446 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
448 // wait for our ticket to come up
450 while( tix != lock->serving[0] )
457 w = PRES | (tix & PHID);
459 r = __sync_fetch_and_add (lock->rin, w);
461 r = _InterlockedExchangeAdd16 (lock->rin, w);
463 while( r != *lock->rout )
471 void WriteRelease (RWLock *lock)
474 __sync_fetch_and_and (lock->rin, ~MASK);
476 _InterlockedAnd16 (lock->rin, ~MASK);
481 void ReadLock (RWLock *lock)
485 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
487 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
490 while( w == (*lock->rin & MASK) )
498 void ReadRelease (RWLock *lock)
501 __sync_fetch_and_add (lock->rout, RINC);
503 _InterlockedExchangeAdd16 (lock->rout, RINC);
507 // Spin Latch Manager
509 // wait until write lock mode is clear
510 // and add 1 to the share count
512 void bt_spinreadlock(BtSpinLatch *latch)
518 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
520 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
522 // see if exclusive request is granted or pending
527 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
529 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
532 } while( sched_yield(), 1 );
534 } while( SwitchToThread(), 1 );
538 // wait for other read and write latches to relinquish
540 void bt_spinwritelock(BtSpinLatch *latch)
546 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
548 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
551 if( !(prev & ~BOTH) )
555 __sync_fetch_and_and ((ushort *)latch, ~XCL);
557 _InterlockedAnd16((ushort *)latch, ~XCL);
560 } while( sched_yield(), 1 );
562 } while( SwitchToThread(), 1 );
566 // try to obtain write lock
568 // return 1 if obtained,
571 int bt_spinwritetry(BtSpinLatch *latch)
576 prev = __sync_fetch_and_or((ushort *)latch, XCL);
578 prev = _InterlockedOr16((ushort *)latch, XCL);
580 // take write access if all bits are clear
583 if( !(prev & ~BOTH) )
587 __sync_fetch_and_and ((ushort *)latch, ~XCL);
589 _InterlockedAnd16((ushort *)latch, ~XCL);
596 void bt_spinreleasewrite(BtSpinLatch *latch)
599 __sync_fetch_and_and((ushort *)latch, ~BOTH);
601 _InterlockedAnd16((ushort *)latch, ~BOTH);
605 // decrement reader count
607 void bt_spinreleaseread(BtSpinLatch *latch)
610 __sync_fetch_and_add((ushort *)latch, -SHARE);
612 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
616 // read page from permanent location in Btree file
618 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
620 off64_t off = page_no << mgr->page_bits;
623 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
624 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
631 memset (ovl, 0, sizeof(OVERLAPPED));
633 ovl->OffsetHigh = off >> 32;
635 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
636 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
639 if( *amt < mgr->page_size ) {
640 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
647 // write page to permanent location in Btree file
648 // clear the dirty bit
650 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
652 off64_t off = page_no << mgr->page_bits;
655 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
661 memset (ovl, 0, sizeof(OVERLAPPED));
663 ovl->OffsetHigh = off >> 32;
665 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
668 if( *amt < mgr->page_size )
674 // link latch table entry into head of latch hash table
676 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
678 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
679 BtLatchSet *latch = bt->mgr->latchsets + slot;
681 if( latch->next = bt->mgr->hashtable[hashidx].slot )
682 bt->mgr->latchsets[latch->next].prev = slot;
684 bt->mgr->hashtable[hashidx].slot = slot;
685 latch->page_no = page_no;
692 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
700 // set CLOCK bit in latch
701 // decrement pin count
703 void bt_unpinlatch (BtLatchSet *latch)
706 if( ~latch->pin & CLOCK_bit )
707 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
708 __sync_fetch_and_add(&latch->pin, -1);
710 if( ~latch->pin & CLOCK_bit )
711 _InterlockedOr16 (&latch->pin, CLOCK_bit);
712 _InterlockedDecrement16 (&latch->pin);
716 // return the btree cached page address
718 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
720 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
725 // find existing latchset or inspire new one
726 // return with latchset pinned
728 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
730 uint hashidx = page_no % bt->mgr->latchhash;
738 // try to find our entry
740 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
742 if( slot = bt->mgr->hashtable[hashidx].slot ) do
744 latch = bt->mgr->latchsets + slot;
745 if( page_no == latch->page_no )
747 } while( slot = latch->next );
753 latch = bt->mgr->latchsets + slot;
755 __sync_fetch_and_add(&latch->pin, 1);
757 _InterlockedIncrement16 (&latch->pin);
759 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
763 // see if there are any unused pool entries
765 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
767 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
770 if( slot < bt->mgr->latchtotal ) {
771 latch = bt->mgr->latchsets + slot;
772 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
774 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
779 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
781 _InterlockedDecrement (&bt->mgr->latchdeployed);
783 // find and reuse previous entry on victim
787 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
789 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
791 // try to get write lock on hash chain
792 // skip entry if not obtained
793 // or has outstanding pins
795 slot %= bt->mgr->latchtotal;
800 latch = bt->mgr->latchsets + slot;
801 idx = latch->page_no % bt->mgr->latchhash;
803 // see we are on same chain as hashidx
808 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
811 // skip this slot if it is pinned
812 // or the CLOCK bit is set
815 if( latch->pin & CLOCK_bit ) {
817 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
819 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
822 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
826 // update permanent page area in btree from buffer pool
828 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
831 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
834 latch->dirty = 0, bt->writes++;
836 // unlink our available slot from its hash chain
839 bt->mgr->latchsets[latch->prev].next = latch->next;
841 bt->mgr->hashtable[idx].slot = latch->next;
844 bt->mgr->latchsets[latch->next].prev = latch->prev;
846 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
848 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
851 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
856 void bt_mgrclose (BtMgr *mgr)
863 // flush dirty pool pages to the btree
865 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
866 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
867 latch = mgr->latchsets + slot;
870 bt_writepage(mgr, page, latch->page_no);
871 latch->dirty = 0, num++;
873 // madvise (page, mgr->page_size, MADV_DONTNEED);
876 fprintf(stderr, "%d buffer pool pages flushed\n", num);
879 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
880 munmap (mgr->pagezero, mgr->page_size);
882 FlushViewOfFile(mgr->pagezero, 0);
883 UnmapViewOfFile(mgr->pagezero);
884 UnmapViewOfFile(mgr->hashtable);
885 CloseHandle(mgr->halloc);
886 CloseHandle(mgr->hpool);
892 FlushFileBuffers(mgr->idx);
893 CloseHandle(mgr->idx);
898 // close and release memory
900 void bt_close (BtDb *bt)
907 VirtualFree (bt->mem, 0, MEM_RELEASE);
912 // open/create new btree buffer manager
914 // call with file_name, BT_openmode, bits in page size (e.g. 16),
915 // size of page pool (e.g. 262144)
917 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
919 uint lvl, attr, last, slot, idx;
920 uint nlatchpage, latchhash;
921 unsigned char value[BtId];
922 int flag, initit = 0;
923 BtPageZero *pagezero;
930 // determine sanity of page size and buffer pool
932 if( bits > BT_maxbits )
934 else if( bits < BT_minbits )
938 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
943 mgr = calloc (1, sizeof(BtMgr));
945 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
947 if( mgr->idx == -1 ) {
948 fprintf (stderr, "Unable to open btree file\n");
949 return free(mgr), NULL;
952 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
953 attr = FILE_ATTRIBUTE_NORMAL;
954 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
956 if( mgr->idx == INVALID_HANDLE_VALUE )
957 return GlobalFree(mgr), NULL;
961 pagezero = valloc (BT_maxpage);
964 // read minimum page size to get root info
965 // to support raw disk partition files
966 // check if bits == 0 on the disk.
968 if( size = lseek (mgr->idx, 0L, 2) )
969 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
970 if( pagezero->alloc->bits )
971 bits = pagezero->alloc->bits;
975 return free(mgr), free(pagezero), NULL;
979 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
980 size = GetFileSize(mgr->idx, amt);
983 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
984 return bt_mgrclose (mgr), NULL;
985 bits = pagezero->alloc->bits;
990 mgr->page_size = 1 << bits;
991 mgr->page_bits = bits;
993 // calculate number of latch hash table entries
995 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
996 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
998 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
999 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1000 mgr->latchtotal = nodemax;
1005 // initialize an empty b-tree with latch page, root page, page of leaves
1006 // and page(s) of latches and page pool cache
1008 memset (pagezero, 0, 1 << bits);
1009 pagezero->alloc->bits = mgr->page_bits;
1010 bt_putid(pagezero->alloc->right, MIN_lvl+1);
1012 // initialize left-most LEAF page in
1015 bt_putid (pagezero->alloc->left, LEAF_page);
1017 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1018 fprintf (stderr, "Unable to create btree page zero\n");
1019 return bt_mgrclose (mgr), NULL;
1022 memset (pagezero, 0, 1 << bits);
1023 pagezero->alloc->bits = mgr->page_bits;
1025 for( lvl=MIN_lvl; lvl--; ) {
1026 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1027 key = keyptr(pagezero->alloc, 1);
1028 key->len = 2; // create stopper key
1032 bt_putid(value, MIN_lvl - lvl + 1);
1033 val = valptr(pagezero->alloc, 1);
1034 val->len = lvl ? BtId : 0;
1035 memcpy (val->value, value, val->len);
1037 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1038 pagezero->alloc->lvl = lvl;
1039 pagezero->alloc->cnt = 1;
1040 pagezero->alloc->act = 1;
1042 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1043 fprintf (stderr, "Unable to create btree page zero\n");
1044 return bt_mgrclose (mgr), NULL;
1052 VirtualFree (pagezero, 0, MEM_RELEASE);
1055 // mlock the pagezero page
1057 flag = PROT_READ | PROT_WRITE;
1058 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1059 if( mgr->pagezero == MAP_FAILED ) {
1060 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1061 return bt_mgrclose (mgr), NULL;
1063 mlock (mgr->pagezero, mgr->page_size);
1065 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1066 if( mgr->hashtable == MAP_FAILED ) {
1067 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1068 return bt_mgrclose (mgr), NULL;
1071 flag = PAGE_READWRITE;
1072 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1073 if( !mgr->halloc ) {
1074 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1075 return bt_mgrclose (mgr), NULL;
1078 flag = FILE_MAP_WRITE;
1079 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1080 if( !mgr->pagezero ) {
1081 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1082 return bt_mgrclose (mgr), NULL;
1085 flag = PAGE_READWRITE;
1086 size = (uid)mgr->nlatchpage << mgr->page_bits;
1087 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1089 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1090 return bt_mgrclose (mgr), NULL;
1093 flag = FILE_MAP_WRITE;
1094 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1095 if( !mgr->hashtable ) {
1096 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1097 return bt_mgrclose (mgr), NULL;
1101 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1102 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1107 // open BTree access method
1108 // based on buffer manager
1110 BtDb *bt_open (BtMgr *mgr)
1112 BtDb *bt = malloc (sizeof(*bt));
1114 memset (bt, 0, sizeof(*bt));
1117 bt->mem = valloc (2 *mgr->page_size);
1119 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1121 bt->frame = (BtPage)bt->mem;
1122 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1124 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1126 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1131 // compare two keys, return > 0, = 0, or < 0
1132 // =0: keys are same
1135 // as the comparison value
1137 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1139 uint len1 = key1->len;
1142 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1153 // place write, read, or parent lock on requested page_no.
1155 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1159 ReadLock (latch->readwr);
1162 WriteLock (latch->readwr);
1165 ReadLock (latch->access);
1168 WriteLock (latch->access);
1171 WriteOLock (latch->parent);
1174 WriteOLock (latch->atomic);
1179 // remove write, read, or parent lock on requested page
1181 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1185 ReadRelease (latch->readwr);
1188 WriteRelease (latch->readwr);
1191 ReadRelease (latch->access);
1194 WriteRelease (latch->access);
1197 WriteORelease (latch->parent);
1200 WriteORelease (latch->atomic);
1205 // allocate a new page
1206 // return with page latched, but unlocked.
1208 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1213 // lock allocation page
1215 bt_spinwritelock(bt->mgr->lock);
1217 // use empty chain first
1218 // else allocate empty page
1220 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1221 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1222 set->page = bt_mappage (bt, set->latch);
1224 return bt->err = BTERR_struct, -1;
1226 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1227 bt_spinreleasewrite(bt->mgr->lock);
1229 memcpy (set->page, contents, bt->mgr->page_size);
1230 set->latch->dirty = 1;
1234 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1235 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1237 // unlock allocation latch
1239 bt_spinreleasewrite(bt->mgr->lock);
1241 // don't load cache from btree page
1243 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1244 set->page = bt_mappage (bt, set->latch);
1246 return bt->err = BTERR_struct;
1248 memcpy (set->page, contents, bt->mgr->page_size);
1249 set->latch->dirty = 1;
1253 // find slot in page for given key at a given level
1255 int bt_findslot (BtPage page, unsigned char *key, uint len)
1257 uint diff, higher = page->cnt, low = 1, slot;
1260 // make stopper key an infinite fence value
1262 if( bt_getid (page->right) )
1267 // low is the lowest candidate.
1268 // loop ends when they meet
1270 // higher is already
1271 // tested as .ge. the passed key.
1273 while( diff = higher - low ) {
1274 slot = low + ( diff >> 1 );
1275 if( keycmp (keyptr(page, slot), key, len) < 0 )
1278 higher = slot, good++;
1281 // return zero if key is on right link page
1283 return good ? higher : 0;
1286 // find and load page at given level for given key
1287 // leave page rd or wr locked as requested
1289 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1291 uid page_no = ROOT_page, prevpage = 0;
1292 uint drill = 0xff, slot;
1293 BtLatchSet *prevlatch;
1294 uint mode, prevmode;
1296 // start at root of btree and drill down
1299 // determine lock mode of drill level
1300 mode = (drill == lvl) ? lock : BtLockRead;
1302 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1305 // obtain access lock using lock chaining with Access mode
1307 if( page_no > ROOT_page )
1308 bt_lockpage(bt, BtLockAccess, set->latch);
1310 set->page = bt_mappage (bt, set->latch);
1312 // release & unpin parent or left sibling page
1315 bt_unlockpage(bt, prevmode, prevlatch);
1316 bt_unpinlatch (prevlatch);
1320 // obtain mode lock using lock chaining through AccessLock
1322 bt_lockpage(bt, mode, set->latch);
1324 if( set->page->free )
1325 return bt->err = BTERR_struct, 0;
1327 if( page_no > ROOT_page )
1328 bt_unlockpage(bt, BtLockAccess, set->latch);
1330 // re-read and re-lock root after determining actual level of root
1332 if( set->page->lvl != drill) {
1333 if( set->latch->page_no != ROOT_page )
1334 return bt->err = BTERR_struct, 0;
1336 drill = set->page->lvl;
1338 if( lock != BtLockRead && drill == lvl ) {
1339 bt_unlockpage(bt, mode, set->latch);
1340 bt_unpinlatch (set->latch);
1345 prevpage = set->latch->page_no;
1346 prevlatch = set->latch;
1349 // find key on page at this level
1350 // and descend to requested level
1352 if( !set->page->kill )
1353 if( slot = bt_findslot (set->page, key, len) ) {
1357 // find next non-dead slot -- the fence key if nothing else
1359 while( slotptr(set->page, slot)->dead )
1360 if( slot++ < set->page->cnt )
1363 return bt->err = BTERR_struct, 0;
1365 page_no = bt_getid(valptr(set->page, slot)->value);
1370 // or slide right into next page
1372 page_no = bt_getid(set->page->right);
1375 // return error on end of right chain
1377 bt->err = BTERR_struct;
1378 return 0; // return error
1381 // return page to free list
1382 // page must be delete & write locked
1384 void bt_freepage (BtDb *bt, BtPageSet *set)
1386 // lock allocation page
1388 bt_spinwritelock (bt->mgr->lock);
1392 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1393 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1394 set->latch->dirty = 1;
1395 set->page->free = 1;
1397 // unlock released page
1399 bt_unlockpage (bt, BtLockDelete, set->latch);
1400 bt_unlockpage (bt, BtLockWrite, set->latch);
1401 bt_unpinlatch (set->latch);
1403 // unlock allocation page
1405 bt_spinreleasewrite (bt->mgr->lock);
1408 // a fence key was deleted from a page
1409 // push new fence value upwards
1411 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1413 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1414 unsigned char value[BtId];
1418 // remove the old fence value
1420 ptr = keyptr(set->page, set->page->cnt);
1421 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1422 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1423 set->latch->dirty = 1;
1425 // cache new fence value
1427 ptr = keyptr(set->page, set->page->cnt);
1428 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1430 bt_lockpage (bt, BtLockParent, set->latch);
1431 bt_unlockpage (bt, BtLockWrite, set->latch);
1433 // insert new (now smaller) fence key
1435 bt_putid (value, set->latch->page_no);
1436 ptr = (BtKey*)leftkey;
1438 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1441 // now delete old fence key
1443 ptr = (BtKey*)rightkey;
1445 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1448 bt_unlockpage (bt, BtLockParent, set->latch);
1449 bt_unpinlatch(set->latch);
1453 // root has a single child
1454 // collapse a level from the tree
1456 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1462 // find the child entry and promote as new root contents
1465 for( idx = 0; idx++ < root->page->cnt; )
1466 if( !slotptr(root->page, idx)->dead )
1469 page_no = bt_getid (valptr(root->page, idx)->value);
1471 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1472 child->page = bt_mappage (bt, child->latch);
1476 bt_lockpage (bt, BtLockDelete, child->latch);
1477 bt_lockpage (bt, BtLockWrite, child->latch);
1479 memcpy (root->page, child->page, bt->mgr->page_size);
1480 root->latch->dirty = 1;
1482 bt_freepage (bt, child);
1484 } while( root->page->lvl > 1 && root->page->act == 1 );
1486 bt_unlockpage (bt, BtLockWrite, root->latch);
1487 bt_unpinlatch (root->latch);
1491 // delete a page and manage keys
1492 // call with page writelocked
1493 // returns with page unpinned
1495 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1497 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1498 unsigned char value[BtId];
1499 uint lvl = set->page->lvl;
1504 // cache copy of fence key
1505 // to post in parent
1507 ptr = keyptr(set->page, set->page->cnt);
1508 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1510 // obtain lock on right page
1512 page_no = bt_getid(set->page->right);
1514 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1515 right->page = bt_mappage (bt, right->latch);
1519 bt_lockpage (bt, BtLockWrite, right->latch);
1521 // cache copy of key to update
1523 ptr = keyptr(right->page, right->page->cnt);
1524 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1526 if( right->page->kill )
1527 return bt->err = BTERR_struct;
1529 // pull contents of right peer into our empty page
1531 memcpy (set->page, right->page, bt->mgr->page_size);
1532 set->latch->dirty = 1;
1534 // mark right page deleted and point it to left page
1535 // until we can post parent updates that remove access
1536 // to the deleted page.
1538 bt_putid (right->page->right, set->latch->page_no);
1539 right->latch->dirty = 1;
1540 right->page->kill = 1;
1542 bt_lockpage (bt, BtLockParent, right->latch);
1543 bt_unlockpage (bt, BtLockWrite, right->latch);
1545 bt_lockpage (bt, BtLockParent, set->latch);
1546 bt_unlockpage (bt, BtLockWrite, set->latch);
1548 // redirect higher key directly to our new node contents
1550 bt_putid (value, set->latch->page_no);
1551 ptr = (BtKey*)higherfence;
1553 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1556 // delete old lower key to our node
1558 ptr = (BtKey*)lowerfence;
1560 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1563 // obtain delete and write locks to right node
1565 bt_unlockpage (bt, BtLockParent, right->latch);
1566 bt_lockpage (bt, BtLockDelete, right->latch);
1567 bt_lockpage (bt, BtLockWrite, right->latch);
1568 bt_freepage (bt, right);
1570 bt_unlockpage (bt, BtLockParent, set->latch);
1571 bt_unpinlatch (set->latch);
1576 // find and delete key on page by marking delete flag bit
1577 // if page becomes empty, delete it from the btree
1579 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1581 uint slot, idx, found, fence;
1586 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1587 ptr = keyptr(set->page, slot);
1591 // if librarian slot, advance to real slot
1593 if( slotptr(set->page, slot)->type == Librarian )
1594 ptr = keyptr(set->page, ++slot);
1596 fence = slot == set->page->cnt;
1598 // if key is found delete it, otherwise ignore request
1600 if( found = !keycmp (ptr, key, len) )
1601 if( found = slotptr(set->page, slot)->dead == 0 ) {
1602 val = valptr(set->page,slot);
1603 slotptr(set->page, slot)->dead = 1;
1604 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1607 // collapse empty slots beneath the fence
1609 while( idx = set->page->cnt - 1 )
1610 if( slotptr(set->page, idx)->dead ) {
1611 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1612 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1617 // did we delete a fence key in an upper level?
1619 if( found && lvl && set->page->act && fence )
1620 if( bt_fixfence (bt, set, lvl) )
1623 return bt->found = found, 0;
1625 // do we need to collapse root?
1627 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1628 if( bt_collapseroot (bt, set) )
1631 return bt->found = found, 0;
1633 // delete empty page
1635 if( !set->page->act )
1636 return bt_deletepage (bt, set);
1638 set->latch->dirty = 1;
1639 bt_unlockpage(bt, BtLockWrite, set->latch);
1640 bt_unpinlatch (set->latch);
1641 return bt->found = found, 0;
1644 BtKey *bt_foundkey (BtDb *bt)
1646 return (BtKey*)(bt->key);
1649 // advance to next slot
1651 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1653 BtLatchSet *prevlatch;
1656 if( slot < set->page->cnt )
1659 prevlatch = set->latch;
1661 if( page_no = bt_getid(set->page->right) )
1662 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1663 set->page = bt_mappage (bt, set->latch);
1667 return bt->err = BTERR_struct, 0;
1669 // obtain access lock using lock chaining with Access mode
1671 bt_lockpage(bt, BtLockAccess, set->latch);
1673 bt_unlockpage(bt, BtLockRead, prevlatch);
1674 bt_unpinlatch (prevlatch);
1676 bt_lockpage(bt, BtLockRead, set->latch);
1677 bt_unlockpage(bt, BtLockAccess, set->latch);
1681 // find unique key or first duplicate key in
1682 // leaf level and return number of value bytes
1683 // or (-1) if not found. Setup key for bt_foundkey
1685 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1693 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1695 ptr = keyptr(set->page, slot);
1697 // skip librarian slot place holder
1699 if( slotptr(set->page, slot)->type == Librarian )
1700 ptr = keyptr(set->page, ++slot);
1702 // return actual key found
1704 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1707 if( slotptr(set->page, slot)->type == Duplicate )
1710 // not there if we reach the stopper key
1712 if( slot == set->page->cnt )
1713 if( !bt_getid (set->page->right) )
1716 // if key exists, return >= 0 value bytes copied
1717 // otherwise return (-1)
1719 if( slotptr(set->page, slot)->dead )
1723 if( !memcmp (ptr->key, key, len) ) {
1724 val = valptr (set->page,slot);
1725 if( valmax > val->len )
1727 memcpy (value, val->value, valmax);
1733 } while( slot = bt_findnext (bt, set, slot) );
1735 bt_unlockpage (bt, BtLockRead, set->latch);
1736 bt_unpinlatch (set->latch);
1740 // check page for space available,
1741 // clean if necessary and return
1742 // 0 - page needs splitting
1743 // >0 new slot value
1745 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1747 uint nxt = bt->mgr->page_size;
1748 BtPage page = set->page;
1749 uint cnt = 0, idx = 0;
1750 uint max = page->cnt;
1755 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1758 // skip cleanup and proceed to split
1759 // if there's not enough garbage
1762 if( page->garbage < nxt / 5 )
1765 memcpy (bt->frame, page, bt->mgr->page_size);
1767 // skip page info and set rest of page to zero
1769 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1770 set->latch->dirty = 1;
1774 // clean up page first by
1775 // removing deleted keys
1777 while( cnt++ < max ) {
1781 if( cnt < max || bt->frame->lvl )
1782 if( slotptr(bt->frame,cnt)->dead )
1785 // copy the value across
1787 val = valptr(bt->frame, cnt);
1788 nxt -= val->len + sizeof(BtVal);
1789 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1791 // copy the key across
1793 key = keyptr(bt->frame, cnt);
1794 nxt -= key->len + sizeof(BtKey);
1795 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1797 // make a librarian slot
1799 slotptr(page, ++idx)->off = nxt;
1800 slotptr(page, idx)->type = Librarian;
1801 slotptr(page, idx)->dead = 1;
1805 slotptr(page, ++idx)->off = nxt;
1806 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1808 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1815 // see if page has enough space now, or does it need splitting?
1817 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1823 // split the root and raise the height of the btree
1825 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1827 unsigned char leftkey[BT_keyarray];
1828 uint nxt = bt->mgr->page_size;
1829 unsigned char value[BtId];
1835 // save left page fence key for new root
1837 ptr = keyptr(root->page, root->page->cnt);
1838 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1840 // Obtain an empty page to use, and copy the current
1841 // root contents into it, e.g. lower keys
1843 if( bt_newpage(bt, left, root->page) )
1846 left_page_no = left->latch->page_no;
1847 bt_unpinlatch (left->latch);
1849 // preserve the page info at the bottom
1850 // of higher keys and set rest to zero
1852 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1854 // insert stopper key at top of newroot page
1855 // and increase the root height
1857 nxt -= BtId + sizeof(BtVal);
1858 bt_putid (value, right->page_no);
1859 val = (BtVal *)((unsigned char *)root->page + nxt);
1860 memcpy (val->value, value, BtId);
1863 nxt -= 2 + sizeof(BtKey);
1864 slotptr(root->page, 2)->off = nxt;
1865 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1870 // insert lower keys page fence key on newroot page as first key
1872 nxt -= BtId + sizeof(BtVal);
1873 bt_putid (value, left_page_no);
1874 val = (BtVal *)((unsigned char *)root->page + nxt);
1875 memcpy (val->value, value, BtId);
1878 ptr = (BtKey *)leftkey;
1879 nxt -= ptr->len + sizeof(BtKey);
1880 slotptr(root->page, 1)->off = nxt;
1881 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1883 bt_putid(root->page->right, 0);
1884 root->page->min = nxt; // reset lowest used offset and key count
1885 root->page->cnt = 2;
1886 root->page->act = 2;
1889 // release and unpin root pages
1891 bt_unlockpage(bt, BtLockWrite, root->latch);
1892 bt_unpinlatch (root->latch);
1894 bt_unpinlatch (right);
1898 // split already locked full node
1900 // return pool entry for new right
1903 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1905 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1906 uint lvl = set->page->lvl;
1913 // split higher half of keys to bt->frame
1915 memset (bt->frame, 0, bt->mgr->page_size);
1916 max = set->page->cnt;
1920 while( cnt++ < max ) {
1921 if( cnt < max || set->page->lvl )
1922 if( slotptr(set->page, cnt)->dead )
1925 src = valptr(set->page, cnt);
1926 nxt -= src->len + sizeof(BtVal);
1927 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1929 key = keyptr(set->page, cnt);
1930 nxt -= key->len + sizeof(BtKey);
1931 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1932 memcpy (ptr, key, key->len + sizeof(BtKey));
1934 // add librarian slot
1936 slotptr(bt->frame, ++idx)->off = nxt;
1937 slotptr(bt->frame, idx)->type = Librarian;
1938 slotptr(bt->frame, idx)->dead = 1;
1942 slotptr(bt->frame, ++idx)->off = nxt;
1943 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1945 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1949 bt->frame->bits = bt->mgr->page_bits;
1950 bt->frame->min = nxt;
1951 bt->frame->cnt = idx;
1952 bt->frame->lvl = lvl;
1956 if( set->latch->page_no > ROOT_page )
1957 bt_putid (bt->frame->right, bt_getid (set->page->right));
1959 // get new free page and write higher keys to it.
1961 if( bt_newpage(bt, right, bt->frame) )
1964 memcpy (bt->frame, set->page, bt->mgr->page_size);
1965 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1966 set->latch->dirty = 1;
1968 nxt = bt->mgr->page_size;
1969 set->page->garbage = 0;
1975 if( slotptr(bt->frame, max)->type == Librarian )
1978 // assemble page of smaller keys
1980 while( cnt++ < max ) {
1981 if( slotptr(bt->frame, cnt)->dead )
1983 val = valptr(bt->frame, cnt);
1984 nxt -= val->len + sizeof(BtVal);
1985 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1987 key = keyptr(bt->frame, cnt);
1988 nxt -= key->len + sizeof(BtKey);
1989 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1991 // add librarian slot
1994 slotptr(set->page, ++idx)->off = nxt;
1995 slotptr(set->page, idx)->type = Librarian;
1996 slotptr(set->page, idx)->dead = 1;
2001 slotptr(set->page, ++idx)->off = nxt;
2002 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2006 bt_putid(set->page->right, right->latch->page_no);
2007 set->page->min = nxt;
2008 set->page->cnt = idx;
2010 return right->latch->entry;
2013 // fix keys for newly split page
2014 // call with page locked, return
2017 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2019 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2020 unsigned char value[BtId];
2021 uint lvl = set->page->lvl;
2025 // if current page is the root page, split it
2027 if( set->latch->page_no == ROOT_page )
2028 return bt_splitroot (bt, set, right);
2030 ptr = keyptr(set->page, set->page->cnt);
2031 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2033 page = bt_mappage (bt, right);
2035 ptr = keyptr(page, page->cnt);
2036 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2038 // insert new fences in their parent pages
2040 bt_lockpage (bt, BtLockParent, right);
2042 bt_lockpage (bt, BtLockParent, set->latch);
2043 bt_unlockpage (bt, BtLockWrite, set->latch);
2045 // insert new fence for reformulated left block of smaller keys
2047 bt_putid (value, set->latch->page_no);
2048 ptr = (BtKey *)leftkey;
2050 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2053 // switch fence for right block of larger keys to new right page
2055 bt_putid (value, right->page_no);
2056 ptr = (BtKey *)rightkey;
2058 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2061 bt_unlockpage (bt, BtLockParent, set->latch);
2062 bt_unpinlatch (set->latch);
2064 bt_unlockpage (bt, BtLockParent, right);
2065 bt_unpinlatch (right);
2069 // install new key and value onto page
2070 // page must already be checked for
2073 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2075 uint idx, librarian;
2080 // if found slot > desired slot and previous slot
2081 // is a librarian slot, use it
2084 if( slotptr(set->page, slot-1)->type == Librarian )
2087 // copy value onto page
2089 set->page->min -= vallen + sizeof(BtVal);
2090 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2091 memcpy (val->value, value, vallen);
2094 // copy key onto page
2096 set->page->min -= keylen + sizeof(BtKey);
2097 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2098 memcpy (ptr->key, key, keylen);
2101 // find first empty slot
2103 for( idx = slot; idx < set->page->cnt; idx++ )
2104 if( slotptr(set->page, idx)->dead )
2107 // now insert key into array before slot
2109 if( idx == set->page->cnt )
2110 idx += 2, set->page->cnt += 2, librarian = 2;
2114 set->latch->dirty = 1;
2117 while( idx > slot + librarian - 1 )
2118 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2120 // add librarian slot
2122 if( librarian > 1 ) {
2123 node = slotptr(set->page, slot++);
2124 node->off = set->page->min;
2125 node->type = Librarian;
2131 node = slotptr(set->page, slot);
2132 node->off = set->page->min;
2137 bt_unlockpage (bt, BtLockWrite, set->latch);
2138 bt_unpinlatch (set->latch);
2144 // Insert new key into the btree at given level.
2145 // either add a new key or update/add an existing one
2147 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2149 unsigned char newkey[BT_keyarray];
2150 uint slot, idx, len, entry;
2157 // set up the key we're working on
2159 ins = (BtKey*)newkey;
2160 memcpy (ins->key, key, keylen);
2163 // is this a non-unique index value?
2169 sequence = bt_newdup (bt);
2170 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2174 while( 1 ) { // find the page and slot for the current key
2175 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2176 ptr = keyptr(set->page, slot);
2179 bt->err = BTERR_ovflw;
2183 // if librarian slot == found slot, advance to real slot
2185 if( slotptr(set->page, slot)->type == Librarian )
2186 if( !keycmp (ptr, key, keylen) )
2187 ptr = keyptr(set->page, ++slot);
2191 if( slotptr(set->page, slot)->type == Duplicate )
2194 // if inserting a duplicate key or unique key
2195 // check for adequate space on the page
2196 // and insert the new key before slot.
2198 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2199 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2200 if( !(entry = bt_splitpage (bt, set)) )
2202 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2207 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2210 // if key already exists, update value and return
2212 val = valptr(set->page, slot);
2214 if( val->len >= vallen ) {
2215 if( slotptr(set->page, slot)->dead )
2217 set->page->garbage += val->len - vallen;
2218 set->latch->dirty = 1;
2219 slotptr(set->page, slot)->dead = 0;
2221 memcpy (val->value, value, vallen);
2222 bt_unlockpage(bt, BtLockWrite, set->latch);
2223 bt_unpinlatch (set->latch);
2227 // new update value doesn't fit in existing value area
2229 if( !slotptr(set->page, slot)->dead )
2230 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2232 slotptr(set->page, slot)->dead = 0;
2236 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2237 if( !(entry = bt_splitpage (bt, set)) )
2239 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2244 set->page->min -= vallen + sizeof(BtVal);
2245 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2246 memcpy (val->value, value, vallen);
2249 set->latch->dirty = 1;
2250 set->page->min -= keylen + sizeof(BtKey);
2251 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2252 memcpy (ptr->key, key, keylen);
2255 slotptr(set->page, slot)->off = set->page->min;
2256 bt_unlockpage(bt, BtLockWrite, set->latch);
2257 bt_unpinlatch (set->latch);
2264 uint entry; // latch table entry number
2265 uint slot:31; // page slot number
2266 uint reuse:1; // reused previous page
2270 uid page_no; // page number for split leaf
2271 void *next; // next key to insert
2272 uint entry:29; // latch table entry number
2273 uint type:2; // 0 == insert, 1 == delete, 2 == free
2274 uint nounlock:1; // don't unlock ParentModification
2275 unsigned char leafkey[BT_keyarray];
2278 // find and load leaf page for given key
2279 // leave page Atomic locked and Read locked.
2281 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2283 BtLatchSet *prevlatch;
2287 // find level one slot
2289 if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2292 // find next non-dead entry on this page
2293 // it will be the fence key if nothing else
2295 while( slotptr(set->page, slot)->dead )
2296 if( slot++ < set->page->cnt )
2299 return bt->err = BTERR_struct, 0;
2301 page_no = bt_getid(valptr(set->page, slot)->value);
2302 prevlatch = set->latch;
2305 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2306 set->page = bt_mappage (bt, set->latch);
2310 // obtain read lock using lock chaining with Access mode
2311 // release & unpin parent/left sibling page
2313 bt_lockpage(bt, BtLockAccess, set->latch);
2315 bt_unlockpage(bt, BtLockRead, prevlatch);
2316 bt_unpinlatch (prevlatch);
2318 bt_lockpage(bt, BtLockRead, set->latch);
2320 // find key on page at this level
2321 // and descend to requested level
2323 if( !set->page->kill )
2324 if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2325 bt_unlockpage(bt, BtLockRead, set->latch);
2326 bt_lockpage(bt, BtLockAtomic, set->latch);
2327 bt_lockpage(bt, BtLockRead, set->latch);
2329 if( !set->page->kill )
2330 if( slot = bt_findslot (set->page, key, len) ) {
2331 bt_unlockpage(bt, BtLockAccess, set->latch);
2335 bt_unlockpage(bt, BtLockAtomic, set->latch);
2338 // slide right into next page
2340 bt_unlockpage(bt, BtLockAccess, set->latch);
2341 page_no = bt_getid(set->page->right);
2342 prevlatch = set->latch;
2345 // return error on end of right chain
2347 bt->err = BTERR_struct;
2348 return 0; // return error
2351 // determine actual page where key is located
2352 // return slot number
2354 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2356 BtKey *key = keyptr(source,src);
2357 uint slot = locks[src].slot;
2360 if( src > 1 && locks[src].reuse )
2361 entry = locks[src-1].entry, slot = 0;
2363 entry = locks[src].entry;
2366 set->latch = bt->mgr->latchsets + entry;
2367 set->page = bt_mappage (bt, set->latch);
2371 // is locks->reuse set? or was slot zeroed?
2372 // if so, find where our key is located
2373 // on current page or pages split on
2374 // same page txn operations.
2377 set->latch = bt->mgr->latchsets + entry;
2378 set->page = bt_mappage (bt, set->latch);
2380 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2381 if( slotptr(set->page, slot)->type == Librarian )
2383 if( locks[src].reuse )
2384 locks[src].entry = entry;
2387 } while( entry = set->latch->split );
2389 bt->err = BTERR_atomic;
2393 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2395 BtKey *key = keyptr(source, src);
2396 BtVal *val = valptr(source, src);
2401 while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2402 if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
2403 return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2405 if( entry = bt_splitpage (bt, set) )
2406 latch = bt->mgr->latchsets + entry;
2410 // splice right page into split chain
2411 // and WriteLock it.
2413 bt_lockpage(bt, BtLockWrite, latch);
2414 latch->split = set->latch->split;
2415 set->latch->split = entry;
2416 locks[src].slot = 0;
2419 return bt->err = BTERR_atomic;
2422 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2424 BtKey *key = keyptr(source, src);
2425 uint idx, entry, slot;
2430 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2431 ptr = keyptr(set->page, slot);
2433 return bt->err = BTERR_struct;
2435 if( !keycmp (ptr, key->key, key->len) )
2436 if( !slotptr(set->page, slot)->dead )
2437 slotptr(set->page, slot)->dead = 1;
2443 val = valptr(set->page, slot);
2444 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2445 set->latch->dirty = 1;
2451 // delete an empty master page for a transaction
2453 // note that the far right page never empties because
2454 // it always contains (at least) the infinite stopper key
2455 // and that all pages that don't contain any keys are
2456 // deleted, or are being held under Atomic lock.
2458 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2460 BtPageSet right[1], temp[1];
2461 unsigned char value[BtId];
2465 bt_lockpage(bt, BtLockWrite, prev->latch);
2467 // grab the right sibling
2469 if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2470 right->page = bt_mappage (bt, right->latch);
2474 bt_lockpage(bt, BtLockAtomic, right->latch);
2475 bt_lockpage(bt, BtLockWrite, right->latch);
2477 // and pull contents over empty page
2478 // while preserving master's left link
2480 memcpy (right->page->left, prev->page->left, BtId);
2481 memcpy (prev->page, right->page, bt->mgr->page_size);
2483 // forward seekers to old right sibling
2484 // to new page location in set
2486 bt_putid (right->page->right, prev->latch->page_no);
2487 right->latch->dirty = 1;
2488 right->page->kill = 1;
2490 // remove pointer to right page for searchers by
2491 // changing right fence key to point to the master page
2493 ptr = keyptr(right->page,right->page->cnt);
2494 bt_putid (value, prev->latch->page_no);
2496 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2499 // now that master page is in good shape we can
2500 // remove its locks.
2502 bt_unlockpage (bt, BtLockAtomic, prev->latch);
2503 bt_unlockpage (bt, BtLockWrite, prev->latch);
2505 // fix master's right sibling's left pointer
2506 // to remove scanner's poiner to the right page
2508 if( right_page_no = bt_getid (prev->page->right) ) {
2509 if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2510 temp->page = bt_mappage (bt, temp->latch);
2512 bt_lockpage (bt, BtLockWrite, temp->latch);
2513 bt_putid (temp->page->left, prev->latch->page_no);
2514 temp->latch->dirty = 1;
2516 bt_unlockpage (bt, BtLockWrite, temp->latch);
2517 bt_unpinlatch (temp->latch);
2518 } else { // master is now the far right page
2519 bt_spinwritelock (bt->mgr->lock);
2520 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2521 bt_spinreleasewrite(bt->mgr->lock);
2524 // now that there are no pointers to the right page
2525 // we can delete it after the last read access occurs
2527 bt_unlockpage (bt, BtLockWrite, right->latch);
2528 bt_unlockpage (bt, BtLockAtomic, right->latch);
2529 bt_lockpage (bt, BtLockDelete, right->latch);
2530 bt_lockpage (bt, BtLockWrite, right->latch);
2531 bt_freepage (bt, right);
2535 // atomic modification of a batch of keys.
2537 // return -1 if BTERR is set
2538 // otherwise return slot number
2539 // causing the key constraint violation
2540 // or zero on successful completion.
2542 int bt_atomictxn (BtDb *bt, BtPage source)
2544 uint src, idx, slot, samepage, entry;
2545 AtomicKey *head, *tail, *leaf;
2546 BtPageSet set[1], prev[1];
2547 unsigned char value[BtId];
2548 BtKey *key, *ptr, *key2;
2558 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2562 // stable sort the list of keys into order to
2563 // prevent deadlocks between threads.
2565 for( src = 1; src++ < source->cnt; ) {
2566 *temp = *slotptr(source,src);
2567 key = keyptr (source,src);
2569 for( idx = src; --idx; ) {
2570 key2 = keyptr (source,idx);
2571 if( keycmp (key, key2->key, key2->len) < 0 ) {
2572 *slotptr(source,idx+1) = *slotptr(source,idx);
2573 *slotptr(source,idx) = *temp;
2579 // Load the leaf page for each key
2580 // group same page references with reuse bit
2581 // and determine any constraint violations
2583 for( src = 0; src++ < source->cnt; ) {
2584 key = keyptr(source, src);
2587 // first determine if this modification falls
2588 // on the same page as the previous modification
2589 // note that the far right leaf page is a special case
2591 if( samepage = src > 1 )
2592 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2593 slot = bt_findslot(set->page, key->key, key->len);
2594 else // release read on previous page
2595 bt_unlockpage(bt, BtLockRead, set->latch);
2598 if( slot = bt_atomicload(bt, set, key->key, key->len) )
2599 set->latch->split = 0;
2603 if( slotptr(set->page, slot)->type == Librarian )
2604 ptr = keyptr(set->page, ++slot);
2606 ptr = keyptr(set->page, slot);
2609 locks[src].entry = set->latch->entry;
2610 locks[src].slot = slot;
2611 locks[src].reuse = 0;
2613 locks[src].entry = 0;
2614 locks[src].slot = 0;
2615 locks[src].reuse = 1;
2618 switch( slotptr(source, src)->type ) {
2621 if( !slotptr(set->page, slot)->dead )
2622 if( slot < set->page->cnt || bt_getid (set->page->right) )
2623 if( !keycmp (ptr, key->key, key->len) ) {
2625 // return constraint violation if key already exists
2627 bt_unlockpage(bt, BtLockRead, set->latch);
2631 if( locks[src].entry ) {
2632 set->latch = bt->mgr->latchsets + locks[src].entry;
2633 bt_unlockpage(bt, BtLockAtomic, set->latch);
2634 bt_unpinlatch (set->latch);
2645 // unlock last loadpage lock
2647 if( source->cnt > 1 )
2648 bt_unlockpage(bt, BtLockRead, set->latch);
2650 // obtain write lock for each master page
2652 for( src = 0; src++ < source->cnt; )
2653 if( locks[src].reuse )
2656 bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2658 // insert or delete each key
2659 // process any splits or merges
2660 // release Write & Atomic latches
2661 // set ParentModifications and build
2662 // queue of keys to insert for split pages
2663 // or delete for deleted pages.
2665 // run through txn list backwards
2667 samepage = source->cnt + 1;
2669 for( src = source->cnt; src; src-- ) {
2670 if( locks[src].reuse )
2673 // perform the txn operations
2674 // from smaller to larger on
2677 for( idx = src; idx < samepage; idx++ )
2678 switch( slotptr(source,idx)->type ) {
2680 if( bt_atomicdelete (bt, source, locks, idx) )
2686 if( bt_atomicinsert (bt, source, locks, idx) )
2691 // after the same page operations have finished,
2692 // process master page for splits or deletion.
2694 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2695 prev->page = bt_mappage (bt, prev->latch);
2698 // pick-up all splits from master page
2699 // each one is already WriteLocked.
2701 entry = prev->latch->split;
2704 set->latch = bt->mgr->latchsets + entry;
2705 set->page = bt_mappage (bt, set->latch);
2706 entry = set->latch->split;
2708 // delete empty master page by undoing its split
2709 // (this is potentially another empty page)
2710 // note that there are no new left pointers yet
2712 if( !prev->page->act ) {
2713 memcpy (set->page->left, prev->page->left, BtId);
2714 memcpy (prev->page, set->page, bt->mgr->page_size);
2715 bt_lockpage (bt, BtLockDelete, set->latch);
2716 bt_freepage (bt, set);
2718 prev->latch->dirty = 1;
2722 // remove empty page from the split chain
2724 if( !set->page->act ) {
2725 memcpy (prev->page->right, set->page->right, BtId);
2726 prev->latch->split = set->latch->split;
2727 bt_lockpage (bt, BtLockDelete, set->latch);
2728 bt_freepage (bt, set);
2732 // schedule prev fence key update
2734 ptr = keyptr(prev->page,prev->page->cnt);
2735 leaf = calloc (sizeof(AtomicKey), 1);
2737 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2738 leaf->page_no = prev->latch->page_no;
2739 leaf->entry = prev->latch->entry;
2749 // splice in the left link into the split page
2751 bt_putid (set->page->left, prev->latch->page_no);
2752 bt_lockpage(bt, BtLockParent, prev->latch);
2753 bt_unlockpage(bt, BtLockWrite, prev->latch);
2757 // update left pointer in next right page from last split page
2758 // (if all splits were reversed, latch->split == 0)
2760 if( latch->split ) {
2761 // fix left pointer in master's original (now split)
2762 // far right sibling or set rightmost page in page zero
2764 if( right = bt_getid (prev->page->right) ) {
2765 if( set->latch = bt_pinlatch (bt, right, 1) )
2766 set->page = bt_mappage (bt, set->latch);
2770 bt_lockpage (bt, BtLockWrite, set->latch);
2771 bt_putid (set->page->left, prev->latch->page_no);
2772 set->latch->dirty = 1;
2773 bt_unlockpage (bt, BtLockWrite, set->latch);
2774 bt_unpinlatch (set->latch);
2775 } else { // prev is rightmost page
2776 bt_spinwritelock (bt->mgr->lock);
2777 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2778 bt_spinreleasewrite(bt->mgr->lock);
2781 // Process last page split in chain
2783 ptr = keyptr(prev->page,prev->page->cnt);
2784 leaf = calloc (sizeof(AtomicKey), 1);
2786 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2787 leaf->page_no = prev->latch->page_no;
2788 leaf->entry = prev->latch->entry;
2798 bt_lockpage(bt, BtLockParent, prev->latch);
2799 bt_unlockpage(bt, BtLockWrite, prev->latch);
2801 // remove atomic lock on master page
2803 bt_unlockpage(bt, BtLockAtomic, latch);
2807 // finished if prev page occupied (either master or final split)
2809 if( prev->page->act ) {
2810 bt_unlockpage(bt, BtLockWrite, latch);
2811 bt_unlockpage(bt, BtLockAtomic, latch);
2812 bt_unpinlatch(latch);
2816 // any and all splits were reversed, and the
2817 // master page located in prev is empty, delete it
2818 // by pulling over master's right sibling.
2820 // Remove empty master's fence key
2822 ptr = keyptr(prev->page,prev->page->cnt);
2824 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2827 // perform the remainder of the delete
2828 // from the FIFO queue
2830 leaf = calloc (sizeof(AtomicKey), 1);
2832 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2833 leaf->page_no = prev->latch->page_no;
2834 leaf->entry = prev->latch->entry;
2845 // leave atomic lock in place until
2846 // deletion completes in next phase.
2848 bt_unlockpage(bt, BtLockWrite, prev->latch);
2851 // add & delete keys for any pages split or merged during transaction
2855 set->latch = bt->mgr->latchsets + leaf->entry;
2856 set->page = bt_mappage (bt, set->latch);
2858 bt_putid (value, leaf->page_no);
2859 ptr = (BtKey *)leaf->leafkey;
2861 switch( leaf->type ) {
2862 case 0: // insert key
2863 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2868 case 1: // delete key
2869 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2874 case 2: // free page
2875 if( bt_atomicfree (bt, set) )
2881 if( !leaf->nounlock )
2882 bt_unlockpage (bt, BtLockParent, set->latch);
2884 bt_unpinlatch (set->latch);
2887 } while( leaf = tail );
2895 // set cursor to highest slot on highest page
2897 uint bt_lastkey (BtDb *bt)
2899 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2902 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2903 set->page = bt_mappage (bt, set->latch);
2907 bt_lockpage(bt, BtLockRead, set->latch);
2908 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2909 bt_unlockpage(bt, BtLockRead, set->latch);
2910 bt_unpinlatch (set->latch);
2912 bt->cursor_page = page_no;
2913 return bt->cursor->cnt;
2916 // return previous slot on cursor page
2918 uint bt_prevkey (BtDb *bt, uint slot)
2920 uid ourright, next, us = bt->cursor_page;
2926 ourright = bt_getid(bt->cursor->right);
2929 if( !(next = bt_getid(bt->cursor->left)) )
2933 bt->cursor_page = next;
2935 if( set->latch = bt_pinlatch (bt, next, 1) )
2936 set->page = bt_mappage (bt, set->latch);
2940 bt_lockpage(bt, BtLockRead, set->latch);
2941 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2942 bt_unlockpage(bt, BtLockRead, set->latch);
2943 bt_unpinlatch (set->latch);
2945 next = bt_getid (bt->cursor->right);
2947 if( bt->cursor->kill )
2951 if( next == ourright )
2956 return bt->cursor->cnt;
2959 // return next slot on cursor page
2960 // or slide cursor right into next page
2962 uint bt_nextkey (BtDb *bt, uint slot)
2968 right = bt_getid(bt->cursor->right);
2970 while( slot++ < bt->cursor->cnt )
2971 if( slotptr(bt->cursor,slot)->dead )
2973 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2981 bt->cursor_page = right;
2983 if( set->latch = bt_pinlatch (bt, right, 1) )
2984 set->page = bt_mappage (bt, set->latch);
2988 bt_lockpage(bt, BtLockRead, set->latch);
2990 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2992 bt_unlockpage(bt, BtLockRead, set->latch);
2993 bt_unpinlatch (set->latch);
3001 // cache page of keys into cursor and return starting slot for given key
3003 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3008 // cache page for retrieval
3010 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3011 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3015 bt->cursor_page = set->latch->page_no;
3017 bt_unlockpage(bt, BtLockRead, set->latch);
3018 bt_unpinlatch (set->latch);
3022 BtKey *bt_key(BtDb *bt, uint slot)
3024 return keyptr(bt->cursor, slot);
3027 BtVal *bt_val(BtDb *bt, uint slot)
3029 return valptr(bt->cursor,slot);
3035 double getCpuTime(int type)
3038 FILETIME xittime[1];
3039 FILETIME systime[1];
3040 FILETIME usrtime[1];
3041 SYSTEMTIME timeconv[1];
3044 memset (timeconv, 0, sizeof(SYSTEMTIME));
3048 GetSystemTimeAsFileTime (xittime);
3049 FileTimeToSystemTime (xittime, timeconv);
3050 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3053 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3054 FileTimeToSystemTime (usrtime, timeconv);
3057 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3058 FileTimeToSystemTime (systime, timeconv);
3062 ans += (double)timeconv->wHour * 3600;
3063 ans += (double)timeconv->wMinute * 60;
3064 ans += (double)timeconv->wSecond;
3065 ans += (double)timeconv->wMilliseconds / 1000;
3070 #include <sys/resource.h>
3072 double getCpuTime(int type)
3074 struct rusage used[1];
3075 struct timeval tv[1];
3079 gettimeofday(tv, NULL);
3080 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3083 getrusage(RUSAGE_SELF, used);
3084 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3087 getrusage(RUSAGE_SELF, used);
3088 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3095 void bt_poolaudit (BtMgr *mgr)
3100 while( slot++ < mgr->latchdeployed ) {
3101 latch = mgr->latchsets + slot;
3103 if( *latch->readwr->rin & MASK )
3104 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3105 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3107 if( *latch->access->rin & MASK )
3108 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3109 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3111 if( *latch->parent->ticket != *latch->parent->serving )
3112 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3113 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3115 if( latch->pin & ~CLOCK_bit ) {
3116 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3122 uint bt_latchaudit (BtDb *bt)
3124 ushort idx, hashidx;
3130 if( *(ushort *)(bt->mgr->lock) )
3131 fprintf(stderr, "Alloc page locked\n");
3132 *(ushort *)(bt->mgr->lock) = 0;
3134 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3135 latch = bt->mgr->latchsets + idx;
3136 if( *latch->readwr->rin & MASK )
3137 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3138 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3140 if( *latch->access->rin & MASK )
3141 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3142 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3144 if( *latch->parent->ticket != *latch->parent->serving )
3145 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3146 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3149 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3154 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3155 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3156 fprintf(stderr, "hash entry %d locked\n", hashidx);
3158 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3160 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3161 latch = bt->mgr->latchsets + idx;
3163 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3164 } while( idx = latch->next );
3167 page_no = LEAF_page;
3169 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3170 uid off = page_no << bt->mgr->page_bits;
3172 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3176 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3178 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3179 return bt->err = BTERR_map;
3181 if( *amt < bt->mgr->page_size )
3182 return bt->err = BTERR_map;
3184 if( !bt->frame->free && !bt->frame->lvl )
3185 cnt += bt->frame->act;
3189 cnt--; // remove stopper key
3190 fprintf(stderr, " Total keys read %d\n", cnt);
3204 // standalone program to index file of keys
3205 // then list them onto std-out
3208 void *index_file (void *arg)
3210 uint __stdcall index_file (void *arg)
3213 int line = 0, found = 0, cnt = 0, idx;
3214 uid next, page_no = LEAF_page; // start on first page of leaves
3215 int ch, len = 0, slot, type = 0;
3216 unsigned char key[BT_maxkey];
3217 unsigned char txn[65536];
3218 ThreadArg *args = arg;
3227 bt = bt_open (args->mgr);
3230 if( args->idx < strlen (args->type) )
3231 ch = args->type[args->idx];
3233 ch = args->type[strlen(args->type) - 1];
3238 fprintf(stderr, "started latch mgr audit\n");
3239 cnt = bt_latchaudit (bt);
3240 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3251 if( type == Delete )
3252 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3254 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3256 if( type == Delete )
3257 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3259 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3261 if( in = fopen (args->infile, "rb") )
3262 while( ch = getc(in), ch != EOF )
3268 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3269 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3275 memcpy (txn + nxt, key + 10, len - 10);
3277 txn[nxt] = len - 10;
3279 memcpy (txn + nxt, key, 10);
3282 slotptr(page,++cnt)->off = nxt;
3283 slotptr(page,cnt)->type = type;
3286 if( cnt < args->num )
3293 if( bt_atomictxn (bt, page) )
3294 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3299 else if( len < BT_maxkey )
3301 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3305 fprintf(stderr, "started indexing for %s\n", args->infile);
3306 if( in = fopen (args->infile, "r") )
3307 while( ch = getc(in), ch != EOF )
3312 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3313 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3316 else if( len < BT_maxkey )
3318 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3322 fprintf(stderr, "started finding keys for %s\n", args->infile);
3323 if( in = fopen (args->infile, "rb") )
3324 while( ch = getc(in), ch != EOF )
3328 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3331 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3334 else if( len < BT_maxkey )
3336 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3340 fprintf(stderr, "started scanning\n");
3342 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3343 set->page = bt_mappage (bt, set->latch);
3345 fprintf(stderr, "unable to obtain latch"), exit(1);
3346 bt_lockpage (bt, BtLockRead, set->latch);
3347 next = bt_getid (set->page->right);
3349 for( slot = 0; slot++ < set->page->cnt; )
3350 if( next || slot < set->page->cnt )
3351 if( !slotptr(set->page, slot)->dead ) {
3352 ptr = keyptr(set->page, slot);
3355 if( slotptr(set->page, slot)->type == Duplicate )
3358 fwrite (ptr->key, len, 1, stdout);
3359 val = valptr(set->page, slot);
3360 fwrite (val->value, val->len, 1, stdout);
3361 fputc ('\n', stdout);
3365 bt_unlockpage (bt, BtLockRead, set->latch);
3366 bt_unpinlatch (set->latch);
3367 } while( page_no = next );
3369 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3373 fprintf(stderr, "started reverse scan\n");
3374 if( slot = bt_lastkey (bt) )
3375 while( slot = bt_prevkey (bt, slot) ) {
3376 if( slotptr(bt->cursor, slot)->dead )
3379 ptr = keyptr(bt->cursor, slot);
3382 if( slotptr(bt->cursor, slot)->type == Duplicate )
3385 fwrite (ptr->key, len, 1, stdout);
3386 val = valptr(bt->cursor, slot);
3387 fwrite (val->value, val->len, 1, stdout);
3388 fputc ('\n', stdout);
3392 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3397 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3399 fprintf(stderr, "started counting\n");
3400 page_no = LEAF_page;
3402 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3403 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3406 if( !bt->frame->free && !bt->frame->lvl )
3407 cnt += bt->frame->act;
3413 cnt--; // remove stopper key
3414 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3426 typedef struct timeval timer;
3428 int main (int argc, char **argv)
3430 int idx, cnt, len, slot, err;
3431 int segsize, bits = 16;
3448 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3449 fprintf (stderr, " where idx_file is the name of the btree file\n");
3450 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3451 fprintf (stderr, " page_bits is the page size in bits\n");
3452 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3453 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3454 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3458 start = getCpuTime(0);
3461 bits = atoi(argv[3]);
3464 poolsize = atoi(argv[4]);
3467 fprintf (stderr, "Warning: no mapped_pool\n");
3470 num = atoi(argv[5]);
3474 threads = malloc (cnt * sizeof(pthread_t));
3476 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3478 args = malloc (cnt * sizeof(ThreadArg));
3480 mgr = bt_mgr ((argv[1]), bits, poolsize);
3483 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3489 for( idx = 0; idx < cnt; idx++ ) {
3490 args[idx].infile = argv[idx + 6];
3491 args[idx].type = argv[2];
3492 args[idx].mgr = mgr;
3493 args[idx].num = num;
3494 args[idx].idx = idx;
3496 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3497 fprintf(stderr, "Error creating thread %d\n", err);
3499 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3503 // wait for termination
3506 for( idx = 0; idx < cnt; idx++ )
3507 pthread_join (threads[idx], NULL);
3509 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3511 for( idx = 0; idx < cnt; idx++ )
3512 CloseHandle(threads[idx]);
3518 elapsed = getCpuTime(0) - start;
3519 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3520 elapsed = getCpuTime(1);
3521 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3522 elapsed = getCpuTime(2);
3523 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);