1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and ACID batched key-value updates
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
88 There are six lock types for each node in four independent sets:
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
106 // definition for phase-fair reader/writer lock implementation
115 // write only queue lock
127 // definition for spin latch implementation
129 // exclusive is set for write access
130 // share is count of read accessors
131 // grant write lock when share == 0
133 volatile typedef struct {
144 // hash table entries
147 volatile uint slot; // Latch table entry at head of chain
148 BtSpinLatch latch[1];
151 // latch manager table structure
154 uid page_no; // latch set page number
155 RWLock readwr[1]; // read/write page lock
156 RWLock access[1]; // Access Intent/Page delete
157 WOLock parent[1]; // Posting of fence key in parent
158 WOLock atomic[1]; // Atomic update in progress
159 uint split; // right split page atomic insert
160 uint entry; // entry slot in latch table
161 uint next; // next entry in hash table chain
162 uint prev; // prev entry in hash table chain
163 volatile ushort pin; // number of outstanding threads
164 ushort dirty:1; // page in cache is dirty
167 // Define the length of the page record numbers
171 // Page key slot definition.
173 // Keys are marked dead, but remain on the page until
174 // it cleanup is called. The fence key (highest key) for
175 // a leaf page is always present, even after cleanup.
179 // In addition to the Unique keys that occupy slots
180 // there are Librarian and Duplicate key
181 // slots occupying the key slot array.
183 // The Librarian slots are dead keys that
184 // serve as filler, available to add new Unique
185 // or Dup slots that are inserted into the B-tree.
187 // The Duplicate slots have had their key bytes extended
188 // by 6 bytes to contain a binary duplicate key uniqueifier.
199 uint off:BT_maxbits; // page offset for key start
200 uint type:3; // type of slot
201 uint dead:1; // set for deleted slot
204 // The key structure occupies space at the upper end of
205 // each page. It's a length byte followed by the key
209 unsigned char len; // this can be changed to a ushort or uint
210 unsigned char key[0];
213 // the value structure also occupies space at the upper
214 // end of the page. Each key is immediately followed by a value.
217 unsigned char len; // this can be changed to a ushort or uint
218 unsigned char value[0];
221 #define BT_maxkey 255 // maximum number of bytes in a key
222 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
224 // The first part of an index page.
225 // It is immediately followed
226 // by the BtSlot array of keys.
228 // note that this structure size
229 // must be a multiple of 8 bytes
230 // in order to place dups correctly.
232 typedef struct BtPage_ {
233 uint cnt; // count of keys in page
234 uint act; // count of active keys
235 uint min; // next key offset
236 uint garbage; // page garbage in bytes
237 unsigned char bits:7; // page size in bits
238 unsigned char free:1; // page is on free chain
239 unsigned char lvl:7; // level of page
240 unsigned char kill:1; // page is being deleted
241 unsigned char right[BtId]; // page number to right
242 unsigned char left[BtId]; // page number to left
243 unsigned char filler[2]; // padding to multiple of 8
246 // The loadpage interface object
249 BtPage page; // current page pointer
250 BtLatchSet *latch; // current page latch set
253 // structure for latch manager on ALLOC_page
256 struct BtPage_ alloc[1]; // next page_no in right ptr
257 unsigned long long dups[1]; // global duplicate key uniqueifier
258 unsigned char chain[BtId]; // head of free page_nos chain
261 // The object structure for Btree access
264 uint page_size; // page size
265 uint page_bits; // page size in bits
271 BtPageZero *pagezero; // mapped allocation page
272 BtSpinLatch lock[1]; // allocation area lite latch
273 uint latchdeployed; // highest number of latch entries deployed
274 uint nlatchpage; // number of latch pages at BT_latch
275 uint latchtotal; // number of page latch entries
276 uint latchhash; // number of latch hash table slots
277 uint latchvictim; // next latch entry to examine
278 BtHashEntry *hashtable; // the buffer pool hash table entries
279 BtLatchSet *latchsets; // mapped latch set from buffer pool
280 unsigned char *pagepool; // mapped to the buffer pool pages
282 HANDLE halloc; // allocation handle
283 HANDLE hpool; // buffer pool handle
288 BtMgr *mgr; // buffer manager for thread
289 BtPage cursor; // cached frame for start/next (never mapped)
290 BtPage frame; // spare frame for the page split (never mapped)
291 uid cursor_page; // current cursor page number
292 unsigned char *mem; // frame, cursor, page memory buffer
293 unsigned char key[BT_keyarray]; // last found complete key
294 int found; // last delete or insert was found
295 int err; // last error
296 int reads, writes; // number of reads and writes from the btree
310 #define CLOCK_bit 0x8000
313 extern void bt_close (BtDb *bt);
314 extern BtDb *bt_open (BtMgr *mgr);
315 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
316 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
317 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
318 extern BtKey *bt_foundkey (BtDb *bt);
319 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
320 extern uint bt_nextkey (BtDb *bt, uint slot);
323 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
324 void bt_mgrclose (BtMgr *mgr);
326 // Helper functions to return slot values
327 // from the cursor page.
329 extern BtKey *bt_key (BtDb *bt, uint slot);
330 extern BtVal *bt_val (BtDb *bt, uint slot);
332 // The page is allocated from low and hi ends.
333 // The key slots are allocated from the bottom,
334 // while the text and value of the key
335 // are allocated from the top. When the two
336 // areas meet, the page is split into two.
338 // A key consists of a length byte, two bytes of
339 // index number (0 - 65535), and up to 253 bytes
342 // Associated with each key is a value byte string
343 // containing any value desired.
345 // The b-tree root is always located at page 1.
346 // The first leaf page of level zero is always
347 // located on page 2.
349 // The b-tree pages are linked with next
350 // pointers to facilitate enumerators,
351 // and provide for concurrency.
353 // When to root page fills, it is split in two and
354 // the tree height is raised by a new root at page
355 // one with two keys.
357 // Deleted keys are marked with a dead bit until
358 // page cleanup. The fence key for a leaf node is
361 // To achieve maximum concurrency one page is locked at a time
362 // as the tree is traversed to find leaf key in question. The right
363 // page numbers are used in cases where the page is being split,
366 // Page 0 is dedicated to lock for new page extensions,
367 // and chains empty pages together for reuse. It also
368 // contains the latch manager hash table.
370 // The ParentModification lock on a node is obtained to serialize posting
371 // or changing the fence key for a node.
373 // Empty pages are chained together through the ALLOC page and reused.
375 // Access macros to address slot and key values from the page
376 // Page slots use 1 based indexing.
378 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
379 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
380 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
382 void bt_putid(unsigned char *dest, uid id)
387 dest[i] = (unsigned char)id, id >>= 8;
390 uid bt_getid(unsigned char *src)
395 for( i = 0; i < BtId; i++ )
396 id <<= 8, id |= *src++;
401 uid bt_newdup (BtDb *bt)
404 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
406 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
410 // Write-Only Queue Lock
412 void WriteOLock (WOLock *lock)
416 tix = __sync_fetch_and_add (lock->ticket, 1);
418 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
420 // wait for our ticket to come up
422 while( tix != lock->serving[0] )
430 void WriteORelease (WOLock *lock)
435 // Phase-Fair reader/writer lock implementation
437 void WriteLock (RWLock *lock)
442 tix = __sync_fetch_and_add (lock->ticket, 1);
444 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
446 // wait for our ticket to come up
448 while( tix != lock->serving[0] )
455 w = PRES | (tix & PHID);
457 r = __sync_fetch_and_add (lock->rin, w);
459 r = _InterlockedExchangeAdd16 (lock->rin, w);
461 while( r != *lock->rout )
469 void WriteRelease (RWLock *lock)
472 __sync_fetch_and_and (lock->rin, ~MASK);
474 _InterlockedAnd16 (lock->rin, ~MASK);
479 void ReadLock (RWLock *lock)
483 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
485 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
488 while( w == (*lock->rin & MASK) )
496 void ReadRelease (RWLock *lock)
499 __sync_fetch_and_add (lock->rout, RINC);
501 _InterlockedExchangeAdd16 (lock->rout, RINC);
505 // Spin Latch Manager
507 // wait until write lock mode is clear
508 // and add 1 to the share count
510 void bt_spinreadlock(BtSpinLatch *latch)
516 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
518 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
520 // see if exclusive request is granted or pending
525 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
527 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
530 } while( sched_yield(), 1 );
532 } while( SwitchToThread(), 1 );
536 // wait for other read and write latches to relinquish
538 void bt_spinwritelock(BtSpinLatch *latch)
544 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
546 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
549 if( !(prev & ~BOTH) )
553 __sync_fetch_and_and ((ushort *)latch, ~XCL);
555 _InterlockedAnd16((ushort *)latch, ~XCL);
558 } while( sched_yield(), 1 );
560 } while( SwitchToThread(), 1 );
564 // try to obtain write lock
566 // return 1 if obtained,
569 int bt_spinwritetry(BtSpinLatch *latch)
574 prev = __sync_fetch_and_or((ushort *)latch, XCL);
576 prev = _InterlockedOr16((ushort *)latch, XCL);
578 // take write access if all bits are clear
581 if( !(prev & ~BOTH) )
585 __sync_fetch_and_and ((ushort *)latch, ~XCL);
587 _InterlockedAnd16((ushort *)latch, ~XCL);
594 void bt_spinreleasewrite(BtSpinLatch *latch)
597 __sync_fetch_and_and((ushort *)latch, ~BOTH);
599 _InterlockedAnd16((ushort *)latch, ~BOTH);
603 // decrement reader count
605 void bt_spinreleaseread(BtSpinLatch *latch)
608 __sync_fetch_and_add((ushort *)latch, -SHARE);
610 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
614 // read page from permanent location in Btree file
616 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
618 off64_t off = page_no << mgr->page_bits;
621 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
622 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
629 memset (ovl, 0, sizeof(OVERLAPPED));
631 ovl->OffsetHigh = off >> 32;
633 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
634 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
637 if( *amt < mgr->page_size ) {
638 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
645 // write page to permanent location in Btree file
646 // clear the dirty bit
648 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
650 off64_t off = page_no << mgr->page_bits;
653 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
659 memset (ovl, 0, sizeof(OVERLAPPED));
661 ovl->OffsetHigh = off >> 32;
663 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
666 if( *amt < mgr->page_size )
672 // link latch table entry into head of latch hash table
674 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
676 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
677 BtLatchSet *latch = bt->mgr->latchsets + slot;
679 if( latch->next = bt->mgr->hashtable[hashidx].slot )
680 bt->mgr->latchsets[latch->next].prev = slot;
682 bt->mgr->hashtable[hashidx].slot = slot;
683 latch->page_no = page_no;
690 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
698 // set CLOCK bit in latch
699 // decrement pin count
701 void bt_unpinlatch (BtLatchSet *latch)
704 if( ~latch->pin & CLOCK_bit )
705 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
706 __sync_fetch_and_add(&latch->pin, -1);
708 if( ~latch->pin & CLOCK_bit )
709 _InterlockedOr16 (&latch->pin, CLOCK_bit);
710 _InterlockedDecrement16 (&latch->pin);
714 // return the btree cached page address
716 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
718 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
723 // find existing latchset or inspire new one
724 // return with latchset pinned
726 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
728 uint hashidx = page_no % bt->mgr->latchhash;
736 // try to find our entry
738 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
740 if( slot = bt->mgr->hashtable[hashidx].slot ) do
742 latch = bt->mgr->latchsets + slot;
743 if( page_no == latch->page_no )
745 } while( slot = latch->next );
751 latch = bt->mgr->latchsets + slot;
753 __sync_fetch_and_add(&latch->pin, 1);
755 _InterlockedIncrement16 (&latch->pin);
757 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
761 // see if there are any unused pool entries
763 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
765 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
768 if( slot < bt->mgr->latchtotal ) {
769 latch = bt->mgr->latchsets + slot;
770 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
772 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
777 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
779 _InterlockedDecrement (&bt->mgr->latchdeployed);
781 // find and reuse previous entry on victim
785 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
787 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
789 // try to get write lock on hash chain
790 // skip entry if not obtained
791 // or has outstanding pins
793 slot %= bt->mgr->latchtotal;
798 latch = bt->mgr->latchsets + slot;
799 idx = latch->page_no % bt->mgr->latchhash;
801 // see we are on same chain as hashidx
806 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
809 // skip this slot if it is pinned
810 // or the CLOCK bit is set
813 if( latch->pin & CLOCK_bit ) {
815 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
817 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
820 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
824 // update permanent page area in btree from buffer pool
826 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
829 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
832 latch->dirty = 0, bt->writes++;
834 // unlink our available slot from its hash chain
837 bt->mgr->latchsets[latch->prev].next = latch->next;
839 bt->mgr->hashtable[idx].slot = latch->next;
842 bt->mgr->latchsets[latch->next].prev = latch->prev;
844 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
846 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
849 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
854 void bt_mgrclose (BtMgr *mgr)
861 // flush dirty pool pages to the btree
863 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
864 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
865 latch = mgr->latchsets + slot;
868 bt_writepage(mgr, page, latch->page_no);
869 latch->dirty = 0, num++;
871 // madvise (page, mgr->page_size, MADV_DONTNEED);
874 fprintf(stderr, "%d buffer pool pages flushed\n", num);
877 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
878 munmap (mgr->pagezero, mgr->page_size);
880 FlushViewOfFile(mgr->pagezero, 0);
881 UnmapViewOfFile(mgr->pagezero);
882 UnmapViewOfFile(mgr->hashtable);
883 CloseHandle(mgr->halloc);
884 CloseHandle(mgr->hpool);
890 FlushFileBuffers(mgr->idx);
891 CloseHandle(mgr->idx);
896 // close and release memory
898 void bt_close (BtDb *bt)
905 VirtualFree (bt->mem, 0, MEM_RELEASE);
910 // open/create new btree buffer manager
912 // call with file_name, BT_openmode, bits in page size (e.g. 16),
913 // size of page pool (e.g. 262144)
915 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
917 uint lvl, attr, last, slot, idx;
918 uint nlatchpage, latchhash;
919 unsigned char value[BtId];
920 int flag, initit = 0;
921 BtPageZero *pagezero;
928 // determine sanity of page size and buffer pool
930 if( bits > BT_maxbits )
932 else if( bits < BT_minbits )
936 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
941 mgr = calloc (1, sizeof(BtMgr));
943 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
945 if( mgr->idx == -1 ) {
946 fprintf (stderr, "Unable to open btree file\n");
947 return free(mgr), NULL;
950 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
951 attr = FILE_ATTRIBUTE_NORMAL;
952 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
954 if( mgr->idx == INVALID_HANDLE_VALUE )
955 return GlobalFree(mgr), NULL;
959 pagezero = valloc (BT_maxpage);
962 // read minimum page size to get root info
963 // to support raw disk partition files
964 // check if bits == 0 on the disk.
966 if( size = lseek (mgr->idx, 0L, 2) )
967 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
968 if( pagezero->alloc->bits )
969 bits = pagezero->alloc->bits;
973 return free(mgr), free(pagezero), NULL;
977 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
978 size = GetFileSize(mgr->idx, amt);
981 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
982 return bt_mgrclose (mgr), NULL;
983 bits = pagezero->alloc->bits;
988 mgr->page_size = 1 << bits;
989 mgr->page_bits = bits;
991 // calculate number of latch hash table entries
993 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
994 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
996 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
997 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
998 mgr->latchtotal = nodemax;
1003 // initialize an empty b-tree with latch page, root page, page of leaves
1004 // and page(s) of latches and page pool cache
1006 memset (pagezero, 0, 1 << bits);
1007 pagezero->alloc->bits = mgr->page_bits;
1008 bt_putid(pagezero->alloc->right, MIN_lvl+1);
1010 // initialize left-most LEAF page in
1013 bt_putid (pagezero->alloc->left, LEAF_page);
1015 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1016 fprintf (stderr, "Unable to create btree page zero\n");
1017 return bt_mgrclose (mgr), NULL;
1020 memset (pagezero, 0, 1 << bits);
1021 pagezero->alloc->bits = mgr->page_bits;
1023 for( lvl=MIN_lvl; lvl--; ) {
1024 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1025 key = keyptr(pagezero->alloc, 1);
1026 key->len = 2; // create stopper key
1030 bt_putid(value, MIN_lvl - lvl + 1);
1031 val = valptr(pagezero->alloc, 1);
1032 val->len = lvl ? BtId : 0;
1033 memcpy (val->value, value, val->len);
1035 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1036 pagezero->alloc->lvl = lvl;
1037 pagezero->alloc->cnt = 1;
1038 pagezero->alloc->act = 1;
1040 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1041 fprintf (stderr, "Unable to create btree page zero\n");
1042 return bt_mgrclose (mgr), NULL;
1050 VirtualFree (pagezero, 0, MEM_RELEASE);
1053 // mlock the pagezero page
1055 flag = PROT_READ | PROT_WRITE;
1056 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1057 if( mgr->pagezero == MAP_FAILED ) {
1058 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1059 return bt_mgrclose (mgr), NULL;
1061 mlock (mgr->pagezero, mgr->page_size);
1063 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1064 if( mgr->hashtable == MAP_FAILED ) {
1065 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1066 return bt_mgrclose (mgr), NULL;
1069 flag = PAGE_READWRITE;
1070 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1071 if( !mgr->halloc ) {
1072 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1073 return bt_mgrclose (mgr), NULL;
1076 flag = FILE_MAP_WRITE;
1077 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1078 if( !mgr->pagezero ) {
1079 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1080 return bt_mgrclose (mgr), NULL;
1083 flag = PAGE_READWRITE;
1084 size = (uid)mgr->nlatchpage << mgr->page_bits;
1085 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1087 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1088 return bt_mgrclose (mgr), NULL;
1091 flag = FILE_MAP_WRITE;
1092 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1093 if( !mgr->hashtable ) {
1094 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1095 return bt_mgrclose (mgr), NULL;
1099 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1100 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1105 // open BTree access method
1106 // based on buffer manager
1108 BtDb *bt_open (BtMgr *mgr)
1110 BtDb *bt = malloc (sizeof(*bt));
1112 memset (bt, 0, sizeof(*bt));
1115 bt->mem = valloc (2 *mgr->page_size);
1117 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1119 bt->frame = (BtPage)bt->mem;
1120 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1124 // compare two keys, return > 0, = 0, or < 0
1125 // =0: keys are same
1128 // as the comparison value
1130 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1132 uint len1 = key1->len;
1135 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1146 // place write, read, or parent lock on requested page_no.
1148 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1152 ReadLock (latch->readwr);
1155 WriteLock (latch->readwr);
1158 ReadLock (latch->access);
1161 WriteLock (latch->access);
1164 WriteOLock (latch->parent);
1167 WriteOLock (latch->atomic);
1172 // remove write, read, or parent lock on requested page
1174 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1178 ReadRelease (latch->readwr);
1181 WriteRelease (latch->readwr);
1184 ReadRelease (latch->access);
1187 WriteRelease (latch->access);
1190 WriteORelease (latch->parent);
1193 WriteORelease (latch->atomic);
1198 // allocate a new page
1199 // return with page latched, but unlocked.
1201 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1206 // lock allocation page
1208 bt_spinwritelock(bt->mgr->lock);
1210 // use empty chain first
1211 // else allocate empty page
1213 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1214 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1215 set->page = bt_mappage (bt, set->latch);
1217 return bt->err = BTERR_struct, -1;
1219 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1220 bt_spinreleasewrite(bt->mgr->lock);
1222 memcpy (set->page, contents, bt->mgr->page_size);
1223 set->latch->dirty = 1;
1227 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1228 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1230 // unlock allocation latch
1232 bt_spinreleasewrite(bt->mgr->lock);
1234 // don't load cache from btree page
1236 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1237 set->page = bt_mappage (bt, set->latch);
1239 return bt->err = BTERR_struct;
1241 memcpy (set->page, contents, bt->mgr->page_size);
1242 set->latch->dirty = 1;
1246 // find slot in page for given key at a given level
1248 int bt_findslot (BtPage page, unsigned char *key, uint len)
1250 uint diff, higher = page->cnt, low = 1, slot;
1253 // make stopper key an infinite fence value
1255 if( bt_getid (page->right) )
1260 // low is the lowest candidate.
1261 // loop ends when they meet
1263 // higher is already
1264 // tested as .ge. the passed key.
1266 while( diff = higher - low ) {
1267 slot = low + ( diff >> 1 );
1268 if( keycmp (keyptr(page, slot), key, len) < 0 )
1271 higher = slot, good++;
1274 // return zero if key is on right link page
1276 return good ? higher : 0;
1279 // find and load page at given level for given key
1280 // leave page rd or wr locked as requested
1282 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1284 uid page_no = ROOT_page, prevpage = 0;
1285 uint drill = 0xff, slot;
1286 BtLatchSet *prevlatch;
1287 uint mode, prevmode;
1289 // start at root of btree and drill down
1292 // determine lock mode of drill level
1293 mode = (drill == lvl) ? lock : BtLockRead;
1295 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1298 // obtain access lock using lock chaining with Access mode
1300 if( page_no > ROOT_page )
1301 bt_lockpage(BtLockAccess, set->latch);
1303 set->page = bt_mappage (bt, set->latch);
1305 // release & unpin parent or left sibling page
1308 bt_unlockpage(prevmode, prevlatch);
1309 bt_unpinlatch (prevlatch);
1313 // obtain mode lock using lock chaining through AccessLock
1315 bt_lockpage(mode, set->latch);
1317 if( set->page->free )
1318 return bt->err = BTERR_struct, 0;
1320 if( page_no > ROOT_page )
1321 bt_unlockpage(BtLockAccess, set->latch);
1323 // re-read and re-lock root after determining actual level of root
1325 if( set->page->lvl != drill) {
1326 if( set->latch->page_no != ROOT_page )
1327 return bt->err = BTERR_struct, 0;
1329 drill = set->page->lvl;
1331 if( lock != BtLockRead && drill == lvl ) {
1332 bt_unlockpage(mode, set->latch);
1333 bt_unpinlatch (set->latch);
1338 prevpage = set->latch->page_no;
1339 prevlatch = set->latch;
1342 // find key on page at this level
1343 // and descend to requested level
1345 if( set->page->kill )
1348 if( slot = bt_findslot (set->page, key, len) ) {
1352 // find next non-dead slot -- the fence key if nothing else
1354 while( slotptr(set->page, slot)->dead )
1355 if( slot++ < set->page->cnt )
1358 return bt->err = BTERR_struct, 0;
1360 page_no = bt_getid(valptr(set->page, slot)->value);
1365 // or slide right into next page
1368 page_no = bt_getid(set->page->right);
1372 // return error on end of right chain
1374 bt->err = BTERR_struct;
1375 return 0; // return error
1378 // return page to free list
1379 // page must be delete & write locked
1381 void bt_freepage (BtDb *bt, BtPageSet *set)
1383 // lock allocation page
1385 bt_spinwritelock (bt->mgr->lock);
1389 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1390 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1391 set->latch->dirty = 1;
1392 set->page->free = 1;
1394 // unlock released page
1396 bt_unlockpage (BtLockDelete, set->latch);
1397 bt_unlockpage (BtLockWrite, set->latch);
1399 // unlock allocation page
1401 bt_spinreleasewrite (bt->mgr->lock);
1404 // a fence key was deleted from a page
1405 // push new fence value upwards
1407 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1409 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1410 unsigned char value[BtId];
1414 // remove the old fence value
1416 ptr = keyptr(set->page, set->page->cnt);
1417 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1418 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1419 set->latch->dirty = 1;
1421 // cache new fence value
1423 ptr = keyptr(set->page, set->page->cnt);
1424 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1426 bt_lockpage (BtLockParent, set->latch);
1427 bt_unlockpage (BtLockWrite, set->latch);
1429 // insert new (now smaller) fence key
1431 bt_putid (value, set->latch->page_no);
1432 ptr = (BtKey*)leftkey;
1434 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1437 // now delete old fence key
1439 ptr = (BtKey*)rightkey;
1441 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1444 bt_unlockpage (BtLockParent, set->latch);
1445 bt_unpinlatch(set->latch);
1449 // root has a single child
1450 // collapse a level from the tree
1452 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1458 // find the child entry and promote as new root contents
1461 for( idx = 0; idx++ < root->page->cnt; )
1462 if( !slotptr(root->page, idx)->dead )
1465 page_no = bt_getid (valptr(root->page, idx)->value);
1467 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1468 child->page = bt_mappage (bt, child->latch);
1472 bt_lockpage (BtLockDelete, child->latch);
1473 bt_lockpage (BtLockWrite, child->latch);
1475 memcpy (root->page, child->page, bt->mgr->page_size);
1476 root->latch->dirty = 1;
1478 bt_freepage (bt, child);
1479 bt_unpinlatch (child->latch);
1481 } while( root->page->lvl > 1 && root->page->act == 1 );
1483 bt_unlockpage (BtLockWrite, root->latch);
1484 bt_unpinlatch (root->latch);
1488 // delete a page and manage keys
1489 // call with page writelocked
1490 // returns with page unpinned
1492 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1494 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1495 unsigned char value[BtId];
1496 uint lvl = set->page->lvl;
1501 // cache copy of fence key
1502 // to post in parent
1504 ptr = keyptr(set->page, set->page->cnt);
1505 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1507 // obtain lock on right page
1509 page_no = bt_getid(set->page->right);
1511 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1512 right->page = bt_mappage (bt, right->latch);
1516 bt_lockpage (BtLockWrite, right->latch);
1518 // cache copy of key to update
1520 ptr = keyptr(right->page, right->page->cnt);
1521 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1523 if( right->page->kill )
1524 return bt->err = BTERR_struct;
1526 // pull contents of right peer into our empty page
1528 memcpy (set->page, right->page, bt->mgr->page_size);
1529 set->latch->dirty = 1;
1531 // mark right page deleted and point it to left page
1532 // until we can post parent updates that remove access
1533 // to the deleted page.
1535 bt_putid (right->page->right, set->latch->page_no);
1536 right->latch->dirty = 1;
1537 right->page->kill = 1;
1539 bt_lockpage (BtLockParent, right->latch);
1540 bt_unlockpage (BtLockWrite, right->latch);
1542 bt_lockpage (BtLockParent, set->latch);
1543 bt_unlockpage (BtLockWrite, set->latch);
1545 // redirect higher key directly to our new node contents
1547 bt_putid (value, set->latch->page_no);
1548 ptr = (BtKey*)higherfence;
1550 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1553 // delete old lower key to our node
1555 ptr = (BtKey*)lowerfence;
1557 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1560 // obtain delete and write locks to right node
1562 bt_unlockpage (BtLockParent, right->latch);
1563 bt_lockpage (BtLockDelete, right->latch);
1564 bt_lockpage (BtLockWrite, right->latch);
1565 bt_freepage (bt, right);
1566 bt_unpinlatch (right->latch);
1568 bt_unlockpage (BtLockParent, set->latch);
1569 bt_unpinlatch (set->latch);
1574 // find and delete key on page by marking delete flag bit
1575 // if page becomes empty, delete it from the btree
1577 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1579 uint slot, idx, found, fence;
1584 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1585 ptr = keyptr(set->page, slot);
1589 // if librarian slot, advance to real slot
1591 if( slotptr(set->page, slot)->type == Librarian )
1592 ptr = keyptr(set->page, ++slot);
1594 fence = slot == set->page->cnt;
1596 // if key is found delete it, otherwise ignore request
1598 if( found = !keycmp (ptr, key, len) )
1599 if( found = slotptr(set->page, slot)->dead == 0 ) {
1600 val = valptr(set->page,slot);
1601 slotptr(set->page, slot)->dead = 1;
1602 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1605 // collapse empty slots beneath the fence
1607 while( idx = set->page->cnt - 1 )
1608 if( slotptr(set->page, idx)->dead ) {
1609 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1610 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1615 // did we delete a fence key in an upper level?
1617 if( found && lvl && set->page->act && fence )
1618 if( bt_fixfence (bt, set, lvl) )
1621 return bt->found = found, 0;
1623 // do we need to collapse root?
1625 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1626 if( bt_collapseroot (bt, set) )
1629 return bt->found = found, 0;
1631 // delete empty page
1633 if( !set->page->act )
1634 return bt_deletepage (bt, set);
1636 set->latch->dirty = 1;
1637 bt_unlockpage(BtLockWrite, set->latch);
1638 bt_unpinlatch (set->latch);
1639 return bt->found = found, 0;
1642 BtKey *bt_foundkey (BtDb *bt)
1644 return (BtKey*)(bt->key);
1647 // advance to next slot
1649 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1651 BtLatchSet *prevlatch;
1654 if( slot < set->page->cnt )
1657 prevlatch = set->latch;
1659 if( page_no = bt_getid(set->page->right) )
1660 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1661 set->page = bt_mappage (bt, set->latch);
1665 return bt->err = BTERR_struct, 0;
1667 // obtain access lock using lock chaining with Access mode
1669 bt_lockpage(BtLockAccess, set->latch);
1671 bt_unlockpage(BtLockRead, prevlatch);
1672 bt_unpinlatch (prevlatch);
1674 bt_lockpage(BtLockRead, set->latch);
1675 bt_unlockpage(BtLockAccess, set->latch);
1679 // find unique key or first duplicate key in
1680 // leaf level and return number of value bytes
1681 // or (-1) if not found. Setup key for bt_foundkey
1683 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1691 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1693 ptr = keyptr(set->page, slot);
1695 // skip librarian slot place holder
1697 if( slotptr(set->page, slot)->type == Librarian )
1698 ptr = keyptr(set->page, ++slot);
1700 // return actual key found
1702 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1705 if( slotptr(set->page, slot)->type == Duplicate )
1708 // not there if we reach the stopper key
1710 if( slot == set->page->cnt )
1711 if( !bt_getid (set->page->right) )
1714 // if key exists, return >= 0 value bytes copied
1715 // otherwise return (-1)
1717 if( slotptr(set->page, slot)->dead )
1721 if( !memcmp (ptr->key, key, len) ) {
1722 val = valptr (set->page,slot);
1723 if( valmax > val->len )
1725 memcpy (value, val->value, valmax);
1731 } while( slot = bt_findnext (bt, set, slot) );
1733 bt_unlockpage (BtLockRead, set->latch);
1734 bt_unpinlatch (set->latch);
1738 // check page for space available,
1739 // clean if necessary and return
1740 // 0 - page needs splitting
1741 // >0 new slot value
1743 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1745 uint nxt = bt->mgr->page_size;
1746 BtPage page = set->page;
1747 uint cnt = 0, idx = 0;
1748 uint max = page->cnt;
1753 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1756 // skip cleanup and proceed to split
1757 // if there's not enough garbage
1760 if( page->garbage < nxt / 5 )
1763 memcpy (bt->frame, page, bt->mgr->page_size);
1765 // skip page info and set rest of page to zero
1767 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1768 set->latch->dirty = 1;
1772 // clean up page first by
1773 // removing deleted keys
1775 while( cnt++ < max ) {
1778 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1781 // copy the value across
1783 val = valptr(bt->frame, cnt);
1784 nxt -= val->len + sizeof(BtVal);
1785 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1787 // copy the key across
1789 key = keyptr(bt->frame, cnt);
1790 nxt -= key->len + sizeof(BtKey);
1791 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1793 // make a librarian slot
1796 slotptr(page, ++idx)->off = nxt;
1797 slotptr(page, idx)->type = Librarian;
1798 slotptr(page, idx)->dead = 1;
1803 slotptr(page, ++idx)->off = nxt;
1804 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1806 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1813 // see if page has enough space now, or does it need splitting?
1815 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1821 // split the root and raise the height of the btree
1823 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1825 unsigned char leftkey[BT_keyarray];
1826 uint nxt = bt->mgr->page_size;
1827 unsigned char value[BtId];
1833 // save left page fence key for new root
1835 ptr = keyptr(root->page, root->page->cnt);
1836 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1838 // Obtain an empty page to use, and copy the current
1839 // root contents into it, e.g. lower keys
1841 if( bt_newpage(bt, left, root->page) )
1844 left_page_no = left->latch->page_no;
1845 bt_unpinlatch (left->latch);
1847 // preserve the page info at the bottom
1848 // of higher keys and set rest to zero
1850 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1852 // insert stopper key at top of newroot page
1853 // and increase the root height
1855 nxt -= BtId + sizeof(BtVal);
1856 bt_putid (value, right->page_no);
1857 val = (BtVal *)((unsigned char *)root->page + nxt);
1858 memcpy (val->value, value, BtId);
1861 nxt -= 2 + sizeof(BtKey);
1862 slotptr(root->page, 2)->off = nxt;
1863 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1868 // insert lower keys page fence key on newroot page as first key
1870 nxt -= BtId + sizeof(BtVal);
1871 bt_putid (value, left_page_no);
1872 val = (BtVal *)((unsigned char *)root->page + nxt);
1873 memcpy (val->value, value, BtId);
1876 ptr = (BtKey *)leftkey;
1877 nxt -= ptr->len + sizeof(BtKey);
1878 slotptr(root->page, 1)->off = nxt;
1879 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1881 bt_putid(root->page->right, 0);
1882 root->page->min = nxt; // reset lowest used offset and key count
1883 root->page->cnt = 2;
1884 root->page->act = 2;
1887 // release and unpin root pages
1889 bt_unlockpage(BtLockWrite, root->latch);
1890 bt_unpinlatch (root->latch);
1892 bt_unpinlatch (right);
1896 // split already locked full node
1898 // return pool entry for new right
1901 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1903 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1904 uint lvl = set->page->lvl;
1911 // split higher half of keys to bt->frame
1913 memset (bt->frame, 0, bt->mgr->page_size);
1914 max = set->page->cnt;
1918 while( cnt++ < max ) {
1919 if( slotptr(set->page, cnt)->dead && cnt < max )
1921 src = valptr(set->page, cnt);
1922 nxt -= src->len + sizeof(BtVal);
1923 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1925 key = keyptr(set->page, cnt);
1926 nxt -= key->len + sizeof(BtKey);
1927 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1928 memcpy (ptr, key, key->len + sizeof(BtKey));
1930 // add librarian slot
1933 slotptr(bt->frame, ++idx)->off = nxt;
1934 slotptr(bt->frame, idx)->type = Librarian;
1935 slotptr(bt->frame, idx)->dead = 1;
1940 slotptr(bt->frame, ++idx)->off = nxt;
1941 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1943 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1947 bt->frame->bits = bt->mgr->page_bits;
1948 bt->frame->min = nxt;
1949 bt->frame->cnt = idx;
1950 bt->frame->lvl = lvl;
1954 if( set->latch->page_no > ROOT_page )
1955 bt_putid (bt->frame->right, bt_getid (set->page->right));
1957 // get new free page and write higher keys to it.
1959 if( bt_newpage(bt, right, bt->frame) )
1962 memcpy (bt->frame, set->page, bt->mgr->page_size);
1963 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1964 set->latch->dirty = 1;
1966 nxt = bt->mgr->page_size;
1967 set->page->garbage = 0;
1973 if( slotptr(bt->frame, max)->type == Librarian )
1976 // assemble page of smaller keys
1978 while( cnt++ < max ) {
1979 if( slotptr(bt->frame, cnt)->dead )
1981 val = valptr(bt->frame, cnt);
1982 nxt -= val->len + sizeof(BtVal);
1983 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1985 key = keyptr(bt->frame, cnt);
1986 nxt -= key->len + sizeof(BtKey);
1987 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1989 // add librarian slot
1992 slotptr(set->page, ++idx)->off = nxt;
1993 slotptr(set->page, idx)->type = Librarian;
1994 slotptr(set->page, idx)->dead = 1;
1999 slotptr(set->page, ++idx)->off = nxt;
2000 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2004 bt_putid(set->page->right, right->latch->page_no);
2005 set->page->min = nxt;
2006 set->page->cnt = idx;
2008 return right->latch->entry;
2011 // fix keys for newly split page
2012 // call with page locked, return
2015 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2017 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2018 unsigned char value[BtId];
2019 uint lvl = set->page->lvl;
2023 // if current page is the root page, split it
2025 if( set->latch->page_no == ROOT_page )
2026 return bt_splitroot (bt, set, right);
2028 ptr = keyptr(set->page, set->page->cnt);
2029 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2031 page = bt_mappage (bt, right);
2033 ptr = keyptr(page, page->cnt);
2034 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2036 // insert new fences in their parent pages
2038 bt_lockpage (BtLockParent, right);
2040 bt_lockpage (BtLockParent, set->latch);
2041 bt_unlockpage (BtLockWrite, set->latch);
2043 // insert new fence for reformulated left block of smaller keys
2045 bt_putid (value, set->latch->page_no);
2046 ptr = (BtKey *)leftkey;
2048 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2051 // switch fence for right block of larger keys to new right page
2053 bt_putid (value, right->page_no);
2054 ptr = (BtKey *)rightkey;
2056 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2059 bt_unlockpage (BtLockParent, set->latch);
2060 bt_unpinlatch (set->latch);
2062 bt_unlockpage (BtLockParent, right);
2063 bt_unpinlatch (right);
2067 // install new key and value onto page
2068 // page must already be checked for
2071 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2073 uint idx, librarian;
2078 // if found slot > desired slot and previous slot
2079 // is a librarian slot, use it
2082 if( slotptr(set->page, slot-1)->type == Librarian )
2085 // copy value onto page
2087 set->page->min -= vallen + sizeof(BtVal);
2088 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2089 memcpy (val->value, value, vallen);
2092 // copy key onto page
2094 set->page->min -= keylen + sizeof(BtKey);
2095 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2096 memcpy (ptr->key, key, keylen);
2099 // find first empty slot
2101 for( idx = slot; idx < set->page->cnt; idx++ )
2102 if( slotptr(set->page, idx)->dead )
2105 // now insert key into array before slot
2107 if( idx == set->page->cnt )
2108 idx += 2, set->page->cnt += 2, librarian = 2;
2112 set->latch->dirty = 1;
2115 while( idx > slot + librarian - 1 )
2116 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2118 // add librarian slot
2120 if( librarian > 1 ) {
2121 node = slotptr(set->page, slot++);
2122 node->off = set->page->min;
2123 node->type = Librarian;
2129 node = slotptr(set->page, slot);
2130 node->off = set->page->min;
2135 bt_unlockpage (BtLockWrite, set->latch);
2136 bt_unpinlatch (set->latch);
2142 // Insert new key into the btree at given level.
2143 // either add a new key or update/add an existing one
2145 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2147 unsigned char newkey[BT_keyarray];
2148 uint slot, idx, len, entry;
2155 // set up the key we're working on
2157 ins = (BtKey*)newkey;
2158 memcpy (ins->key, key, keylen);
2161 // is this a non-unique index value?
2167 sequence = bt_newdup (bt);
2168 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2172 while( 1 ) { // find the page and slot for the current key
2173 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2174 ptr = keyptr(set->page, slot);
2177 bt->err = BTERR_ovflw;
2181 // if librarian slot == found slot, advance to real slot
2183 if( slotptr(set->page, slot)->type == Librarian )
2184 if( !keycmp (ptr, key, keylen) )
2185 ptr = keyptr(set->page, ++slot);
2189 if( slotptr(set->page, slot)->type == Duplicate )
2192 // if inserting a duplicate key or unique key
2193 // check for adequate space on the page
2194 // and insert the new key before slot.
2196 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2197 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2198 if( !(entry = bt_splitpage (bt, set)) )
2200 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2205 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2208 // if key already exists, update value and return
2210 val = valptr(set->page, slot);
2212 if( val->len >= vallen ) {
2213 if( slotptr(set->page, slot)->dead )
2215 set->page->garbage += val->len - vallen;
2216 set->latch->dirty = 1;
2217 slotptr(set->page, slot)->dead = 0;
2219 memcpy (val->value, value, vallen);
2220 bt_unlockpage(BtLockWrite, set->latch);
2221 bt_unpinlatch (set->latch);
2225 // new update value doesn't fit in existing value area
2227 if( !slotptr(set->page, slot)->dead )
2228 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2230 slotptr(set->page, slot)->dead = 0;
2234 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2235 if( !(entry = bt_splitpage (bt, set)) )
2237 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2242 set->page->min -= vallen + sizeof(BtVal);
2243 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2244 memcpy (val->value, value, vallen);
2247 set->latch->dirty = 1;
2248 set->page->min -= keylen + sizeof(BtKey);
2249 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2250 memcpy (ptr->key, key, keylen);
2253 slotptr(set->page, slot)->off = set->page->min;
2254 bt_unlockpage(BtLockWrite, set->latch);
2255 bt_unpinlatch (set->latch);
2262 uint entry; // latch table entry number
2263 uint slot:31; // page slot number
2264 uint reuse:1; // reused previous page
2268 uid page_no; // page number for split leaf
2269 void *next; // next key to insert
2270 uint entry:29; // latch table entry number
2271 uint type:2; // 0 == insert, 1 == delete, 2 == free
2272 uint nounlock:1; // don't unlock ParentModification
2273 unsigned char leafkey[BT_keyarray];
2276 // find and load leaf page for given key
2277 // leave page Atomic locked and Read locked.
2279 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2281 BtLatchSet *prevlatch;
2285 // find level zero page
2287 if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2290 // find next non-dead entry on this page
2291 // it will be the fence key if nothing else
2293 while( slotptr(set->page, slot)->dead )
2294 if( slot++ < set->page->cnt )
2297 return bt->err = BTERR_struct, 0;
2299 page_no = bt_getid(valptr(set->page, slot)->value);
2300 prevlatch = set->latch;
2303 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2304 set->page = bt_mappage (bt, set->latch);
2308 if( set->page->free || set->page->lvl )
2309 return bt->err = BTERR_struct, 0;
2311 // obtain read lock using lock chaining with Access mode
2312 // release & unpin parent/left sibling page
2314 bt_lockpage(BtLockAccess, set->latch);
2316 bt_unlockpage(BtLockRead, prevlatch);
2317 bt_unpinlatch (prevlatch);
2319 bt_lockpage(BtLockRead, set->latch);
2320 bt_unlockpage(BtLockAccess, set->latch);
2322 // find key on page at this level
2323 // and descend to requested level
2325 if( !set->page->kill )
2326 if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2327 bt_unlockpage(BtLockRead, set->latch);
2328 bt_lockpage(BtLockAtomic, set->latch);
2329 bt_lockpage(BtLockAccess, set->latch);
2330 bt_lockpage(BtLockRead, set->latch);
2331 bt_unlockpage(BtLockAccess, set->latch);
2333 if( slot = bt_findslot (set->page, key, len) )
2336 bt_unlockpage(BtLockAtomic, set->latch);
2339 // slide right into next page
2341 page_no = bt_getid(set->page->right);
2342 prevlatch = set->latch;
2345 // return error on end of right chain
2347 bt->err = BTERR_struct;
2348 return 0; // return error
2351 // determine actual page where key is located
2352 // return slot number
2354 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2356 BtKey *key = keyptr(source,src);
2357 uint slot = locks[src].slot;
2360 if( src > 1 && locks[src].reuse )
2361 entry = locks[src-1].entry, slot = 0;
2363 entry = locks[src].entry;
2366 set->latch = bt->mgr->latchsets + entry;
2367 set->page = bt_mappage (bt, set->latch);
2371 // is locks->reuse set?
2372 // if so, find where our key
2373 // is located on previous page or split pages
2376 set->latch = bt->mgr->latchsets + entry;
2377 set->page = bt_mappage (bt, set->latch);
2379 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2380 if( locks[src].reuse )
2381 locks[src].entry = entry;
2384 } while( entry = set->latch->split );
2386 bt->err = BTERR_atomic;
2390 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2392 BtKey *key = keyptr(source, src);
2393 BtVal *val = valptr(source, src);
2398 while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2399 if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2400 return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2402 if( entry = bt_splitpage (bt, set) )
2403 latch = bt->mgr->latchsets + entry;
2407 // splice right page into split chain
2408 // and WriteLock it.
2410 latch->split = set->latch->split;
2411 set->latch->split = entry;
2412 bt_lockpage(BtLockWrite, latch);
2415 return bt->err = BTERR_atomic;
2418 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2420 BtKey *key = keyptr(source, src);
2421 uint idx, entry, slot;
2426 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2427 slotptr(set->page, slot)->dead = 1;
2429 return bt->err = BTERR_struct;
2431 ptr = keyptr(set->page, slot);
2432 val = valptr(set->page, slot);
2434 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2435 set->latch->dirty = 1;
2440 uint bt_parentmatch (AtomicKey *head, uint entry)
2445 do if( head->entry == entry )
2446 head->nounlock = 1, cnt++;
2447 while( head = head->next );
2452 // atomic modification of a batch of keys.
2454 // return -1 if BTERR is set
2455 // otherwise return slot number
2456 // causing the key constraint violation
2457 // or zero on successful completion.
2459 int bt_atomictxn (BtDb *bt, BtPage source)
2461 uint src, idx, slot, samepage, entry;
2462 AtomicKey *head, *tail, *leaf;
2463 BtPageSet set[1], prev[1];
2464 unsigned char value[BtId];
2465 BtKey *key, *ptr, *key2;
2475 locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2479 // stable sort the list of keys into order to
2480 // prevent deadlocks between threads.
2482 for( src = 1; src++ < source->cnt; ) {
2483 *temp = *slotptr(source,src);
2484 key = keyptr (source,src);
2486 for( idx = src; --idx; ) {
2487 key2 = keyptr (source,idx);
2488 if( keycmp (key, key2->key, key2->len) < 0 ) {
2489 *slotptr(source,idx+1) = *slotptr(source,idx);
2490 *slotptr(source,idx) = *temp;
2496 // Load the leaf page for each key
2497 // group same page references with reuse bit
2498 // and determine any constraint violations
2500 for( src = 0; src++ < source->cnt; ) {
2501 key = keyptr(source, src);
2504 // first determine if this modification falls
2505 // on the same page as the previous modification
2506 // note that the far right leaf page is a special case
2508 if( samepage = src > 1 )
2509 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2510 slot = bt_findslot(set->page, key->key, key->len);
2511 else // release read on previous page
2512 bt_unlockpage(BtLockRead, set->latch);
2515 if( slot = bt_atomicload(bt, set, key->key, key->len) )
2516 set->latch->split = 0;
2520 if( slotptr(set->page, slot)->type == Librarian )
2521 ptr = keyptr(set->page, ++slot);
2523 ptr = keyptr(set->page, slot);
2526 locks[src].entry = set->latch->entry;
2527 locks[src].slot = slot;
2528 locks[src].reuse = 0;
2530 locks[src].entry = 0;
2531 locks[src].slot = 0;
2532 locks[src].reuse = 1;
2535 switch( slotptr(source, src)->type ) {
2538 if( !slotptr(set->page, slot)->dead )
2539 if( slot < set->page->cnt || bt_getid (set->page->right) )
2540 if( !keycmp (ptr, key->key, key->len) ) {
2542 // return constraint violation if key already exists
2544 bt_unlockpage(BtLockRead, set->latch);
2548 if( locks[src].entry ) {
2549 set->latch = bt->mgr->latchsets + locks[src].entry;
2550 bt_unlockpage(BtLockAtomic, set->latch);
2551 bt_unpinlatch (set->latch);
2562 // unlock last loadpage lock
2564 if( source->cnt > 1 )
2565 bt_unlockpage(BtLockRead, set->latch);
2567 // obtain write lock for each master page
2569 for( src = 0; src++ < source->cnt; )
2570 if( locks[src].reuse )
2573 bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2575 // insert or delete each key
2576 // process any splits or merges
2577 // release Write & Atomic latches
2578 // set ParentModifications and build
2579 // queue of keys to insert for split pages
2580 // or delete for deleted pages.
2582 // run through txn list backwards
2584 samepage = source->cnt + 1;
2586 for( src = source->cnt; src; src-- ) {
2587 if( locks[src].reuse )
2590 // perform the txn operations
2591 // from smaller to larger on
2594 for( idx = src; idx < samepage; idx++ )
2595 switch( slotptr(source,idx)->type ) {
2597 if( bt_atomicdelete (bt, source, locks, idx) )
2603 if( bt_atomicinsert (bt, source, locks, idx) )
2608 // after the same page operations have finished,
2609 // process master page for splits or deletion.
2611 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2612 prev->page = bt_mappage (bt, prev->latch);
2615 // pick-up all splits from master page
2617 entry = prev->latch->split;
2620 set->latch = bt->mgr->latchsets + entry;
2621 set->page = bt_mappage (bt, set->latch);
2622 entry = set->latch->split;
2624 // delete empty master page
2626 if( !prev->page->act ) {
2627 memcpy (set->page->left, prev->page->left, BtId);
2628 memcpy (prev->page, set->page, bt->mgr->page_size);
2629 bt_lockpage (BtLockDelete, set->latch);
2630 bt_freepage (bt, set);
2631 bt_unpinlatch (set->latch);
2633 prev->latch->dirty = 1;
2637 // remove empty page from the split chain
2639 if( !set->page->act ) {
2640 memcpy (prev->page->right, set->page->right, BtId);
2641 prev->latch->split = set->latch->split;
2642 bt_lockpage (BtLockDelete, set->latch);
2643 bt_freepage (bt, set);
2644 bt_unpinlatch (set->latch);
2648 // schedule prev fence key update
2650 ptr = keyptr(prev->page,prev->page->cnt);
2651 leaf = calloc (sizeof(AtomicKey), 1);
2653 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2654 leaf->page_no = prev->latch->page_no;
2655 leaf->entry = prev->latch->entry;
2665 bt_putid (set->page->left, prev->latch->page_no);
2666 bt_lockpage(BtLockParent, prev->latch);
2667 bt_unlockpage(BtLockWrite, prev->latch);
2671 // update left pointer in next right page from last split page
2672 // (if all splits were reversed, latch->split == 0)
2674 if( latch->split ) {
2675 // fix left pointer in master's original (now split) right sibling
2676 // or set rightmost page in page zero
2678 if( right = bt_getid (prev->page->right) ) {
2679 if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
2680 set->page = bt_mappage (bt, set->latch);
2684 bt_lockpage (BtLockWrite, set->latch);
2685 bt_putid (set->page->left, prev->latch->page_no);
2686 set->latch->dirty = 1;
2687 bt_unlockpage (BtLockWrite, set->latch);
2688 bt_unpinlatch (set->latch);
2689 } else { // prev is rightmost page
2690 bt_spinwritelock (bt->mgr->lock);
2691 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2692 bt_spinreleasewrite(bt->mgr->lock);
2695 // Process last page split in chain
2697 ptr = keyptr(prev->page,prev->page->cnt);
2698 leaf = calloc (sizeof(AtomicKey), 1);
2700 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2701 leaf->page_no = prev->latch->page_no;
2702 leaf->entry = prev->latch->entry;
2712 bt_lockpage(BtLockParent, prev->latch);
2713 bt_unlockpage(BtLockWrite, prev->latch);
2715 // remove atomic lock on master page
2717 bt_unlockpage(BtLockAtomic, latch);
2721 // finished if prev page occupied (either master or final split)
2723 if( prev->page->act ) {
2724 bt_unlockpage(BtLockWrite, latch);
2725 bt_unlockpage(BtLockAtomic, latch);
2726 bt_unpinlatch(latch);
2730 // any and all splits were reversed, and the
2731 // master page located in prev is empty, delete it
2732 // by pulling over master's right sibling.
2734 // Delete empty master's fence key
2736 ptr = keyptr(prev->page,prev->page->cnt);
2737 leaf = calloc (sizeof(AtomicKey), 1);
2739 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2740 leaf->page_no = prev->latch->page_no;
2741 leaf->entry = prev->latch->entry;
2751 bt_lockpage(BtLockParent, prev->latch);
2752 bt_unlockpage(BtLockWrite, prev->latch);
2754 // grab master's right sibling
2755 // note that the far right page never empties because
2756 // it always contains (at least) the infinite stopper key
2757 // and that all pages that don't contain any keys have
2758 // been deleted, or are being deleted under write lock.
2760 if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2761 set->page = bt_mappage (bt, set->latch);
2765 bt_lockpage(BtLockWrite, set->latch);
2767 // and pull contents over empty page
2768 // while preserving master's left link
2770 memcpy (set->page->left, prev->page->left, BtId);
2771 memcpy (prev->page, set->page, bt->mgr->page_size);
2773 // forward seekers to old right sibling
2774 // to new page location in prev
2776 bt_putid (set->page->right, prev->latch->page_no);
2777 set->latch->dirty = 1;
2778 set->page->kill = 1;
2780 // add new fence key for new master page contents, delete
2781 // right sibling after parent posts the new master fence.
2783 ptr = keyptr(set->page,set->page->cnt);
2784 leaf = calloc (sizeof(AtomicKey), 1);
2786 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2787 leaf->page_no = prev->latch->page_no;
2788 leaf->entry = set->latch->entry;
2791 // see if right sibling key update is in the FIFO,
2792 // and ParentLock if not there.
2794 if( !bt_parentmatch (head, leaf->entry) )
2795 bt_lockpage(BtLockParent, set->latch);
2797 bt_unlockpage(BtLockWrite, set->latch);
2806 // fix new master's right sibling's left pointer
2808 if( right = bt_getid (set->page->right) ) {
2809 if( set->latch = bt_pinlatch (bt, right, 1) )
2810 set->page = bt_mappage (bt, set->latch);
2812 bt_lockpage (BtLockWrite, set->latch);
2813 bt_putid (set->page->left, prev->latch->page_no);
2814 set->latch->dirty = 1;
2816 bt_unlockpage (BtLockWrite, set->latch);
2817 bt_unpinlatch (set->latch);
2818 } else { // master is now the far right page
2819 bt_spinwritelock (bt->mgr->lock);
2820 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2821 bt_spinreleasewrite(bt->mgr->lock);
2824 bt_unlockpage(BtLockAtomic, latch);
2827 // add & delete keys for any pages split or merged during transaction
2831 set->latch = bt->mgr->latchsets + leaf->entry;
2832 set->page = bt_mappage (bt, set->latch);
2834 bt_putid (value, leaf->page_no);
2835 ptr = (BtKey *)leaf->leafkey;
2837 switch( leaf->type ) {
2838 case 0: // insert key
2839 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2844 case 1: // delete key
2845 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2850 case 2: // insert key & free
2851 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2854 bt_lockpage (BtLockDelete, set->latch);
2855 bt_lockpage (BtLockWrite, set->latch);
2856 bt_freepage (bt, set);
2860 if( !leaf->nounlock )
2861 bt_unlockpage (BtLockParent, set->latch);
2863 bt_unpinlatch (set->latch);
2866 } while( leaf = tail );
2874 // set cursor to highest slot on highest page
2876 uint bt_lastkey (BtDb *bt)
2878 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2881 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2882 set->page = bt_mappage (bt, set->latch);
2886 bt_lockpage(BtLockRead, set->latch);
2887 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2888 bt_unlockpage(BtLockRead, set->latch);
2889 bt_unpinlatch (set->latch);
2891 bt->cursor_page = page_no;
2892 return bt->cursor->cnt;
2895 // return previous slot on cursor page
2897 uint bt_prevkey (BtDb *bt, uint slot)
2899 uid ourright, next, us = bt->cursor_page;
2905 ourright = bt_getid(bt->cursor->right);
2908 if( !(next = bt_getid(bt->cursor->left)) )
2912 bt->cursor_page = next;
2914 if( set->latch = bt_pinlatch (bt, next, 1) )
2915 set->page = bt_mappage (bt, set->latch);
2919 bt_lockpage(BtLockRead, set->latch);
2920 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2921 bt_unlockpage(BtLockRead, set->latch);
2922 bt_unpinlatch (set->latch);
2924 next = bt_getid (bt->cursor->right);
2926 if( bt->cursor->kill )
2930 if( next == ourright )
2935 return bt->cursor->cnt;
2938 // return next slot on cursor page
2939 // or slide cursor right into next page
2941 uint bt_nextkey (BtDb *bt, uint slot)
2947 right = bt_getid(bt->cursor->right);
2949 while( slot++ < bt->cursor->cnt )
2950 if( slotptr(bt->cursor,slot)->dead )
2952 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2960 bt->cursor_page = right;
2962 if( set->latch = bt_pinlatch (bt, right, 1) )
2963 set->page = bt_mappage (bt, set->latch);
2967 bt_lockpage(BtLockRead, set->latch);
2969 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2971 bt_unlockpage(BtLockRead, set->latch);
2972 bt_unpinlatch (set->latch);
2980 // cache page of keys into cursor and return starting slot for given key
2982 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2987 // cache page for retrieval
2989 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2990 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2994 bt->cursor_page = set->latch->page_no;
2996 bt_unlockpage(BtLockRead, set->latch);
2997 bt_unpinlatch (set->latch);
3001 BtKey *bt_key(BtDb *bt, uint slot)
3003 return keyptr(bt->cursor, slot);
3006 BtVal *bt_val(BtDb *bt, uint slot)
3008 return valptr(bt->cursor,slot);
3014 double getCpuTime(int type)
3017 FILETIME xittime[1];
3018 FILETIME systime[1];
3019 FILETIME usrtime[1];
3020 SYSTEMTIME timeconv[1];
3023 memset (timeconv, 0, sizeof(SYSTEMTIME));
3027 GetSystemTimeAsFileTime (xittime);
3028 FileTimeToSystemTime (xittime, timeconv);
3029 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3032 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3033 FileTimeToSystemTime (usrtime, timeconv);
3036 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3037 FileTimeToSystemTime (systime, timeconv);
3041 ans += (double)timeconv->wHour * 3600;
3042 ans += (double)timeconv->wMinute * 60;
3043 ans += (double)timeconv->wSecond;
3044 ans += (double)timeconv->wMilliseconds / 1000;
3049 #include <sys/resource.h>
3051 double getCpuTime(int type)
3053 struct rusage used[1];
3054 struct timeval tv[1];
3058 gettimeofday(tv, NULL);
3059 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3062 getrusage(RUSAGE_SELF, used);
3063 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3066 getrusage(RUSAGE_SELF, used);
3067 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3074 void bt_poolaudit (BtMgr *mgr)
3079 while( slot++ < mgr->latchdeployed ) {
3080 latch = mgr->latchsets + slot;
3082 if( *latch->readwr->rin & MASK )
3083 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3084 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3086 if( *latch->access->rin & MASK )
3087 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3088 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3090 if( *latch->parent->ticket != *latch->parent->serving )
3091 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3092 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3094 if( latch->pin & ~CLOCK_bit ) {
3095 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3101 uint bt_latchaudit (BtDb *bt)
3103 ushort idx, hashidx;
3109 if( *(ushort *)(bt->mgr->lock) )
3110 fprintf(stderr, "Alloc page locked\n");
3111 *(ushort *)(bt->mgr->lock) = 0;
3113 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3114 latch = bt->mgr->latchsets + idx;
3115 if( *latch->readwr->rin & MASK )
3116 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3117 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3119 if( *latch->access->rin & MASK )
3120 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3121 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3123 if( *latch->parent->ticket != *latch->parent->serving )
3124 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3125 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3128 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3133 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3134 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3135 fprintf(stderr, "hash entry %d locked\n", hashidx);
3137 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3139 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3140 latch = bt->mgr->latchsets + idx;
3142 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3143 } while( idx = latch->next );
3146 page_no = LEAF_page;
3148 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3149 uid off = page_no << bt->mgr->page_bits;
3151 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3155 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3157 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3158 return bt->err = BTERR_map;
3160 if( *amt < bt->mgr->page_size )
3161 return bt->err = BTERR_map;
3163 if( !bt->frame->free && !bt->frame->lvl )
3164 cnt += bt->frame->act;
3168 cnt--; // remove stopper key
3169 fprintf(stderr, " Total keys read %d\n", cnt);
3183 // standalone program to index file of keys
3184 // then list them onto std-out
3187 void *index_file (void *arg)
3189 uint __stdcall index_file (void *arg)
3192 int line = 0, found = 0, cnt = 0, idx;
3193 uid next, page_no = LEAF_page; // start on first page of leaves
3194 int ch, len = 0, slot, type = 0;
3195 unsigned char key[BT_maxkey];
3196 unsigned char txn[65536];
3197 ThreadArg *args = arg;
3206 bt = bt_open (args->mgr);
3209 if( args->idx < strlen (args->type) )
3210 ch = args->type[args->idx];
3212 ch = args->type[strlen(args->type) - 1];
3217 fprintf(stderr, "started latch mgr audit\n");
3218 cnt = bt_latchaudit (bt);
3219 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3230 if( type == Delete )
3231 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3233 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3235 if( type == Delete )
3236 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3238 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3240 if( in = fopen (args->infile, "rb") )
3241 while( ch = getc(in), ch != EOF )
3247 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3248 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3254 memcpy (txn + nxt, key + 10, len - 10);
3256 txn[nxt] = len - 10;
3258 memcpy (txn + nxt, key, 10);
3261 slotptr(page,++cnt)->off = nxt;
3262 slotptr(page,cnt)->type = type;
3265 if( cnt < args->num )
3272 if( bt_atomictxn (bt, page) )
3273 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3278 else if( len < BT_maxkey )
3280 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3284 fprintf(stderr, "started indexing for %s\n", args->infile);
3285 if( in = fopen (args->infile, "r") )
3286 while( ch = getc(in), ch != EOF )
3291 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3292 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3295 else if( len < BT_maxkey )
3297 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3301 fprintf(stderr, "started finding keys for %s\n", args->infile);
3302 if( in = fopen (args->infile, "rb") )
3303 while( ch = getc(in), ch != EOF )
3307 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3310 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3313 else if( len < BT_maxkey )
3315 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3319 fprintf(stderr, "started scanning\n");
3321 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3322 set->page = bt_mappage (bt, set->latch);
3324 fprintf(stderr, "unable to obtain latch"), exit(1);
3325 bt_lockpage (BtLockRead, set->latch);
3326 next = bt_getid (set->page->right);
3328 for( slot = 0; slot++ < set->page->cnt; )
3329 if( next || slot < set->page->cnt )
3330 if( !slotptr(set->page, slot)->dead ) {
3331 ptr = keyptr(set->page, slot);
3334 if( slotptr(set->page, slot)->type == Duplicate )
3337 fwrite (ptr->key, len, 1, stdout);
3338 val = valptr(set->page, slot);
3339 fwrite (val->value, val->len, 1, stdout);
3340 fputc ('\n', stdout);
3344 bt_unlockpage (BtLockRead, set->latch);
3345 bt_unpinlatch (set->latch);
3346 } while( page_no = next );
3348 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3352 fprintf(stderr, "started reverse scan\n");
3353 if( slot = bt_lastkey (bt) )
3354 while( slot = bt_prevkey (bt, slot) ) {
3355 if( slotptr(bt->cursor, slot)->dead )
3358 ptr = keyptr(bt->cursor, slot);
3361 if( slotptr(bt->cursor, slot)->type == Duplicate )
3364 fwrite (ptr->key, len, 1, stdout);
3365 val = valptr(bt->cursor, slot);
3366 fwrite (val->value, val->len, 1, stdout);
3367 fputc ('\n', stdout);
3371 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3376 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3378 fprintf(stderr, "started counting\n");
3379 page_no = LEAF_page;
3381 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3382 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3385 if( !bt->frame->free && !bt->frame->lvl )
3386 cnt += bt->frame->act;
3392 cnt--; // remove stopper key
3393 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3405 typedef struct timeval timer;
3407 int main (int argc, char **argv)
3409 int idx, cnt, len, slot, err;
3410 int segsize, bits = 16;
3427 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3428 fprintf (stderr, " where idx_file is the name of the btree file\n");
3429 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3430 fprintf (stderr, " page_bits is the page size in bits\n");
3431 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3432 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3433 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3437 start = getCpuTime(0);
3440 bits = atoi(argv[3]);
3443 poolsize = atoi(argv[4]);
3446 fprintf (stderr, "Warning: no mapped_pool\n");
3449 num = atoi(argv[5]);
3453 threads = malloc (cnt * sizeof(pthread_t));
3455 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3457 args = malloc (cnt * sizeof(ThreadArg));
3459 mgr = bt_mgr ((argv[1]), bits, poolsize);
3462 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3468 for( idx = 0; idx < cnt; idx++ ) {
3469 args[idx].infile = argv[idx + 6];
3470 args[idx].type = argv[2];
3471 args[idx].mgr = mgr;
3472 args[idx].num = num;
3473 args[idx].idx = idx;
3475 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3476 fprintf(stderr, "Error creating thread %d\n", err);
3478 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3482 // wait for termination
3485 for( idx = 0; idx < cnt; idx++ )
3486 pthread_join (threads[idx], NULL);
3488 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3490 for( idx = 0; idx < cnt; idx++ )
3491 CloseHandle(threads[idx]);
3497 elapsed = getCpuTime(0) - start;
3498 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3499 elapsed = getCpuTime(1);
3500 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3501 elapsed = getCpuTime(2);
3502 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);