1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and ACID batched key-value updates
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
88 There are six lock types for each node in four independent sets:
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
106 // definition for phase-fair reader/writer lock implementation
115 // write only queue lock
127 // definition for spin latch implementation
129 // exclusive is set for write access
130 // share is count of read accessors
131 // grant write lock when share == 0
133 volatile typedef struct {
144 // hash table entries
147 volatile uint slot; // Latch table entry at head of chain
148 BtSpinLatch latch[1];
151 // latch manager table structure
154 uid page_no; // latch set page number
155 RWLock readwr[1]; // read/write page lock
156 RWLock access[1]; // Access Intent/Page delete
157 WOLock parent[1]; // Posting of fence key in parent
158 WOLock atomic[1]; // Atomic update in progress
159 uint split; // right split page atomic insert
160 uint entry; // entry slot in latch table
161 uint next; // next entry in hash table chain
162 uint prev; // prev entry in hash table chain
163 volatile ushort pin; // number of outstanding threads
164 ushort dirty:1; // page in cache is dirty
167 // Define the length of the page record numbers
171 // Page key slot definition.
173 // Keys are marked dead, but remain on the page until
174 // it cleanup is called. The fence key (highest key) for
175 // a leaf page is always present, even after cleanup.
179 // In addition to the Unique keys that occupy slots
180 // there are Librarian and Duplicate key
181 // slots occupying the key slot array.
183 // The Librarian slots are dead keys that
184 // serve as filler, available to add new Unique
185 // or Dup slots that are inserted into the B-tree.
187 // The Duplicate slots have had their key bytes extended
188 // by 6 bytes to contain a binary duplicate key uniqueifier.
199 uint off:BT_maxbits; // page offset for key start
200 uint type:3; // type of slot
201 uint dead:1; // set for deleted slot
204 // The key structure occupies space at the upper end of
205 // each page. It's a length byte followed by the key
209 unsigned char len; // this can be changed to a ushort or uint
210 unsigned char key[0];
213 // the value structure also occupies space at the upper
214 // end of the page. Each key is immediately followed by a value.
217 unsigned char len; // this can be changed to a ushort or uint
218 unsigned char value[0];
221 #define BT_maxkey 255 // maximum number of bytes in a key
222 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
224 // The first part of an index page.
225 // It is immediately followed
226 // by the BtSlot array of keys.
228 // note that this structure size
229 // must be a multiple of 8 bytes
230 // in order to place dups correctly.
232 typedef struct BtPage_ {
233 uint cnt; // count of keys in page
234 uint act; // count of active keys
235 uint min; // next key offset
236 uint garbage; // page garbage in bytes
237 unsigned char bits:7; // page size in bits
238 unsigned char free:1; // page is on free chain
239 unsigned char lvl:7; // level of page
240 unsigned char kill:1; // page is being deleted
241 unsigned char right[BtId]; // page number to right
242 unsigned char left[BtId]; // page number to left
243 unsigned char filler[2]; // padding to multiple of 8
246 // The loadpage interface object
249 BtPage page; // current page pointer
250 BtLatchSet *latch; // current page latch set
253 // structure for latch manager on ALLOC_page
256 struct BtPage_ alloc[1]; // next page_no in right ptr
257 unsigned long long dups[1]; // global duplicate key uniqueifier
258 unsigned char chain[BtId]; // head of free page_nos chain
261 // The object structure for Btree access
264 uint page_size; // page size
265 uint page_bits; // page size in bits
271 BtPageZero *pagezero; // mapped allocation page
272 BtSpinLatch lock[1]; // allocation area lite latch
273 uint latchdeployed; // highest number of latch entries deployed
274 uint nlatchpage; // number of latch pages at BT_latch
275 uint latchtotal; // number of page latch entries
276 uint latchhash; // number of latch hash table slots
277 uint latchvictim; // next latch entry to examine
278 BtHashEntry *hashtable; // the buffer pool hash table entries
279 BtLatchSet *latchsets; // mapped latch set from buffer pool
280 unsigned char *pagepool; // mapped to the buffer pool pages
282 HANDLE halloc; // allocation handle
283 HANDLE hpool; // buffer pool handle
288 BtMgr *mgr; // buffer manager for thread
289 BtPage cursor; // cached frame for start/next (never mapped)
290 BtPage frame; // spare frame for the page split (never mapped)
291 uid cursor_page; // current cursor page number
292 unsigned char *mem; // frame, cursor, page memory buffer
293 unsigned char key[BT_keyarray]; // last found complete key
294 int found; // last delete or insert was found
295 int err; // last error
296 int reads, writes; // number of reads and writes from the btree
310 #define CLOCK_bit 0x8000
313 extern void bt_close (BtDb *bt);
314 extern BtDb *bt_open (BtMgr *mgr);
315 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
316 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
317 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
318 extern BtKey *bt_foundkey (BtDb *bt);
319 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
320 extern uint bt_nextkey (BtDb *bt, uint slot);
323 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
324 void bt_mgrclose (BtMgr *mgr);
326 // Helper functions to return slot values
327 // from the cursor page.
329 extern BtKey *bt_key (BtDb *bt, uint slot);
330 extern BtVal *bt_val (BtDb *bt, uint slot);
332 // The page is allocated from low and hi ends.
333 // The key slots are allocated from the bottom,
334 // while the text and value of the key
335 // are allocated from the top. When the two
336 // areas meet, the page is split into two.
338 // A key consists of a length byte, two bytes of
339 // index number (0 - 65535), and up to 253 bytes
342 // Associated with each key is a value byte string
343 // containing any value desired.
345 // The b-tree root is always located at page 1.
346 // The first leaf page of level zero is always
347 // located on page 2.
349 // The b-tree pages are linked with next
350 // pointers to facilitate enumerators,
351 // and provide for concurrency.
353 // When to root page fills, it is split in two and
354 // the tree height is raised by a new root at page
355 // one with two keys.
357 // Deleted keys are marked with a dead bit until
358 // page cleanup. The fence key for a leaf node is
361 // To achieve maximum concurrency one page is locked at a time
362 // as the tree is traversed to find leaf key in question. The right
363 // page numbers are used in cases where the page is being split,
366 // Page 0 is dedicated to lock for new page extensions,
367 // and chains empty pages together for reuse. It also
368 // contains the latch manager hash table.
370 // The ParentModification lock on a node is obtained to serialize posting
371 // or changing the fence key for a node.
373 // Empty pages are chained together through the ALLOC page and reused.
375 // Access macros to address slot and key values from the page
376 // Page slots use 1 based indexing.
378 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
379 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
380 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
382 void bt_putid(unsigned char *dest, uid id)
387 dest[i] = (unsigned char)id, id >>= 8;
390 uid bt_getid(unsigned char *src)
395 for( i = 0; i < BtId; i++ )
396 id <<= 8, id |= *src++;
401 uid bt_newdup (BtDb *bt)
404 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
406 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
410 // Write-Only Queue Lock
412 void WriteOLock (WOLock *lock)
416 tix = __sync_fetch_and_add (lock->ticket, 1);
418 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
420 // wait for our ticket to come up
422 while( tix != lock->serving[0] )
430 void WriteORelease (WOLock *lock)
435 // Phase-Fair reader/writer lock implementation
437 void WriteLock (RWLock *lock)
442 tix = __sync_fetch_and_add (lock->ticket, 1);
444 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
446 // wait for our ticket to come up
448 while( tix != lock->serving[0] )
455 w = PRES | (tix & PHID);
457 r = __sync_fetch_and_add (lock->rin, w);
459 r = _InterlockedExchangeAdd16 (lock->rin, w);
461 while( r != *lock->rout )
469 void WriteRelease (RWLock *lock)
472 __sync_fetch_and_and (lock->rin, ~MASK);
474 _InterlockedAnd16 (lock->rin, ~MASK);
479 void ReadLock (RWLock *lock)
483 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
485 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
488 while( w == (*lock->rin & MASK) )
496 void ReadRelease (RWLock *lock)
499 __sync_fetch_and_add (lock->rout, RINC);
501 _InterlockedExchangeAdd16 (lock->rout, RINC);
505 // Spin Latch Manager
507 // wait until write lock mode is clear
508 // and add 1 to the share count
510 void bt_spinreadlock(BtSpinLatch *latch)
516 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
518 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
520 // see if exclusive request is granted or pending
525 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
527 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
530 } while( sched_yield(), 1 );
532 } while( SwitchToThread(), 1 );
536 // wait for other read and write latches to relinquish
538 void bt_spinwritelock(BtSpinLatch *latch)
544 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
546 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
549 if( !(prev & ~BOTH) )
553 __sync_fetch_and_and ((ushort *)latch, ~XCL);
555 _InterlockedAnd16((ushort *)latch, ~XCL);
558 } while( sched_yield(), 1 );
560 } while( SwitchToThread(), 1 );
564 // try to obtain write lock
566 // return 1 if obtained,
569 int bt_spinwritetry(BtSpinLatch *latch)
574 prev = __sync_fetch_and_or((ushort *)latch, XCL);
576 prev = _InterlockedOr16((ushort *)latch, XCL);
578 // take write access if all bits are clear
581 if( !(prev & ~BOTH) )
585 __sync_fetch_and_and ((ushort *)latch, ~XCL);
587 _InterlockedAnd16((ushort *)latch, ~XCL);
594 void bt_spinreleasewrite(BtSpinLatch *latch)
597 __sync_fetch_and_and((ushort *)latch, ~BOTH);
599 _InterlockedAnd16((ushort *)latch, ~BOTH);
603 // decrement reader count
605 void bt_spinreleaseread(BtSpinLatch *latch)
608 __sync_fetch_and_add((ushort *)latch, -SHARE);
610 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
614 // read page from permanent location in Btree file
616 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
618 off64_t off = page_no << mgr->page_bits;
621 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
622 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
629 memset (ovl, 0, sizeof(OVERLAPPED));
631 ovl->OffsetHigh = off >> 32;
633 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
634 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
637 if( *amt < mgr->page_size ) {
638 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
645 // write page to permanent location in Btree file
646 // clear the dirty bit
648 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
650 off64_t off = page_no << mgr->page_bits;
653 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
659 memset (ovl, 0, sizeof(OVERLAPPED));
661 ovl->OffsetHigh = off >> 32;
663 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
666 if( *amt < mgr->page_size )
672 // link latch table entry into head of latch hash table
674 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
676 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
677 BtLatchSet *latch = bt->mgr->latchsets + slot;
679 if( latch->next = bt->mgr->hashtable[hashidx].slot )
680 bt->mgr->latchsets[latch->next].prev = slot;
682 bt->mgr->hashtable[hashidx].slot = slot;
683 latch->page_no = page_no;
690 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
698 // set CLOCK bit in latch
699 // decrement pin count
701 void bt_unpinlatch (BtLatchSet *latch)
704 if( ~latch->pin & CLOCK_bit )
705 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
706 __sync_fetch_and_add(&latch->pin, -1);
708 if( ~latch->pin & CLOCK_bit )
709 _InterlockedOr16 (&latch->pin, CLOCK_bit);
710 _InterlockedDecrement16 (&latch->pin);
714 // return the btree cached page address
716 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
718 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
723 // find existing latchset or inspire new one
724 // return with latchset pinned
726 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
728 uint hashidx = page_no % bt->mgr->latchhash;
736 // try to find our entry
738 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
740 if( slot = bt->mgr->hashtable[hashidx].slot ) do
742 latch = bt->mgr->latchsets + slot;
743 if( page_no == latch->page_no )
745 } while( slot = latch->next );
751 latch = bt->mgr->latchsets + slot;
753 __sync_fetch_and_add(&latch->pin, 1);
755 _InterlockedIncrement16 (&latch->pin);
757 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
761 // see if there are any unused pool entries
763 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
765 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
768 if( slot < bt->mgr->latchtotal ) {
769 latch = bt->mgr->latchsets + slot;
770 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
772 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
777 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
779 _InterlockedDecrement (&bt->mgr->latchdeployed);
781 // find and reuse previous entry on victim
785 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
787 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
789 // try to get write lock on hash chain
790 // skip entry if not obtained
791 // or has outstanding pins
793 slot %= bt->mgr->latchtotal;
798 latch = bt->mgr->latchsets + slot;
799 idx = latch->page_no % bt->mgr->latchhash;
801 // see we are on same chain as hashidx
806 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
809 // skip this slot if it is pinned
810 // or the CLOCK bit is set
813 if( latch->pin & CLOCK_bit ) {
815 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
817 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
820 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
824 // update permanent page area in btree from buffer pool
826 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
829 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
832 latch->dirty = 0, bt->writes++;
834 // unlink our available slot from its hash chain
837 bt->mgr->latchsets[latch->prev].next = latch->next;
839 bt->mgr->hashtable[idx].slot = latch->next;
842 bt->mgr->latchsets[latch->next].prev = latch->prev;
844 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
846 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
849 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
854 void bt_mgrclose (BtMgr *mgr)
861 // flush dirty pool pages to the btree
863 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
864 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
865 latch = mgr->latchsets + slot;
868 bt_writepage(mgr, page, latch->page_no);
869 latch->dirty = 0, num++;
871 // madvise (page, mgr->page_size, MADV_DONTNEED);
874 fprintf(stderr, "%d buffer pool pages flushed\n", num);
877 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
878 munmap (mgr->pagezero, mgr->page_size);
880 FlushViewOfFile(mgr->pagezero, 0);
881 UnmapViewOfFile(mgr->pagezero);
882 UnmapViewOfFile(mgr->hashtable);
883 CloseHandle(mgr->halloc);
884 CloseHandle(mgr->hpool);
890 FlushFileBuffers(mgr->idx);
891 CloseHandle(mgr->idx);
896 // close and release memory
898 void bt_close (BtDb *bt)
905 VirtualFree (bt->mem, 0, MEM_RELEASE);
910 // open/create new btree buffer manager
912 // call with file_name, BT_openmode, bits in page size (e.g. 16),
913 // size of page pool (e.g. 262144)
915 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
917 uint lvl, attr, last, slot, idx;
918 uint nlatchpage, latchhash;
919 unsigned char value[BtId];
920 int flag, initit = 0;
921 BtPageZero *pagezero;
928 // determine sanity of page size and buffer pool
930 if( bits > BT_maxbits )
932 else if( bits < BT_minbits )
936 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
941 mgr = calloc (1, sizeof(BtMgr));
943 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
945 if( mgr->idx == -1 ) {
946 fprintf (stderr, "Unable to open btree file\n");
947 return free(mgr), NULL;
950 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
951 attr = FILE_ATTRIBUTE_NORMAL;
952 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
954 if( mgr->idx == INVALID_HANDLE_VALUE )
955 return GlobalFree(mgr), NULL;
959 pagezero = valloc (BT_maxpage);
962 // read minimum page size to get root info
963 // to support raw disk partition files
964 // check if bits == 0 on the disk.
966 if( size = lseek (mgr->idx, 0L, 2) )
967 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
968 if( pagezero->alloc->bits )
969 bits = pagezero->alloc->bits;
973 return free(mgr), free(pagezero), NULL;
977 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
978 size = GetFileSize(mgr->idx, amt);
981 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
982 return bt_mgrclose (mgr), NULL;
983 bits = pagezero->alloc->bits;
988 mgr->page_size = 1 << bits;
989 mgr->page_bits = bits;
991 // calculate number of latch hash table entries
993 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
994 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
996 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
997 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
998 mgr->latchtotal = nodemax;
1003 // initialize an empty b-tree with latch page, root page, page of leaves
1004 // and page(s) of latches and page pool cache
1006 memset (pagezero, 0, 1 << bits);
1007 pagezero->alloc->bits = mgr->page_bits;
1008 bt_putid(pagezero->alloc->right, MIN_lvl+1);
1010 // initialize left-most LEAF page in
1013 bt_putid (pagezero->alloc->left, LEAF_page);
1015 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1016 fprintf (stderr, "Unable to create btree page zero\n");
1017 return bt_mgrclose (mgr), NULL;
1020 memset (pagezero, 0, 1 << bits);
1021 pagezero->alloc->bits = mgr->page_bits;
1023 for( lvl=MIN_lvl; lvl--; ) {
1024 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1025 key = keyptr(pagezero->alloc, 1);
1026 key->len = 2; // create stopper key
1030 bt_putid(value, MIN_lvl - lvl + 1);
1031 val = valptr(pagezero->alloc, 1);
1032 val->len = lvl ? BtId : 0;
1033 memcpy (val->value, value, val->len);
1035 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1036 pagezero->alloc->lvl = lvl;
1037 pagezero->alloc->cnt = 1;
1038 pagezero->alloc->act = 1;
1040 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1041 fprintf (stderr, "Unable to create btree page zero\n");
1042 return bt_mgrclose (mgr), NULL;
1050 VirtualFree (pagezero, 0, MEM_RELEASE);
1053 // mlock the pagezero page
1055 flag = PROT_READ | PROT_WRITE;
1056 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1057 if( mgr->pagezero == MAP_FAILED ) {
1058 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1059 return bt_mgrclose (mgr), NULL;
1061 mlock (mgr->pagezero, mgr->page_size);
1063 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1064 if( mgr->hashtable == MAP_FAILED ) {
1065 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1066 return bt_mgrclose (mgr), NULL;
1069 flag = PAGE_READWRITE;
1070 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1071 if( !mgr->halloc ) {
1072 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1073 return bt_mgrclose (mgr), NULL;
1076 flag = FILE_MAP_WRITE;
1077 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1078 if( !mgr->pagezero ) {
1079 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1080 return bt_mgrclose (mgr), NULL;
1083 flag = PAGE_READWRITE;
1084 size = (uid)mgr->nlatchpage << mgr->page_bits;
1085 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1087 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1088 return bt_mgrclose (mgr), NULL;
1091 flag = FILE_MAP_WRITE;
1092 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1093 if( !mgr->hashtable ) {
1094 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1095 return bt_mgrclose (mgr), NULL;
1099 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1100 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1105 // open BTree access method
1106 // based on buffer manager
1108 BtDb *bt_open (BtMgr *mgr)
1110 BtDb *bt = malloc (sizeof(*bt));
1112 memset (bt, 0, sizeof(*bt));
1115 bt->mem = valloc (2 *mgr->page_size);
1117 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1119 bt->frame = (BtPage)bt->mem;
1120 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1124 // compare two keys, return > 0, = 0, or < 0
1125 // =0: keys are same
1128 // as the comparison value
1130 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1132 uint len1 = key1->len;
1135 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1146 // place write, read, or parent lock on requested page_no.
1148 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1152 ReadLock (latch->readwr);
1155 WriteLock (latch->readwr);
1158 ReadLock (latch->access);
1161 WriteLock (latch->access);
1164 WriteOLock (latch->parent);
1167 WriteOLock (latch->atomic);
1172 // remove write, read, or parent lock on requested page
1174 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1178 ReadRelease (latch->readwr);
1181 WriteRelease (latch->readwr);
1184 ReadRelease (latch->access);
1187 WriteRelease (latch->access);
1190 WriteORelease (latch->parent);
1193 WriteORelease (latch->atomic);
1198 // allocate a new page
1199 // return with page latched, but unlocked.
1201 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1206 // lock allocation page
1208 bt_spinwritelock(bt->mgr->lock);
1210 // use empty chain first
1211 // else allocate empty page
1213 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1214 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1215 set->page = bt_mappage (bt, set->latch);
1217 return bt->err = BTERR_struct, -1;
1219 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1220 bt_spinreleasewrite(bt->mgr->lock);
1222 memcpy (set->page, contents, bt->mgr->page_size);
1223 set->latch->dirty = 1;
1227 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1228 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1230 // unlock allocation latch
1232 bt_spinreleasewrite(bt->mgr->lock);
1234 // don't load cache from btree page
1236 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1237 set->page = bt_mappage (bt, set->latch);
1239 return bt->err = BTERR_struct;
1241 memcpy (set->page, contents, bt->mgr->page_size);
1242 set->latch->dirty = 1;
1246 // find slot in page for given key at a given level
1248 int bt_findslot (BtPage page, unsigned char *key, uint len)
1250 uint diff, higher = page->cnt, low = 1, slot;
1253 // make stopper key an infinite fence value
1255 if( bt_getid (page->right) )
1260 // low is the lowest candidate.
1261 // loop ends when they meet
1263 // higher is already
1264 // tested as .ge. the passed key.
1266 while( diff = higher - low ) {
1267 slot = low + ( diff >> 1 );
1268 if( keycmp (keyptr(page, slot), key, len) < 0 )
1271 higher = slot, good++;
1274 // return zero if key is on right link page
1276 return good ? higher : 0;
1279 // find and load page at given level for given key
1280 // leave page rd or wr locked as requested
1282 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1284 uid page_no = ROOT_page, prevpage = 0;
1285 uint drill = 0xff, slot;
1286 BtLatchSet *prevlatch;
1287 uint mode, prevmode;
1289 // start at root of btree and drill down
1292 // determine lock mode of drill level
1293 mode = (drill == lvl) ? lock : BtLockRead;
1295 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1298 // obtain access lock using lock chaining with Access mode
1300 if( page_no > ROOT_page )
1301 bt_lockpage(BtLockAccess, set->latch);
1303 set->page = bt_mappage (bt, set->latch);
1305 // release & unpin parent or left sibling page
1308 bt_unlockpage(prevmode, prevlatch);
1309 bt_unpinlatch (prevlatch);
1313 // obtain mode lock using lock chaining through AccessLock
1315 bt_lockpage(mode, set->latch);
1317 if( set->page->free )
1318 return bt->err = BTERR_struct, 0;
1320 if( page_no > ROOT_page )
1321 bt_unlockpage(BtLockAccess, set->latch);
1323 // re-read and re-lock root after determining actual level of root
1325 if( set->page->lvl != drill) {
1326 if( set->latch->page_no != ROOT_page )
1327 return bt->err = BTERR_struct, 0;
1329 drill = set->page->lvl;
1331 if( lock != BtLockRead && drill == lvl ) {
1332 bt_unlockpage(mode, set->latch);
1333 bt_unpinlatch (set->latch);
1338 prevpage = set->latch->page_no;
1339 prevlatch = set->latch;
1342 // find key on page at this level
1343 // and descend to requested level
1345 if( set->page->kill )
1348 if( slot = bt_findslot (set->page, key, len) ) {
1352 // find next non-dead slot -- the fence key if nothing else
1354 while( slotptr(set->page, slot)->dead )
1355 if( slot++ < set->page->cnt )
1358 return bt->err = BTERR_struct, 0;
1360 page_no = bt_getid(valptr(set->page, slot)->value);
1365 // or slide right into next page
1368 page_no = bt_getid(set->page->right);
1372 // return error on end of right chain
1374 bt->err = BTERR_struct;
1375 return 0; // return error
1378 // return page to free list
1379 // page must be delete & write locked
1381 void bt_freepage (BtDb *bt, BtPageSet *set)
1383 // lock allocation page
1385 bt_spinwritelock (bt->mgr->lock);
1389 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1390 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1391 set->latch->dirty = 1;
1392 set->page->free = 1;
1394 // unlock released page
1396 bt_unlockpage (BtLockDelete, set->latch);
1397 bt_unlockpage (BtLockWrite, set->latch);
1398 bt_unpinlatch (set->latch);
1400 // unlock allocation page
1402 bt_spinreleasewrite (bt->mgr->lock);
1405 // a fence key was deleted from a page
1406 // push new fence value upwards
1408 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1410 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1411 unsigned char value[BtId];
1415 // remove the old fence value
1417 ptr = keyptr(set->page, set->page->cnt);
1418 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1419 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1420 set->latch->dirty = 1;
1422 // cache new fence value
1424 ptr = keyptr(set->page, set->page->cnt);
1425 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1427 bt_lockpage (BtLockParent, set->latch);
1428 bt_unlockpage (BtLockWrite, set->latch);
1430 // insert new (now smaller) fence key
1432 bt_putid (value, set->latch->page_no);
1433 ptr = (BtKey*)leftkey;
1435 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1438 // now delete old fence key
1440 ptr = (BtKey*)rightkey;
1442 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1445 bt_unlockpage (BtLockParent, set->latch);
1446 bt_unpinlatch(set->latch);
1450 // root has a single child
1451 // collapse a level from the tree
1453 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1459 // find the child entry and promote as new root contents
1462 for( idx = 0; idx++ < root->page->cnt; )
1463 if( !slotptr(root->page, idx)->dead )
1466 page_no = bt_getid (valptr(root->page, idx)->value);
1468 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1469 child->page = bt_mappage (bt, child->latch);
1473 bt_lockpage (BtLockDelete, child->latch);
1474 bt_lockpage (BtLockWrite, child->latch);
1476 memcpy (root->page, child->page, bt->mgr->page_size);
1477 root->latch->dirty = 1;
1479 bt_freepage (bt, child);
1481 } while( root->page->lvl > 1 && root->page->act == 1 );
1483 bt_unlockpage (BtLockWrite, root->latch);
1484 bt_unpinlatch (root->latch);
1488 // delete a page and manage keys
1489 // call with page writelocked
1490 // returns with page unpinned
1492 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1494 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1495 unsigned char value[BtId];
1496 uint lvl = set->page->lvl;
1501 // cache copy of fence key
1502 // to post in parent
1504 ptr = keyptr(set->page, set->page->cnt);
1505 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1507 // obtain lock on right page
1509 page_no = bt_getid(set->page->right);
1511 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1512 right->page = bt_mappage (bt, right->latch);
1516 bt_lockpage (BtLockWrite, right->latch);
1518 // cache copy of key to update
1520 ptr = keyptr(right->page, right->page->cnt);
1521 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1523 if( right->page->kill )
1524 return bt->err = BTERR_struct;
1526 // pull contents of right peer into our empty page
1528 memcpy (set->page, right->page, bt->mgr->page_size);
1529 set->latch->dirty = 1;
1531 // mark right page deleted and point it to left page
1532 // until we can post parent updates that remove access
1533 // to the deleted page.
1535 bt_putid (right->page->right, set->latch->page_no);
1536 right->latch->dirty = 1;
1537 right->page->kill = 1;
1539 bt_lockpage (BtLockParent, right->latch);
1540 bt_unlockpage (BtLockWrite, right->latch);
1542 bt_lockpage (BtLockParent, set->latch);
1543 bt_unlockpage (BtLockWrite, set->latch);
1545 // redirect higher key directly to our new node contents
1547 bt_putid (value, set->latch->page_no);
1548 ptr = (BtKey*)higherfence;
1550 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1553 // delete old lower key to our node
1555 ptr = (BtKey*)lowerfence;
1557 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1560 // obtain delete and write locks to right node
1562 bt_unlockpage (BtLockParent, right->latch);
1563 bt_lockpage (BtLockDelete, right->latch);
1564 bt_lockpage (BtLockWrite, right->latch);
1565 bt_freepage (bt, right);
1567 bt_unlockpage (BtLockParent, set->latch);
1568 bt_unpinlatch (set->latch);
1573 // find and delete key on page by marking delete flag bit
1574 // if page becomes empty, delete it from the btree
1576 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1578 uint slot, idx, found, fence;
1583 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1584 ptr = keyptr(set->page, slot);
1588 // if librarian slot, advance to real slot
1590 if( slotptr(set->page, slot)->type == Librarian )
1591 ptr = keyptr(set->page, ++slot);
1593 fence = slot == set->page->cnt;
1595 // if key is found delete it, otherwise ignore request
1597 if( found = !keycmp (ptr, key, len) )
1598 if( found = slotptr(set->page, slot)->dead == 0 ) {
1599 val = valptr(set->page,slot);
1600 slotptr(set->page, slot)->dead = 1;
1601 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1604 // collapse empty slots beneath the fence
1606 while( idx = set->page->cnt - 1 )
1607 if( slotptr(set->page, idx)->dead ) {
1608 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1609 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1614 // did we delete a fence key in an upper level?
1616 if( found && lvl && set->page->act && fence )
1617 if( bt_fixfence (bt, set, lvl) )
1620 return bt->found = found, 0;
1622 // do we need to collapse root?
1624 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1625 if( bt_collapseroot (bt, set) )
1628 return bt->found = found, 0;
1630 // delete empty page
1632 if( !set->page->act )
1633 return bt_deletepage (bt, set);
1635 set->latch->dirty = 1;
1636 bt_unlockpage(BtLockWrite, set->latch);
1637 bt_unpinlatch (set->latch);
1638 return bt->found = found, 0;
1641 BtKey *bt_foundkey (BtDb *bt)
1643 return (BtKey*)(bt->key);
1646 // advance to next slot
1648 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1650 BtLatchSet *prevlatch;
1653 if( slot < set->page->cnt )
1656 prevlatch = set->latch;
1658 if( page_no = bt_getid(set->page->right) )
1659 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1660 set->page = bt_mappage (bt, set->latch);
1664 return bt->err = BTERR_struct, 0;
1666 // obtain access lock using lock chaining with Access mode
1668 bt_lockpage(BtLockAccess, set->latch);
1670 bt_unlockpage(BtLockRead, prevlatch);
1671 bt_unpinlatch (prevlatch);
1673 bt_lockpage(BtLockRead, set->latch);
1674 bt_unlockpage(BtLockAccess, set->latch);
1678 // find unique key or first duplicate key in
1679 // leaf level and return number of value bytes
1680 // or (-1) if not found. Setup key for bt_foundkey
1682 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1690 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1692 ptr = keyptr(set->page, slot);
1694 // skip librarian slot place holder
1696 if( slotptr(set->page, slot)->type == Librarian )
1697 ptr = keyptr(set->page, ++slot);
1699 // return actual key found
1701 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1704 if( slotptr(set->page, slot)->type == Duplicate )
1707 // not there if we reach the stopper key
1709 if( slot == set->page->cnt )
1710 if( !bt_getid (set->page->right) )
1713 // if key exists, return >= 0 value bytes copied
1714 // otherwise return (-1)
1716 if( slotptr(set->page, slot)->dead )
1720 if( !memcmp (ptr->key, key, len) ) {
1721 val = valptr (set->page,slot);
1722 if( valmax > val->len )
1724 memcpy (value, val->value, valmax);
1730 } while( slot = bt_findnext (bt, set, slot) );
1732 bt_unlockpage (BtLockRead, set->latch);
1733 bt_unpinlatch (set->latch);
1737 // check page for space available,
1738 // clean if necessary and return
1739 // 0 - page needs splitting
1740 // >0 new slot value
1742 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1744 uint nxt = bt->mgr->page_size;
1745 BtPage page = set->page;
1746 uint cnt = 0, idx = 0;
1747 uint max = page->cnt;
1752 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1755 // skip cleanup and proceed to split
1756 // if there's not enough garbage
1759 if( page->garbage < nxt / 5 )
1762 memcpy (bt->frame, page, bt->mgr->page_size);
1764 // skip page info and set rest of page to zero
1766 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1767 set->latch->dirty = 1;
1771 // clean up page first by
1772 // removing deleted keys
1774 while( cnt++ < max ) {
1777 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1780 // copy the value across
1782 val = valptr(bt->frame, cnt);
1783 nxt -= val->len + sizeof(BtVal);
1784 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1786 // copy the key across
1788 key = keyptr(bt->frame, cnt);
1789 nxt -= key->len + sizeof(BtKey);
1790 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1792 // make a librarian slot
1795 slotptr(page, ++idx)->off = nxt;
1796 slotptr(page, idx)->type = Librarian;
1797 slotptr(page, idx)->dead = 1;
1802 slotptr(page, ++idx)->off = nxt;
1803 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1805 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1812 // see if page has enough space now, or does it need splitting?
1814 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1820 // split the root and raise the height of the btree
1822 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1824 unsigned char leftkey[BT_keyarray];
1825 uint nxt = bt->mgr->page_size;
1826 unsigned char value[BtId];
1832 // save left page fence key for new root
1834 ptr = keyptr(root->page, root->page->cnt);
1835 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1837 // Obtain an empty page to use, and copy the current
1838 // root contents into it, e.g. lower keys
1840 if( bt_newpage(bt, left, root->page) )
1843 left_page_no = left->latch->page_no;
1844 bt_unpinlatch (left->latch);
1846 // preserve the page info at the bottom
1847 // of higher keys and set rest to zero
1849 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1851 // insert stopper key at top of newroot page
1852 // and increase the root height
1854 nxt -= BtId + sizeof(BtVal);
1855 bt_putid (value, right->page_no);
1856 val = (BtVal *)((unsigned char *)root->page + nxt);
1857 memcpy (val->value, value, BtId);
1860 nxt -= 2 + sizeof(BtKey);
1861 slotptr(root->page, 2)->off = nxt;
1862 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1867 // insert lower keys page fence key on newroot page as first key
1869 nxt -= BtId + sizeof(BtVal);
1870 bt_putid (value, left_page_no);
1871 val = (BtVal *)((unsigned char *)root->page + nxt);
1872 memcpy (val->value, value, BtId);
1875 ptr = (BtKey *)leftkey;
1876 nxt -= ptr->len + sizeof(BtKey);
1877 slotptr(root->page, 1)->off = nxt;
1878 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1880 bt_putid(root->page->right, 0);
1881 root->page->min = nxt; // reset lowest used offset and key count
1882 root->page->cnt = 2;
1883 root->page->act = 2;
1886 // release and unpin root pages
1888 bt_unlockpage(BtLockWrite, root->latch);
1889 bt_unpinlatch (root->latch);
1891 bt_unpinlatch (right);
1895 // split already locked full node
1897 // return pool entry for new right
1900 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1902 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1903 uint lvl = set->page->lvl;
1910 // split higher half of keys to bt->frame
1912 memset (bt->frame, 0, bt->mgr->page_size);
1913 max = set->page->cnt;
1917 while( cnt++ < max ) {
1918 if( slotptr(set->page, cnt)->dead && cnt < max )
1920 src = valptr(set->page, cnt);
1921 nxt -= src->len + sizeof(BtVal);
1922 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1924 key = keyptr(set->page, cnt);
1925 nxt -= key->len + sizeof(BtKey);
1926 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1927 memcpy (ptr, key, key->len + sizeof(BtKey));
1929 // add librarian slot
1932 slotptr(bt->frame, ++idx)->off = nxt;
1933 slotptr(bt->frame, idx)->type = Librarian;
1934 slotptr(bt->frame, idx)->dead = 1;
1939 slotptr(bt->frame, ++idx)->off = nxt;
1940 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1942 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1946 bt->frame->bits = bt->mgr->page_bits;
1947 bt->frame->min = nxt;
1948 bt->frame->cnt = idx;
1949 bt->frame->lvl = lvl;
1953 if( set->latch->page_no > ROOT_page )
1954 bt_putid (bt->frame->right, bt_getid (set->page->right));
1956 // get new free page and write higher keys to it.
1958 if( bt_newpage(bt, right, bt->frame) )
1961 memcpy (bt->frame, set->page, bt->mgr->page_size);
1962 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1963 set->latch->dirty = 1;
1965 nxt = bt->mgr->page_size;
1966 set->page->garbage = 0;
1972 if( slotptr(bt->frame, max)->type == Librarian )
1975 // assemble page of smaller keys
1977 while( cnt++ < max ) {
1978 if( slotptr(bt->frame, cnt)->dead )
1980 val = valptr(bt->frame, cnt);
1981 nxt -= val->len + sizeof(BtVal);
1982 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1984 key = keyptr(bt->frame, cnt);
1985 nxt -= key->len + sizeof(BtKey);
1986 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1988 // add librarian slot
1991 slotptr(set->page, ++idx)->off = nxt;
1992 slotptr(set->page, idx)->type = Librarian;
1993 slotptr(set->page, idx)->dead = 1;
1998 slotptr(set->page, ++idx)->off = nxt;
1999 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2003 bt_putid(set->page->right, right->latch->page_no);
2004 set->page->min = nxt;
2005 set->page->cnt = idx;
2007 return right->latch->entry;
2010 // fix keys for newly split page
2011 // call with page locked, return
2014 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2016 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2017 unsigned char value[BtId];
2018 uint lvl = set->page->lvl;
2022 // if current page is the root page, split it
2024 if( set->latch->page_no == ROOT_page )
2025 return bt_splitroot (bt, set, right);
2027 ptr = keyptr(set->page, set->page->cnt);
2028 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2030 page = bt_mappage (bt, right);
2032 ptr = keyptr(page, page->cnt);
2033 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2035 // insert new fences in their parent pages
2037 bt_lockpage (BtLockParent, right);
2039 bt_lockpage (BtLockParent, set->latch);
2040 bt_unlockpage (BtLockWrite, set->latch);
2042 // insert new fence for reformulated left block of smaller keys
2044 bt_putid (value, set->latch->page_no);
2045 ptr = (BtKey *)leftkey;
2047 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2050 // switch fence for right block of larger keys to new right page
2052 bt_putid (value, right->page_no);
2053 ptr = (BtKey *)rightkey;
2055 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2058 bt_unlockpage (BtLockParent, set->latch);
2059 bt_unpinlatch (set->latch);
2061 bt_unlockpage (BtLockParent, right);
2062 bt_unpinlatch (right);
2066 // install new key and value onto page
2067 // page must already be checked for
2070 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2072 uint idx, librarian;
2077 // if found slot > desired slot and previous slot
2078 // is a librarian slot, use it
2081 if( slotptr(set->page, slot-1)->type == Librarian )
2084 // copy value onto page
2086 set->page->min -= vallen + sizeof(BtVal);
2087 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2088 memcpy (val->value, value, vallen);
2091 // copy key onto page
2093 set->page->min -= keylen + sizeof(BtKey);
2094 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2095 memcpy (ptr->key, key, keylen);
2098 // find first empty slot
2100 for( idx = slot; idx < set->page->cnt; idx++ )
2101 if( slotptr(set->page, idx)->dead )
2104 // now insert key into array before slot
2106 if( idx == set->page->cnt )
2107 idx += 2, set->page->cnt += 2, librarian = 2;
2111 set->latch->dirty = 1;
2114 while( idx > slot + librarian - 1 )
2115 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2117 // add librarian slot
2119 if( librarian > 1 ) {
2120 node = slotptr(set->page, slot++);
2121 node->off = set->page->min;
2122 node->type = Librarian;
2128 node = slotptr(set->page, slot);
2129 node->off = set->page->min;
2134 bt_unlockpage (BtLockWrite, set->latch);
2135 bt_unpinlatch (set->latch);
2141 // Insert new key into the btree at given level.
2142 // either add a new key or update/add an existing one
2144 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2146 unsigned char newkey[BT_keyarray];
2147 uint slot, idx, len, entry;
2154 // set up the key we're working on
2156 ins = (BtKey*)newkey;
2157 memcpy (ins->key, key, keylen);
2160 // is this a non-unique index value?
2166 sequence = bt_newdup (bt);
2167 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2171 while( 1 ) { // find the page and slot for the current key
2172 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2173 ptr = keyptr(set->page, slot);
2176 bt->err = BTERR_ovflw;
2180 // if librarian slot == found slot, advance to real slot
2182 if( slotptr(set->page, slot)->type == Librarian )
2183 if( !keycmp (ptr, key, keylen) )
2184 ptr = keyptr(set->page, ++slot);
2188 if( slotptr(set->page, slot)->type == Duplicate )
2191 // if inserting a duplicate key or unique key
2192 // check for adequate space on the page
2193 // and insert the new key before slot.
2195 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2196 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2197 if( !(entry = bt_splitpage (bt, set)) )
2199 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2204 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2207 // if key already exists, update value and return
2209 val = valptr(set->page, slot);
2211 if( val->len >= vallen ) {
2212 if( slotptr(set->page, slot)->dead )
2214 set->page->garbage += val->len - vallen;
2215 set->latch->dirty = 1;
2216 slotptr(set->page, slot)->dead = 0;
2218 memcpy (val->value, value, vallen);
2219 bt_unlockpage(BtLockWrite, set->latch);
2220 bt_unpinlatch (set->latch);
2224 // new update value doesn't fit in existing value area
2226 if( !slotptr(set->page, slot)->dead )
2227 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2229 slotptr(set->page, slot)->dead = 0;
2233 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2234 if( !(entry = bt_splitpage (bt, set)) )
2236 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2241 set->page->min -= vallen + sizeof(BtVal);
2242 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2243 memcpy (val->value, value, vallen);
2246 set->latch->dirty = 1;
2247 set->page->min -= keylen + sizeof(BtKey);
2248 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2249 memcpy (ptr->key, key, keylen);
2252 slotptr(set->page, slot)->off = set->page->min;
2253 bt_unlockpage(BtLockWrite, set->latch);
2254 bt_unpinlatch (set->latch);
2261 uint entry; // latch table entry number
2262 uint slot:31; // page slot number
2263 uint reuse:1; // reused previous page
2267 uid page_no; // page number for split leaf
2268 void *next; // next key to insert
2269 uint entry:29; // latch table entry number
2270 uint type:2; // 0 == insert, 1 == delete, 2 == free
2271 uint nounlock:1; // don't unlock ParentModification
2272 unsigned char leafkey[BT_keyarray];
2275 // find and load leaf page for given key
2276 // leave page Atomic locked and Read locked.
2278 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2280 BtLatchSet *prevlatch;
2284 // find level zero page
2286 if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2289 // find next non-dead entry on this page
2290 // it will be the fence key if nothing else
2292 while( slotptr(set->page, slot)->dead )
2293 if( slot++ < set->page->cnt )
2296 return bt->err = BTERR_struct, 0;
2298 page_no = bt_getid(valptr(set->page, slot)->value);
2299 prevlatch = set->latch;
2302 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2303 set->page = bt_mappage (bt, set->latch);
2307 // obtain read lock using lock chaining with Access mode
2308 // release & unpin parent/left sibling page
2310 bt_lockpage(BtLockAccess, set->latch);
2312 bt_unlockpage(BtLockRead, prevlatch);
2313 bt_unpinlatch (prevlatch);
2315 bt_lockpage(BtLockRead, set->latch);
2316 bt_unlockpage(BtLockAccess, set->latch);
2318 // find key on page at this level
2319 // and descend to requested level
2321 if( !set->page->kill )
2322 if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2323 bt_unlockpage(BtLockRead, set->latch);
2324 bt_lockpage(BtLockAccess, set->latch);
2325 bt_lockpage(BtLockAtomic, set->latch);
2326 bt_lockpage(BtLockRead, set->latch);
2327 bt_unlockpage(BtLockAccess, set->latch);
2329 if( !set->page->kill )
2330 if( slot = bt_findslot (set->page, key, len) )
2333 bt_unlockpage(BtLockAtomic, set->latch);
2336 // slide right into next page
2338 page_no = bt_getid(set->page->right);
2339 prevlatch = set->latch;
2342 // return error on end of right chain
2344 bt->err = BTERR_struct;
2345 return 0; // return error
2348 // determine actual page where key is located
2349 // return slot number
2351 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2353 BtKey *key = keyptr(source,src);
2354 uint slot = locks[src].slot;
2357 if( src > 1 && locks[src].reuse )
2358 entry = locks[src-1].entry, slot = 0;
2360 entry = locks[src].entry;
2363 set->latch = bt->mgr->latchsets + entry;
2364 set->page = bt_mappage (bt, set->latch);
2368 // is locks->reuse set?
2369 // if so, find where our key
2370 // is located on previous page or split pages
2373 set->latch = bt->mgr->latchsets + entry;
2374 set->page = bt_mappage (bt, set->latch);
2376 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2377 if( locks[src].reuse )
2378 locks[src].entry = entry;
2381 } while( entry = set->latch->split );
2383 bt->err = BTERR_atomic;
2387 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2389 BtKey *key = keyptr(source, src);
2390 BtVal *val = valptr(source, src);
2395 while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2396 if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2397 return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2399 if( entry = bt_splitpage (bt, set) )
2400 latch = bt->mgr->latchsets + entry;
2404 // splice right page into split chain
2405 // and WriteLock it.
2407 latch->split = set->latch->split;
2408 set->latch->split = entry;
2409 bt_lockpage(BtLockWrite, latch);
2412 return bt->err = BTERR_atomic;
2415 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2417 BtKey *key = keyptr(source, src);
2418 uint idx, entry, slot;
2423 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2424 slotptr(set->page, slot)->dead = 1;
2426 return bt->err = BTERR_struct;
2428 ptr = keyptr(set->page, slot);
2429 val = valptr(set->page, slot);
2431 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2432 set->latch->dirty = 1;
2437 // delete an empty master page for a transaction
2439 // note that the far right page never empties because
2440 // it always contains (at least) the infinite stopper key
2441 // and that all pages that don't contain any keys are
2442 // deleted, or are being held under Atomic lock.
2444 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2446 BtPageSet right[1], temp[1];
2447 unsigned char value[BtId];
2451 bt_lockpage(BtLockWrite, prev->latch);
2453 // grab the right sibling
2455 if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2456 right->page = bt_mappage (bt, right->latch);
2460 bt_lockpage(BtLockAtomic, right->latch);
2461 bt_lockpage(BtLockWrite, right->latch);
2463 // and pull contents over empty page
2464 // while preserving master's left link
2466 memcpy (right->page->left, prev->page->left, BtId);
2467 memcpy (prev->page, right->page, bt->mgr->page_size);
2469 // forward seekers to old right sibling
2470 // to new page location in set
2472 bt_putid (right->page->right, prev->latch->page_no);
2473 right->latch->dirty = 1;
2474 right->page->kill = 1;
2476 // remove pointer to right page for searchers by
2477 // changing right fence key to point to the master page
2479 ptr = keyptr(right->page,right->page->cnt);
2480 bt_putid (value, prev->latch->page_no);
2482 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2485 // now that master page is in good shape we can
2486 // remove its locks.
2488 bt_unlockpage (BtLockAtomic, prev->latch);
2489 bt_unlockpage (BtLockWrite, prev->latch);
2491 // fix master's right sibling's left pointer
2492 // to remove scanner's poiner to the right page
2494 if( right_page_no = bt_getid (prev->page->right) ) {
2495 if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2496 temp->page = bt_mappage (bt, temp->latch);
2498 bt_lockpage (BtLockWrite, temp->latch);
2499 bt_putid (temp->page->left, prev->latch->page_no);
2500 temp->latch->dirty = 1;
2502 bt_unlockpage (BtLockWrite, temp->latch);
2503 bt_unpinlatch (temp->latch);
2504 } else { // master is now the far right page
2505 bt_spinwritelock (bt->mgr->lock);
2506 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2507 bt_spinreleasewrite(bt->mgr->lock);
2510 // now that there are no pointers to the right page
2511 // we can delete it after the last read access occurs
2513 bt_unlockpage (BtLockWrite, right->latch);
2514 bt_unlockpage (BtLockAtomic, right->latch);
2515 bt_lockpage (BtLockDelete, right->latch);
2516 bt_lockpage (BtLockWrite, right->latch);
2517 bt_freepage (bt, right);
2521 // atomic modification of a batch of keys.
2523 // return -1 if BTERR is set
2524 // otherwise return slot number
2525 // causing the key constraint violation
2526 // or zero on successful completion.
2528 int bt_atomictxn (BtDb *bt, BtPage source)
2530 uint src, idx, slot, samepage, entry;
2531 AtomicKey *head, *tail, *leaf;
2532 BtPageSet set[1], prev[1];
2533 unsigned char value[BtId];
2534 BtKey *key, *ptr, *key2;
2544 locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2548 // stable sort the list of keys into order to
2549 // prevent deadlocks between threads.
2551 for( src = 1; src++ < source->cnt; ) {
2552 *temp = *slotptr(source,src);
2553 key = keyptr (source,src);
2555 for( idx = src; --idx; ) {
2556 key2 = keyptr (source,idx);
2557 if( keycmp (key, key2->key, key2->len) < 0 ) {
2558 *slotptr(source,idx+1) = *slotptr(source,idx);
2559 *slotptr(source,idx) = *temp;
2565 // Load the leaf page for each key
2566 // group same page references with reuse bit
2567 // and determine any constraint violations
2569 for( src = 0; src++ < source->cnt; ) {
2570 key = keyptr(source, src);
2573 // first determine if this modification falls
2574 // on the same page as the previous modification
2575 // note that the far right leaf page is a special case
2577 if( samepage = src > 1 )
2578 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2579 slot = bt_findslot(set->page, key->key, key->len);
2580 else // release read on previous page
2581 bt_unlockpage(BtLockRead, set->latch);
2584 if( slot = bt_atomicload(bt, set, key->key, key->len) )
2585 set->latch->split = 0;
2589 if( slotptr(set->page, slot)->type == Librarian )
2590 ptr = keyptr(set->page, ++slot);
2592 ptr = keyptr(set->page, slot);
2595 locks[src].entry = set->latch->entry;
2596 locks[src].slot = slot;
2597 locks[src].reuse = 0;
2599 locks[src].entry = 0;
2600 locks[src].slot = 0;
2601 locks[src].reuse = 1;
2604 switch( slotptr(source, src)->type ) {
2607 if( !slotptr(set->page, slot)->dead )
2608 if( slot < set->page->cnt || bt_getid (set->page->right) )
2609 if( !keycmp (ptr, key->key, key->len) ) {
2611 // return constraint violation if key already exists
2613 bt_unlockpage(BtLockRead, set->latch);
2617 if( locks[src].entry ) {
2618 set->latch = bt->mgr->latchsets + locks[src].entry;
2619 bt_unlockpage(BtLockAtomic, set->latch);
2620 bt_unpinlatch (set->latch);
2631 // unlock last loadpage lock
2633 if( source->cnt > 1 )
2634 bt_unlockpage(BtLockRead, set->latch);
2636 // obtain write lock for each master page
2638 for( src = 0; src++ < source->cnt; )
2639 if( locks[src].reuse )
2642 bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2644 // insert or delete each key
2645 // process any splits or merges
2646 // release Write & Atomic latches
2647 // set ParentModifications and build
2648 // queue of keys to insert for split pages
2649 // or delete for deleted pages.
2651 // run through txn list backwards
2653 samepage = source->cnt + 1;
2655 for( src = source->cnt; src; src-- ) {
2656 if( locks[src].reuse )
2659 // perform the txn operations
2660 // from smaller to larger on
2663 for( idx = src; idx < samepage; idx++ )
2664 switch( slotptr(source,idx)->type ) {
2666 if( bt_atomicdelete (bt, source, locks, idx) )
2672 if( bt_atomicinsert (bt, source, locks, idx) )
2677 // after the same page operations have finished,
2678 // process master page for splits or deletion.
2680 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2681 prev->page = bt_mappage (bt, prev->latch);
2684 // pick-up all splits from master page
2685 // each one is already WriteLocked.
2687 entry = prev->latch->split;
2690 set->latch = bt->mgr->latchsets + entry;
2691 set->page = bt_mappage (bt, set->latch);
2692 entry = set->latch->split;
2694 // delete empty master page by undoing its split
2695 // (this is potentially another empty page)
2696 // note that there are no new left pointers yet
2698 if( !prev->page->act ) {
2699 memcpy (set->page->left, prev->page->left, BtId);
2700 memcpy (prev->page, set->page, bt->mgr->page_size);
2701 bt_lockpage (BtLockDelete, set->latch);
2702 bt_freepage (bt, set);
2704 prev->latch->dirty = 1;
2708 // remove empty page from the split chain
2710 if( !set->page->act ) {
2711 memcpy (prev->page->right, set->page->right, BtId);
2712 prev->latch->split = set->latch->split;
2713 bt_lockpage (BtLockDelete, set->latch);
2714 bt_freepage (bt, set);
2718 // schedule prev fence key update
2720 ptr = keyptr(prev->page,prev->page->cnt);
2721 leaf = calloc (sizeof(AtomicKey), 1);
2723 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2724 leaf->page_no = prev->latch->page_no;
2725 leaf->entry = prev->latch->entry;
2735 // splice in the left link into the split page
2737 bt_putid (set->page->left, prev->latch->page_no);
2738 bt_lockpage(BtLockParent, prev->latch);
2739 bt_unlockpage(BtLockWrite, prev->latch);
2743 // update left pointer in next right page from last split page
2744 // (if all splits were reversed, latch->split == 0)
2746 if( latch->split ) {
2747 // fix left pointer in master's original (now split)
2748 // far right sibling or set rightmost page in page zero
2750 if( right = bt_getid (prev->page->right) ) {
2751 if( set->latch = bt_pinlatch (bt, right, 1) )
2752 set->page = bt_mappage (bt, set->latch);
2756 bt_lockpage (BtLockWrite, set->latch);
2757 bt_putid (set->page->left, prev->latch->page_no);
2758 set->latch->dirty = 1;
2759 bt_unlockpage (BtLockWrite, set->latch);
2760 bt_unpinlatch (set->latch);
2761 } else { // prev is rightmost page
2762 bt_spinwritelock (bt->mgr->lock);
2763 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2764 bt_spinreleasewrite(bt->mgr->lock);
2767 // Process last page split in chain
2769 ptr = keyptr(prev->page,prev->page->cnt);
2770 leaf = calloc (sizeof(AtomicKey), 1);
2772 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2773 leaf->page_no = prev->latch->page_no;
2774 leaf->entry = prev->latch->entry;
2784 bt_lockpage(BtLockParent, prev->latch);
2785 bt_unlockpage(BtLockWrite, prev->latch);
2787 // remove atomic lock on master page
2789 bt_unlockpage(BtLockAtomic, latch);
2793 // finished if prev page occupied (either master or final split)
2795 if( prev->page->act ) {
2796 bt_unlockpage(BtLockWrite, latch);
2797 bt_unlockpage(BtLockAtomic, latch);
2798 bt_unpinlatch(latch);
2802 // any and all splits were reversed, and the
2803 // master page located in prev is empty, delete it
2804 // by pulling over master's right sibling.
2806 // Remove empty master's fence key
2808 ptr = keyptr(prev->page,prev->page->cnt);
2810 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2813 // perform the remainder of the delete
2814 // from the FIFO queue
2816 leaf = calloc (sizeof(AtomicKey), 1);
2818 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2819 leaf->page_no = prev->latch->page_no;
2820 leaf->entry = prev->latch->entry;
2831 // leave atomic lock in place until
2832 // deletion completes in next phase.
2834 bt_unlockpage(BtLockWrite, prev->latch);
2837 // add & delete keys for any pages split or merged during transaction
2841 set->latch = bt->mgr->latchsets + leaf->entry;
2842 set->page = bt_mappage (bt, set->latch);
2844 bt_putid (value, leaf->page_no);
2845 ptr = (BtKey *)leaf->leafkey;
2847 switch( leaf->type ) {
2848 case 0: // insert key
2849 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2854 case 1: // delete key
2855 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2860 case 2: // free page
2861 if( bt_atomicfree (bt, set) )
2867 if( !leaf->nounlock )
2868 bt_unlockpage (BtLockParent, set->latch);
2870 bt_unpinlatch (set->latch);
2873 } while( leaf = tail );
2881 // set cursor to highest slot on highest page
2883 uint bt_lastkey (BtDb *bt)
2885 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2888 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2889 set->page = bt_mappage (bt, set->latch);
2893 bt_lockpage(BtLockRead, set->latch);
2894 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2895 bt_unlockpage(BtLockRead, set->latch);
2896 bt_unpinlatch (set->latch);
2898 bt->cursor_page = page_no;
2899 return bt->cursor->cnt;
2902 // return previous slot on cursor page
2904 uint bt_prevkey (BtDb *bt, uint slot)
2906 uid ourright, next, us = bt->cursor_page;
2912 ourright = bt_getid(bt->cursor->right);
2915 if( !(next = bt_getid(bt->cursor->left)) )
2919 bt->cursor_page = next;
2921 if( set->latch = bt_pinlatch (bt, next, 1) )
2922 set->page = bt_mappage (bt, set->latch);
2926 bt_lockpage(BtLockRead, set->latch);
2927 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2928 bt_unlockpage(BtLockRead, set->latch);
2929 bt_unpinlatch (set->latch);
2931 next = bt_getid (bt->cursor->right);
2933 if( bt->cursor->kill )
2937 if( next == ourright )
2942 return bt->cursor->cnt;
2945 // return next slot on cursor page
2946 // or slide cursor right into next page
2948 uint bt_nextkey (BtDb *bt, uint slot)
2954 right = bt_getid(bt->cursor->right);
2956 while( slot++ < bt->cursor->cnt )
2957 if( slotptr(bt->cursor,slot)->dead )
2959 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2967 bt->cursor_page = right;
2969 if( set->latch = bt_pinlatch (bt, right, 1) )
2970 set->page = bt_mappage (bt, set->latch);
2974 bt_lockpage(BtLockRead, set->latch);
2976 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2978 bt_unlockpage(BtLockRead, set->latch);
2979 bt_unpinlatch (set->latch);
2987 // cache page of keys into cursor and return starting slot for given key
2989 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2994 // cache page for retrieval
2996 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2997 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3001 bt->cursor_page = set->latch->page_no;
3003 bt_unlockpage(BtLockRead, set->latch);
3004 bt_unpinlatch (set->latch);
3008 BtKey *bt_key(BtDb *bt, uint slot)
3010 return keyptr(bt->cursor, slot);
3013 BtVal *bt_val(BtDb *bt, uint slot)
3015 return valptr(bt->cursor,slot);
3021 double getCpuTime(int type)
3024 FILETIME xittime[1];
3025 FILETIME systime[1];
3026 FILETIME usrtime[1];
3027 SYSTEMTIME timeconv[1];
3030 memset (timeconv, 0, sizeof(SYSTEMTIME));
3034 GetSystemTimeAsFileTime (xittime);
3035 FileTimeToSystemTime (xittime, timeconv);
3036 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3039 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3040 FileTimeToSystemTime (usrtime, timeconv);
3043 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3044 FileTimeToSystemTime (systime, timeconv);
3048 ans += (double)timeconv->wHour * 3600;
3049 ans += (double)timeconv->wMinute * 60;
3050 ans += (double)timeconv->wSecond;
3051 ans += (double)timeconv->wMilliseconds / 1000;
3056 #include <sys/resource.h>
3058 double getCpuTime(int type)
3060 struct rusage used[1];
3061 struct timeval tv[1];
3065 gettimeofday(tv, NULL);
3066 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3069 getrusage(RUSAGE_SELF, used);
3070 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3073 getrusage(RUSAGE_SELF, used);
3074 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3081 void bt_poolaudit (BtMgr *mgr)
3086 while( slot++ < mgr->latchdeployed ) {
3087 latch = mgr->latchsets + slot;
3089 if( *latch->readwr->rin & MASK )
3090 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3091 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3093 if( *latch->access->rin & MASK )
3094 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3095 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3097 if( *latch->parent->ticket != *latch->parent->serving )
3098 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3099 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3101 if( latch->pin & ~CLOCK_bit ) {
3102 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3108 uint bt_latchaudit (BtDb *bt)
3110 ushort idx, hashidx;
3116 if( *(ushort *)(bt->mgr->lock) )
3117 fprintf(stderr, "Alloc page locked\n");
3118 *(ushort *)(bt->mgr->lock) = 0;
3120 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3121 latch = bt->mgr->latchsets + idx;
3122 if( *latch->readwr->rin & MASK )
3123 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3124 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3126 if( *latch->access->rin & MASK )
3127 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3128 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3130 if( *latch->parent->ticket != *latch->parent->serving )
3131 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3132 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3135 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3140 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3141 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3142 fprintf(stderr, "hash entry %d locked\n", hashidx);
3144 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3146 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3147 latch = bt->mgr->latchsets + idx;
3149 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3150 } while( idx = latch->next );
3153 page_no = LEAF_page;
3155 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3156 uid off = page_no << bt->mgr->page_bits;
3158 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3162 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3164 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3165 return bt->err = BTERR_map;
3167 if( *amt < bt->mgr->page_size )
3168 return bt->err = BTERR_map;
3170 if( !bt->frame->free && !bt->frame->lvl )
3171 cnt += bt->frame->act;
3175 cnt--; // remove stopper key
3176 fprintf(stderr, " Total keys read %d\n", cnt);
3190 // standalone program to index file of keys
3191 // then list them onto std-out
3194 void *index_file (void *arg)
3196 uint __stdcall index_file (void *arg)
3199 int line = 0, found = 0, cnt = 0, idx;
3200 uid next, page_no = LEAF_page; // start on first page of leaves
3201 int ch, len = 0, slot, type = 0;
3202 unsigned char key[BT_maxkey];
3203 unsigned char txn[65536];
3204 ThreadArg *args = arg;
3213 bt = bt_open (args->mgr);
3216 if( args->idx < strlen (args->type) )
3217 ch = args->type[args->idx];
3219 ch = args->type[strlen(args->type) - 1];
3224 fprintf(stderr, "started latch mgr audit\n");
3225 cnt = bt_latchaudit (bt);
3226 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3237 if( type == Delete )
3238 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3240 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3242 if( type == Delete )
3243 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3245 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3247 if( in = fopen (args->infile, "rb") )
3248 while( ch = getc(in), ch != EOF )
3254 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3255 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3261 memcpy (txn + nxt, key + 10, len - 10);
3263 txn[nxt] = len - 10;
3265 memcpy (txn + nxt, key, 10);
3268 slotptr(page,++cnt)->off = nxt;
3269 slotptr(page,cnt)->type = type;
3272 if( cnt < args->num )
3279 if( bt_atomictxn (bt, page) )
3280 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3285 else if( len < BT_maxkey )
3287 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3291 fprintf(stderr, "started indexing for %s\n", args->infile);
3292 if( in = fopen (args->infile, "r") )
3293 while( ch = getc(in), ch != EOF )
3298 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3299 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3302 else if( len < BT_maxkey )
3304 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3308 fprintf(stderr, "started finding keys for %s\n", args->infile);
3309 if( in = fopen (args->infile, "rb") )
3310 while( ch = getc(in), ch != EOF )
3314 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3317 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3320 else if( len < BT_maxkey )
3322 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3326 fprintf(stderr, "started scanning\n");
3328 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3329 set->page = bt_mappage (bt, set->latch);
3331 fprintf(stderr, "unable to obtain latch"), exit(1);
3332 bt_lockpage (BtLockRead, set->latch);
3333 next = bt_getid (set->page->right);
3335 for( slot = 0; slot++ < set->page->cnt; )
3336 if( next || slot < set->page->cnt )
3337 if( !slotptr(set->page, slot)->dead ) {
3338 ptr = keyptr(set->page, slot);
3341 if( slotptr(set->page, slot)->type == Duplicate )
3344 fwrite (ptr->key, len, 1, stdout);
3345 val = valptr(set->page, slot);
3346 fwrite (val->value, val->len, 1, stdout);
3347 fputc ('\n', stdout);
3351 bt_unlockpage (BtLockRead, set->latch);
3352 bt_unpinlatch (set->latch);
3353 } while( page_no = next );
3355 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3359 fprintf(stderr, "started reverse scan\n");
3360 if( slot = bt_lastkey (bt) )
3361 while( slot = bt_prevkey (bt, slot) ) {
3362 if( slotptr(bt->cursor, slot)->dead )
3365 ptr = keyptr(bt->cursor, slot);
3368 if( slotptr(bt->cursor, slot)->type == Duplicate )
3371 fwrite (ptr->key, len, 1, stdout);
3372 val = valptr(bt->cursor, slot);
3373 fwrite (val->value, val->len, 1, stdout);
3374 fputc ('\n', stdout);
3378 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3383 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3385 fprintf(stderr, "started counting\n");
3386 page_no = LEAF_page;
3388 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3389 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3392 if( !bt->frame->free && !bt->frame->lvl )
3393 cnt += bt->frame->act;
3399 cnt--; // remove stopper key
3400 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3412 typedef struct timeval timer;
3414 int main (int argc, char **argv)
3416 int idx, cnt, len, slot, err;
3417 int segsize, bits = 16;
3434 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3435 fprintf (stderr, " where idx_file is the name of the btree file\n");
3436 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3437 fprintf (stderr, " page_bits is the page size in bits\n");
3438 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3439 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3440 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3444 start = getCpuTime(0);
3447 bits = atoi(argv[3]);
3450 poolsize = atoi(argv[4]);
3453 fprintf (stderr, "Warning: no mapped_pool\n");
3456 num = atoi(argv[5]);
3460 threads = malloc (cnt * sizeof(pthread_t));
3462 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3464 args = malloc (cnt * sizeof(ThreadArg));
3466 mgr = bt_mgr ((argv[1]), bits, poolsize);
3469 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3475 for( idx = 0; idx < cnt; idx++ ) {
3476 args[idx].infile = argv[idx + 6];
3477 args[idx].type = argv[2];
3478 args[idx].mgr = mgr;
3479 args[idx].num = num;
3480 args[idx].idx = idx;
3482 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3483 fprintf(stderr, "Error creating thread %d\n", err);
3485 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3489 // wait for termination
3492 for( idx = 0; idx < cnt; idx++ )
3493 pthread_join (threads[idx], NULL);
3495 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3497 for( idx = 0; idx < cnt; idx++ )
3498 CloseHandle(threads[idx]);
3504 elapsed = getCpuTime(0) - start;
3505 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3506 elapsed = getCpuTime(1);
3507 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3508 elapsed = getCpuTime(2);
3509 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);