1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and ACID batched key-value updates
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
88 There are six lock types for each node in four independent sets:
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
106 // definition for phase-fair reader/writer lock implementation
120 // definition for spin latch implementation
122 // exclusive is set for write access
123 // share is count of read accessors
124 // grant write lock when share == 0
126 volatile typedef struct {
137 // hash table entries
140 volatile uint slot; // Latch table entry at head of chain
141 BtSpinLatch latch[1];
144 // latch manager table structure
147 uid page_no; // latch set page number
148 RWLock readwr[1]; // read/write page lock
149 RWLock access[1]; // Access Intent/Page delete
150 RWLock parent[1]; // Posting of fence key in parent
151 RWLock atomic[1]; // Atomic update in progress
152 uint split; // right split page atomic insert
153 uint entry; // entry slot in latch table
154 uint next; // next entry in hash table chain
155 uint prev; // prev entry in hash table chain
156 volatile ushort pin; // number of outstanding threads
157 ushort dirty:1; // page in cache is dirty
160 // Define the length of the page record numbers
164 // Page key slot definition.
166 // Keys are marked dead, but remain on the page until
167 // it cleanup is called. The fence key (highest key) for
168 // a leaf page is always present, even after cleanup.
172 // In addition to the Unique keys that occupy slots
173 // there are Librarian and Duplicate key
174 // slots occupying the key slot array.
176 // The Librarian slots are dead keys that
177 // serve as filler, available to add new Unique
178 // or Dup slots that are inserted into the B-tree.
180 // The Duplicate slots have had their key bytes extended
181 // by 6 bytes to contain a binary duplicate key uniqueifier.
192 uint off:BT_maxbits; // page offset for key start
193 uint type:3; // type of slot
194 uint dead:1; // set for deleted slot
197 // The key structure occupies space at the upper end of
198 // each page. It's a length byte followed by the key
202 unsigned char len; // this can be changed to a ushort or uint
203 unsigned char key[0];
206 // the value structure also occupies space at the upper
207 // end of the page. Each key is immediately followed by a value.
210 unsigned char len; // this can be changed to a ushort or uint
211 unsigned char value[0];
214 #define BT_maxkey 255 // maximum number of bytes in a key
215 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
217 // The first part of an index page.
218 // It is immediately followed
219 // by the BtSlot array of keys.
221 // note that this structure size
222 // must be a multiple of 8 bytes
223 // in order to place dups correctly.
225 typedef struct BtPage_ {
226 uint cnt; // count of keys in page
227 uint act; // count of active keys
228 uint min; // next key offset
229 uint garbage; // page garbage in bytes
230 unsigned char bits:7; // page size in bits
231 unsigned char free:1; // page is on free chain
232 unsigned char lvl:7; // level of page
233 unsigned char kill:1; // page is being deleted
234 unsigned char right[BtId]; // page number to right
235 unsigned char left[BtId]; // page number to left
236 unsigned char filler[2]; // padding to multiple of 8
239 // The loadpage interface object
242 BtPage page; // current page pointer
243 BtLatchSet *latch; // current page latch set
246 // structure for latch manager on ALLOC_page
249 struct BtPage_ alloc[1]; // next page_no in right ptr
250 unsigned long long dups[1]; // global duplicate key uniqueifier
251 unsigned char chain[BtId]; // head of free page_nos chain
254 // The object structure for Btree access
257 uint page_size; // page size
258 uint page_bits; // page size in bits
264 BtPageZero *pagezero; // mapped allocation page
265 BtSpinLatch lock[1]; // allocation area lite latch
266 uint latchdeployed; // highest number of latch entries deployed
267 uint nlatchpage; // number of latch pages at BT_latch
268 uint latchtotal; // number of page latch entries
269 uint latchhash; // number of latch hash table slots
270 uint latchvictim; // next latch entry to examine
271 BtHashEntry *hashtable; // the buffer pool hash table entries
272 BtLatchSet *latchsets; // mapped latch set from buffer pool
273 unsigned char *pagepool; // mapped to the buffer pool pages
275 HANDLE halloc; // allocation handle
276 HANDLE hpool; // buffer pool handle
281 BtMgr *mgr; // buffer manager for thread
282 BtPage cursor; // cached frame for start/next (never mapped)
283 BtPage frame; // spare frame for the page split (never mapped)
284 uid cursor_page; // current cursor page number
285 unsigned char *mem; // frame, cursor, page memory buffer
286 unsigned char key[BT_keyarray]; // last found complete key
287 int found; // last delete or insert was found
288 int err; // last error
289 int reads, writes; // number of reads and writes from the btree
303 #define CLOCK_bit 0x8000
306 extern void bt_close (BtDb *bt);
307 extern BtDb *bt_open (BtMgr *mgr);
308 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
309 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
310 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
311 extern BtKey *bt_foundkey (BtDb *bt);
312 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
313 extern uint bt_nextkey (BtDb *bt, uint slot);
316 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
317 void bt_mgrclose (BtMgr *mgr);
319 // Helper functions to return slot values
320 // from the cursor page.
322 extern BtKey *bt_key (BtDb *bt, uint slot);
323 extern BtVal *bt_val (BtDb *bt, uint slot);
325 // The page is allocated from low and hi ends.
326 // The key slots are allocated from the bottom,
327 // while the text and value of the key
328 // are allocated from the top. When the two
329 // areas meet, the page is split into two.
331 // A key consists of a length byte, two bytes of
332 // index number (0 - 65535), and up to 253 bytes
335 // Associated with each key is a value byte string
336 // containing any value desired.
338 // The b-tree root is always located at page 1.
339 // The first leaf page of level zero is always
340 // located on page 2.
342 // The b-tree pages are linked with next
343 // pointers to facilitate enumerators,
344 // and provide for concurrency.
346 // When to root page fills, it is split in two and
347 // the tree height is raised by a new root at page
348 // one with two keys.
350 // Deleted keys are marked with a dead bit until
351 // page cleanup. The fence key for a leaf node is
354 // To achieve maximum concurrency one page is locked at a time
355 // as the tree is traversed to find leaf key in question. The right
356 // page numbers are used in cases where the page is being split,
359 // Page 0 is dedicated to lock for new page extensions,
360 // and chains empty pages together for reuse. It also
361 // contains the latch manager hash table.
363 // The ParentModification lock on a node is obtained to serialize posting
364 // or changing the fence key for a node.
366 // Empty pages are chained together through the ALLOC page and reused.
368 // Access macros to address slot and key values from the page
369 // Page slots use 1 based indexing.
371 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
372 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
373 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
375 void bt_putid(unsigned char *dest, uid id)
380 dest[i] = (unsigned char)id, id >>= 8;
383 uid bt_getid(unsigned char *src)
388 for( i = 0; i < BtId; i++ )
389 id <<= 8, id |= *src++;
394 uid bt_newdup (BtDb *bt)
397 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
399 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
403 // Phase-Fair reader/writer lock implementation
405 void WriteLock (RWLock *lock)
410 tix = __sync_fetch_and_add (lock->ticket, 1);
412 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
414 // wait for our ticket to come up
416 while( tix != lock->serving[0] )
423 w = PRES | (tix & PHID);
425 r = __sync_fetch_and_add (lock->rin, w);
427 r = _InterlockedExchangeAdd16 (lock->rin, w);
429 while( r != *lock->rout )
437 void WriteRelease (RWLock *lock)
440 __sync_fetch_and_and (lock->rin, ~MASK);
442 _InterlockedAnd16 (lock->rin, ~MASK);
447 void ReadLock (RWLock *lock)
451 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
453 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
456 while( w == (*lock->rin & MASK) )
464 void ReadRelease (RWLock *lock)
467 __sync_fetch_and_add (lock->rout, RINC);
469 _InterlockedExchangeAdd16 (lock->rout, RINC);
473 // Spin Latch Manager
475 // wait until write lock mode is clear
476 // and add 1 to the share count
478 void bt_spinreadlock(BtSpinLatch *latch)
484 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
486 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
488 // see if exclusive request is granted or pending
493 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
495 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
498 } while( sched_yield(), 1 );
500 } while( SwitchToThread(), 1 );
504 // wait for other read and write latches to relinquish
506 void bt_spinwritelock(BtSpinLatch *latch)
512 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
514 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
517 if( !(prev & ~BOTH) )
521 __sync_fetch_and_and ((ushort *)latch, ~XCL);
523 _InterlockedAnd16((ushort *)latch, ~XCL);
526 } while( sched_yield(), 1 );
528 } while( SwitchToThread(), 1 );
532 // try to obtain write lock
534 // return 1 if obtained,
537 int bt_spinwritetry(BtSpinLatch *latch)
542 prev = __sync_fetch_and_or((ushort *)latch, XCL);
544 prev = _InterlockedOr16((ushort *)latch, XCL);
546 // take write access if all bits are clear
549 if( !(prev & ~BOTH) )
553 __sync_fetch_and_and ((ushort *)latch, ~XCL);
555 _InterlockedAnd16((ushort *)latch, ~XCL);
562 void bt_spinreleasewrite(BtSpinLatch *latch)
565 __sync_fetch_and_and((ushort *)latch, ~BOTH);
567 _InterlockedAnd16((ushort *)latch, ~BOTH);
571 // decrement reader count
573 void bt_spinreleaseread(BtSpinLatch *latch)
576 __sync_fetch_and_add((ushort *)latch, -SHARE);
578 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
582 // read page from permanent location in Btree file
584 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
586 off64_t off = page_no << mgr->page_bits;
589 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
590 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
597 memset (ovl, 0, sizeof(OVERLAPPED));
599 ovl->OffsetHigh = off >> 32;
601 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
602 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
605 if( *amt < mgr->page_size ) {
606 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
613 // write page to permanent location in Btree file
614 // clear the dirty bit
616 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
618 off64_t off = page_no << mgr->page_bits;
621 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
627 memset (ovl, 0, sizeof(OVERLAPPED));
629 ovl->OffsetHigh = off >> 32;
631 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
634 if( *amt < mgr->page_size )
640 // link latch table entry into head of latch hash table
642 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
644 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
645 BtLatchSet *latch = bt->mgr->latchsets + slot;
647 if( latch->next = bt->mgr->hashtable[hashidx].slot )
648 bt->mgr->latchsets[latch->next].prev = slot;
650 bt->mgr->hashtable[hashidx].slot = slot;
651 latch->page_no = page_no;
658 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
666 // set CLOCK bit in latch
667 // decrement pin count
669 void bt_unpinlatch (BtLatchSet *latch)
672 if( ~latch->pin & CLOCK_bit )
673 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
674 __sync_fetch_and_add(&latch->pin, -1);
676 if( ~latch->pin & CLOCK_bit )
677 _InterlockedOr16 (&latch->pin, CLOCK_bit);
678 _InterlockedDecrement16 (&latch->pin);
682 // return the btree cached page address
684 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
686 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
691 // find existing latchset or inspire new one
692 // return with latchset pinned
694 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
696 uint hashidx = page_no % bt->mgr->latchhash;
704 // try to find our entry
706 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
708 if( slot = bt->mgr->hashtable[hashidx].slot ) do
710 latch = bt->mgr->latchsets + slot;
711 if( page_no == latch->page_no )
713 } while( slot = latch->next );
719 latch = bt->mgr->latchsets + slot;
721 __sync_fetch_and_add(&latch->pin, 1);
723 _InterlockedIncrement16 (&latch->pin);
725 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
729 // see if there are any unused pool entries
731 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
733 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
736 if( slot < bt->mgr->latchtotal ) {
737 latch = bt->mgr->latchsets + slot;
738 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
740 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
745 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
747 _InterlockedDecrement (&bt->mgr->latchdeployed);
749 // find and reuse previous entry on victim
753 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
755 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
757 // try to get write lock on hash chain
758 // skip entry if not obtained
759 // or has outstanding pins
761 slot %= bt->mgr->latchtotal;
766 latch = bt->mgr->latchsets + slot;
767 idx = latch->page_no % bt->mgr->latchhash;
769 // see we are on same chain as hashidx
774 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
777 // skip this slot if it is pinned
778 // or the CLOCK bit is set
781 if( latch->pin & CLOCK_bit ) {
783 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
785 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
788 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
792 // update permanent page area in btree from buffer pool
794 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
797 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
800 latch->dirty = 0, bt->writes++;
802 // unlink our available slot from its hash chain
805 bt->mgr->latchsets[latch->prev].next = latch->next;
807 bt->mgr->hashtable[idx].slot = latch->next;
810 bt->mgr->latchsets[latch->next].prev = latch->prev;
812 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
814 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
817 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
822 void bt_mgrclose (BtMgr *mgr)
829 // flush dirty pool pages to the btree
831 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
832 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
833 latch = mgr->latchsets + slot;
836 bt_writepage(mgr, page, latch->page_no);
837 latch->dirty = 0, num++;
839 // madvise (page, mgr->page_size, MADV_DONTNEED);
842 fprintf(stderr, "%d buffer pool pages flushed\n", num);
845 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
846 munmap (mgr->pagezero, mgr->page_size);
848 FlushViewOfFile(mgr->pagezero, 0);
849 UnmapViewOfFile(mgr->pagezero);
850 UnmapViewOfFile(mgr->hashtable);
851 CloseHandle(mgr->halloc);
852 CloseHandle(mgr->hpool);
858 FlushFileBuffers(mgr->idx);
859 CloseHandle(mgr->idx);
864 // close and release memory
866 void bt_close (BtDb *bt)
873 VirtualFree (bt->mem, 0, MEM_RELEASE);
878 // open/create new btree buffer manager
880 // call with file_name, BT_openmode, bits in page size (e.g. 16),
881 // size of page pool (e.g. 262144)
883 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
885 uint lvl, attr, last, slot, idx;
886 uint nlatchpage, latchhash;
887 unsigned char value[BtId];
888 int flag, initit = 0;
889 BtPageZero *pagezero;
896 // determine sanity of page size and buffer pool
898 if( bits > BT_maxbits )
900 else if( bits < BT_minbits )
904 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
909 mgr = calloc (1, sizeof(BtMgr));
911 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
913 if( mgr->idx == -1 ) {
914 fprintf (stderr, "Unable to open btree file\n");
915 return free(mgr), NULL;
918 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
919 attr = FILE_ATTRIBUTE_NORMAL;
920 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
922 if( mgr->idx == INVALID_HANDLE_VALUE )
923 return GlobalFree(mgr), NULL;
927 pagezero = valloc (BT_maxpage);
930 // read minimum page size to get root info
931 // to support raw disk partition files
932 // check if bits == 0 on the disk.
934 if( size = lseek (mgr->idx, 0L, 2) )
935 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
936 if( pagezero->alloc->bits )
937 bits = pagezero->alloc->bits;
941 return free(mgr), free(pagezero), NULL;
945 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
946 size = GetFileSize(mgr->idx, amt);
949 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
950 return bt_mgrclose (mgr), NULL;
951 bits = pagezero->alloc->bits;
956 mgr->page_size = 1 << bits;
957 mgr->page_bits = bits;
959 // calculate number of latch hash table entries
961 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
962 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
964 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
965 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
966 mgr->latchtotal = nodemax;
971 // initialize an empty b-tree with latch page, root page, page of leaves
972 // and page(s) of latches and page pool cache
974 memset (pagezero, 0, 1 << bits);
975 pagezero->alloc->bits = mgr->page_bits;
976 bt_putid(pagezero->alloc->right, MIN_lvl+1);
978 // initialize left-most LEAF page in
981 bt_putid (pagezero->alloc->left, LEAF_page);
983 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
984 fprintf (stderr, "Unable to create btree page zero\n");
985 return bt_mgrclose (mgr), NULL;
988 memset (pagezero, 0, 1 << bits);
989 pagezero->alloc->bits = mgr->page_bits;
991 for( lvl=MIN_lvl; lvl--; ) {
992 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
993 key = keyptr(pagezero->alloc, 1);
994 key->len = 2; // create stopper key
998 bt_putid(value, MIN_lvl - lvl + 1);
999 val = valptr(pagezero->alloc, 1);
1000 val->len = lvl ? BtId : 0;
1001 memcpy (val->value, value, val->len);
1003 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1004 pagezero->alloc->lvl = lvl;
1005 pagezero->alloc->cnt = 1;
1006 pagezero->alloc->act = 1;
1008 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1009 fprintf (stderr, "Unable to create btree page zero\n");
1010 return bt_mgrclose (mgr), NULL;
1018 VirtualFree (pagezero, 0, MEM_RELEASE);
1021 // mlock the pagezero page
1023 flag = PROT_READ | PROT_WRITE;
1024 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1025 if( mgr->pagezero == MAP_FAILED ) {
1026 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1027 return bt_mgrclose (mgr), NULL;
1029 mlock (mgr->pagezero, mgr->page_size);
1031 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1032 if( mgr->hashtable == MAP_FAILED ) {
1033 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1034 return bt_mgrclose (mgr), NULL;
1037 flag = PAGE_READWRITE;
1038 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1039 if( !mgr->halloc ) {
1040 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1041 return bt_mgrclose (mgr), NULL;
1044 flag = FILE_MAP_WRITE;
1045 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1046 if( !mgr->pagezero ) {
1047 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1048 return bt_mgrclose (mgr), NULL;
1051 flag = PAGE_READWRITE;
1052 size = (uid)mgr->nlatchpage << mgr->page_bits;
1053 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1055 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1056 return bt_mgrclose (mgr), NULL;
1059 flag = FILE_MAP_WRITE;
1060 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1061 if( !mgr->hashtable ) {
1062 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1063 return bt_mgrclose (mgr), NULL;
1067 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1068 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1073 // open BTree access method
1074 // based on buffer manager
1076 BtDb *bt_open (BtMgr *mgr)
1078 BtDb *bt = malloc (sizeof(*bt));
1080 memset (bt, 0, sizeof(*bt));
1083 bt->mem = valloc (2 *mgr->page_size);
1085 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1087 bt->frame = (BtPage)bt->mem;
1088 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1092 // compare two keys, return > 0, = 0, or < 0
1093 // =0: keys are same
1096 // as the comparison value
1098 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1100 uint len1 = key1->len;
1103 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1114 // place write, read, or parent lock on requested page_no.
1116 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1120 ReadLock (latch->readwr);
1123 WriteLock (latch->readwr);
1126 ReadLock (latch->access);
1129 WriteLock (latch->access);
1132 WriteLock (latch->parent);
1135 WriteLock (latch->atomic);
1140 // remove write, read, or parent lock on requested page
1142 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1146 ReadRelease (latch->readwr);
1149 WriteRelease (latch->readwr);
1152 ReadRelease (latch->access);
1155 WriteRelease (latch->access);
1158 WriteRelease (latch->parent);
1161 WriteRelease (latch->atomic);
1166 // allocate a new page
1167 // return with page latched, but unlocked.
1169 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1174 // lock allocation page
1176 bt_spinwritelock(bt->mgr->lock);
1178 // use empty chain first
1179 // else allocate empty page
1181 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1182 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1183 set->page = bt_mappage (bt, set->latch);
1185 return bt->err = BTERR_struct, -1;
1187 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1188 bt_spinreleasewrite(bt->mgr->lock);
1190 memcpy (set->page, contents, bt->mgr->page_size);
1191 set->latch->dirty = 1;
1195 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1196 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1198 // unlock allocation latch
1200 bt_spinreleasewrite(bt->mgr->lock);
1202 // don't load cache from btree page
1204 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1205 set->page = bt_mappage (bt, set->latch);
1207 return bt->err = BTERR_struct;
1209 memcpy (set->page, contents, bt->mgr->page_size);
1210 set->latch->dirty = 1;
1214 // find slot in page for given key at a given level
1216 int bt_findslot (BtPage page, unsigned char *key, uint len)
1218 uint diff, higher = page->cnt, low = 1, slot;
1221 // make stopper key an infinite fence value
1223 if( bt_getid (page->right) )
1228 // low is the lowest candidate.
1229 // loop ends when they meet
1231 // higher is already
1232 // tested as .ge. the passed key.
1234 while( diff = higher - low ) {
1235 slot = low + ( diff >> 1 );
1236 if( keycmp (keyptr(page, slot), key, len) < 0 )
1239 higher = slot, good++;
1242 // return zero if key is on right link page
1244 return good ? higher : 0;
1247 // find and load page at given level for given key
1248 // leave page rd or wr locked as requested
1250 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1252 uid page_no = ROOT_page, prevpage = 0;
1253 uint drill = 0xff, slot;
1254 BtLatchSet *prevlatch;
1255 uint mode, prevmode;
1257 // start at root of btree and drill down
1260 // determine lock mode of drill level
1261 mode = (drill == lvl) ? lock : BtLockRead;
1263 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1266 // obtain access lock using lock chaining with Access mode
1268 if( page_no > ROOT_page )
1269 bt_lockpage(BtLockAccess, set->latch);
1271 set->page = bt_mappage (bt, set->latch);
1273 // release & unpin parent or left sibling page
1276 bt_unlockpage(prevmode, prevlatch);
1277 bt_unpinlatch (prevlatch);
1281 // obtain mode lock using lock chaining through AccessLock
1283 bt_lockpage(mode, set->latch);
1285 if( set->page->free )
1286 return bt->err = BTERR_struct, 0;
1288 if( page_no > ROOT_page )
1289 bt_unlockpage(BtLockAccess, set->latch);
1291 // re-read and re-lock root after determining actual level of root
1293 if( set->page->lvl != drill) {
1294 if( set->latch->page_no != ROOT_page )
1295 return bt->err = BTERR_struct, 0;
1297 drill = set->page->lvl;
1299 if( lock != BtLockRead && drill == lvl ) {
1300 bt_unlockpage(mode, set->latch);
1301 bt_unpinlatch (set->latch);
1306 prevpage = set->latch->page_no;
1307 prevlatch = set->latch;
1310 // find key on page at this level
1311 // and descend to requested level
1313 if( set->page->kill )
1316 if( slot = bt_findslot (set->page, key, len) ) {
1320 // find next non-dead slot -- the fence key if nothing else
1322 while( slotptr(set->page, slot)->dead )
1323 if( slot++ < set->page->cnt )
1326 return bt->err = BTERR_struct, 0;
1328 page_no = bt_getid(valptr(set->page, slot)->value);
1333 // or slide right into next page
1336 page_no = bt_getid(set->page->right);
1340 // return error on end of right chain
1342 bt->err = BTERR_struct;
1343 return 0; // return error
1346 // return page to free list
1347 // page must be delete & write locked
1349 void bt_freepage (BtDb *bt, BtPageSet *set)
1351 // lock allocation page
1353 bt_spinwritelock (bt->mgr->lock);
1357 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1358 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1359 set->latch->dirty = 1;
1360 set->page->free = 1;
1362 // unlock released page
1364 bt_unlockpage (BtLockDelete, set->latch);
1365 bt_unlockpage (BtLockWrite, set->latch);
1367 // unlock allocation page
1369 bt_spinreleasewrite (bt->mgr->lock);
1372 // a fence key was deleted from a page
1373 // push new fence value upwards
1375 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1377 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1378 unsigned char value[BtId];
1382 // remove the old fence value
1384 ptr = keyptr(set->page, set->page->cnt);
1385 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1386 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1387 set->latch->dirty = 1;
1389 // cache new fence value
1391 ptr = keyptr(set->page, set->page->cnt);
1392 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1394 bt_lockpage (BtLockParent, set->latch);
1395 bt_unlockpage (BtLockWrite, set->latch);
1397 // insert new (now smaller) fence key
1399 bt_putid (value, set->latch->page_no);
1400 ptr = (BtKey*)leftkey;
1402 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1405 // now delete old fence key
1407 ptr = (BtKey*)rightkey;
1409 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1412 bt_unlockpage (BtLockParent, set->latch);
1413 bt_unpinlatch(set->latch);
1417 // root has a single child
1418 // collapse a level from the tree
1420 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1426 // find the child entry and promote as new root contents
1429 for( idx = 0; idx++ < root->page->cnt; )
1430 if( !slotptr(root->page, idx)->dead )
1433 page_no = bt_getid (valptr(root->page, idx)->value);
1435 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1436 child->page = bt_mappage (bt, child->latch);
1440 bt_lockpage (BtLockDelete, child->latch);
1441 bt_lockpage (BtLockWrite, child->latch);
1443 memcpy (root->page, child->page, bt->mgr->page_size);
1444 root->latch->dirty = 1;
1446 bt_freepage (bt, child);
1447 bt_unpinlatch (child->latch);
1449 } while( root->page->lvl > 1 && root->page->act == 1 );
1451 bt_unlockpage (BtLockWrite, root->latch);
1452 bt_unpinlatch (root->latch);
1456 // delete a page and manage keys
1457 // call with page writelocked
1458 // returns with page unpinned
1460 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1462 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1463 unsigned char value[BtId];
1464 uint lvl = set->page->lvl;
1469 // cache copy of fence key
1470 // to post in parent
1472 ptr = keyptr(set->page, set->page->cnt);
1473 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1475 // obtain lock on right page
1477 page_no = bt_getid(set->page->right);
1479 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1480 right->page = bt_mappage (bt, right->latch);
1484 bt_lockpage (BtLockWrite, right->latch);
1486 // cache copy of key to update
1488 ptr = keyptr(right->page, right->page->cnt);
1489 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1491 if( right->page->kill )
1492 return bt->err = BTERR_struct;
1494 // pull contents of right peer into our empty page
1496 memcpy (set->page, right->page, bt->mgr->page_size);
1497 set->latch->dirty = 1;
1499 // mark right page deleted and point it to left page
1500 // until we can post parent updates that remove access
1501 // to the deleted page.
1503 bt_putid (right->page->right, set->latch->page_no);
1504 right->latch->dirty = 1;
1505 right->page->kill = 1;
1507 bt_lockpage (BtLockParent, right->latch);
1508 bt_unlockpage (BtLockWrite, right->latch);
1510 bt_lockpage (BtLockParent, set->latch);
1511 bt_unlockpage (BtLockWrite, set->latch);
1513 // redirect higher key directly to our new node contents
1515 bt_putid (value, set->latch->page_no);
1516 ptr = (BtKey*)higherfence;
1518 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1521 // delete old lower key to our node
1523 ptr = (BtKey*)lowerfence;
1525 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1528 // obtain delete and write locks to right node
1530 bt_unlockpage (BtLockParent, right->latch);
1531 bt_lockpage (BtLockDelete, right->latch);
1532 bt_lockpage (BtLockWrite, right->latch);
1533 bt_freepage (bt, right);
1534 bt_unpinlatch (right->latch);
1536 bt_unlockpage (BtLockParent, set->latch);
1537 bt_unpinlatch (set->latch);
1542 // find and delete key on page by marking delete flag bit
1543 // if page becomes empty, delete it from the btree
1545 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1547 uint slot, idx, found, fence;
1552 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1553 ptr = keyptr(set->page, slot);
1557 // if librarian slot, advance to real slot
1559 if( slotptr(set->page, slot)->type == Librarian )
1560 ptr = keyptr(set->page, ++slot);
1562 fence = slot == set->page->cnt;
1564 // if key is found delete it, otherwise ignore request
1566 if( found = !keycmp (ptr, key, len) )
1567 if( found = slotptr(set->page, slot)->dead == 0 ) {
1568 val = valptr(set->page,slot);
1569 slotptr(set->page, slot)->dead = 1;
1570 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1573 // collapse empty slots beneath the fence
1575 while( idx = set->page->cnt - 1 )
1576 if( slotptr(set->page, idx)->dead ) {
1577 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1578 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1583 // did we delete a fence key in an upper level?
1585 if( found && lvl && set->page->act && fence )
1586 if( bt_fixfence (bt, set, lvl) )
1589 return bt->found = found, 0;
1591 // do we need to collapse root?
1593 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1594 if( bt_collapseroot (bt, set) )
1597 return bt->found = found, 0;
1599 // delete empty page
1601 if( !set->page->act )
1602 return bt_deletepage (bt, set);
1604 set->latch->dirty = 1;
1605 bt_unlockpage(BtLockWrite, set->latch);
1606 bt_unpinlatch (set->latch);
1607 return bt->found = found, 0;
1610 BtKey *bt_foundkey (BtDb *bt)
1612 return (BtKey*)(bt->key);
1615 // advance to next slot
1617 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1619 BtLatchSet *prevlatch;
1622 if( slot < set->page->cnt )
1625 prevlatch = set->latch;
1627 if( page_no = bt_getid(set->page->right) )
1628 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1629 set->page = bt_mappage (bt, set->latch);
1633 return bt->err = BTERR_struct, 0;
1635 // obtain access lock using lock chaining with Access mode
1637 bt_lockpage(BtLockAccess, set->latch);
1639 bt_unlockpage(BtLockRead, prevlatch);
1640 bt_unpinlatch (prevlatch);
1642 bt_lockpage(BtLockRead, set->latch);
1643 bt_unlockpage(BtLockAccess, set->latch);
1647 // find unique key or first duplicate key in
1648 // leaf level and return number of value bytes
1649 // or (-1) if not found. Setup key for bt_foundkey
1651 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1659 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1661 ptr = keyptr(set->page, slot);
1663 // skip librarian slot place holder
1665 if( slotptr(set->page, slot)->type == Librarian )
1666 ptr = keyptr(set->page, ++slot);
1668 // return actual key found
1670 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1673 if( slotptr(set->page, slot)->type == Duplicate )
1676 // not there if we reach the stopper key
1678 if( slot == set->page->cnt )
1679 if( !bt_getid (set->page->right) )
1682 // if key exists, return >= 0 value bytes copied
1683 // otherwise return (-1)
1685 if( slotptr(set->page, slot)->dead )
1689 if( !memcmp (ptr->key, key, len) ) {
1690 val = valptr (set->page,slot);
1691 if( valmax > val->len )
1693 memcpy (value, val->value, valmax);
1699 } while( slot = bt_findnext (bt, set, slot) );
1701 bt_unlockpage (BtLockRead, set->latch);
1702 bt_unpinlatch (set->latch);
1706 // check page for space available,
1707 // clean if necessary and return
1708 // 0 - page needs splitting
1709 // >0 new slot value
1711 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1713 uint nxt = bt->mgr->page_size;
1714 BtPage page = set->page;
1715 uint cnt = 0, idx = 0;
1716 uint max = page->cnt;
1721 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1724 // skip cleanup and proceed to split
1725 // if there's not enough garbage
1728 if( page->garbage < nxt / 5 )
1731 memcpy (bt->frame, page, bt->mgr->page_size);
1733 // skip page info and set rest of page to zero
1735 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1736 set->latch->dirty = 1;
1740 // clean up page first by
1741 // removing deleted keys
1743 while( cnt++ < max ) {
1746 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1749 // copy the value across
1751 val = valptr(bt->frame, cnt);
1752 nxt -= val->len + sizeof(BtVal);
1753 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1755 // copy the key across
1757 key = keyptr(bt->frame, cnt);
1758 nxt -= key->len + sizeof(BtKey);
1759 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1761 // make a librarian slot
1764 slotptr(page, ++idx)->off = nxt;
1765 slotptr(page, idx)->type = Librarian;
1766 slotptr(page, idx)->dead = 1;
1771 slotptr(page, ++idx)->off = nxt;
1772 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1774 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1781 // see if page has enough space now, or does it need splitting?
1783 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1789 // split the root and raise the height of the btree
1791 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1793 unsigned char leftkey[BT_keyarray];
1794 uint nxt = bt->mgr->page_size;
1795 unsigned char value[BtId];
1801 // save left page fence key for new root
1803 ptr = keyptr(root->page, root->page->cnt);
1804 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1806 // Obtain an empty page to use, and copy the current
1807 // root contents into it, e.g. lower keys
1809 if( bt_newpage(bt, left, root->page) )
1812 left_page_no = left->latch->page_no;
1813 bt_unpinlatch (left->latch);
1815 // preserve the page info at the bottom
1816 // of higher keys and set rest to zero
1818 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1820 // insert stopper key at top of newroot page
1821 // and increase the root height
1823 nxt -= BtId + sizeof(BtVal);
1824 bt_putid (value, right->page_no);
1825 val = (BtVal *)((unsigned char *)root->page + nxt);
1826 memcpy (val->value, value, BtId);
1829 nxt -= 2 + sizeof(BtKey);
1830 slotptr(root->page, 2)->off = nxt;
1831 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1836 // insert lower keys page fence key on newroot page as first key
1838 nxt -= BtId + sizeof(BtVal);
1839 bt_putid (value, left_page_no);
1840 val = (BtVal *)((unsigned char *)root->page + nxt);
1841 memcpy (val->value, value, BtId);
1844 ptr = (BtKey *)leftkey;
1845 nxt -= ptr->len + sizeof(BtKey);
1846 slotptr(root->page, 1)->off = nxt;
1847 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1849 bt_putid(root->page->right, 0);
1850 root->page->min = nxt; // reset lowest used offset and key count
1851 root->page->cnt = 2;
1852 root->page->act = 2;
1855 // release and unpin root pages
1857 bt_unlockpage(BtLockWrite, root->latch);
1858 bt_unpinlatch (root->latch);
1860 bt_unpinlatch (right);
1864 // split already locked full node
1866 // return pool entry for new right
1869 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1871 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1872 uint lvl = set->page->lvl;
1879 // split higher half of keys to bt->frame
1881 memset (bt->frame, 0, bt->mgr->page_size);
1882 max = set->page->cnt;
1886 while( cnt++ < max ) {
1887 if( slotptr(set->page, cnt)->dead && cnt < max )
1889 src = valptr(set->page, cnt);
1890 nxt -= src->len + sizeof(BtVal);
1891 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1893 key = keyptr(set->page, cnt);
1894 nxt -= key->len + sizeof(BtKey);
1895 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1896 memcpy (ptr, key, key->len + sizeof(BtKey));
1898 // add librarian slot
1901 slotptr(bt->frame, ++idx)->off = nxt;
1902 slotptr(bt->frame, idx)->type = Librarian;
1903 slotptr(bt->frame, idx)->dead = 1;
1908 slotptr(bt->frame, ++idx)->off = nxt;
1909 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1911 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1915 bt->frame->bits = bt->mgr->page_bits;
1916 bt->frame->min = nxt;
1917 bt->frame->cnt = idx;
1918 bt->frame->lvl = lvl;
1922 if( set->latch->page_no > ROOT_page )
1923 bt_putid (bt->frame->right, bt_getid (set->page->right));
1925 // get new free page and write higher keys to it.
1927 if( bt_newpage(bt, right, bt->frame) )
1930 memcpy (bt->frame, set->page, bt->mgr->page_size);
1931 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1932 set->latch->dirty = 1;
1934 nxt = bt->mgr->page_size;
1935 set->page->garbage = 0;
1941 if( slotptr(bt->frame, max)->type == Librarian )
1944 // assemble page of smaller keys
1946 while( cnt++ < max ) {
1947 if( slotptr(bt->frame, cnt)->dead )
1949 val = valptr(bt->frame, cnt);
1950 nxt -= val->len + sizeof(BtVal);
1951 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1953 key = keyptr(bt->frame, cnt);
1954 nxt -= key->len + sizeof(BtKey);
1955 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1957 // add librarian slot
1960 slotptr(set->page, ++idx)->off = nxt;
1961 slotptr(set->page, idx)->type = Librarian;
1962 slotptr(set->page, idx)->dead = 1;
1967 slotptr(set->page, ++idx)->off = nxt;
1968 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1972 bt_putid(set->page->right, right->latch->page_no);
1973 set->page->min = nxt;
1974 set->page->cnt = idx;
1976 return right->latch->entry;
1979 // fix keys for newly split page
1980 // call with page locked, return
1983 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
1985 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1986 unsigned char value[BtId];
1987 uint lvl = set->page->lvl;
1991 // if current page is the root page, split it
1993 if( set->latch->page_no == ROOT_page )
1994 return bt_splitroot (bt, set, right);
1996 ptr = keyptr(set->page, set->page->cnt);
1997 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1999 page = bt_mappage (bt, right);
2001 ptr = keyptr(page, page->cnt);
2002 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2004 // insert new fences in their parent pages
2006 bt_lockpage (BtLockParent, right);
2008 bt_lockpage (BtLockParent, set->latch);
2009 bt_unlockpage (BtLockWrite, set->latch);
2011 // insert new fence for reformulated left block of smaller keys
2013 bt_putid (value, set->latch->page_no);
2014 ptr = (BtKey *)leftkey;
2016 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2019 // switch fence for right block of larger keys to new right page
2021 bt_putid (value, right->page_no);
2022 ptr = (BtKey *)rightkey;
2024 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2027 bt_unlockpage (BtLockParent, set->latch);
2028 bt_unpinlatch (set->latch);
2030 bt_unlockpage (BtLockParent, right);
2031 bt_unpinlatch (right);
2035 // install new key and value onto page
2036 // page must already be checked for
2039 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2041 uint idx, librarian;
2046 // if found slot > desired slot and previous slot
2047 // is a librarian slot, use it
2050 if( slotptr(set->page, slot-1)->type == Librarian )
2053 // copy value onto page
2055 set->page->min -= vallen + sizeof(BtVal);
2056 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2057 memcpy (val->value, value, vallen);
2060 // copy key onto page
2062 set->page->min -= keylen + sizeof(BtKey);
2063 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2064 memcpy (ptr->key, key, keylen);
2067 // find first empty slot
2069 for( idx = slot; idx < set->page->cnt; idx++ )
2070 if( slotptr(set->page, idx)->dead )
2073 // now insert key into array before slot
2075 if( idx == set->page->cnt )
2076 idx += 2, set->page->cnt += 2, librarian = 2;
2080 set->latch->dirty = 1;
2083 while( idx > slot + librarian - 1 )
2084 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2086 // add librarian slot
2088 if( librarian > 1 ) {
2089 node = slotptr(set->page, slot++);
2090 node->off = set->page->min;
2091 node->type = Librarian;
2097 node = slotptr(set->page, slot);
2098 node->off = set->page->min;
2103 bt_unlockpage (BtLockWrite, set->latch);
2104 bt_unpinlatch (set->latch);
2110 // Insert new key into the btree at given level.
2111 // either add a new key or update/add an existing one
2113 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2115 unsigned char newkey[BT_keyarray];
2116 uint slot, idx, len, entry;
2123 // set up the key we're working on
2125 ins = (BtKey*)newkey;
2126 memcpy (ins->key, key, keylen);
2129 // is this a non-unique index value?
2135 sequence = bt_newdup (bt);
2136 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2140 while( 1 ) { // find the page and slot for the current key
2141 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2142 ptr = keyptr(set->page, slot);
2145 bt->err = BTERR_ovflw;
2149 // if librarian slot == found slot, advance to real slot
2151 if( slotptr(set->page, slot)->type == Librarian )
2152 if( !keycmp (ptr, key, keylen) )
2153 ptr = keyptr(set->page, ++slot);
2157 if( slotptr(set->page, slot)->type == Duplicate )
2160 // if inserting a duplicate key or unique key
2161 // check for adequate space on the page
2162 // and insert the new key before slot.
2164 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2165 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2166 if( !(entry = bt_splitpage (bt, set)) )
2168 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2173 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2176 // if key already exists, update value and return
2178 val = valptr(set->page, slot);
2180 if( val->len >= vallen ) {
2181 if( slotptr(set->page, slot)->dead )
2183 set->page->garbage += val->len - vallen;
2184 set->latch->dirty = 1;
2185 slotptr(set->page, slot)->dead = 0;
2187 memcpy (val->value, value, vallen);
2188 bt_unlockpage(BtLockWrite, set->latch);
2189 bt_unpinlatch (set->latch);
2193 // new update value doesn't fit in existing value area
2195 if( !slotptr(set->page, slot)->dead )
2196 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2198 slotptr(set->page, slot)->dead = 0;
2202 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2203 if( !(entry = bt_splitpage (bt, set)) )
2205 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2210 set->page->min -= vallen + sizeof(BtVal);
2211 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2212 memcpy (val->value, value, vallen);
2215 set->latch->dirty = 1;
2216 set->page->min -= keylen + sizeof(BtKey);
2217 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2218 memcpy (ptr->key, key, keylen);
2221 slotptr(set->page, slot)->off = set->page->min;
2222 bt_unlockpage(BtLockWrite, set->latch);
2223 bt_unpinlatch (set->latch);
2230 uint entry; // latch table entry number
2231 uint slot:31; // page slot number
2232 uint reuse:1; // reused previous page
2236 uid page_no; // page number for split leaf
2237 void *next; // next key to insert
2238 uint entry:29; // latch table entry number
2239 uint type:2; // 0 == insert, 1 == delete, 2 == free
2240 uint nounlock:1; // don't unlock ParentModification
2241 unsigned char leafkey[BT_keyarray];
2244 // find and load leaf page for given key
2245 // leave page Atomic locked and Read locked.
2247 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2249 BtLatchSet *prevlatch;
2253 // find level zero page
2255 if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2258 // find next non-dead entry on this page
2259 // it will be the fence key if nothing else
2261 while( slotptr(set->page, slot)->dead )
2262 if( slot++ < set->page->cnt )
2265 return bt->err = BTERR_struct, 0;
2267 page_no = bt_getid(valptr(set->page, slot)->value);
2268 prevlatch = set->latch;
2271 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2272 set->page = bt_mappage (bt, set->latch);
2276 if( set->page->free || set->page->lvl )
2277 return bt->err = BTERR_struct, 0;
2279 // obtain read lock using lock chaining with Access mode
2280 // release & unpin parent/left sibling page
2282 bt_lockpage(BtLockAccess, set->latch);
2284 bt_unlockpage(BtLockRead, prevlatch);
2285 bt_unpinlatch (prevlatch);
2287 bt_lockpage(BtLockRead, set->latch);
2288 bt_unlockpage(BtLockAccess, set->latch);
2290 // find key on page at this level
2291 // and descend to requested level
2293 if( !set->page->kill )
2294 if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2295 bt_unlockpage(BtLockRead, set->latch);
2296 bt_lockpage(BtLockAtomic, set->latch);
2297 bt_lockpage(BtLockAccess, set->latch);
2298 bt_lockpage(BtLockRead, set->latch);
2299 bt_unlockpage(BtLockAccess, set->latch);
2301 if( slot = bt_findslot (set->page, key, len) )
2304 bt_unlockpage(BtLockAtomic, set->latch);
2307 // slide right into next page
2309 page_no = bt_getid(set->page->right);
2310 prevlatch = set->latch;
2313 // return error on end of right chain
2315 bt->err = BTERR_struct;
2316 return 0; // return error
2319 // determine actual page where key is located
2320 // return slot number
2322 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2324 BtKey *key = keyptr(source,src);
2325 uint slot = locks[src].slot;
2328 if( src > 1 && locks[src].reuse )
2329 entry = locks[src-1].entry, slot = 0;
2331 entry = locks[src].entry;
2334 set->latch = bt->mgr->latchsets + entry;
2335 set->page = bt_mappage (bt, set->latch);
2339 // is locks->reuse set?
2340 // if so, find where our key
2341 // is located on previous page or split pages
2344 set->latch = bt->mgr->latchsets + entry;
2345 set->page = bt_mappage (bt, set->latch);
2347 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2348 if( locks[src].reuse )
2349 locks[src].entry = entry;
2352 } while( entry = set->latch->split );
2354 bt->err = BTERR_atomic;
2358 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2360 BtKey *key = keyptr(source, src);
2361 BtVal *val = valptr(source, src);
2366 while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2367 if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2368 return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2370 if( entry = bt_splitpage (bt, set) )
2371 latch = bt->mgr->latchsets + entry;
2375 // splice right page into split chain
2376 // and WriteLock it.
2378 latch->split = set->latch->split;
2379 set->latch->split = entry;
2380 bt_lockpage(BtLockWrite, latch);
2383 return bt->err = BTERR_atomic;
2386 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2388 BtKey *key = keyptr(source, src);
2389 uint idx, entry, slot;
2394 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2395 slotptr(set->page, slot)->dead = 1;
2397 return bt->err = BTERR_struct;
2399 ptr = keyptr(set->page, slot);
2400 val = valptr(set->page, slot);
2402 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2403 set->latch->dirty = 1;
2408 uint bt_parentmatch (AtomicKey *head, uint entry)
2413 do if( head->entry == entry )
2414 head->nounlock = 1, cnt++;
2415 while( head = head->next );
2420 // atomic modification of a batch of keys.
2422 // return -1 if BTERR is set
2423 // otherwise return slot number
2424 // causing the key constraint violation
2425 // or zero on successful completion.
2427 int bt_atomictxn (BtDb *bt, BtPage source)
2429 uint src, idx, slot, samepage, entry;
2430 AtomicKey *head, *tail, *leaf;
2431 BtPageSet set[1], prev[1];
2432 unsigned char value[BtId];
2433 BtKey *key, *ptr, *key2;
2443 locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2447 // stable sort the list of keys into order to
2448 // prevent deadlocks between threads.
2450 for( src = 1; src++ < source->cnt; ) {
2451 *temp = *slotptr(source,src);
2452 key = keyptr (source,src);
2454 for( idx = src; --idx; ) {
2455 key2 = keyptr (source,idx);
2456 if( keycmp (key, key2->key, key2->len) < 0 ) {
2457 *slotptr(source,idx+1) = *slotptr(source,idx);
2458 *slotptr(source,idx) = *temp;
2464 // Load the leaf page for each key
2465 // group same page references with reuse bit
2466 // and determine any constraint violations
2468 for( src = 0; src++ < source->cnt; ) {
2469 key = keyptr(source, src);
2472 // first determine if this modification falls
2473 // on the same page as the previous modification
2474 // note that the far right leaf page is a special case
2476 if( samepage = src > 1 )
2477 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2478 slot = bt_findslot(set->page, key->key, key->len);
2479 else // release read on previous page
2480 bt_unlockpage(BtLockRead, set->latch);
2483 if( slot = bt_atomicload(bt, set, key->key, key->len) )
2484 set->latch->split = 0;
2488 if( slotptr(set->page, slot)->type == Librarian )
2489 ptr = keyptr(set->page, ++slot);
2491 ptr = keyptr(set->page, slot);
2494 locks[src].entry = set->latch->entry;
2495 locks[src].slot = slot;
2496 locks[src].reuse = 0;
2498 locks[src].entry = 0;
2499 locks[src].slot = 0;
2500 locks[src].reuse = 1;
2503 switch( slotptr(source, src)->type ) {
2506 if( !slotptr(set->page, slot)->dead )
2507 if( slot < set->page->cnt || bt_getid (set->page->right) )
2508 if( !keycmp (ptr, key->key, key->len) ) {
2510 // return constraint violation if key already exists
2512 bt_unlockpage(BtLockRead, set->latch);
2516 if( locks[src].entry ) {
2517 set->latch = bt->mgr->latchsets + locks[src].entry;
2518 bt_unlockpage(BtLockAtomic, set->latch);
2519 bt_unpinlatch (set->latch);
2530 // unlock last loadpage lock
2532 if( source->cnt > 1 )
2533 bt_unlockpage(BtLockRead, set->latch);
2535 // obtain write lock for each master page
2537 for( src = 0; src++ < source->cnt; )
2538 if( locks[src].reuse )
2541 bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2543 // insert or delete each key
2544 // process any splits or merges
2545 // release Write & Atomic latches
2546 // set ParentModifications and build
2547 // queue of keys to insert for split pages
2548 // or delete for deleted pages.
2550 // run through txn list backwards
2552 samepage = source->cnt + 1;
2554 for( src = source->cnt; src; src-- ) {
2555 if( locks[src].reuse )
2558 // perform the txn operations
2559 // from smaller to larger on
2562 for( idx = src; idx < samepage; idx++ )
2563 switch( slotptr(source,idx)->type ) {
2565 if( bt_atomicdelete (bt, source, locks, idx) )
2571 if( bt_atomicinsert (bt, source, locks, idx) )
2576 // after the same page operations have finished,
2577 // process master page for splits or deletion.
2579 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2580 prev->page = bt_mappage (bt, prev->latch);
2583 // pick-up all splits from master page
2585 entry = prev->latch->split;
2588 set->latch = bt->mgr->latchsets + entry;
2589 set->page = bt_mappage (bt, set->latch);
2590 entry = set->latch->split;
2592 // delete empty master page
2594 if( !prev->page->act ) {
2595 memcpy (set->page->left, prev->page->left, BtId);
2596 memcpy (prev->page, set->page, bt->mgr->page_size);
2597 bt_lockpage (BtLockDelete, set->latch);
2598 bt_freepage (bt, set);
2599 bt_unpinlatch (set->latch);
2601 prev->latch->dirty = 1;
2605 // remove empty page from the split chain
2607 if( !set->page->act ) {
2608 memcpy (prev->page->right, set->page->right, BtId);
2609 prev->latch->split = set->latch->split;
2610 bt_lockpage (BtLockDelete, set->latch);
2611 bt_freepage (bt, set);
2612 bt_unpinlatch (set->latch);
2616 // schedule prev fence key update
2618 ptr = keyptr(prev->page,prev->page->cnt);
2619 leaf = calloc (sizeof(AtomicKey), 1);
2621 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2622 leaf->page_no = prev->latch->page_no;
2623 leaf->entry = prev->latch->entry;
2633 bt_putid (set->page->left, prev->latch->page_no);
2634 bt_lockpage(BtLockParent, prev->latch);
2635 bt_unlockpage(BtLockWrite, prev->latch);
2639 // update left pointer in next right page
2640 // if we did any now non-empty splits
2642 if( latch->split ) {
2643 // fix left pointer in master's original right sibling
2644 // or set rightmost page in page zero
2646 if( right = bt_getid (prev->page->right) ) {
2647 if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
2648 set->page = bt_mappage (bt, set->latch);
2652 bt_lockpage (BtLockWrite, set->latch);
2653 bt_putid (set->page->left, prev->latch->page_no);
2654 set->latch->dirty = 1;
2655 bt_unlockpage (BtLockWrite, set->latch);
2656 bt_unpinlatch (set->latch);
2658 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2660 // Process last page split in chain
2662 ptr = keyptr(prev->page,prev->page->cnt);
2663 leaf = calloc (sizeof(AtomicKey), 1);
2665 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2666 leaf->page_no = prev->latch->page_no;
2667 leaf->entry = prev->latch->entry;
2677 bt_lockpage(BtLockParent, prev->latch);
2678 bt_unlockpage(BtLockWrite, prev->latch);
2679 bt_unlockpage(BtLockAtomic, latch);
2683 // finished if prev page occupied (either master or final split)
2685 if( prev->page->act ) {
2686 bt_unlockpage(BtLockWrite, latch);
2687 bt_unlockpage(BtLockAtomic, latch);
2688 bt_unpinlatch(latch);
2692 // any splits were reversed, and the
2693 // master page located in prev is empty, delete it
2694 // by pulling over master's right sibling.
2695 // Delete empty master's fence
2697 ptr = keyptr(prev->page,prev->page->cnt);
2698 leaf = calloc (sizeof(AtomicKey), 1);
2700 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2701 leaf->page_no = prev->latch->page_no;
2702 leaf->entry = prev->latch->entry;
2712 bt_lockpage(BtLockParent, prev->latch);
2713 bt_unlockpage(BtLockWrite, prev->latch);
2715 // grab master's right sibling
2717 if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2718 set->page = bt_mappage (bt, set->latch);
2722 bt_lockpage(BtLockWrite, set->latch);
2724 // pull contents over empty page
2726 memcpy (set->page->left, prev->page->left, BtId);
2727 memcpy (prev->page, set->page, bt->mgr->page_size);
2729 bt_putid (set->page->right, prev->latch->page_no);
2730 set->latch->dirty = 1;
2731 set->page->kill = 1;
2733 // add new fence key for new master page contents, delete
2734 // right sibling after parent posts the new master fence.
2736 ptr = keyptr(set->page,set->page->cnt);
2737 leaf = calloc (sizeof(AtomicKey), 1);
2739 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2740 leaf->page_no = prev->latch->page_no;
2741 leaf->entry = set->latch->entry;
2744 // see if right sibling is not yet in the FIFO
2746 if( !bt_parentmatch (head, leaf->entry) )
2747 bt_lockpage(BtLockParent, set->latch);
2749 bt_unlockpage(BtLockWrite, set->latch);
2758 // fix master's far right sibling's left pointer
2760 if( right = bt_getid (set->page->right) ) {
2761 if( set->latch = bt_pinlatch (bt, right, 1) )
2762 set->page = bt_mappage (bt, set->latch);
2764 bt_lockpage (BtLockWrite, set->latch);
2765 bt_putid (set->page->left, prev->latch->page_no);
2766 set->latch->dirty = 1;
2768 bt_unlockpage (BtLockWrite, set->latch);
2769 bt_unpinlatch (set->latch);
2772 bt_unlockpage(BtLockAtomic, latch);
2775 // add & delete keys for any pages split or merged during transaction
2779 set->latch = bt->mgr->latchsets + leaf->entry;
2780 set->page = bt_mappage (bt, set->latch);
2782 bt_putid (value, leaf->page_no);
2783 ptr = (BtKey *)leaf->leafkey;
2785 switch( leaf->type ) {
2786 case 0: // insert key
2787 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2792 case 1: // delete key
2793 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2798 case 2: // insert key & free
2799 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2802 bt_lockpage (BtLockDelete, set->latch);
2803 bt_lockpage (BtLockWrite, set->latch);
2804 bt_freepage (bt, set);
2808 if( !leaf->nounlock )
2809 bt_unlockpage (BtLockParent, set->latch);
2811 bt_unpinlatch (set->latch);
2814 } while( leaf = tail );
2822 // set cursor to highest slot on highest page
2824 uint bt_lastkey (BtDb *bt)
2826 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2830 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2831 set->page = bt_mappage (bt, set->latch);
2835 bt_lockpage(BtLockRead, set->latch);
2837 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2838 slot = set->page->cnt;
2840 bt_unlockpage(BtLockRead, set->latch);
2841 bt_unpinlatch (set->latch);
2845 // return previous slot on cursor page
2847 uint bt_prevkey (BtDb *bt, uint slot)
2849 uid ourright, next, us = bt->cursor_page;
2855 ourright = bt_getid(bt->cursor->right);
2858 if( !(next = bt_getid(bt->cursor->left)) )
2862 bt->cursor_page = next;
2864 if( set->latch = bt_pinlatch (bt, next, 1) )
2865 set->page = bt_mappage (bt, set->latch);
2869 bt_lockpage(BtLockRead, set->latch);
2870 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2871 bt_unlockpage(BtLockRead, set->latch);
2872 bt_unpinlatch (set->latch);
2874 next = bt_getid (bt->cursor->right);
2877 if( next == ourright )
2882 return bt->cursor->cnt;
2885 // return next slot on cursor page
2886 // or slide cursor right into next page
2888 uint bt_nextkey (BtDb *bt, uint slot)
2894 right = bt_getid(bt->cursor->right);
2896 while( slot++ < bt->cursor->cnt )
2897 if( slotptr(bt->cursor,slot)->dead )
2899 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2907 bt->cursor_page = right;
2909 if( set->latch = bt_pinlatch (bt, right, 1) )
2910 set->page = bt_mappage (bt, set->latch);
2914 bt_lockpage(BtLockRead, set->latch);
2916 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2918 bt_unlockpage(BtLockRead, set->latch);
2919 bt_unpinlatch (set->latch);
2927 // cache page of keys into cursor and return starting slot for given key
2929 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2934 // cache page for retrieval
2936 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2937 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2941 bt->cursor_page = set->latch->page_no;
2943 bt_unlockpage(BtLockRead, set->latch);
2944 bt_unpinlatch (set->latch);
2948 BtKey *bt_key(BtDb *bt, uint slot)
2950 return keyptr(bt->cursor, slot);
2953 BtVal *bt_val(BtDb *bt, uint slot)
2955 return valptr(bt->cursor,slot);
2961 double getCpuTime(int type)
2964 FILETIME xittime[1];
2965 FILETIME systime[1];
2966 FILETIME usrtime[1];
2967 SYSTEMTIME timeconv[1];
2970 memset (timeconv, 0, sizeof(SYSTEMTIME));
2974 GetSystemTimeAsFileTime (xittime);
2975 FileTimeToSystemTime (xittime, timeconv);
2976 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2979 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2980 FileTimeToSystemTime (usrtime, timeconv);
2983 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2984 FileTimeToSystemTime (systime, timeconv);
2988 ans += (double)timeconv->wHour * 3600;
2989 ans += (double)timeconv->wMinute * 60;
2990 ans += (double)timeconv->wSecond;
2991 ans += (double)timeconv->wMilliseconds / 1000;
2996 #include <sys/resource.h>
2998 double getCpuTime(int type)
3000 struct rusage used[1];
3001 struct timeval tv[1];
3005 gettimeofday(tv, NULL);
3006 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3009 getrusage(RUSAGE_SELF, used);
3010 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3013 getrusage(RUSAGE_SELF, used);
3014 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3021 void bt_poolaudit (BtMgr *mgr)
3026 while( slot++ < mgr->latchdeployed ) {
3027 latch = mgr->latchsets + slot;
3029 if( *latch->readwr->rin & MASK )
3030 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3031 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3033 if( *latch->access->rin & MASK )
3034 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3035 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3037 if( *latch->parent->rin & MASK )
3038 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3039 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3041 if( latch->pin & ~CLOCK_bit ) {
3042 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3048 uint bt_latchaudit (BtDb *bt)
3050 ushort idx, hashidx;
3056 if( *(ushort *)(bt->mgr->lock) )
3057 fprintf(stderr, "Alloc page locked\n");
3058 *(ushort *)(bt->mgr->lock) = 0;
3060 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3061 latch = bt->mgr->latchsets + idx;
3062 if( *latch->readwr->rin & MASK )
3063 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3064 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3066 if( *latch->access->rin & MASK )
3067 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3068 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3070 if( *latch->parent->rin & MASK )
3071 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3072 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3075 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3080 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3081 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3082 fprintf(stderr, "hash entry %d locked\n", hashidx);
3084 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3086 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3087 latch = bt->mgr->latchsets + idx;
3089 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3090 } while( idx = latch->next );
3093 page_no = LEAF_page;
3095 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3096 uid off = page_no << bt->mgr->page_bits;
3098 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3102 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3104 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3105 return bt->err = BTERR_map;
3107 if( *amt < bt->mgr->page_size )
3108 return bt->err = BTERR_map;
3110 if( !bt->frame->free && !bt->frame->lvl )
3111 cnt += bt->frame->act;
3115 cnt--; // remove stopper key
3116 fprintf(stderr, " Total keys read %d\n", cnt);
3130 // standalone program to index file of keys
3131 // then list them onto std-out
3134 void *index_file (void *arg)
3136 uint __stdcall index_file (void *arg)
3139 int line = 0, found = 0, cnt = 0, unique;
3140 uid next, page_no = LEAF_page; // start on first page of leaves
3141 unsigned char key[BT_maxkey];
3142 unsigned char txn[65536];
3143 ThreadArg *args = arg;
3144 int ch, len = 0, slot;
3153 bt = bt_open (args->mgr);
3156 unique = (args->type[1] | 0x20) != 'd';
3158 switch(args->type[0] | 0x20)
3161 fprintf(stderr, "started latch mgr audit\n");
3162 cnt = bt_latchaudit (bt);
3163 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3167 fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
3168 if( in = fopen (args->infile, "rb") )
3169 while( ch = getc(in), ch != EOF )
3174 memcpy (txn + nxt, key + 10, len - 10);
3176 txn[nxt] = len - 10;
3178 memcpy (txn + nxt, key, 10);
3181 slotptr(page,++cnt)->off = nxt;
3182 slotptr(page,cnt)->type = Delete;
3185 if( cnt < args->num )
3192 if( bt_atomictxn (bt, page) )
3193 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3198 else if( len < BT_maxkey )
3200 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3204 fprintf(stderr, "started pennysort for %s\n", args->infile);
3205 if( in = fopen (args->infile, "rb") )
3206 while( ch = getc(in), ch != EOF )
3211 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
3212 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3215 else if( len < BT_maxkey )
3217 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3221 fprintf(stderr, "started indexing for %s\n", args->infile);
3222 if( in = fopen (args->infile, "r") )
3223 while( ch = getc(in), ch != EOF )
3228 if( args->num == 1 )
3229 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3231 else if( args->num )
3232 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3234 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
3235 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3238 else if( len < BT_maxkey )
3240 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3244 fprintf(stderr, "started deleting keys for %s\n", args->infile);
3245 if( in = fopen (args->infile, "rb") )
3246 while( ch = getc(in), ch != EOF )
3250 if( args->num == 1 )
3251 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3253 else if( args->num )
3254 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3256 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
3257 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
3258 ptr = (BtKey*)(bt->key);
3261 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
3262 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3265 else if( len < BT_maxkey )
3267 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3271 fprintf(stderr, "started finding keys for %s\n", args->infile);
3272 if( in = fopen (args->infile, "rb") )
3273 while( ch = getc(in), ch != EOF )
3277 if( args->num == 1 )
3278 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3280 else if( args->num )
3281 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3283 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3286 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3289 else if( len < BT_maxkey )
3291 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3295 fprintf(stderr, "started scanning\n");
3297 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3298 set->page = bt_mappage (bt, set->latch);
3300 fprintf(stderr, "unable to obtain latch"), exit(1);
3301 bt_lockpage (BtLockRead, set->latch);
3302 next = bt_getid (set->page->right);
3304 for( slot = 0; slot++ < set->page->cnt; )
3305 if( next || slot < set->page->cnt )
3306 if( !slotptr(set->page, slot)->dead ) {
3307 ptr = keyptr(set->page, slot);
3310 if( slotptr(set->page, slot)->type == Duplicate )
3313 fwrite (ptr->key, len, 1, stdout);
3314 val = valptr(set->page, slot);
3315 fwrite (val->value, val->len, 1, stdout);
3316 fputc ('\n', stdout);
3320 bt_unlockpage (BtLockRead, set->latch);
3321 bt_unpinlatch (set->latch);
3322 } while( page_no = next );
3324 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3329 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3331 fprintf(stderr, "started counting\n");
3332 page_no = LEAF_page;
3334 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3335 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3338 if( !bt->frame->free && !bt->frame->lvl )
3339 cnt += bt->frame->act;
3345 cnt--; // remove stopper key
3346 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3358 typedef struct timeval timer;
3360 int main (int argc, char **argv)
3362 int idx, cnt, len, slot, err;
3363 int segsize, bits = 16;
3380 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3381 fprintf (stderr, " where page_bits is the page size in bits\n");
3382 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3383 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
3384 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3388 start = getCpuTime(0);
3391 bits = atoi(argv[3]);
3394 poolsize = atoi(argv[4]);
3397 fprintf (stderr, "Warning: no mapped_pool\n");
3400 num = atoi(argv[5]);
3404 threads = malloc (cnt * sizeof(pthread_t));
3406 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3408 args = malloc (cnt * sizeof(ThreadArg));
3410 mgr = bt_mgr ((argv[1]), bits, poolsize);
3413 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3419 for( idx = 0; idx < cnt; idx++ ) {
3420 args[idx].infile = argv[idx + 6];
3421 args[idx].type = argv[2];
3422 args[idx].mgr = mgr;
3423 args[idx].num = num;
3424 args[idx].idx = idx;
3426 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3427 fprintf(stderr, "Error creating thread %d\n", err);
3429 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3433 // wait for termination
3436 for( idx = 0; idx < cnt; idx++ )
3437 pthread_join (threads[idx], NULL);
3439 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3441 for( idx = 0; idx < cnt; idx++ )
3442 CloseHandle(threads[idx]);
3448 elapsed = getCpuTime(0) - start;
3449 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3450 elapsed = getCpuTime(1);
3451 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3452 elapsed = getCpuTime(2);
3453 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);