1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and ACID batched key-value updates
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
88 There are six lock types for each node in four independent sets:
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
106 // definition for phase-fair reader/writer lock implementation
120 // definition for spin latch implementation
122 // exclusive is set for write access
123 // share is count of read accessors
124 // grant write lock when share == 0
126 volatile typedef struct {
137 // write only reentrant lock
141 volatile ushort tid[1];
142 volatile ushort dup[1];
145 // hash table entries
148 volatile uint slot; // Latch table entry at head of chain
149 BtSpinLatch latch[1];
152 // latch manager table structure
155 uid page_no; // latch set page number
156 RWLock readwr[1]; // read/write page lock
157 RWLock access[1]; // Access Intent/Page delete
158 WOLock parent[1]; // Posting of fence key in parent
159 WOLock atomic[1]; // Atomic update in progress
160 uint split; // right split page atomic insert
161 uint entry; // entry slot in latch table
162 uint next; // next entry in hash table chain
163 uint prev; // prev entry in hash table chain
164 volatile ushort pin; // number of outstanding threads
165 ushort dirty:1; // page in cache is dirty
168 // Define the length of the page record numbers
172 // Page key slot definition.
174 // Keys are marked dead, but remain on the page until
175 // it cleanup is called. The fence key (highest key) for
176 // a leaf page is always present, even after cleanup.
180 // In addition to the Unique keys that occupy slots
181 // there are Librarian and Duplicate key
182 // slots occupying the key slot array.
184 // The Librarian slots are dead keys that
185 // serve as filler, available to add new Unique
186 // or Dup slots that are inserted into the B-tree.
188 // The Duplicate slots have had their key bytes extended
189 // by 6 bytes to contain a binary duplicate key uniqueifier.
200 uint off:BT_maxbits; // page offset for key start
201 uint type:3; // type of slot
202 uint dead:1; // set for deleted slot
205 // The key structure occupies space at the upper end of
206 // each page. It's a length byte followed by the key
210 unsigned char len; // this can be changed to a ushort or uint
211 unsigned char key[0];
214 // the value structure also occupies space at the upper
215 // end of the page. Each key is immediately followed by a value.
218 unsigned char len; // this can be changed to a ushort or uint
219 unsigned char value[0];
222 #define BT_maxkey 255 // maximum number of bytes in a key
223 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
225 // The first part of an index page.
226 // It is immediately followed
227 // by the BtSlot array of keys.
229 // note that this structure size
230 // must be a multiple of 8 bytes
231 // in order to place dups correctly.
233 typedef struct BtPage_ {
234 uint cnt; // count of keys in page
235 uint act; // count of active keys
236 uint min; // next key offset
237 uint garbage; // page garbage in bytes
238 unsigned char bits:7; // page size in bits
239 unsigned char free:1; // page is on free chain
240 unsigned char lvl:7; // level of page
241 unsigned char kill:1; // page is being deleted
242 unsigned char right[BtId]; // page number to right
243 unsigned char left[BtId]; // page number to left
244 unsigned char filler[2]; // padding to multiple of 8
247 // The loadpage interface object
250 BtPage page; // current page pointer
251 BtLatchSet *latch; // current page latch set
254 // structure for latch manager on ALLOC_page
257 struct BtPage_ alloc[1]; // next page_no in right ptr
258 unsigned long long dups[1]; // global duplicate key uniqueifier
259 unsigned char chain[BtId]; // head of free page_nos chain
262 // The object structure for Btree access
265 uint page_size; // page size
266 uint page_bits; // page size in bits
272 BtPageZero *pagezero; // mapped allocation page
273 BtSpinLatch lock[1]; // allocation area lite latch
274 uint latchdeployed; // highest number of latch entries deployed
275 uint nlatchpage; // number of latch pages at BT_latch
276 uint latchtotal; // number of page latch entries
277 uint latchhash; // number of latch hash table slots
278 uint latchvictim; // next latch entry to examine
279 ushort thread_no[1]; // next thread number
280 BtHashEntry *hashtable; // the buffer pool hash table entries
281 BtLatchSet *latchsets; // mapped latch set from buffer pool
282 unsigned char *pagepool; // mapped to the buffer pool pages
284 HANDLE halloc; // allocation handle
285 HANDLE hpool; // buffer pool handle
290 BtMgr *mgr; // buffer manager for thread
291 BtPage cursor; // cached frame for start/next (never mapped)
292 BtPage frame; // spare frame for the page split (never mapped)
293 uid cursor_page; // current cursor page number
294 unsigned char *mem; // frame, cursor, page memory buffer
295 unsigned char key[BT_keyarray]; // last found complete key
296 int found; // last delete or insert was found
297 int err; // last error
298 int reads, writes; // number of reads and writes from the btree
299 ushort thread_no; // thread number
313 #define CLOCK_bit 0x8000
316 extern void bt_close (BtDb *bt);
317 extern BtDb *bt_open (BtMgr *mgr);
318 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
319 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
320 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
321 extern BtKey *bt_foundkey (BtDb *bt);
322 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
323 extern uint bt_nextkey (BtDb *bt, uint slot);
326 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
327 void bt_mgrclose (BtMgr *mgr);
329 // Helper functions to return slot values
330 // from the cursor page.
332 extern BtKey *bt_key (BtDb *bt, uint slot);
333 extern BtVal *bt_val (BtDb *bt, uint slot);
335 // The page is allocated from low and hi ends.
336 // The key slots are allocated from the bottom,
337 // while the text and value of the key
338 // are allocated from the top. When the two
339 // areas meet, the page is split into two.
341 // A key consists of a length byte, two bytes of
342 // index number (0 - 65535), and up to 253 bytes
345 // Associated with each key is a value byte string
346 // containing any value desired.
348 // The b-tree root is always located at page 1.
349 // The first leaf page of level zero is always
350 // located on page 2.
352 // The b-tree pages are linked with next
353 // pointers to facilitate enumerators,
354 // and provide for concurrency.
356 // When to root page fills, it is split in two and
357 // the tree height is raised by a new root at page
358 // one with two keys.
360 // Deleted keys are marked with a dead bit until
361 // page cleanup. The fence key for a leaf node is
364 // To achieve maximum concurrency one page is locked at a time
365 // as the tree is traversed to find leaf key in question. The right
366 // page numbers are used in cases where the page is being split,
369 // Page 0 is dedicated to lock for new page extensions,
370 // and chains empty pages together for reuse. It also
371 // contains the latch manager hash table.
373 // The ParentModification lock on a node is obtained to serialize posting
374 // or changing the fence key for a node.
376 // Empty pages are chained together through the ALLOC page and reused.
378 // Access macros to address slot and key values from the page
379 // Page slots use 1 based indexing.
381 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
382 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
383 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
385 void bt_putid(unsigned char *dest, uid id)
390 dest[i] = (unsigned char)id, id >>= 8;
393 uid bt_getid(unsigned char *src)
398 for( i = 0; i < BtId; i++ )
399 id <<= 8, id |= *src++;
404 uid bt_newdup (BtDb *bt)
407 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
409 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
413 void bt_spinreleasewrite(BtSpinLatch *latch);
414 void bt_spinwritelock(BtSpinLatch *latch);
416 // Write-Only Reentrant Lock
418 void WriteOLock (WOLock *lock, ushort tid)
421 bt_spinwritelock(lock->xcl);
423 if( *lock->tid == tid ) {
425 bt_spinreleasewrite(lock->xcl);
430 bt_spinreleasewrite(lock->xcl);
433 bt_spinreleasewrite(lock->xcl);
438 void WriteORelease (WOLock *lock)
448 // Phase-Fair reader/writer lock implementation
450 void WriteLock (RWLock *lock)
455 tix = __sync_fetch_and_add (lock->ticket, 1);
457 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
459 // wait for our ticket to come up
461 while( tix != lock->serving[0] )
468 w = PRES | (tix & PHID);
470 r = __sync_fetch_and_add (lock->rin, w);
472 r = _InterlockedExchangeAdd16 (lock->rin, w);
474 while( r != *lock->rout )
482 void WriteRelease (RWLock *lock)
485 __sync_fetch_and_and (lock->rin, ~MASK);
487 _InterlockedAnd16 (lock->rin, ~MASK);
492 void ReadLock (RWLock *lock)
496 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
498 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
501 while( w == (*lock->rin & MASK) )
509 void ReadRelease (RWLock *lock)
512 __sync_fetch_and_add (lock->rout, RINC);
514 _InterlockedExchangeAdd16 (lock->rout, RINC);
518 // Spin Latch Manager
520 // wait until write lock mode is clear
521 // and add 1 to the share count
523 void bt_spinreadlock(BtSpinLatch *latch)
529 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
531 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
533 // see if exclusive request is granted or pending
538 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
540 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
543 } while( sched_yield(), 1 );
545 } while( SwitchToThread(), 1 );
549 // wait for other read and write latches to relinquish
551 void bt_spinwritelock(BtSpinLatch *latch)
557 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
559 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
562 if( !(prev & ~BOTH) )
566 __sync_fetch_and_and ((ushort *)latch, ~XCL);
568 _InterlockedAnd16((ushort *)latch, ~XCL);
571 } while( sched_yield(), 1 );
573 } while( SwitchToThread(), 1 );
577 // try to obtain write lock
579 // return 1 if obtained,
582 int bt_spinwritetry(BtSpinLatch *latch)
587 prev = __sync_fetch_and_or((ushort *)latch, XCL);
589 prev = _InterlockedOr16((ushort *)latch, XCL);
591 // take write access if all bits are clear
594 if( !(prev & ~BOTH) )
598 __sync_fetch_and_and ((ushort *)latch, ~XCL);
600 _InterlockedAnd16((ushort *)latch, ~XCL);
607 void bt_spinreleasewrite(BtSpinLatch *latch)
610 __sync_fetch_and_and((ushort *)latch, ~BOTH);
612 _InterlockedAnd16((ushort *)latch, ~BOTH);
616 // decrement reader count
618 void bt_spinreleaseread(BtSpinLatch *latch)
621 __sync_fetch_and_add((ushort *)latch, -SHARE);
623 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
627 // read page from permanent location in Btree file
629 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
631 off64_t off = page_no << mgr->page_bits;
634 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
635 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
642 memset (ovl, 0, sizeof(OVERLAPPED));
644 ovl->OffsetHigh = off >> 32;
646 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
647 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
650 if( *amt < mgr->page_size ) {
651 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
658 // write page to permanent location in Btree file
659 // clear the dirty bit
661 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
663 off64_t off = page_no << mgr->page_bits;
666 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
672 memset (ovl, 0, sizeof(OVERLAPPED));
674 ovl->OffsetHigh = off >> 32;
676 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
679 if( *amt < mgr->page_size )
685 // link latch table entry into head of latch hash table
687 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
689 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
690 BtLatchSet *latch = bt->mgr->latchsets + slot;
692 if( latch->next = bt->mgr->hashtable[hashidx].slot )
693 bt->mgr->latchsets[latch->next].prev = slot;
695 bt->mgr->hashtable[hashidx].slot = slot;
696 latch->page_no = page_no;
703 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
711 // set CLOCK bit in latch
712 // decrement pin count
714 void bt_unpinlatch (BtLatchSet *latch)
717 if( ~latch->pin & CLOCK_bit )
718 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
719 __sync_fetch_and_add(&latch->pin, -1);
721 if( ~latch->pin & CLOCK_bit )
722 _InterlockedOr16 (&latch->pin, CLOCK_bit);
723 _InterlockedDecrement16 (&latch->pin);
727 // return the btree cached page address
729 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
731 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
736 // find existing latchset or inspire new one
737 // return with latchset pinned
739 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
741 uint hashidx = page_no % bt->mgr->latchhash;
749 // try to find our entry
751 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
753 if( slot = bt->mgr->hashtable[hashidx].slot ) do
755 latch = bt->mgr->latchsets + slot;
756 if( page_no == latch->page_no )
758 } while( slot = latch->next );
764 latch = bt->mgr->latchsets + slot;
766 __sync_fetch_and_add(&latch->pin, 1);
768 _InterlockedIncrement16 (&latch->pin);
770 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
774 // see if there are any unused pool entries
776 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
778 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
781 if( slot < bt->mgr->latchtotal ) {
782 latch = bt->mgr->latchsets + slot;
783 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
785 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
790 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
792 _InterlockedDecrement (&bt->mgr->latchdeployed);
794 // find and reuse previous entry on victim
798 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
800 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
802 // try to get write lock on hash chain
803 // skip entry if not obtained
804 // or has outstanding pins
806 slot %= bt->mgr->latchtotal;
811 latch = bt->mgr->latchsets + slot;
812 idx = latch->page_no % bt->mgr->latchhash;
814 // see we are on same chain as hashidx
819 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
822 // skip this slot if it is pinned
823 // or the CLOCK bit is set
826 if( latch->pin & CLOCK_bit ) {
828 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
830 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
833 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
837 // update permanent page area in btree from buffer pool
839 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
842 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
845 latch->dirty = 0, bt->writes++;
847 // unlink our available slot from its hash chain
850 bt->mgr->latchsets[latch->prev].next = latch->next;
852 bt->mgr->hashtable[idx].slot = latch->next;
855 bt->mgr->latchsets[latch->next].prev = latch->prev;
857 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
859 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
862 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
867 void bt_mgrclose (BtMgr *mgr)
874 // flush dirty pool pages to the btree
876 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
877 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
878 latch = mgr->latchsets + slot;
881 bt_writepage(mgr, page, latch->page_no);
882 latch->dirty = 0, num++;
884 // madvise (page, mgr->page_size, MADV_DONTNEED);
887 fprintf(stderr, "%d buffer pool pages flushed\n", num);
890 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
891 munmap (mgr->pagezero, mgr->page_size);
893 FlushViewOfFile(mgr->pagezero, 0);
894 UnmapViewOfFile(mgr->pagezero);
895 UnmapViewOfFile(mgr->hashtable);
896 CloseHandle(mgr->halloc);
897 CloseHandle(mgr->hpool);
903 FlushFileBuffers(mgr->idx);
904 CloseHandle(mgr->idx);
909 // close and release memory
911 void bt_close (BtDb *bt)
918 VirtualFree (bt->mem, 0, MEM_RELEASE);
923 // open/create new btree buffer manager
925 // call with file_name, BT_openmode, bits in page size (e.g. 16),
926 // size of page pool (e.g. 262144)
928 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
930 uint lvl, attr, last, slot, idx;
931 uint nlatchpage, latchhash;
932 unsigned char value[BtId];
933 int flag, initit = 0;
934 BtPageZero *pagezero;
941 // determine sanity of page size and buffer pool
943 if( bits > BT_maxbits )
945 else if( bits < BT_minbits )
949 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
954 mgr = calloc (1, sizeof(BtMgr));
956 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
958 if( mgr->idx == -1 ) {
959 fprintf (stderr, "Unable to open btree file\n");
960 return free(mgr), NULL;
963 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
964 attr = FILE_ATTRIBUTE_NORMAL;
965 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
967 if( mgr->idx == INVALID_HANDLE_VALUE )
968 return GlobalFree(mgr), NULL;
972 pagezero = valloc (BT_maxpage);
975 // read minimum page size to get root info
976 // to support raw disk partition files
977 // check if bits == 0 on the disk.
979 if( size = lseek (mgr->idx, 0L, 2) )
980 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
981 if( pagezero->alloc->bits )
982 bits = pagezero->alloc->bits;
986 return free(mgr), free(pagezero), NULL;
990 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
991 size = GetFileSize(mgr->idx, amt);
994 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
995 return bt_mgrclose (mgr), NULL;
996 bits = pagezero->alloc->bits;
1001 mgr->page_size = 1 << bits;
1002 mgr->page_bits = bits;
1004 // calculate number of latch hash table entries
1006 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1007 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
1009 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1010 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1011 mgr->latchtotal = nodemax;
1016 // initialize an empty b-tree with latch page, root page, page of leaves
1017 // and page(s) of latches and page pool cache
1019 memset (pagezero, 0, 1 << bits);
1020 pagezero->alloc->bits = mgr->page_bits;
1021 bt_putid(pagezero->alloc->right, MIN_lvl+1);
1023 // initialize left-most LEAF page in
1026 bt_putid (pagezero->alloc->left, LEAF_page);
1028 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1029 fprintf (stderr, "Unable to create btree page zero\n");
1030 return bt_mgrclose (mgr), NULL;
1033 memset (pagezero, 0, 1 << bits);
1034 pagezero->alloc->bits = mgr->page_bits;
1036 for( lvl=MIN_lvl; lvl--; ) {
1037 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1038 key = keyptr(pagezero->alloc, 1);
1039 key->len = 2; // create stopper key
1043 bt_putid(value, MIN_lvl - lvl + 1);
1044 val = valptr(pagezero->alloc, 1);
1045 val->len = lvl ? BtId : 0;
1046 memcpy (val->value, value, val->len);
1048 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1049 pagezero->alloc->lvl = lvl;
1050 pagezero->alloc->cnt = 1;
1051 pagezero->alloc->act = 1;
1053 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1054 fprintf (stderr, "Unable to create btree page zero\n");
1055 return bt_mgrclose (mgr), NULL;
1063 VirtualFree (pagezero, 0, MEM_RELEASE);
1066 // mlock the pagezero page
1068 flag = PROT_READ | PROT_WRITE;
1069 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1070 if( mgr->pagezero == MAP_FAILED ) {
1071 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1072 return bt_mgrclose (mgr), NULL;
1074 mlock (mgr->pagezero, mgr->page_size);
1076 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1077 if( mgr->hashtable == MAP_FAILED ) {
1078 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1079 return bt_mgrclose (mgr), NULL;
1082 flag = PAGE_READWRITE;
1083 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1084 if( !mgr->halloc ) {
1085 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1086 return bt_mgrclose (mgr), NULL;
1089 flag = FILE_MAP_WRITE;
1090 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1091 if( !mgr->pagezero ) {
1092 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1093 return bt_mgrclose (mgr), NULL;
1096 flag = PAGE_READWRITE;
1097 size = (uid)mgr->nlatchpage << mgr->page_bits;
1098 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1100 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1101 return bt_mgrclose (mgr), NULL;
1104 flag = FILE_MAP_WRITE;
1105 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1106 if( !mgr->hashtable ) {
1107 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1108 return bt_mgrclose (mgr), NULL;
1112 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1113 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1118 // open BTree access method
1119 // based on buffer manager
1121 BtDb *bt_open (BtMgr *mgr)
1123 BtDb *bt = malloc (sizeof(*bt));
1125 memset (bt, 0, sizeof(*bt));
1128 bt->mem = valloc (2 *mgr->page_size);
1130 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1132 bt->frame = (BtPage)bt->mem;
1133 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1135 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1137 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1142 // compare two keys, return > 0, = 0, or < 0
1143 // =0: keys are same
1146 // as the comparison value
1148 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1150 uint len1 = key1->len;
1153 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1164 // place write, read, or parent lock on requested page_no.
1166 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1170 ReadLock (latch->readwr);
1173 WriteLock (latch->readwr);
1176 ReadLock (latch->access);
1179 WriteLock (latch->access);
1182 WriteOLock (latch->parent, bt->thread_no);
1185 WriteOLock (latch->atomic, bt->thread_no);
1187 case BtLockAtomic | BtLockRead:
1188 WriteOLock (latch->atomic, bt->thread_no);
1189 ReadLock (latch->readwr);
1194 // remove write, read, or parent lock on requested page
1196 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1200 ReadRelease (latch->readwr);
1203 WriteRelease (latch->readwr);
1206 ReadRelease (latch->access);
1209 WriteRelease (latch->access);
1212 WriteORelease (latch->parent);
1215 WriteORelease (latch->atomic);
1217 case BtLockAtomic | BtLockRead:
1218 WriteORelease (latch->atomic);
1219 ReadRelease (latch->readwr);
1224 // allocate a new page
1225 // return with page latched, but unlocked.
1227 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1232 // lock allocation page
1234 bt_spinwritelock(bt->mgr->lock);
1236 // use empty chain first
1237 // else allocate empty page
1239 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1240 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1241 set->page = bt_mappage (bt, set->latch);
1243 return bt->err = BTERR_struct, -1;
1245 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1246 bt_spinreleasewrite(bt->mgr->lock);
1248 memcpy (set->page, contents, bt->mgr->page_size);
1249 set->latch->dirty = 1;
1253 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1254 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1256 // unlock allocation latch
1258 bt_spinreleasewrite(bt->mgr->lock);
1260 // don't load cache from btree page
1262 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1263 set->page = bt_mappage (bt, set->latch);
1265 return bt->err = BTERR_struct;
1267 memcpy (set->page, contents, bt->mgr->page_size);
1268 set->latch->dirty = 1;
1272 // find slot in page for given key at a given level
1274 int bt_findslot (BtPage page, unsigned char *key, uint len)
1276 uint diff, higher = page->cnt, low = 1, slot;
1279 // make stopper key an infinite fence value
1281 if( bt_getid (page->right) )
1286 // low is the lowest candidate.
1287 // loop ends when they meet
1289 // higher is already
1290 // tested as .ge. the passed key.
1292 while( diff = higher - low ) {
1293 slot = low + ( diff >> 1 );
1294 if( keycmp (keyptr(page, slot), key, len) < 0 )
1297 higher = slot, good++;
1300 // return zero if key is on right link page
1302 return good ? higher : 0;
1305 // find and load page at given level for given key
1306 // leave page rd or wr locked as requested
1308 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1310 uid page_no = ROOT_page, prevpage = 0;
1311 uint drill = 0xff, slot;
1312 BtLatchSet *prevlatch;
1313 uint mode, prevmode;
1315 // start at root of btree and drill down
1318 // determine lock mode of drill level
1319 mode = (drill == lvl) ? lock : BtLockRead;
1321 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1324 // obtain access lock using lock chaining with Access mode
1326 if( page_no > ROOT_page )
1327 bt_lockpage(bt, BtLockAccess, set->latch);
1329 set->page = bt_mappage (bt, set->latch);
1331 // release & unpin parent or left sibling page
1334 bt_unlockpage(bt, prevmode, prevlatch);
1335 bt_unpinlatch (prevlatch);
1339 // obtain mode lock using lock chaining through AccessLock
1341 bt_lockpage(bt, mode, set->latch);
1343 if( set->page->free )
1344 return bt->err = BTERR_struct, 0;
1346 if( page_no > ROOT_page )
1347 bt_unlockpage(bt, BtLockAccess, set->latch);
1349 // re-read and re-lock root after determining actual level of root
1351 if( set->page->lvl != drill) {
1352 if( set->latch->page_no != ROOT_page )
1353 return bt->err = BTERR_struct, 0;
1355 drill = set->page->lvl;
1357 if( lock != BtLockRead && drill == lvl ) {
1358 bt_unlockpage(bt, mode, set->latch);
1359 bt_unpinlatch (set->latch);
1364 prevpage = set->latch->page_no;
1365 prevlatch = set->latch;
1368 // find key on page at this level
1369 // and descend to requested level
1371 if( !set->page->kill )
1372 if( slot = bt_findslot (set->page, key, len) ) {
1376 // find next non-dead slot -- the fence key if nothing else
1378 while( slotptr(set->page, slot)->dead )
1379 if( slot++ < set->page->cnt )
1382 return bt->err = BTERR_struct, 0;
1384 page_no = bt_getid(valptr(set->page, slot)->value);
1389 // or slide right into next page
1391 page_no = bt_getid(set->page->right);
1394 // return error on end of right chain
1396 bt->err = BTERR_struct;
1397 return 0; // return error
1400 // return page to free list
1401 // page must be delete & write locked
1403 void bt_freepage (BtDb *bt, BtPageSet *set)
1405 // lock allocation page
1407 bt_spinwritelock (bt->mgr->lock);
1411 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1412 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1413 set->latch->dirty = 1;
1414 set->page->free = 1;
1416 // unlock released page
1418 bt_unlockpage (bt, BtLockDelete, set->latch);
1419 bt_unlockpage (bt, BtLockWrite, set->latch);
1420 bt_unpinlatch (set->latch);
1422 // unlock allocation page
1424 bt_spinreleasewrite (bt->mgr->lock);
1427 // a fence key was deleted from a page
1428 // push new fence value upwards
1430 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1432 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1433 unsigned char value[BtId];
1437 // remove the old fence value
1439 ptr = keyptr(set->page, set->page->cnt);
1440 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1441 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1442 set->latch->dirty = 1;
1444 // cache new fence value
1446 ptr = keyptr(set->page, set->page->cnt);
1447 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1449 bt_lockpage (bt, BtLockParent, set->latch);
1450 bt_unlockpage (bt, BtLockWrite, set->latch);
1452 // insert new (now smaller) fence key
1454 bt_putid (value, set->latch->page_no);
1455 ptr = (BtKey*)leftkey;
1457 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1460 // now delete old fence key
1462 ptr = (BtKey*)rightkey;
1464 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1467 bt_unlockpage (bt, BtLockParent, set->latch);
1468 bt_unpinlatch(set->latch);
1472 // root has a single child
1473 // collapse a level from the tree
1475 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1481 // find the child entry and promote as new root contents
1484 for( idx = 0; idx++ < root->page->cnt; )
1485 if( !slotptr(root->page, idx)->dead )
1488 page_no = bt_getid (valptr(root->page, idx)->value);
1490 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1491 child->page = bt_mappage (bt, child->latch);
1495 bt_lockpage (bt, BtLockDelete, child->latch);
1496 bt_lockpage (bt, BtLockWrite, child->latch);
1498 memcpy (root->page, child->page, bt->mgr->page_size);
1499 root->latch->dirty = 1;
1501 bt_freepage (bt, child);
1503 } while( root->page->lvl > 1 && root->page->act == 1 );
1505 bt_unlockpage (bt, BtLockWrite, root->latch);
1506 bt_unpinlatch (root->latch);
1510 // delete a page and manage keys
1511 // call with page writelocked
1512 // returns with page unpinned
1514 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1516 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1517 unsigned char value[BtId];
1518 uint lvl = set->page->lvl;
1523 // cache copy of fence key
1524 // to post in parent
1526 ptr = keyptr(set->page, set->page->cnt);
1527 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1529 // obtain lock on right page
1531 page_no = bt_getid(set->page->right);
1533 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1534 right->page = bt_mappage (bt, right->latch);
1538 bt_lockpage (bt, BtLockWrite, right->latch);
1540 // cache copy of key to update
1542 ptr = keyptr(right->page, right->page->cnt);
1543 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1545 if( right->page->kill )
1546 return bt->err = BTERR_struct;
1548 // pull contents of right peer into our empty page
1550 memcpy (set->page, right->page, bt->mgr->page_size);
1551 set->latch->dirty = 1;
1553 // mark right page deleted and point it to left page
1554 // until we can post parent updates that remove access
1555 // to the deleted page.
1557 bt_putid (right->page->right, set->latch->page_no);
1558 right->latch->dirty = 1;
1559 right->page->kill = 1;
1561 bt_lockpage (bt, BtLockParent, right->latch);
1562 bt_unlockpage (bt, BtLockWrite, right->latch);
1564 bt_lockpage (bt, BtLockParent, set->latch);
1565 bt_unlockpage (bt, BtLockWrite, set->latch);
1567 // redirect higher key directly to our new node contents
1569 bt_putid (value, set->latch->page_no);
1570 ptr = (BtKey*)higherfence;
1572 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1575 // delete old lower key to our node
1577 ptr = (BtKey*)lowerfence;
1579 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1582 // obtain delete and write locks to right node
1584 bt_unlockpage (bt, BtLockParent, right->latch);
1585 bt_lockpage (bt, BtLockDelete, right->latch);
1586 bt_lockpage (bt, BtLockWrite, right->latch);
1587 bt_freepage (bt, right);
1589 bt_unlockpage (bt, BtLockParent, set->latch);
1590 bt_unpinlatch (set->latch);
1594 // find and delete key on page by marking delete flag bit
1595 // if page becomes empty, delete it from the btree
1597 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1599 uint slot, idx, found, fence;
1604 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1605 ptr = keyptr(set->page, slot);
1609 // if librarian slot, advance to real slot
1611 if( slotptr(set->page, slot)->type == Librarian )
1612 ptr = keyptr(set->page, ++slot);
1614 fence = slot == set->page->cnt;
1616 // if key is found delete it, otherwise ignore request
1618 if( found = !keycmp (ptr, key, len) )
1619 if( found = slotptr(set->page, slot)->dead == 0 ) {
1620 val = valptr(set->page,slot);
1621 slotptr(set->page, slot)->dead = 1;
1622 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1625 // collapse empty slots beneath the fence
1627 while( idx = set->page->cnt - 1 )
1628 if( slotptr(set->page, idx)->dead ) {
1629 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1630 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1635 // did we delete a fence key in an upper level?
1637 if( found && lvl && set->page->act && fence )
1638 if( bt_fixfence (bt, set, lvl) )
1643 // do we need to collapse root?
1645 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1646 if( bt_collapseroot (bt, set) )
1651 // delete empty page
1653 if( !set->page->act )
1654 return bt_deletepage (bt, set);
1656 set->latch->dirty = 1;
1657 bt_unlockpage(bt, BtLockWrite, set->latch);
1658 bt_unpinlatch (set->latch);
1662 BtKey *bt_foundkey (BtDb *bt)
1664 return (BtKey*)(bt->key);
1667 // advance to next slot
1669 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1671 BtLatchSet *prevlatch;
1674 if( slot < set->page->cnt )
1677 prevlatch = set->latch;
1679 if( page_no = bt_getid(set->page->right) )
1680 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1681 set->page = bt_mappage (bt, set->latch);
1685 return bt->err = BTERR_struct, 0;
1687 // obtain access lock using lock chaining with Access mode
1689 bt_lockpage(bt, BtLockAccess, set->latch);
1691 bt_unlockpage(bt, BtLockRead, prevlatch);
1692 bt_unpinlatch (prevlatch);
1694 bt_lockpage(bt, BtLockRead, set->latch);
1695 bt_unlockpage(bt, BtLockAccess, set->latch);
1699 // find unique key or first duplicate key in
1700 // leaf level and return number of value bytes
1701 // or (-1) if not found. Setup key for bt_foundkey
1703 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1711 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1713 ptr = keyptr(set->page, slot);
1715 // skip librarian slot place holder
1717 if( slotptr(set->page, slot)->type == Librarian )
1718 ptr = keyptr(set->page, ++slot);
1720 // return actual key found
1722 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1725 if( slotptr(set->page, slot)->type == Duplicate )
1728 // not there if we reach the stopper key
1730 if( slot == set->page->cnt )
1731 if( !bt_getid (set->page->right) )
1734 // if key exists, return >= 0 value bytes copied
1735 // otherwise return (-1)
1737 if( slotptr(set->page, slot)->dead )
1741 if( !memcmp (ptr->key, key, len) ) {
1742 val = valptr (set->page,slot);
1743 if( valmax > val->len )
1745 memcpy (value, val->value, valmax);
1751 } while( slot = bt_findnext (bt, set, slot) );
1753 bt_unlockpage (bt, BtLockRead, set->latch);
1754 bt_unpinlatch (set->latch);
1758 // check page for space available,
1759 // clean if necessary and return
1760 // 0 - page needs splitting
1761 // >0 new slot value
1763 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1765 uint nxt = bt->mgr->page_size;
1766 BtPage page = set->page;
1767 uint cnt = 0, idx = 0;
1768 uint max = page->cnt;
1773 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1776 // skip cleanup and proceed to split
1777 // if there's not enough garbage
1780 if( page->garbage < nxt / 5 )
1783 memcpy (bt->frame, page, bt->mgr->page_size);
1785 // skip page info and set rest of page to zero
1787 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1788 set->latch->dirty = 1;
1792 // clean up page first by
1793 // removing deleted keys
1795 while( cnt++ < max ) {
1799 if( cnt < max || bt->frame->lvl )
1800 if( slotptr(bt->frame,cnt)->dead )
1803 // copy the value across
1805 val = valptr(bt->frame, cnt);
1806 nxt -= val->len + sizeof(BtVal);
1807 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1809 // copy the key across
1811 key = keyptr(bt->frame, cnt);
1812 nxt -= key->len + sizeof(BtKey);
1813 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1815 // make a librarian slot
1817 slotptr(page, ++idx)->off = nxt;
1818 slotptr(page, idx)->type = Librarian;
1819 slotptr(page, idx)->dead = 1;
1823 slotptr(page, ++idx)->off = nxt;
1824 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1826 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1833 // see if page has enough space now, or does it need splitting?
1835 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1841 // split the root and raise the height of the btree
1843 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1845 unsigned char leftkey[BT_keyarray];
1846 uint nxt = bt->mgr->page_size;
1847 unsigned char value[BtId];
1853 // save left page fence key for new root
1855 ptr = keyptr(root->page, root->page->cnt);
1856 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1858 // Obtain an empty page to use, and copy the current
1859 // root contents into it, e.g. lower keys
1861 if( bt_newpage(bt, left, root->page) )
1864 left_page_no = left->latch->page_no;
1865 bt_unpinlatch (left->latch);
1867 // preserve the page info at the bottom
1868 // of higher keys and set rest to zero
1870 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1872 // insert stopper key at top of newroot page
1873 // and increase the root height
1875 nxt -= BtId + sizeof(BtVal);
1876 bt_putid (value, right->page_no);
1877 val = (BtVal *)((unsigned char *)root->page + nxt);
1878 memcpy (val->value, value, BtId);
1881 nxt -= 2 + sizeof(BtKey);
1882 slotptr(root->page, 2)->off = nxt;
1883 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1888 // insert lower keys page fence key on newroot page as first key
1890 nxt -= BtId + sizeof(BtVal);
1891 bt_putid (value, left_page_no);
1892 val = (BtVal *)((unsigned char *)root->page + nxt);
1893 memcpy (val->value, value, BtId);
1896 ptr = (BtKey *)leftkey;
1897 nxt -= ptr->len + sizeof(BtKey);
1898 slotptr(root->page, 1)->off = nxt;
1899 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1901 bt_putid(root->page->right, 0);
1902 root->page->min = nxt; // reset lowest used offset and key count
1903 root->page->cnt = 2;
1904 root->page->act = 2;
1907 // release and unpin root pages
1909 bt_unlockpage(bt, BtLockWrite, root->latch);
1910 bt_unpinlatch (root->latch);
1912 bt_unpinlatch (right);
1916 // split already locked full node
1918 // return pool entry for new right
1921 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1923 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1924 uint lvl = set->page->lvl;
1931 // split higher half of keys to bt->frame
1933 memset (bt->frame, 0, bt->mgr->page_size);
1934 max = set->page->cnt;
1938 while( cnt++ < max ) {
1939 if( cnt < max || set->page->lvl )
1940 if( slotptr(set->page, cnt)->dead )
1943 src = valptr(set->page, cnt);
1944 nxt -= src->len + sizeof(BtVal);
1945 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1947 key = keyptr(set->page, cnt);
1948 nxt -= key->len + sizeof(BtKey);
1949 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1950 memcpy (ptr, key, key->len + sizeof(BtKey));
1952 // add librarian slot
1954 slotptr(bt->frame, ++idx)->off = nxt;
1955 slotptr(bt->frame, idx)->type = Librarian;
1956 slotptr(bt->frame, idx)->dead = 1;
1960 slotptr(bt->frame, ++idx)->off = nxt;
1961 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1963 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1967 bt->frame->bits = bt->mgr->page_bits;
1968 bt->frame->min = nxt;
1969 bt->frame->cnt = idx;
1970 bt->frame->lvl = lvl;
1974 if( set->latch->page_no > ROOT_page )
1975 bt_putid (bt->frame->right, bt_getid (set->page->right));
1977 // get new free page and write higher keys to it.
1979 if( bt_newpage(bt, right, bt->frame) )
1982 memcpy (bt->frame, set->page, bt->mgr->page_size);
1983 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1984 set->latch->dirty = 1;
1986 nxt = bt->mgr->page_size;
1987 set->page->garbage = 0;
1993 if( slotptr(bt->frame, max)->type == Librarian )
1996 // assemble page of smaller keys
1998 while( cnt++ < max ) {
1999 if( slotptr(bt->frame, cnt)->dead )
2001 val = valptr(bt->frame, cnt);
2002 nxt -= val->len + sizeof(BtVal);
2003 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2005 key = keyptr(bt->frame, cnt);
2006 nxt -= key->len + sizeof(BtKey);
2007 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2009 // add librarian slot
2011 slotptr(set->page, ++idx)->off = nxt;
2012 slotptr(set->page, idx)->type = Librarian;
2013 slotptr(set->page, idx)->dead = 1;
2017 slotptr(set->page, ++idx)->off = nxt;
2018 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2022 bt_putid(set->page->right, right->latch->page_no);
2023 set->page->min = nxt;
2024 set->page->cnt = idx;
2026 return right->latch->entry;
2029 // fix keys for newly split page
2030 // call with page locked, return
2033 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2035 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2036 unsigned char value[BtId];
2037 uint lvl = set->page->lvl;
2041 // if current page is the root page, split it
2043 if( set->latch->page_no == ROOT_page )
2044 return bt_splitroot (bt, set, right);
2046 ptr = keyptr(set->page, set->page->cnt);
2047 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2049 page = bt_mappage (bt, right);
2051 ptr = keyptr(page, page->cnt);
2052 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2054 // insert new fences in their parent pages
2056 bt_lockpage (bt, BtLockParent, right);
2058 bt_lockpage (bt, BtLockParent, set->latch);
2059 bt_unlockpage (bt, BtLockWrite, set->latch);
2061 // insert new fence for reformulated left block of smaller keys
2063 bt_putid (value, set->latch->page_no);
2064 ptr = (BtKey *)leftkey;
2066 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2069 // switch fence for right block of larger keys to new right page
2071 bt_putid (value, right->page_no);
2072 ptr = (BtKey *)rightkey;
2074 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2077 bt_unlockpage (bt, BtLockParent, set->latch);
2078 bt_unpinlatch (set->latch);
2080 bt_unlockpage (bt, BtLockParent, right);
2081 bt_unpinlatch (right);
2085 // install new key and value onto page
2086 // page must already be checked for
2089 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2091 uint idx, librarian;
2096 // if found slot > desired slot and previous slot
2097 // is a librarian slot, use it
2100 if( slotptr(set->page, slot-1)->type == Librarian )
2103 // copy value onto page
2105 set->page->min -= vallen + sizeof(BtVal);
2106 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2107 memcpy (val->value, value, vallen);
2110 // copy key onto page
2112 set->page->min -= keylen + sizeof(BtKey);
2113 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2114 memcpy (ptr->key, key, keylen);
2117 // find first empty slot
2119 for( idx = slot; idx < set->page->cnt; idx++ )
2120 if( slotptr(set->page, idx)->dead )
2123 // now insert key into array before slot
2125 if( idx == set->page->cnt )
2126 idx += 2, set->page->cnt += 2, librarian = 2;
2130 set->latch->dirty = 1;
2133 while( idx > slot + librarian - 1 )
2134 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2136 // add librarian slot
2138 if( librarian > 1 ) {
2139 node = slotptr(set->page, slot++);
2140 node->off = set->page->min;
2141 node->type = Librarian;
2147 node = slotptr(set->page, slot);
2148 node->off = set->page->min;
2153 bt_unlockpage (bt, BtLockWrite, set->latch);
2154 bt_unpinlatch (set->latch);
2160 // Insert new key into the btree at given level.
2161 // either add a new key or update/add an existing one
2163 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2165 unsigned char newkey[BT_keyarray];
2166 uint slot, idx, len, entry;
2173 // set up the key we're working on
2175 ins = (BtKey*)newkey;
2176 memcpy (ins->key, key, keylen);
2179 // is this a non-unique index value?
2185 sequence = bt_newdup (bt);
2186 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2190 while( 1 ) { // find the page and slot for the current key
2191 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2192 ptr = keyptr(set->page, slot);
2195 bt->err = BTERR_ovflw;
2199 // if librarian slot == found slot, advance to real slot
2201 if( slotptr(set->page, slot)->type == Librarian )
2202 if( !keycmp (ptr, key, keylen) )
2203 ptr = keyptr(set->page, ++slot);
2207 if( slotptr(set->page, slot)->type == Duplicate )
2210 // if inserting a duplicate key or unique key
2211 // check for adequate space on the page
2212 // and insert the new key before slot.
2214 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2215 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2216 if( !(entry = bt_splitpage (bt, set)) )
2218 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2223 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2226 // if key already exists, update value and return
2228 val = valptr(set->page, slot);
2230 if( val->len >= vallen ) {
2231 if( slotptr(set->page, slot)->dead )
2233 set->page->garbage += val->len - vallen;
2234 set->latch->dirty = 1;
2235 slotptr(set->page, slot)->dead = 0;
2237 memcpy (val->value, value, vallen);
2238 bt_unlockpage(bt, BtLockWrite, set->latch);
2239 bt_unpinlatch (set->latch);
2243 // new update value doesn't fit in existing value area
2245 if( !slotptr(set->page, slot)->dead )
2246 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2248 slotptr(set->page, slot)->dead = 0;
2252 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2253 if( !(entry = bt_splitpage (bt, set)) )
2255 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2260 set->page->min -= vallen + sizeof(BtVal);
2261 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2262 memcpy (val->value, value, vallen);
2265 set->latch->dirty = 1;
2266 set->page->min -= keylen + sizeof(BtKey);
2267 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2268 memcpy (ptr->key, key, keylen);
2271 slotptr(set->page, slot)->off = set->page->min;
2272 bt_unlockpage(bt, BtLockWrite, set->latch);
2273 bt_unpinlatch (set->latch);
2280 uint entry; // latch table entry number
2281 uint slot:31; // page slot number
2282 uint reuse:1; // reused previous page
2286 uid page_no; // page number for split leaf
2287 void *next; // next key to insert
2288 uint entry:29; // latch table entry number
2289 uint type:2; // 0 == insert, 1 == delete, 2 == free
2290 uint nounlock:1; // don't unlock ParentModification
2291 unsigned char leafkey[BT_keyarray];
2294 // determine actual page where key is located
2295 // return slot number
2297 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2299 BtKey *key = keyptr(source,src);
2300 uint slot = locks[src].slot;
2303 if( src > 1 && locks[src].reuse )
2304 entry = locks[src-1].entry, slot = 0;
2306 entry = locks[src].entry;
2309 set->latch = bt->mgr->latchsets + entry;
2310 set->page = bt_mappage (bt, set->latch);
2314 // is locks->reuse set? or was slot zeroed?
2315 // if so, find where our key is located
2316 // on current page or pages split on
2317 // same page txn operations.
2320 set->latch = bt->mgr->latchsets + entry;
2321 set->page = bt_mappage (bt, set->latch);
2323 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2324 if( slotptr(set->page, slot)->type == Librarian )
2326 if( locks[src].reuse )
2327 locks[src].entry = entry;
2330 } while( entry = set->latch->split );
2332 bt->err = BTERR_atomic;
2336 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2338 BtKey *key = keyptr(source, src);
2339 BtVal *val = valptr(source, src);
2344 while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2345 if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
2346 return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2348 if( entry = bt_splitpage (bt, set) )
2349 latch = bt->mgr->latchsets + entry;
2353 // splice right page into split chain
2354 // and WriteLock it.
2356 bt_lockpage(bt, BtLockWrite, latch);
2357 latch->split = set->latch->split;
2358 set->latch->split = entry;
2359 locks[src].slot = 0;
2362 return bt->err = BTERR_atomic;
2365 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2367 BtKey *key = keyptr(source, src);
2368 uint idx, entry, slot;
2373 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2374 ptr = keyptr(set->page, slot);
2376 return bt->err = BTERR_struct;
2378 if( !keycmp (ptr, key->key, key->len) )
2379 if( !slotptr(set->page, slot)->dead )
2380 slotptr(set->page, slot)->dead = 1;
2386 val = valptr(set->page, slot);
2387 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2388 set->latch->dirty = 1;
2394 // delete an empty master page for a transaction
2396 // note that the far right page never empties because
2397 // it always contains (at least) the infinite stopper key
2398 // and that all pages that don't contain any keys are
2399 // deleted, or are being held under Atomic lock.
2401 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2403 BtPageSet right[1], temp[1];
2404 unsigned char value[BtId];
2408 bt_lockpage(bt, BtLockWrite, prev->latch);
2410 // grab the right sibling
2412 if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2413 right->page = bt_mappage (bt, right->latch);
2417 bt_lockpage(bt, BtLockAtomic, right->latch);
2418 bt_lockpage(bt, BtLockWrite, right->latch);
2420 // and pull contents over empty page
2421 // while preserving master's left link
2423 memcpy (right->page->left, prev->page->left, BtId);
2424 memcpy (prev->page, right->page, bt->mgr->page_size);
2426 // forward seekers to old right sibling
2427 // to new page location in set
2429 bt_putid (right->page->right, prev->latch->page_no);
2430 right->latch->dirty = 1;
2431 right->page->kill = 1;
2433 // remove pointer to right page for searchers by
2434 // changing right fence key to point to the master page
2436 ptr = keyptr(right->page,right->page->cnt);
2437 bt_putid (value, prev->latch->page_no);
2439 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2442 // now that master page is in good shape we can
2443 // remove its locks.
2445 bt_unlockpage (bt, BtLockAtomic, prev->latch);
2446 bt_unlockpage (bt, BtLockWrite, prev->latch);
2448 // fix master's right sibling's left pointer
2449 // to remove scanner's poiner to the right page
2451 if( right_page_no = bt_getid (prev->page->right) ) {
2452 if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2453 temp->page = bt_mappage (bt, temp->latch);
2455 bt_lockpage (bt, BtLockWrite, temp->latch);
2456 bt_putid (temp->page->left, prev->latch->page_no);
2457 temp->latch->dirty = 1;
2459 bt_unlockpage (bt, BtLockWrite, temp->latch);
2460 bt_unpinlatch (temp->latch);
2461 } else { // master is now the far right page
2462 bt_spinwritelock (bt->mgr->lock);
2463 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2464 bt_spinreleasewrite(bt->mgr->lock);
2467 // now that there are no pointers to the right page
2468 // we can delete it after the last read access occurs
2470 bt_unlockpage (bt, BtLockWrite, right->latch);
2471 bt_unlockpage (bt, BtLockAtomic, right->latch);
2472 bt_lockpage (bt, BtLockDelete, right->latch);
2473 bt_lockpage (bt, BtLockWrite, right->latch);
2474 bt_freepage (bt, right);
2478 // atomic modification of a batch of keys.
2480 // return -1 if BTERR is set
2481 // otherwise return slot number
2482 // causing the key constraint violation
2483 // or zero on successful completion.
2485 int bt_atomictxn (BtDb *bt, BtPage source)
2487 uint src, idx, slot, samepage, entry;
2488 AtomicKey *head, *tail, *leaf;
2489 BtPageSet set[1], prev[1];
2490 unsigned char value[BtId];
2491 BtKey *key, *ptr, *key2;
2501 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2505 // stable sort the list of keys into order to
2506 // prevent deadlocks between threads.
2508 for( src = 1; src++ < source->cnt; ) {
2509 *temp = *slotptr(source,src);
2510 key = keyptr (source,src);
2512 for( idx = src; --idx; ) {
2513 key2 = keyptr (source,idx);
2514 if( keycmp (key, key2->key, key2->len) < 0 ) {
2515 *slotptr(source,idx+1) = *slotptr(source,idx);
2516 *slotptr(source,idx) = *temp;
2522 // Load the leaf page for each key
2523 // group same page references with reuse bit
2524 // and determine any constraint violations
2526 for( src = 0; src++ < source->cnt; ) {
2527 key = keyptr(source, src);
2530 // first determine if this modification falls
2531 // on the same page as the previous modification
2532 // note that the far right leaf page is a special case
2534 if( samepage = src > 1 )
2535 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2536 slot = bt_findslot(set->page, key->key, key->len);
2538 bt_unlockpage(bt, BtLockRead, set->latch);
2541 if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
2542 set->latch->split = 0;
2546 if( slotptr(set->page, slot)->type == Librarian )
2547 ptr = keyptr(set->page, ++slot);
2549 ptr = keyptr(set->page, slot);
2552 locks[src].entry = set->latch->entry;
2553 locks[src].slot = slot;
2554 locks[src].reuse = 0;
2556 locks[src].entry = 0;
2557 locks[src].slot = 0;
2558 locks[src].reuse = 1;
2561 switch( slotptr(source, src)->type ) {
2564 if( !slotptr(set->page, slot)->dead )
2565 if( slot < set->page->cnt || bt_getid (set->page->right) )
2566 if( !keycmp (ptr, key->key, key->len) ) {
2568 // return constraint violation if key already exists
2570 bt_unlockpage(bt, BtLockRead, set->latch);
2574 if( locks[src].entry ) {
2575 set->latch = bt->mgr->latchsets + locks[src].entry;
2576 bt_unlockpage(bt, BtLockAtomic, set->latch);
2577 bt_unpinlatch (set->latch);
2588 // unlock last loadpage lock
2591 bt_unlockpage(bt, BtLockRead, set->latch);
2593 // obtain write lock for each master page
2595 for( src = 0; src++ < source->cnt; )
2596 if( locks[src].reuse )
2599 bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2601 // insert or delete each key
2602 // process any splits or merges
2603 // release Write & Atomic latches
2604 // set ParentModifications and build
2605 // queue of keys to insert for split pages
2606 // or delete for deleted pages.
2608 // run through txn list backwards
2610 samepage = source->cnt + 1;
2612 for( src = source->cnt; src; src-- ) {
2613 if( locks[src].reuse )
2616 // perform the txn operations
2617 // from smaller to larger on
2620 for( idx = src; idx < samepage; idx++ )
2621 switch( slotptr(source,idx)->type ) {
2623 if( bt_atomicdelete (bt, source, locks, idx) )
2629 if( bt_atomicinsert (bt, source, locks, idx) )
2634 // after the same page operations have finished,
2635 // process master page for splits or deletion.
2637 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2638 prev->page = bt_mappage (bt, prev->latch);
2641 // pick-up all splits from master page
2642 // each one is already WriteLocked.
2644 entry = prev->latch->split;
2647 set->latch = bt->mgr->latchsets + entry;
2648 set->page = bt_mappage (bt, set->latch);
2649 entry = set->latch->split;
2651 // delete empty master page by undoing its split
2652 // (this is potentially another empty page)
2653 // note that there are no new left pointers yet
2655 if( !prev->page->act ) {
2656 memcpy (set->page->left, prev->page->left, BtId);
2657 memcpy (prev->page, set->page, bt->mgr->page_size);
2658 bt_lockpage (bt, BtLockDelete, set->latch);
2659 bt_freepage (bt, set);
2661 prev->latch->dirty = 1;
2665 // remove empty page from the split chain
2667 if( !set->page->act ) {
2668 memcpy (prev->page->right, set->page->right, BtId);
2669 prev->latch->split = set->latch->split;
2670 bt_lockpage (bt, BtLockDelete, set->latch);
2671 bt_freepage (bt, set);
2675 // schedule prev fence key update
2677 ptr = keyptr(prev->page,prev->page->cnt);
2678 leaf = calloc (sizeof(AtomicKey), 1);
2680 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2681 leaf->page_no = prev->latch->page_no;
2682 leaf->entry = prev->latch->entry;
2692 // splice in the left link into the split page
2694 bt_putid (set->page->left, prev->latch->page_no);
2695 bt_lockpage(bt, BtLockParent, prev->latch);
2696 bt_unlockpage(bt, BtLockWrite, prev->latch);
2700 // update left pointer in next right page from last split page
2701 // (if all splits were reversed, latch->split == 0)
2703 if( latch->split ) {
2704 // fix left pointer in master's original (now split)
2705 // far right sibling or set rightmost page in page zero
2707 if( right = bt_getid (prev->page->right) ) {
2708 if( set->latch = bt_pinlatch (bt, right, 1) )
2709 set->page = bt_mappage (bt, set->latch);
2713 bt_lockpage (bt, BtLockWrite, set->latch);
2714 bt_putid (set->page->left, prev->latch->page_no);
2715 set->latch->dirty = 1;
2716 bt_unlockpage (bt, BtLockWrite, set->latch);
2717 bt_unpinlatch (set->latch);
2718 } else { // prev is rightmost page
2719 bt_spinwritelock (bt->mgr->lock);
2720 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2721 bt_spinreleasewrite(bt->mgr->lock);
2724 // Process last page split in chain
2726 ptr = keyptr(prev->page,prev->page->cnt);
2727 leaf = calloc (sizeof(AtomicKey), 1);
2729 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2730 leaf->page_no = prev->latch->page_no;
2731 leaf->entry = prev->latch->entry;
2741 bt_lockpage(bt, BtLockParent, prev->latch);
2742 bt_unlockpage(bt, BtLockWrite, prev->latch);
2744 // remove atomic lock on master page
2746 bt_unlockpage(bt, BtLockAtomic, latch);
2750 // finished if prev page occupied (either master or final split)
2752 if( prev->page->act ) {
2753 bt_unlockpage(bt, BtLockWrite, latch);
2754 bt_unlockpage(bt, BtLockAtomic, latch);
2755 bt_unpinlatch(latch);
2759 // any and all splits were reversed, and the
2760 // master page located in prev is empty, delete it
2761 // by pulling over master's right sibling.
2763 // Remove empty master's fence key
2765 ptr = keyptr(prev->page,prev->page->cnt);
2767 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2770 // perform the remainder of the delete
2771 // from the FIFO queue
2773 leaf = calloc (sizeof(AtomicKey), 1);
2775 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2776 leaf->page_no = prev->latch->page_no;
2777 leaf->entry = prev->latch->entry;
2788 // leave atomic lock in place until
2789 // deletion completes in next phase.
2791 bt_unlockpage(bt, BtLockWrite, prev->latch);
2794 // add & delete keys for any pages split or merged during transaction
2798 set->latch = bt->mgr->latchsets + leaf->entry;
2799 set->page = bt_mappage (bt, set->latch);
2801 bt_putid (value, leaf->page_no);
2802 ptr = (BtKey *)leaf->leafkey;
2804 switch( leaf->type ) {
2805 case 0: // insert key
2806 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2811 case 1: // delete key
2812 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2817 case 2: // free page
2818 if( bt_atomicfree (bt, set) )
2824 if( !leaf->nounlock )
2825 bt_unlockpage (bt, BtLockParent, set->latch);
2827 bt_unpinlatch (set->latch);
2830 } while( leaf = tail );
2838 // set cursor to highest slot on highest page
2840 uint bt_lastkey (BtDb *bt)
2842 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2845 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2846 set->page = bt_mappage (bt, set->latch);
2850 bt_lockpage(bt, BtLockRead, set->latch);
2851 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2852 bt_unlockpage(bt, BtLockRead, set->latch);
2853 bt_unpinlatch (set->latch);
2855 bt->cursor_page = page_no;
2856 return bt->cursor->cnt;
2859 // return previous slot on cursor page
2861 uint bt_prevkey (BtDb *bt, uint slot)
2863 uid ourright, next, us = bt->cursor_page;
2869 ourright = bt_getid(bt->cursor->right);
2872 if( !(next = bt_getid(bt->cursor->left)) )
2876 bt->cursor_page = next;
2878 if( set->latch = bt_pinlatch (bt, next, 1) )
2879 set->page = bt_mappage (bt, set->latch);
2883 bt_lockpage(bt, BtLockRead, set->latch);
2884 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2885 bt_unlockpage(bt, BtLockRead, set->latch);
2886 bt_unpinlatch (set->latch);
2888 next = bt_getid (bt->cursor->right);
2890 if( bt->cursor->kill )
2894 if( next == ourright )
2899 return bt->cursor->cnt;
2902 // return next slot on cursor page
2903 // or slide cursor right into next page
2905 uint bt_nextkey (BtDb *bt, uint slot)
2911 right = bt_getid(bt->cursor->right);
2913 while( slot++ < bt->cursor->cnt )
2914 if( slotptr(bt->cursor,slot)->dead )
2916 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2924 bt->cursor_page = right;
2926 if( set->latch = bt_pinlatch (bt, right, 1) )
2927 set->page = bt_mappage (bt, set->latch);
2931 bt_lockpage(bt, BtLockRead, set->latch);
2933 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2935 bt_unlockpage(bt, BtLockRead, set->latch);
2936 bt_unpinlatch (set->latch);
2944 // cache page of keys into cursor and return starting slot for given key
2946 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2951 // cache page for retrieval
2953 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2954 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2958 bt->cursor_page = set->latch->page_no;
2960 bt_unlockpage(bt, BtLockRead, set->latch);
2961 bt_unpinlatch (set->latch);
2965 BtKey *bt_key(BtDb *bt, uint slot)
2967 return keyptr(bt->cursor, slot);
2970 BtVal *bt_val(BtDb *bt, uint slot)
2972 return valptr(bt->cursor,slot);
2978 double getCpuTime(int type)
2981 FILETIME xittime[1];
2982 FILETIME systime[1];
2983 FILETIME usrtime[1];
2984 SYSTEMTIME timeconv[1];
2987 memset (timeconv, 0, sizeof(SYSTEMTIME));
2991 GetSystemTimeAsFileTime (xittime);
2992 FileTimeToSystemTime (xittime, timeconv);
2993 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2996 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2997 FileTimeToSystemTime (usrtime, timeconv);
3000 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3001 FileTimeToSystemTime (systime, timeconv);
3005 ans += (double)timeconv->wHour * 3600;
3006 ans += (double)timeconv->wMinute * 60;
3007 ans += (double)timeconv->wSecond;
3008 ans += (double)timeconv->wMilliseconds / 1000;
3013 #include <sys/resource.h>
3015 double getCpuTime(int type)
3017 struct rusage used[1];
3018 struct timeval tv[1];
3022 gettimeofday(tv, NULL);
3023 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3026 getrusage(RUSAGE_SELF, used);
3027 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3030 getrusage(RUSAGE_SELF, used);
3031 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3038 void bt_poolaudit (BtMgr *mgr)
3043 while( slot++ < mgr->latchdeployed ) {
3044 latch = mgr->latchsets + slot;
3046 if( *latch->readwr->rin & MASK )
3047 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3048 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3050 if( *latch->access->rin & MASK )
3051 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3052 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3054 if( *latch->parent->tid )
3055 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3056 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3058 if( latch->pin & ~CLOCK_bit ) {
3059 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3065 uint bt_latchaudit (BtDb *bt)
3067 ushort idx, hashidx;
3073 if( *(ushort *)(bt->mgr->lock) )
3074 fprintf(stderr, "Alloc page locked\n");
3075 *(ushort *)(bt->mgr->lock) = 0;
3077 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3078 latch = bt->mgr->latchsets + idx;
3079 if( *latch->readwr->rin & MASK )
3080 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3081 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3083 if( *latch->access->rin & MASK )
3084 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3085 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3087 if( *latch->parent->tid )
3088 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3089 memset ((ushort *)latch->parent, 0, sizeof(WOLock));
3092 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3097 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3098 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3099 fprintf(stderr, "hash entry %d locked\n", hashidx);
3101 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3103 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3104 latch = bt->mgr->latchsets + idx;
3106 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3107 } while( idx = latch->next );
3110 page_no = LEAF_page;
3112 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3113 uid off = page_no << bt->mgr->page_bits;
3115 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3119 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3121 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3122 return bt->err = BTERR_map;
3124 if( *amt < bt->mgr->page_size )
3125 return bt->err = BTERR_map;
3127 if( !bt->frame->free && !bt->frame->lvl )
3128 cnt += bt->frame->act;
3132 cnt--; // remove stopper key
3133 fprintf(stderr, " Total keys read %d\n", cnt);
3147 // standalone program to index file of keys
3148 // then list them onto std-out
3151 void *index_file (void *arg)
3153 uint __stdcall index_file (void *arg)
3156 int line = 0, found = 0, cnt = 0, idx;
3157 uid next, page_no = LEAF_page; // start on first page of leaves
3158 int ch, len = 0, slot, type = 0;
3159 unsigned char key[BT_maxkey];
3160 unsigned char txn[65536];
3161 ThreadArg *args = arg;
3170 bt = bt_open (args->mgr);
3173 if( args->idx < strlen (args->type) )
3174 ch = args->type[args->idx];
3176 ch = args->type[strlen(args->type) - 1];
3181 fprintf(stderr, "started latch mgr audit\n");
3182 cnt = bt_latchaudit (bt);
3183 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3194 if( type == Delete )
3195 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3197 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3199 if( type == Delete )
3200 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3202 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3204 if( in = fopen (args->infile, "rb") )
3205 while( ch = getc(in), ch != EOF )
3211 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3212 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3218 memcpy (txn + nxt, key + 10, len - 10);
3220 txn[nxt] = len - 10;
3222 memcpy (txn + nxt, key, 10);
3225 slotptr(page,++cnt)->off = nxt;
3226 slotptr(page,cnt)->type = type;
3229 if( cnt < args->num )
3236 if( bt_atomictxn (bt, page) )
3237 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3242 else if( len < BT_maxkey )
3244 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3248 fprintf(stderr, "started indexing for %s\n", args->infile);
3249 if( in = fopen (args->infile, "r") )
3250 while( ch = getc(in), ch != EOF )
3255 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3256 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3259 else if( len < BT_maxkey )
3261 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3265 fprintf(stderr, "started finding keys for %s\n", args->infile);
3266 if( in = fopen (args->infile, "rb") )
3267 while( ch = getc(in), ch != EOF )
3271 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3274 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3277 else if( len < BT_maxkey )
3279 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3283 fprintf(stderr, "started scanning\n");
3285 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3286 set->page = bt_mappage (bt, set->latch);
3288 fprintf(stderr, "unable to obtain latch"), exit(1);
3289 bt_lockpage (bt, BtLockRead, set->latch);
3290 next = bt_getid (set->page->right);
3292 for( slot = 0; slot++ < set->page->cnt; )
3293 if( next || slot < set->page->cnt )
3294 if( !slotptr(set->page, slot)->dead ) {
3295 ptr = keyptr(set->page, slot);
3298 if( slotptr(set->page, slot)->type == Duplicate )
3301 fwrite (ptr->key, len, 1, stdout);
3302 val = valptr(set->page, slot);
3303 fwrite (val->value, val->len, 1, stdout);
3304 fputc ('\n', stdout);
3308 bt_unlockpage (bt, BtLockRead, set->latch);
3309 bt_unpinlatch (set->latch);
3310 } while( page_no = next );
3312 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3316 fprintf(stderr, "started reverse scan\n");
3317 if( slot = bt_lastkey (bt) )
3318 while( slot = bt_prevkey (bt, slot) ) {
3319 if( slotptr(bt->cursor, slot)->dead )
3322 ptr = keyptr(bt->cursor, slot);
3325 if( slotptr(bt->cursor, slot)->type == Duplicate )
3328 fwrite (ptr->key, len, 1, stdout);
3329 val = valptr(bt->cursor, slot);
3330 fwrite (val->value, val->len, 1, stdout);
3331 fputc ('\n', stdout);
3335 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3340 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3342 fprintf(stderr, "started counting\n");
3343 page_no = LEAF_page;
3345 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3346 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3349 if( !bt->frame->free && !bt->frame->lvl )
3350 cnt += bt->frame->act;
3356 cnt--; // remove stopper key
3357 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3369 typedef struct timeval timer;
3371 int main (int argc, char **argv)
3373 int idx, cnt, len, slot, err;
3374 int segsize, bits = 16;
3391 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3392 fprintf (stderr, " where idx_file is the name of the btree file\n");
3393 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3394 fprintf (stderr, " page_bits is the page size in bits\n");
3395 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3396 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3397 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3401 start = getCpuTime(0);
3404 bits = atoi(argv[3]);
3407 poolsize = atoi(argv[4]);
3410 fprintf (stderr, "Warning: no mapped_pool\n");
3413 num = atoi(argv[5]);
3417 threads = malloc (cnt * sizeof(pthread_t));
3419 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3421 args = malloc (cnt * sizeof(ThreadArg));
3423 mgr = bt_mgr ((argv[1]), bits, poolsize);
3426 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3432 for( idx = 0; idx < cnt; idx++ ) {
3433 args[idx].infile = argv[idx + 6];
3434 args[idx].type = argv[2];
3435 args[idx].mgr = mgr;
3436 args[idx].num = num;
3437 args[idx].idx = idx;
3439 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3440 fprintf(stderr, "Error creating thread %d\n", err);
3442 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3446 // wait for termination
3449 for( idx = 0; idx < cnt; idx++ )
3450 pthread_join (threads[idx], NULL);
3452 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3454 for( idx = 0; idx < cnt; idx++ )
3455 CloseHandle(threads[idx]);
3461 elapsed = getCpuTime(0) - start;
3462 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3463 elapsed = getCpuTime(1);
3464 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3465 elapsed = getCpuTime(2);
3466 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);