1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // and ACID batched key-value updates
12 // author: karl malbrain, malbrain@cal.berkeley.edu
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
18 The orginal author is Karl Malbrain.
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
48 #define WIN32_LEAN_AND_MEAN
62 typedef unsigned long long uid;
65 typedef unsigned long long off64_t;
66 typedef unsigned short ushort;
67 typedef unsigned int uint;
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
78 // BTree page number constants
79 #define ALLOC_page 0 // allocation page
80 #define ROOT_page 1 // root of the btree
81 #define LEAF_page 2 // first page of leaves
83 // Number of levels to create in a new BTree
88 There are six lock types for each node in four independent sets:
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
106 // definition for phase-fair reader/writer lock implementation
120 // definition for spin latch implementation
122 // exclusive is set for write access
123 // share is count of read accessors
124 // grant write lock when share == 0
126 volatile typedef struct {
137 // hash table entries
140 volatile uint slot; // Latch table entry at head of chain
141 BtSpinLatch latch[1];
144 // latch manager table structure
147 uid page_no; // latch set page number
148 RWLock readwr[1]; // read/write page lock
149 RWLock access[1]; // Access Intent/Page delete
150 RWLock parent[1]; // Posting of fence key in parent
151 RWLock atomic[1]; // Atomic update in progress
152 uint split; // right split page atomic insert
153 uint entry; // entry slot in latch table
154 uint next; // next entry in hash table chain
155 uint prev; // prev entry in hash table chain
156 volatile ushort pin; // number of outstanding threads
157 ushort dirty:1; // page in cache is dirty
159 pthread_t atomictid; // pid holding atomic lock
161 uint atomictid; // pid holding atomic lock
165 // Define the length of the page record numbers
169 // Page key slot definition.
171 // Keys are marked dead, but remain on the page until
172 // it cleanup is called. The fence key (highest key) for
173 // a leaf page is always present, even after cleanup.
177 // In addition to the Unique keys that occupy slots
178 // there are Librarian and Duplicate key
179 // slots occupying the key slot array.
181 // The Librarian slots are dead keys that
182 // serve as filler, available to add new Unique
183 // or Dup slots that are inserted into the B-tree.
185 // The Duplicate slots have had their key bytes extended
186 // by 6 bytes to contain a binary duplicate key uniqueifier.
196 uint off:BT_maxbits; // page offset for key start
197 uint type:3; // type of slot
198 uint dead:1; // set for deleted slot
201 // The key structure occupies space at the upper end of
202 // each page. It's a length byte followed by the key
206 unsigned char len; // this can be changed to a ushort or uint
207 unsigned char key[0];
210 // the value structure also occupies space at the upper
211 // end of the page. Each key is immediately followed by a value.
214 unsigned char len; // this can be changed to a ushort or uint
215 unsigned char value[0];
218 #define BT_maxkey 255 // maximum number of bytes in a key
219 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
221 // The first part of an index page.
222 // It is immediately followed
223 // by the BtSlot array of keys.
225 // note that this structure size
226 // must be a multiple of 8 bytes
227 // in order to place dups correctly.
229 typedef struct BtPage_ {
230 uint cnt; // count of keys in page
231 uint act; // count of active keys
232 uint min; // next key offset
233 uint garbage; // page garbage in bytes
234 unsigned char bits:7; // page size in bits
235 unsigned char free:1; // page is on free chain
236 unsigned char lvl:7; // level of page
237 unsigned char kill:1; // page is being deleted
238 unsigned char right[BtId]; // page number to right
239 unsigned char left[BtId]; // page number to left
240 unsigned char filler[2]; // padding to multiple of 8
243 // The loadpage interface object
246 BtPage page; // current page pointer
247 BtLatchSet *latch; // current page latch set
250 // structure for latch manager on ALLOC_page
253 struct BtPage_ alloc[1]; // next page_no in right ptr
254 unsigned long long dups[1]; // global duplicate key uniqueifier
255 unsigned char chain[BtId]; // head of free page_nos chain
258 // The object structure for Btree access
261 uint page_size; // page size
262 uint page_bits; // page size in bits
268 BtPageZero *pagezero; // mapped allocation page
269 BtSpinLatch lock[1]; // allocation area lite latch
270 uint latchdeployed; // highest number of latch entries deployed
271 uint nlatchpage; // number of latch pages at BT_latch
272 uint latchtotal; // number of page latch entries
273 uint latchhash; // number of latch hash table slots
274 uint latchvictim; // next latch entry to examine
275 BtHashEntry *hashtable; // the buffer pool hash table entries
276 BtLatchSet *latchsets; // mapped latch set from buffer pool
277 unsigned char *pagepool; // mapped to the buffer pool pages
279 HANDLE halloc; // allocation handle
280 HANDLE hpool; // buffer pool handle
285 BtMgr *mgr; // buffer manager for thread
286 BtPage cursor; // cached frame for start/next (never mapped)
287 BtPage frame; // spare frame for the page split (never mapped)
288 uid cursor_page; // current cursor page number
289 unsigned char *mem; // frame, cursor, page memory buffer
290 unsigned char key[BT_keyarray]; // last found complete key
291 int found; // last delete or insert was found
292 int err; // last error
293 int reads, writes; // number of reads and writes from the btree
307 #define CLOCK_bit 0x8000
310 extern void bt_close (BtDb *bt);
311 extern BtDb *bt_open (BtMgr *mgr);
312 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
313 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
314 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
315 extern BtKey *bt_foundkey (BtDb *bt);
316 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
317 extern uint bt_nextkey (BtDb *bt, uint slot);
320 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
321 void bt_mgrclose (BtMgr *mgr);
323 // Helper functions to return slot values
324 // from the cursor page.
326 extern BtKey *bt_key (BtDb *bt, uint slot);
327 extern BtVal *bt_val (BtDb *bt, uint slot);
329 // The page is allocated from low and hi ends.
330 // The key slots are allocated from the bottom,
331 // while the text and value of the key
332 // are allocated from the top. When the two
333 // areas meet, the page is split into two.
335 // A key consists of a length byte, two bytes of
336 // index number (0 - 65535), and up to 253 bytes
339 // Associated with each key is a value byte string
340 // containing any value desired.
342 // The b-tree root is always located at page 1.
343 // The first leaf page of level zero is always
344 // located on page 2.
346 // The b-tree pages are linked with next
347 // pointers to facilitate enumerators,
348 // and provide for concurrency.
350 // When to root page fills, it is split in two and
351 // the tree height is raised by a new root at page
352 // one with two keys.
354 // Deleted keys are marked with a dead bit until
355 // page cleanup. The fence key for a leaf node is
358 // To achieve maximum concurrency one page is locked at a time
359 // as the tree is traversed to find leaf key in question. The right
360 // page numbers are used in cases where the page is being split,
363 // Page 0 is dedicated to lock for new page extensions,
364 // and chains empty pages together for reuse. It also
365 // contains the latch manager hash table.
367 // The ParentModification lock on a node is obtained to serialize posting
368 // or changing the fence key for a node.
370 // Empty pages are chained together through the ALLOC page and reused.
372 // Access macros to address slot and key values from the page
373 // Page slots use 1 based indexing.
375 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
376 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
377 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
379 void bt_putid(unsigned char *dest, uid id)
384 dest[i] = (unsigned char)id, id >>= 8;
387 uid bt_getid(unsigned char *src)
392 for( i = 0; i < BtId; i++ )
393 id <<= 8, id |= *src++;
398 uid bt_newdup (BtDb *bt)
401 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
403 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
407 // Phase-Fair reader/writer lock implementation
409 void WriteLock (RWLock *lock)
414 tix = __sync_fetch_and_add (lock->ticket, 1);
416 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
418 // wait for our ticket to come up
420 while( tix != lock->serving[0] )
427 w = PRES | (tix & PHID);
429 r = __sync_fetch_and_add (lock->rin, w);
431 r = _InterlockedExchangeAdd16 (lock->rin, w);
433 while( r != *lock->rout )
441 void WriteRelease (RWLock *lock)
444 __sync_fetch_and_and (lock->rin, ~MASK);
446 _InterlockedAnd16 (lock->rin, ~MASK);
451 void ReadLock (RWLock *lock)
455 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
457 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
460 while( w == (*lock->rin & MASK) )
468 void ReadRelease (RWLock *lock)
471 __sync_fetch_and_add (lock->rout, RINC);
473 _InterlockedExchangeAdd16 (lock->rout, RINC);
477 // Spin Latch Manager
479 // wait until write lock mode is clear
480 // and add 1 to the share count
482 void bt_spinreadlock(BtSpinLatch *latch)
488 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
490 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
492 // see if exclusive request is granted or pending
497 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
499 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
502 } while( sched_yield(), 1 );
504 } while( SwitchToThread(), 1 );
508 // wait for other read and write latches to relinquish
510 void bt_spinwritelock(BtSpinLatch *latch)
516 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
518 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
521 if( !(prev & ~BOTH) )
525 __sync_fetch_and_and ((ushort *)latch, ~XCL);
527 _InterlockedAnd16((ushort *)latch, ~XCL);
530 } while( sched_yield(), 1 );
532 } while( SwitchToThread(), 1 );
536 // try to obtain write lock
538 // return 1 if obtained,
541 int bt_spinwritetry(BtSpinLatch *latch)
546 prev = __sync_fetch_and_or((ushort *)latch, XCL);
548 prev = _InterlockedOr16((ushort *)latch, XCL);
550 // take write access if all bits are clear
553 if( !(prev & ~BOTH) )
557 __sync_fetch_and_and ((ushort *)latch, ~XCL);
559 _InterlockedAnd16((ushort *)latch, ~XCL);
566 void bt_spinreleasewrite(BtSpinLatch *latch)
569 __sync_fetch_and_and((ushort *)latch, ~BOTH);
571 _InterlockedAnd16((ushort *)latch, ~BOTH);
575 // decrement reader count
577 void bt_spinreleaseread(BtSpinLatch *latch)
580 __sync_fetch_and_add((ushort *)latch, -SHARE);
582 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
586 // read page from permanent location in Btree file
588 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
590 off64_t off = page_no << mgr->page_bits;
593 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
594 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
601 memset (ovl, 0, sizeof(OVERLAPPED));
603 ovl->OffsetHigh = off >> 32;
605 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
606 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
609 if( *amt < mgr->page_size ) {
610 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
617 // write page to permanent location in Btree file
618 // clear the dirty bit
620 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
622 off64_t off = page_no << mgr->page_bits;
625 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
631 memset (ovl, 0, sizeof(OVERLAPPED));
633 ovl->OffsetHigh = off >> 32;
635 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
638 if( *amt < mgr->page_size )
644 // link latch table entry into head of latch hash table
646 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
648 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
649 BtLatchSet *latch = bt->mgr->latchsets + slot;
651 if( latch->next = bt->mgr->hashtable[hashidx].slot )
652 bt->mgr->latchsets[latch->next].prev = slot;
654 bt->mgr->hashtable[hashidx].slot = slot;
655 memset (&latch->atomictid, 0, sizeof(latch->atomictid));
656 latch->page_no = page_no;
663 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
671 // set CLOCK bit in latch
672 // decrement pin count
674 void bt_unpinlatch (BtLatchSet *latch)
677 if( ~latch->pin & CLOCK_bit )
678 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
679 __sync_fetch_and_add(&latch->pin, -1);
681 if( ~latch->pin & CLOCK_bit )
682 _InterlockedOr16 (&latch->pin, CLOCK_bit);
683 _InterlockedDecrement16 (&latch->pin);
687 // return the btree cached page address
689 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
691 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
696 // find existing latchset or inspire new one
697 // return with latchset pinned
699 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
701 uint hashidx = page_no % bt->mgr->latchhash;
709 // try to find our entry
711 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
713 if( slot = bt->mgr->hashtable[hashidx].slot ) do
715 latch = bt->mgr->latchsets + slot;
716 if( page_no == latch->page_no )
718 } while( slot = latch->next );
724 latch = bt->mgr->latchsets + slot;
726 __sync_fetch_and_add(&latch->pin, 1);
728 _InterlockedIncrement16 (&latch->pin);
730 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
734 // see if there are any unused pool entries
736 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
738 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
741 if( slot < bt->mgr->latchtotal ) {
742 latch = bt->mgr->latchsets + slot;
743 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
745 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
750 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
752 _InterlockedDecrement (&bt->mgr->latchdeployed);
754 // find and reuse previous entry on victim
758 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
760 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
762 // try to get write lock on hash chain
763 // skip entry if not obtained
764 // or has outstanding pins
766 slot %= bt->mgr->latchtotal;
771 latch = bt->mgr->latchsets + slot;
772 idx = latch->page_no % bt->mgr->latchhash;
774 // see we are on same chain as hashidx
779 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
782 // skip this slot if it is pinned
783 // or the CLOCK bit is set
786 if( latch->pin & CLOCK_bit ) {
788 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
790 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
793 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
797 // update permanent page area in btree from buffer pool
799 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
802 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
805 latch->dirty = 0, bt->writes++;
807 // unlink our available slot from its hash chain
810 bt->mgr->latchsets[latch->prev].next = latch->next;
812 bt->mgr->hashtable[idx].slot = latch->next;
815 bt->mgr->latchsets[latch->next].prev = latch->prev;
817 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
819 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
822 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
827 void bt_mgrclose (BtMgr *mgr)
834 // flush dirty pool pages to the btree
836 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
837 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
838 latch = mgr->latchsets + slot;
841 bt_writepage(mgr, page, latch->page_no);
842 latch->dirty = 0, num++;
844 // madvise (page, mgr->page_size, MADV_DONTNEED);
847 fprintf(stderr, "%d buffer pool pages flushed\n", num);
850 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
851 munmap (mgr->pagezero, mgr->page_size);
853 FlushViewOfFile(mgr->pagezero, 0);
854 UnmapViewOfFile(mgr->pagezero);
855 UnmapViewOfFile(mgr->hashtable);
856 CloseHandle(mgr->halloc);
857 CloseHandle(mgr->hpool);
863 FlushFileBuffers(mgr->idx);
864 CloseHandle(mgr->idx);
869 // close and release memory
871 void bt_close (BtDb *bt)
878 VirtualFree (bt->mem, 0, MEM_RELEASE);
883 // open/create new btree buffer manager
885 // call with file_name, BT_openmode, bits in page size (e.g. 16),
886 // size of page pool (e.g. 262144)
888 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
890 uint lvl, attr, last, slot, idx;
891 uint nlatchpage, latchhash;
892 unsigned char value[BtId];
893 int flag, initit = 0;
894 BtPageZero *pagezero;
901 // determine sanity of page size and buffer pool
903 if( bits > BT_maxbits )
905 else if( bits < BT_minbits )
909 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
914 mgr = calloc (1, sizeof(BtMgr));
916 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
918 if( mgr->idx == -1 ) {
919 fprintf (stderr, "Unable to open btree file\n");
920 return free(mgr), NULL;
923 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
924 attr = FILE_ATTRIBUTE_NORMAL;
925 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
927 if( mgr->idx == INVALID_HANDLE_VALUE )
928 return GlobalFree(mgr), NULL;
932 pagezero = valloc (BT_maxpage);
935 // read minimum page size to get root info
936 // to support raw disk partition files
937 // check if bits == 0 on the disk.
939 if( size = lseek (mgr->idx, 0L, 2) )
940 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
941 if( pagezero->alloc->bits )
942 bits = pagezero->alloc->bits;
946 return free(mgr), free(pagezero), NULL;
950 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
951 size = GetFileSize(mgr->idx, amt);
954 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
955 return bt_mgrclose (mgr), NULL;
956 bits = pagezero->alloc->bits;
961 mgr->page_size = 1 << bits;
962 mgr->page_bits = bits;
964 // calculate number of latch hash table entries
966 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
967 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
969 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
970 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
971 mgr->latchtotal = nodemax;
976 // initialize an empty b-tree with latch page, root page, page of leaves
977 // and page(s) of latches and page pool cache
979 memset (pagezero, 0, 1 << bits);
980 pagezero->alloc->bits = mgr->page_bits;
981 bt_putid(pagezero->alloc->right, MIN_lvl+1);
983 // initialize left-most LEAF page in
986 bt_putid (pagezero->alloc->left, LEAF_page);
988 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
989 fprintf (stderr, "Unable to create btree page zero\n");
990 return bt_mgrclose (mgr), NULL;
993 memset (pagezero, 0, 1 << bits);
994 pagezero->alloc->bits = mgr->page_bits;
996 for( lvl=MIN_lvl; lvl--; ) {
997 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
998 key = keyptr(pagezero->alloc, 1);
999 key->len = 2; // create stopper key
1003 bt_putid(value, MIN_lvl - lvl + 1);
1004 val = valptr(pagezero->alloc, 1);
1005 val->len = lvl ? BtId : 0;
1006 memcpy (val->value, value, val->len);
1008 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1009 pagezero->alloc->lvl = lvl;
1010 pagezero->alloc->cnt = 1;
1011 pagezero->alloc->act = 1;
1013 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1014 fprintf (stderr, "Unable to create btree page zero\n");
1015 return bt_mgrclose (mgr), NULL;
1023 VirtualFree (pagezero, 0, MEM_RELEASE);
1026 // mlock the pagezero page
1028 flag = PROT_READ | PROT_WRITE;
1029 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1030 if( mgr->pagezero == MAP_FAILED ) {
1031 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1032 return bt_mgrclose (mgr), NULL;
1034 mlock (mgr->pagezero, mgr->page_size);
1036 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1037 if( mgr->hashtable == MAP_FAILED ) {
1038 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1039 return bt_mgrclose (mgr), NULL;
1042 flag = PAGE_READWRITE;
1043 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1044 if( !mgr->halloc ) {
1045 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1046 return bt_mgrclose (mgr), NULL;
1049 flag = FILE_MAP_WRITE;
1050 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1051 if( !mgr->pagezero ) {
1052 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1053 return bt_mgrclose (mgr), NULL;
1056 flag = PAGE_READWRITE;
1057 size = (uid)mgr->nlatchpage << mgr->page_bits;
1058 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1060 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1061 return bt_mgrclose (mgr), NULL;
1064 flag = FILE_MAP_WRITE;
1065 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1066 if( !mgr->hashtable ) {
1067 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1068 return bt_mgrclose (mgr), NULL;
1072 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1073 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1078 // open BTree access method
1079 // based on buffer manager
1081 BtDb *bt_open (BtMgr *mgr)
1083 BtDb *bt = malloc (sizeof(*bt));
1085 memset (bt, 0, sizeof(*bt));
1088 bt->mem = valloc (2 *mgr->page_size);
1090 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1092 bt->frame = (BtPage)bt->mem;
1093 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1097 // compare two keys, return > 0, = 0, or < 0
1098 // =0: keys are same
1101 // as the comparison value
1103 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1105 uint len1 = key1->len;
1108 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1119 // place write, read, or parent lock on requested page_no.
1121 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1125 ReadLock (latch->readwr);
1128 WriteLock (latch->readwr);
1131 ReadLock (latch->access);
1134 WriteLock (latch->access);
1137 WriteLock (latch->parent);
1140 WriteLock (latch->atomic);
1142 case BtLockAtomic | BtLockRead:
1143 WriteLock (latch->atomic);
1144 ReadLock (latch->readwr);
1149 // remove write, read, or parent lock on requested page
1151 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1155 ReadRelease (latch->readwr);
1158 WriteRelease (latch->readwr);
1161 ReadRelease (latch->access);
1164 WriteRelease (latch->access);
1167 WriteRelease (latch->parent);
1170 memset (&latch->atomictid, 0, sizeof(latch->atomictid));
1171 WriteRelease (latch->atomic);
1173 case BtLockAtomic | BtLockRead:
1174 ReadRelease (latch->readwr);
1175 memset (&latch->atomictid, 0, sizeof(latch->atomictid));
1176 WriteRelease (latch->atomic);
1181 // allocate a new page
1182 // return with page latched, but unlocked.
1184 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1189 // lock allocation page
1191 bt_spinwritelock(bt->mgr->lock);
1193 // use empty chain first
1194 // else allocate empty page
1196 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1197 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1198 set->page = bt_mappage (bt, set->latch);
1200 return bt->err = BTERR_struct, -1;
1202 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1203 bt_spinreleasewrite(bt->mgr->lock);
1205 memcpy (set->page, contents, bt->mgr->page_size);
1206 set->latch->dirty = 1;
1210 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1211 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1213 // unlock allocation latch
1215 bt_spinreleasewrite(bt->mgr->lock);
1217 // don't load cache from btree page
1219 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1220 set->page = bt_mappage (bt, set->latch);
1222 return bt->err = BTERR_struct;
1224 memcpy (set->page, contents, bt->mgr->page_size);
1225 set->latch->dirty = 1;
1229 // find slot in page for given key at a given level
1231 int bt_findslot (BtPage page, unsigned char *key, uint len)
1233 uint diff, higher = page->cnt, low = 1, slot;
1236 // make stopper key an infinite fence value
1238 if( bt_getid (page->right) )
1243 // low is the lowest candidate.
1244 // loop ends when they meet
1246 // higher is already
1247 // tested as .ge. the passed key.
1249 while( diff = higher - low ) {
1250 slot = low + ( diff >> 1 );
1251 if( keycmp (keyptr(page, slot), key, len) < 0 )
1254 higher = slot, good++;
1257 // return zero if key is on right link page
1259 return good ? higher : 0;
1262 // find and load page at given level for given key
1263 // leave page rd or wr locked as requested
1265 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1267 uid page_no = ROOT_page, prevpage = 0;
1268 uint drill = 0xff, slot;
1269 BtLatchSet *prevlatch;
1270 uint mode, prevmode;
1272 // start at root of btree and drill down
1275 // determine lock mode of drill level
1276 mode = (drill == lvl) ? lock : BtLockRead;
1278 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1281 // obtain access lock using lock chaining with Access mode
1283 if( page_no > ROOT_page )
1284 bt_lockpage(BtLockAccess, set->latch);
1286 set->page = bt_mappage (bt, set->latch);
1288 // release & unpin parent or left sibling page
1291 bt_unlockpage(prevmode, prevlatch);
1292 bt_unpinlatch (prevlatch);
1296 // skip Atomic lock on leaf page we need to slide over
1299 if( mode & BtLockAtomic )
1300 if( pthread_equal (set->latch->atomictid, pthread_self()) )
1301 mode &= ~BtLockAtomic;
1303 // obtain mode lock using lock chaining through AccessLock
1305 bt_lockpage(mode, set->latch);
1307 if( mode & BtLockAtomic )
1308 set->latch->atomictid = pthread_self();
1310 if( set->page->free )
1311 return bt->err = BTERR_struct, 0;
1313 if( page_no > ROOT_page )
1314 bt_unlockpage(BtLockAccess, set->latch);
1316 // re-read and re-lock root after determining actual level of root
1318 if( set->page->lvl != drill) {
1319 if( set->latch->page_no != ROOT_page )
1320 return bt->err = BTERR_struct, 0;
1322 drill = set->page->lvl;
1324 if( lock != BtLockRead && drill == lvl ) {
1325 bt_unlockpage(mode, set->latch);
1326 bt_unpinlatch (set->latch);
1331 prevpage = set->latch->page_no;
1332 prevlatch = set->latch;
1335 // find key on page at this level
1336 // and descend to requested level
1338 if( set->page->kill )
1341 if( slot = bt_findslot (set->page, key, len) ) {
1345 while( slotptr(set->page, slot)->dead )
1346 if( slot++ < set->page->cnt )
1351 page_no = bt_getid(valptr(set->page, slot)->value);
1356 // or slide right into next page
1359 page_no = bt_getid(set->page->right);
1363 // return error on end of right chain
1365 bt->err = BTERR_struct;
1366 return 0; // return error
1369 // return page to free list
1370 // page must be delete & write locked
1372 void bt_freepage (BtDb *bt, BtPageSet *set)
1374 // lock allocation page
1376 bt_spinwritelock (bt->mgr->lock);
1380 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1381 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1382 set->latch->dirty = 1;
1383 set->page->free = 1;
1385 // unlock released page
1387 bt_unlockpage (BtLockDelete, set->latch);
1388 bt_unlockpage (BtLockWrite, set->latch);
1390 // unlock allocation page
1392 bt_spinreleasewrite (bt->mgr->lock);
1395 // a fence key was deleted from a page
1396 // push new fence value upwards
1398 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1400 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1401 unsigned char value[BtId];
1405 // remove the old fence value
1407 ptr = keyptr(set->page, set->page->cnt);
1408 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1409 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1410 set->latch->dirty = 1;
1412 // cache new fence value
1414 ptr = keyptr(set->page, set->page->cnt);
1415 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1417 bt_lockpage (BtLockParent, set->latch);
1418 bt_unlockpage (BtLockWrite, set->latch);
1420 // insert new (now smaller) fence key
1422 bt_putid (value, set->latch->page_no);
1423 ptr = (BtKey*)leftkey;
1425 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1428 // now delete old fence key
1430 ptr = (BtKey*)rightkey;
1432 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1435 bt_unlockpage (BtLockParent, set->latch);
1436 bt_unpinlatch(set->latch);
1440 // root has a single child
1441 // collapse a level from the tree
1443 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1449 // find the child entry and promote as new root contents
1452 for( idx = 0; idx++ < root->page->cnt; )
1453 if( !slotptr(root->page, idx)->dead )
1456 page_no = bt_getid (valptr(root->page, idx)->value);
1458 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1459 child->page = bt_mappage (bt, child->latch);
1463 bt_lockpage (BtLockDelete, child->latch);
1464 bt_lockpage (BtLockWrite, child->latch);
1466 memcpy (root->page, child->page, bt->mgr->page_size);
1467 root->latch->dirty = 1;
1469 bt_freepage (bt, child);
1470 bt_unpinlatch (child->latch);
1472 } while( root->page->lvl > 1 && root->page->act == 1 );
1474 bt_unlockpage (BtLockWrite, root->latch);
1475 bt_unpinlatch (root->latch);
1479 // delete a page and manage keys
1480 // call with page writelocked
1481 // returns with page unpinned
1483 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1485 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1486 unsigned char value[BtId];
1487 uint lvl = set->page->lvl;
1492 // cache copy of fence key
1493 // to post in parent
1495 ptr = keyptr(set->page, set->page->cnt);
1496 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1498 // obtain lock on right page
1500 page_no = bt_getid(set->page->right);
1502 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1503 right->page = bt_mappage (bt, right->latch);
1507 bt_lockpage (BtLockWrite, right->latch);
1509 // cache copy of key to update
1511 ptr = keyptr(right->page, right->page->cnt);
1512 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1514 if( right->page->kill )
1515 return bt->err = BTERR_struct;
1517 // pull contents of right peer into our empty page
1519 memcpy (set->page, right->page, bt->mgr->page_size);
1520 set->latch->dirty = 1;
1522 // mark right page deleted and point it to left page
1523 // until we can post parent updates that remove access
1524 // to the deleted page.
1526 bt_putid (right->page->right, set->latch->page_no);
1527 right->latch->dirty = 1;
1528 right->page->kill = 1;
1530 bt_lockpage (BtLockParent, right->latch);
1531 bt_unlockpage (BtLockWrite, right->latch);
1533 bt_lockpage (BtLockParent, set->latch);
1534 bt_unlockpage (BtLockWrite, set->latch);
1536 // redirect higher key directly to our new node contents
1538 bt_putid (value, set->latch->page_no);
1539 ptr = (BtKey*)higherfence;
1541 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1544 // delete old lower key to our node
1546 ptr = (BtKey*)lowerfence;
1548 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1551 // obtain delete and write locks to right node
1553 bt_unlockpage (BtLockParent, right->latch);
1554 bt_lockpage (BtLockDelete, right->latch);
1555 bt_lockpage (BtLockWrite, right->latch);
1556 bt_freepage (bt, right);
1557 bt_unpinlatch (right->latch);
1559 bt_unlockpage (BtLockParent, set->latch);
1560 bt_unpinlatch (set->latch);
1565 // find and delete key on page by marking delete flag bit
1566 // if page becomes empty, delete it from the btree
1568 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1570 uint slot, idx, found, fence;
1575 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1576 ptr = keyptr(set->page, slot);
1580 // if librarian slot, advance to real slot
1582 if( slotptr(set->page, slot)->type == Librarian )
1583 ptr = keyptr(set->page, ++slot);
1585 fence = slot == set->page->cnt;
1587 // if key is found delete it, otherwise ignore request
1589 if( found = !keycmp (ptr, key, len) )
1590 if( found = slotptr(set->page, slot)->dead == 0 ) {
1591 val = valptr(set->page,slot);
1592 slotptr(set->page, slot)->dead = 1;
1593 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1596 // collapse empty slots beneath the fence
1598 while( idx = set->page->cnt - 1 )
1599 if( slotptr(set->page, idx)->dead ) {
1600 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1601 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1606 // did we delete a fence key in an upper level?
1608 if( found && lvl && set->page->act && fence )
1609 if( bt_fixfence (bt, set, lvl) )
1612 return bt->found = found, 0;
1614 // do we need to collapse root?
1616 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1617 if( bt_collapseroot (bt, set) )
1620 return bt->found = found, 0;
1622 // delete empty page
1624 if( !set->page->act )
1625 return bt_deletepage (bt, set);
1627 set->latch->dirty = 1;
1628 bt_unlockpage(BtLockWrite, set->latch);
1629 bt_unpinlatch (set->latch);
1630 return bt->found = found, 0;
1633 BtKey *bt_foundkey (BtDb *bt)
1635 return (BtKey*)(bt->key);
1638 // advance to next slot
1640 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1642 BtLatchSet *prevlatch;
1645 if( slot < set->page->cnt )
1648 prevlatch = set->latch;
1650 if( page_no = bt_getid(set->page->right) )
1651 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1652 set->page = bt_mappage (bt, set->latch);
1656 return bt->err = BTERR_struct, 0;
1658 // obtain access lock using lock chaining with Access mode
1660 bt_lockpage(BtLockAccess, set->latch);
1662 bt_unlockpage(BtLockRead, prevlatch);
1663 bt_unpinlatch (prevlatch);
1665 bt_lockpage(BtLockRead, set->latch);
1666 bt_unlockpage(BtLockAccess, set->latch);
1670 // find unique key or first duplicate key in
1671 // leaf level and return number of value bytes
1672 // or (-1) if not found. Setup key for bt_foundkey
1674 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1682 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1684 ptr = keyptr(set->page, slot);
1686 // skip librarian slot place holder
1688 if( slotptr(set->page, slot)->type == Librarian )
1689 ptr = keyptr(set->page, ++slot);
1691 // return actual key found
1693 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1696 if( slotptr(set->page, slot)->type == Duplicate )
1699 // not there if we reach the stopper key
1701 if( slot == set->page->cnt )
1702 if( !bt_getid (set->page->right) )
1705 // if key exists, return >= 0 value bytes copied
1706 // otherwise return (-1)
1708 if( slotptr(set->page, slot)->dead )
1712 if( !memcmp (ptr->key, key, len) ) {
1713 val = valptr (set->page,slot);
1714 if( valmax > val->len )
1716 memcpy (value, val->value, valmax);
1722 } while( slot = bt_findnext (bt, set, slot) );
1724 bt_unlockpage (BtLockRead, set->latch);
1725 bt_unpinlatch (set->latch);
1729 // check page for space available,
1730 // clean if necessary and return
1731 // 0 - page needs splitting
1732 // >0 new slot value
1734 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1736 uint nxt = bt->mgr->page_size;
1737 BtPage page = set->page;
1738 uint cnt = 0, idx = 0;
1739 uint max = page->cnt;
1744 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1747 // skip cleanup and proceed to split
1748 // if there's not enough garbage
1751 if( page->garbage < nxt / 5 )
1754 memcpy (bt->frame, page, bt->mgr->page_size);
1756 // skip page info and set rest of page to zero
1758 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1759 set->latch->dirty = 1;
1763 // clean up page first by
1764 // removing deleted keys
1766 while( cnt++ < max ) {
1769 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1772 // copy the value across
1774 val = valptr(bt->frame, cnt);
1775 nxt -= val->len + sizeof(BtVal);
1776 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1778 // copy the key across
1780 key = keyptr(bt->frame, cnt);
1781 nxt -= key->len + sizeof(BtKey);
1782 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1784 // make a librarian slot
1787 slotptr(page, ++idx)->off = nxt;
1788 slotptr(page, idx)->type = Librarian;
1789 slotptr(page, idx)->dead = 1;
1794 slotptr(page, ++idx)->off = nxt;
1795 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1797 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1804 // see if page has enough space now, or does it need splitting?
1806 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1812 // split the root and raise the height of the btree
1814 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1816 unsigned char leftkey[BT_keyarray];
1817 uint nxt = bt->mgr->page_size;
1818 unsigned char value[BtId];
1824 // save left page fence key for new root
1826 ptr = keyptr(root->page, root->page->cnt);
1827 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1829 // Obtain an empty page to use, and copy the current
1830 // root contents into it, e.g. lower keys
1832 if( bt_newpage(bt, left, root->page) )
1835 left_page_no = left->latch->page_no;
1836 bt_unpinlatch (left->latch);
1838 // preserve the page info at the bottom
1839 // of higher keys and set rest to zero
1841 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1843 // insert stopper key at top of newroot page
1844 // and increase the root height
1846 nxt -= BtId + sizeof(BtVal);
1847 bt_putid (value, right->page_no);
1848 val = (BtVal *)((unsigned char *)root->page + nxt);
1849 memcpy (val->value, value, BtId);
1852 nxt -= 2 + sizeof(BtKey);
1853 slotptr(root->page, 2)->off = nxt;
1854 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1859 // insert lower keys page fence key on newroot page as first key
1861 nxt -= BtId + sizeof(BtVal);
1862 bt_putid (value, left_page_no);
1863 val = (BtVal *)((unsigned char *)root->page + nxt);
1864 memcpy (val->value, value, BtId);
1867 ptr = (BtKey *)leftkey;
1868 nxt -= ptr->len + sizeof(BtKey);
1869 slotptr(root->page, 1)->off = nxt;
1870 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1872 bt_putid(root->page->right, 0);
1873 root->page->min = nxt; // reset lowest used offset and key count
1874 root->page->cnt = 2;
1875 root->page->act = 2;
1878 // release and unpin root pages
1880 bt_unlockpage(BtLockWrite, root->latch);
1881 bt_unpinlatch (root->latch);
1883 bt_unpinlatch (right);
1887 // split already locked full node
1889 // return pool entry for new right
1892 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1894 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1895 uint lvl = set->page->lvl;
1902 // split higher half of keys to bt->frame
1904 memset (bt->frame, 0, bt->mgr->page_size);
1905 max = set->page->cnt;
1909 while( cnt++ < max ) {
1910 if( slotptr(set->page, cnt)->dead && cnt < max )
1912 src = valptr(set->page, cnt);
1913 nxt -= src->len + sizeof(BtVal);
1914 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1916 key = keyptr(set->page, cnt);
1917 nxt -= key->len + sizeof(BtKey);
1918 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1919 memcpy (ptr, key, key->len + sizeof(BtKey));
1921 // add librarian slot
1924 slotptr(bt->frame, ++idx)->off = nxt;
1925 slotptr(bt->frame, idx)->type = Librarian;
1926 slotptr(bt->frame, idx)->dead = 1;
1931 slotptr(bt->frame, ++idx)->off = nxt;
1932 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1934 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1938 bt->frame->bits = bt->mgr->page_bits;
1939 bt->frame->min = nxt;
1940 bt->frame->cnt = idx;
1941 bt->frame->lvl = lvl;
1945 if( set->latch->page_no > ROOT_page )
1946 bt_putid (bt->frame->right, bt_getid (set->page->right));
1948 // get new free page and write higher keys to it.
1950 if( bt_newpage(bt, right, bt->frame) )
1953 memcpy (bt->frame, set->page, bt->mgr->page_size);
1954 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1955 set->latch->dirty = 1;
1957 nxt = bt->mgr->page_size;
1958 set->page->garbage = 0;
1964 if( slotptr(bt->frame, max)->type == Librarian )
1967 // assemble page of smaller keys
1969 while( cnt++ < max ) {
1970 if( slotptr(bt->frame, cnt)->dead )
1972 val = valptr(bt->frame, cnt);
1973 nxt -= val->len + sizeof(BtVal);
1974 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1976 key = keyptr(bt->frame, cnt);
1977 nxt -= key->len + sizeof(BtKey);
1978 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1980 // add librarian slot
1983 slotptr(set->page, ++idx)->off = nxt;
1984 slotptr(set->page, idx)->type = Librarian;
1985 slotptr(set->page, idx)->dead = 1;
1990 slotptr(set->page, ++idx)->off = nxt;
1991 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1995 bt_putid(set->page->right, right->latch->page_no);
1996 set->page->min = nxt;
1997 set->page->cnt = idx;
1999 return right->latch->entry;
2002 // fix keys for newly split page
2003 // call with page locked, return
2006 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2008 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2009 unsigned char value[BtId];
2010 uint lvl = set->page->lvl;
2014 // if current page is the root page, split it
2016 if( set->latch->page_no == ROOT_page )
2017 return bt_splitroot (bt, set, right);
2019 ptr = keyptr(set->page, set->page->cnt);
2020 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2022 page = bt_mappage (bt, right);
2024 ptr = keyptr(page, page->cnt);
2025 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2027 // insert new fences in their parent pages
2029 bt_lockpage (BtLockParent, right);
2031 bt_lockpage (BtLockParent, set->latch);
2032 bt_unlockpage (BtLockWrite, set->latch);
2034 // insert new fence for reformulated left block of smaller keys
2036 bt_putid (value, set->latch->page_no);
2037 ptr = (BtKey *)leftkey;
2039 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2042 // switch fence for right block of larger keys to new right page
2044 bt_putid (value, right->page_no);
2045 ptr = (BtKey *)rightkey;
2047 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2050 bt_unlockpage (BtLockParent, set->latch);
2051 bt_unpinlatch (set->latch);
2053 bt_unlockpage (BtLockParent, right);
2054 bt_unpinlatch (right);
2058 // install new key and value onto page
2059 // page must already be checked for
2062 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2064 uint idx, librarian;
2069 // if found slot > desired slot and previous slot
2070 // is a librarian slot, use it
2073 if( slotptr(set->page, slot-1)->type == Librarian )
2076 // copy value onto page
2078 set->page->min -= vallen + sizeof(BtVal);
2079 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2080 memcpy (val->value, value, vallen);
2083 // copy key onto page
2085 set->page->min -= keylen + sizeof(BtKey);
2086 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2087 memcpy (ptr->key, key, keylen);
2090 // find first empty slot
2092 for( idx = slot; idx < set->page->cnt; idx++ )
2093 if( slotptr(set->page, idx)->dead )
2096 // now insert key into array before slot
2098 if( idx == set->page->cnt )
2099 idx += 2, set->page->cnt += 2, librarian = 2;
2103 set->latch->dirty = 1;
2106 while( idx > slot + librarian - 1 )
2107 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2109 // add librarian slot
2111 if( librarian > 1 ) {
2112 node = slotptr(set->page, slot++);
2113 node->off = set->page->min;
2114 node->type = Librarian;
2120 node = slotptr(set->page, slot);
2121 node->off = set->page->min;
2126 bt_unlockpage (BtLockWrite, set->latch);
2127 bt_unpinlatch (set->latch);
2133 // Insert new key into the btree at given level.
2134 // either add a new key or update/add an existing one
2136 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2138 unsigned char newkey[BT_keyarray];
2139 uint slot, idx, len, entry;
2146 // set up the key we're working on
2148 ins = (BtKey*)newkey;
2149 memcpy (ins->key, key, keylen);
2152 // is this a non-unique index value?
2158 sequence = bt_newdup (bt);
2159 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2163 while( 1 ) { // find the page and slot for the current key
2164 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2165 ptr = keyptr(set->page, slot);
2168 bt->err = BTERR_ovflw;
2172 // if librarian slot == found slot, advance to real slot
2174 if( slotptr(set->page, slot)->type == Librarian )
2175 if( !keycmp (ptr, key, keylen) )
2176 ptr = keyptr(set->page, ++slot);
2180 if( slotptr(set->page, slot)->type == Duplicate )
2183 // if inserting a duplicate key or unique key
2184 // check for adequate space on the page
2185 // and insert the new key before slot.
2187 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2188 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2189 if( !(entry = bt_splitpage (bt, set)) )
2191 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2196 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2199 // if key already exists, update value and return
2201 val = valptr(set->page, slot);
2203 if( val->len >= vallen ) {
2204 if( slotptr(set->page, slot)->dead )
2206 set->page->garbage += val->len - vallen;
2207 set->latch->dirty = 1;
2208 slotptr(set->page, slot)->dead = 0;
2210 memcpy (val->value, value, vallen);
2211 bt_unlockpage(BtLockWrite, set->latch);
2212 bt_unpinlatch (set->latch);
2216 // new update value doesn't fit in existing value area
2218 if( !slotptr(set->page, slot)->dead )
2219 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2221 slotptr(set->page, slot)->dead = 0;
2225 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2226 if( !(entry = bt_splitpage (bt, set)) )
2228 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2233 set->page->min -= vallen + sizeof(BtVal);
2234 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2235 memcpy (val->value, value, vallen);
2238 set->latch->dirty = 1;
2239 set->page->min -= keylen + sizeof(BtKey);
2240 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2241 memcpy (ptr->key, key, keylen);
2244 slotptr(set->page, slot)->off = set->page->min;
2245 bt_unlockpage(BtLockWrite, set->latch);
2246 bt_unpinlatch (set->latch);
2253 uint entry; // latch table entry number
2254 uint slot:31; // page slot number
2255 uint reuse:1; // reused previous page
2259 uid page_no; // page number for split leaf
2260 void *next; // next key to insert
2261 uint entry:30; // latch table entry number
2262 uint type:2; // 0 == insert, 1 == delete, 2 == free
2263 unsigned char leafkey[BT_keyarray];
2266 // determine actual page where key is located
2267 // return slot number
2269 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2271 BtKey *key = keyptr(source,src);
2272 uint slot = locks[src].slot;
2275 if( src > 1 && locks[src].reuse )
2276 entry = locks[src-1].entry, slot = 0;
2278 entry = locks[src].entry;
2281 set->latch = bt->mgr->latchsets + entry;
2282 set->page = bt_mappage (bt, set->latch);
2286 // is locks->reuse set?
2287 // if so, find where our key
2288 // is located on previous page or split pages
2291 set->latch = bt->mgr->latchsets + entry;
2292 set->page = bt_mappage (bt, set->latch);
2294 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2295 if( locks[src].reuse )
2296 locks[src].entry = entry;
2299 } while( entry = set->latch->split );
2301 bt->err = BTERR_atomic;
2305 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2307 BtKey *key = keyptr(source, src);
2308 BtVal *val = valptr(source, src);
2313 while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2314 if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2315 return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2317 if( entry = bt_splitpage (bt, set) )
2318 latch = bt->mgr->latchsets + entry;
2322 // splice right page into split chain
2323 // and WriteLock it.
2325 latch->split = set->latch->split;
2326 set->latch->split = entry;
2327 bt_lockpage(BtLockWrite, latch);
2330 return bt->err = BTERR_atomic;
2333 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2335 BtKey *key = keyptr(source, src);
2336 uint idx, entry, slot;
2341 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2342 slotptr(set->page, slot)->dead = 1;
2344 return bt->err = BTERR_struct;
2346 ptr = keyptr(set->page, slot);
2347 val = valptr(set->page, slot);
2349 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2350 set->latch->dirty = 1;
2355 // atomic modification of a batch of keys.
2357 // return -1 if BTERR is set
2358 // otherwise return slot number
2359 // causing the key constraint violation
2360 // or zero on successful completion.
2362 int bt_atomictxn (BtDb *bt, BtPage source)
2364 uint src, idx, slot, samepage, entry;
2365 AtomicKey *head, *tail, *leaf;
2366 BtPageSet set[1], prev[1];
2367 unsigned char value[BtId];
2368 BtKey *key, *ptr, *key2;
2378 locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2382 // stable sort the list of keys into order to
2383 // prevent deadlocks between threads.
2385 for( src = 1; src++ < source->cnt; ) {
2386 *temp = *slotptr(source,src);
2387 key = keyptr (source,src);
2389 for( idx = src; --idx; ) {
2390 key2 = keyptr (source,idx);
2391 if( keycmp (key, key2->key, key2->len) < 0 ) {
2392 *slotptr(source,idx+1) = *slotptr(source,idx);
2393 *slotptr(source,idx) = *temp;
2399 // Load the leaf page for each key
2400 // group same page references with reuse bit
2401 // and determine any constraint violations
2403 for( src = 0; src++ < source->cnt; ) {
2404 key = keyptr(source, src);
2407 // first determine if this modification falls
2408 // on the same page as the previous modification
2409 // note that the far right leaf page is a special case
2411 if( samepage = src > 1 )
2412 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2413 slot = bt_findslot(set->page, key->key, key->len);
2414 else // release read on previous page
2415 bt_unlockpage(BtLockRead, set->latch);
2418 if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
2419 set->latch->split = 0;
2423 if( slotptr(set->page, slot)->type == Librarian )
2424 ptr = keyptr(set->page, ++slot);
2426 ptr = keyptr(set->page, slot);
2429 locks[src].entry = set->latch->entry;
2430 locks[src].slot = slot;
2431 locks[src].reuse = 0;
2433 locks[src].entry = 0;
2434 locks[src].slot = 0;
2435 locks[src].reuse = 1;
2438 switch( slotptr(source, src)->type ) {
2441 if( !slotptr(set->page, slot)->dead )
2442 if( slot < set->page->cnt || bt_getid (set->page->right) )
2443 if( !keycmp (ptr, key->key, key->len) ) {
2445 // return constraint violation if key already exists
2447 bt_unlockpage(BtLockRead, set->latch);
2451 if( locks[src].entry ) {
2452 set->latch = bt->mgr->latchsets + locks[src].entry;
2453 bt_unlockpage(BtLockAtomic, set->latch);
2454 bt_unpinlatch (set->latch);
2465 // unlock last loadpage lock
2467 if( source->cnt > 1 )
2468 bt_unlockpage(BtLockRead, set->latch);
2470 // obtain write lock for each master page
2472 for( src = 0; src++ < source->cnt; )
2473 if( locks[src].reuse )
2476 bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2478 // insert or delete each key
2479 // process any splits or merges
2480 // release Write & Atomic latches
2481 // set ParentModifications and build
2482 // queue of keys to insert for split pages
2483 // or delete for deleted pages.
2485 // run through txn list backwards
2487 samepage = source->cnt + 1;
2489 for( src = source->cnt; src; src-- ) {
2490 if( locks[src].reuse )
2493 // perform the txn operations
2494 // from smaller to larger on
2497 for( idx = src; idx < samepage; idx++ )
2498 switch( slotptr(source,idx)->type ) {
2500 if( bt_atomicdelete (bt, source, locks, idx) )
2506 if( bt_atomicinsert (bt, source, locks, idx) )
2511 // after the same page operations have finished,
2512 // process master page for splits or deletion.
2514 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2515 prev->page = bt_mappage (bt, prev->latch);
2518 // pick-up all splits from master page
2520 entry = prev->latch->split;
2523 set->latch = bt->mgr->latchsets + entry;
2524 set->page = bt_mappage (bt, set->latch);
2525 entry = set->latch->split;
2527 // delete empty master page
2529 if( !prev->page->act ) {
2530 memcpy (set->page->left, prev->page->left, BtId);
2531 memcpy (prev->page, set->page, bt->mgr->page_size);
2532 bt_lockpage (BtLockDelete, set->latch);
2533 bt_freepage (bt, set);
2534 bt_unpinlatch (set->latch);
2536 prev->latch->dirty = 1;
2540 // remove empty page from the split chain
2542 if( !set->page->act ) {
2543 memcpy (prev->page->right, set->page->right, BtId);
2544 prev->latch->split = set->latch->split;
2545 bt_lockpage (BtLockDelete, set->latch);
2546 bt_freepage (bt, set);
2547 bt_unpinlatch (set->latch);
2551 // schedule prev fence key update
2553 ptr = keyptr(prev->page,prev->page->cnt);
2554 leaf = malloc (sizeof(AtomicKey));
2556 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2557 leaf->page_no = prev->latch->page_no;
2558 leaf->entry = prev->latch->entry;
2569 bt_putid (set->page->left, prev->latch->page_no);
2570 bt_lockpage(BtLockParent, prev->latch);
2571 bt_unlockpage(BtLockWrite, prev->latch);
2575 // update left pointer in next right page
2576 // if we did any now non-empty splits
2578 if( latch->split ) {
2579 // fix left pointer in master's original right sibling
2580 // or set rightmost page in page zero
2582 if( right = bt_getid (prev->page->right) ) {
2583 if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
2584 set->page = bt_mappage (bt, set->latch);
2588 bt_lockpage (BtLockWrite, set->latch);
2589 bt_putid (set->page->left, prev->latch->page_no);
2590 set->latch->dirty = 1;
2591 bt_unlockpage (BtLockWrite, set->latch);
2592 bt_unpinlatch (set->latch);
2594 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2596 // Process last page split in chain
2598 ptr = keyptr(prev->page,prev->page->cnt);
2599 leaf = malloc (sizeof(AtomicKey));
2601 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2602 leaf->page_no = prev->latch->page_no;
2603 leaf->entry = prev->latch->entry;
2614 bt_lockpage(BtLockParent, prev->latch);
2615 bt_unlockpage(BtLockWrite, prev->latch);
2616 bt_unlockpage(BtLockAtomic, latch);
2620 // finished if prev page occupied (either master or final split)
2622 if( prev->page->act ) {
2623 bt_unlockpage(BtLockWrite, latch);
2624 bt_unlockpage(BtLockAtomic, latch);
2625 bt_unpinlatch(latch);
2629 // any splits were reversed, and the
2630 // master page located in prev is empty, delete it
2631 // by pulling over master's right sibling.
2632 // Delete empty master's fence
2634 ptr = keyptr(prev->page,prev->page->cnt);
2635 leaf = malloc (sizeof(AtomicKey));
2637 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2638 leaf->page_no = prev->latch->page_no;
2639 leaf->entry = prev->latch->entry;
2650 bt_lockpage(BtLockParent, prev->latch);
2651 bt_unlockpage(BtLockWrite, prev->latch);
2653 if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2654 set->page = bt_mappage (bt, set->latch);
2658 // add page to our transaction
2660 bt_lockpage(BtLockAtomic, set->latch);
2661 bt_lockpage(BtLockWrite, set->latch);
2663 // pull contents over empty page
2665 memcpy (set->page->left, prev->page->left, BtId);
2666 memcpy (prev->page, set->page, bt->mgr->page_size);
2668 bt_putid (set->page->right, prev->latch->page_no);
2669 set->latch->dirty = 1;
2670 set->page->kill = 1;
2672 // add new parent key for new master page contents
2673 // delete it after parent posts the new master fence.
2675 ptr = keyptr(set->page,set->page->cnt);
2676 leaf = malloc (sizeof(AtomicKey));
2678 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2679 leaf->page_no = prev->latch->page_no;
2680 leaf->entry = set->latch->entry;
2691 // bt_lockpage(BtLockParent, set->latch);
2692 bt_unlockpage(BtLockWrite, set->latch);
2694 // fix master's far right sibling's left pointer
2696 if( right = bt_getid (set->page->right) ) {
2697 if( set->latch = bt_pinlatch (bt, right, 1) )
2698 set->page = bt_mappage (bt, set->latch);
2700 bt_lockpage (BtLockWrite, set->latch);
2701 bt_putid (set->page->left, prev->latch->page_no);
2702 set->latch->dirty = 1;
2704 bt_unlockpage (BtLockWrite, set->latch);
2705 bt_unpinlatch (set->latch);
2708 bt_unlockpage(BtLockAtomic, latch);
2711 // add & delete keys for any pages split or merged during transaction
2715 set->latch = bt->mgr->latchsets + leaf->entry;
2716 set->page = bt_mappage (bt, set->latch);
2718 bt_putid (value, leaf->page_no);
2719 ptr = (BtKey *)leaf->leafkey;
2721 switch( leaf->type ) {
2722 case 0: // insert key
2723 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2726 bt_unlockpage (BtLockParent, set->latch);
2729 case 1: // delete key
2730 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2733 bt_unlockpage (BtLockParent, set->latch);
2736 case 2: // insert key & free
2737 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2740 bt_unlockpage (BtLockAtomic, set->latch);
2741 bt_lockpage (BtLockDelete, set->latch);
2742 bt_lockpage (BtLockWrite, set->latch);
2743 bt_freepage (bt, set);
2747 bt_unpinlatch (set->latch);
2750 } while( leaf = tail );
2758 // set cursor to highest slot on highest page
2760 uint bt_lastkey (BtDb *bt)
2762 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2766 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2767 set->page = bt_mappage (bt, set->latch);
2771 bt_lockpage(BtLockRead, set->latch);
2773 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2774 slot = set->page->cnt;
2776 bt_unlockpage(BtLockRead, set->latch);
2777 bt_unpinlatch (set->latch);
2781 // return previous slot on cursor page
2783 uint bt_prevkey (BtDb *bt, uint slot)
2785 uid ourright, next, us = bt->cursor_page;
2791 ourright = bt_getid(bt->cursor->right);
2794 if( !(next = bt_getid(bt->cursor->left)) )
2798 bt->cursor_page = next;
2800 if( set->latch = bt_pinlatch (bt, next, 1) )
2801 set->page = bt_mappage (bt, set->latch);
2805 bt_lockpage(BtLockRead, set->latch);
2806 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2807 bt_unlockpage(BtLockRead, set->latch);
2808 bt_unpinlatch (set->latch);
2810 next = bt_getid (bt->cursor->right);
2813 if( next == ourright )
2818 return bt->cursor->cnt;
2821 // return next slot on cursor page
2822 // or slide cursor right into next page
2824 uint bt_nextkey (BtDb *bt, uint slot)
2830 right = bt_getid(bt->cursor->right);
2832 while( slot++ < bt->cursor->cnt )
2833 if( slotptr(bt->cursor,slot)->dead )
2835 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2843 bt->cursor_page = right;
2845 if( set->latch = bt_pinlatch (bt, right, 1) )
2846 set->page = bt_mappage (bt, set->latch);
2850 bt_lockpage(BtLockRead, set->latch);
2852 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2854 bt_unlockpage(BtLockRead, set->latch);
2855 bt_unpinlatch (set->latch);
2863 // cache page of keys into cursor and return starting slot for given key
2865 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2870 // cache page for retrieval
2872 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2873 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2877 bt->cursor_page = set->latch->page_no;
2879 bt_unlockpage(BtLockRead, set->latch);
2880 bt_unpinlatch (set->latch);
2884 BtKey *bt_key(BtDb *bt, uint slot)
2886 return keyptr(bt->cursor, slot);
2889 BtVal *bt_val(BtDb *bt, uint slot)
2891 return valptr(bt->cursor,slot);
2897 double getCpuTime(int type)
2900 FILETIME xittime[1];
2901 FILETIME systime[1];
2902 FILETIME usrtime[1];
2903 SYSTEMTIME timeconv[1];
2906 memset (timeconv, 0, sizeof(SYSTEMTIME));
2910 GetSystemTimeAsFileTime (xittime);
2911 FileTimeToSystemTime (xittime, timeconv);
2912 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2915 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2916 FileTimeToSystemTime (usrtime, timeconv);
2919 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2920 FileTimeToSystemTime (systime, timeconv);
2924 ans += (double)timeconv->wHour * 3600;
2925 ans += (double)timeconv->wMinute * 60;
2926 ans += (double)timeconv->wSecond;
2927 ans += (double)timeconv->wMilliseconds / 1000;
2932 #include <sys/resource.h>
2934 double getCpuTime(int type)
2936 struct rusage used[1];
2937 struct timeval tv[1];
2941 gettimeofday(tv, NULL);
2942 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2945 getrusage(RUSAGE_SELF, used);
2946 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2949 getrusage(RUSAGE_SELF, used);
2950 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2957 void bt_poolaudit (BtMgr *mgr)
2962 while( slot++ < mgr->latchdeployed ) {
2963 latch = mgr->latchsets + slot;
2965 if( *latch->readwr->rin & MASK )
2966 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2967 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2969 if( *latch->access->rin & MASK )
2970 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2971 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2973 if( *latch->parent->rin & MASK )
2974 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2975 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2977 if( latch->pin & ~CLOCK_bit ) {
2978 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2984 uint bt_latchaudit (BtDb *bt)
2986 ushort idx, hashidx;
2992 if( *(ushort *)(bt->mgr->lock) )
2993 fprintf(stderr, "Alloc page locked\n");
2994 *(ushort *)(bt->mgr->lock) = 0;
2996 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2997 latch = bt->mgr->latchsets + idx;
2998 if( *latch->readwr->rin & MASK )
2999 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3000 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3002 if( *latch->access->rin & MASK )
3003 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3004 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3006 if( *latch->parent->rin & MASK )
3007 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3008 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3011 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3016 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3017 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3018 fprintf(stderr, "hash entry %d locked\n", hashidx);
3020 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3022 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3023 latch = bt->mgr->latchsets + idx;
3025 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3026 } while( idx = latch->next );
3029 page_no = LEAF_page;
3031 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3032 uid off = page_no << bt->mgr->page_bits;
3034 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3038 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3040 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3041 return bt->err = BTERR_map;
3043 if( *amt < bt->mgr->page_size )
3044 return bt->err = BTERR_map;
3046 if( !bt->frame->free && !bt->frame->lvl )
3047 cnt += bt->frame->act;
3051 cnt--; // remove stopper key
3052 fprintf(stderr, " Total keys read %d\n", cnt);
3066 // standalone program to index file of keys
3067 // then list them onto std-out
3070 void *index_file (void *arg)
3072 uint __stdcall index_file (void *arg)
3075 int line = 0, found = 0, cnt = 0, unique;
3076 uid next, page_no = LEAF_page; // start on first page of leaves
3077 unsigned char key[BT_maxkey];
3078 unsigned char txn[65536];
3079 ThreadArg *args = arg;
3080 int ch, len = 0, slot;
3089 bt = bt_open (args->mgr);
3092 unique = (args->type[1] | 0x20) != 'd';
3094 switch(args->type[0] | 0x20)
3097 fprintf(stderr, "started latch mgr audit\n");
3098 cnt = bt_latchaudit (bt);
3099 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3103 fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
3104 if( in = fopen (args->infile, "rb") )
3105 while( ch = getc(in), ch != EOF )
3110 memcpy (txn + nxt, key + 10, len - 10);
3112 txn[nxt] = len - 10;
3114 memcpy (txn + nxt, key, 10);
3117 slotptr(page,++cnt)->off = nxt;
3118 slotptr(page,cnt)->type = Unique;
3121 if( cnt < args->num )
3128 if( bt_atomictxn (bt, page) )
3129 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3134 else if( len < BT_maxkey )
3136 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3140 fprintf(stderr, "started pennysort for %s\n", args->infile);
3141 if( in = fopen (args->infile, "rb") )
3142 while( ch = getc(in), ch != EOF )
3147 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
3148 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3151 else if( len < BT_maxkey )
3153 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3157 fprintf(stderr, "started indexing for %s\n", args->infile);
3158 if( in = fopen (args->infile, "r") )
3159 while( ch = getc(in), ch != EOF )
3164 if( args->num == 1 )
3165 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3167 else if( args->num )
3168 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3170 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
3171 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3174 else if( len < BT_maxkey )
3176 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3180 fprintf(stderr, "started deleting keys for %s\n", args->infile);
3181 if( in = fopen (args->infile, "rb") )
3182 while( ch = getc(in), ch != EOF )
3186 if( args->num == 1 )
3187 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3189 else if( args->num )
3190 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3192 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
3193 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
3194 ptr = (BtKey*)(bt->key);
3197 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
3198 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3201 else if( len < BT_maxkey )
3203 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3207 fprintf(stderr, "started finding keys for %s\n", args->infile);
3208 if( in = fopen (args->infile, "rb") )
3209 while( ch = getc(in), ch != EOF )
3213 if( args->num == 1 )
3214 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3216 else if( args->num )
3217 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3219 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3222 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3225 else if( len < BT_maxkey )
3227 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3231 fprintf(stderr, "started scanning\n");
3233 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3234 set->page = bt_mappage (bt, set->latch);
3236 fprintf(stderr, "unable to obtain latch"), exit(1);
3237 bt_lockpage (BtLockRead, set->latch);
3238 next = bt_getid (set->page->right);
3240 for( slot = 0; slot++ < set->page->cnt; )
3241 if( next || slot < set->page->cnt )
3242 if( !slotptr(set->page, slot)->dead ) {
3243 ptr = keyptr(set->page, slot);
3246 if( slotptr(set->page, slot)->type == Duplicate )
3249 fwrite (ptr->key, len, 1, stdout);
3250 val = valptr(set->page, slot);
3251 fwrite (val->value, val->len, 1, stdout);
3252 fputc ('\n', stdout);
3256 bt_unlockpage (BtLockRead, set->latch);
3257 bt_unpinlatch (set->latch);
3258 } while( page_no = next );
3260 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3265 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3267 fprintf(stderr, "started counting\n");
3268 page_no = LEAF_page;
3270 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3271 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3274 if( !bt->frame->free && !bt->frame->lvl )
3275 cnt += bt->frame->act;
3281 cnt--; // remove stopper key
3282 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3294 typedef struct timeval timer;
3296 int main (int argc, char **argv)
3298 int idx, cnt, len, slot, err;
3299 int segsize, bits = 16;
3316 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3317 fprintf (stderr, " where page_bits is the page size in bits\n");
3318 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3319 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
3320 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3324 start = getCpuTime(0);
3327 bits = atoi(argv[3]);
3330 poolsize = atoi(argv[4]);
3333 fprintf (stderr, "Warning: no mapped_pool\n");
3336 num = atoi(argv[5]);
3340 threads = malloc (cnt * sizeof(pthread_t));
3342 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3344 args = malloc (cnt * sizeof(ThreadArg));
3346 mgr = bt_mgr ((argv[1]), bits, poolsize);
3349 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3355 for( idx = 0; idx < cnt; idx++ ) {
3356 args[idx].infile = argv[idx + 6];
3357 args[idx].type = argv[2];
3358 args[idx].mgr = mgr;
3359 args[idx].num = num;
3360 args[idx].idx = idx;
3362 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3363 fprintf(stderr, "Error creating thread %d\n", err);
3365 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3369 // wait for termination
3372 for( idx = 0; idx < cnt; idx++ )
3373 pthread_join (threads[idx], NULL);
3375 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3377 for( idx = 0; idx < cnt; idx++ )
3378 CloseHandle(threads[idx]);
3384 elapsed = getCpuTime(0) - start;
3385 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3386 elapsed = getCpuTime(1);
3387 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3388 elapsed = getCpuTime(2);
3389 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);