1 // btree version threadskv8 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // traditional buffer pool manager
7 // and ACID batched key updates
11 // author: karl malbrain, malbrain@cal.berkeley.edu
14 This work, including the source code, documentation
15 and related data, is placed into the public domain.
17 The orginal author is Karl Malbrain.
19 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
20 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
21 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
22 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
23 RESULTING FROM THE USE, MODIFICATION, OR
24 REDISTRIBUTION OF THIS SOFTWARE.
27 // Please see the project home page for documentation
28 // code.google.com/p/high-concurrency-btree
30 #define _FILE_OFFSET_BITS 64
31 #define _LARGEFILE64_SOURCE
47 #define WIN32_LEAN_AND_MEAN
61 typedef unsigned long long uid;
64 typedef unsigned long long off64_t;
65 typedef unsigned short ushort;
66 typedef unsigned int uint;
69 #define BT_ro 0x6f72 // ro
70 #define BT_rw 0x7772 // rw
72 #define BT_maxbits 24 // maximum page size in bits
73 #define BT_minbits 9 // minimum page size in bits
74 #define BT_minpage (1 << BT_minbits) // minimum page size
75 #define BT_maxpage (1 << BT_maxbits) // maximum page size
77 // BTree page number constants
78 #define ALLOC_page 0 // allocation page
79 #define ROOT_page 1 // root of the btree
80 #define LEAF_page 2 // first page of leaves
82 // Number of levels to create in a new BTree
87 There are six lock types for each node in four independent sets:
88 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
89 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
90 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
91 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
92 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
93 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
105 // definition for phase-fair reader/writer lock implementation
119 // definition for spin latch implementation
121 // exclusive is set for write access
122 // share is count of read accessors
123 // grant write lock when share == 0
125 volatile typedef struct {
136 // hash table entries
139 volatile uint slot; // Latch table entry at head of chain
140 BtSpinLatch latch[1];
143 // latch manager table structure
146 uid page_no; // latch set page number
147 RWLock readwr[1]; // read/write page lock
148 RWLock access[1]; // Access Intent/Page delete
149 RWLock parent[1]; // Posting of fence key in parent
150 RWLock atomic[1]; // Atomic update in progress
151 uint split; // right split page atomic insert
152 uint entry; // entry slot in latch table
153 uint next; // next entry in hash table chain
154 uint prev; // prev entry in hash table chain
155 volatile ushort pin; // number of outstanding threads
156 ushort dirty:1; // page in cache is dirty
158 pthread_t atomictid; // pid holding atomic lock
160 uint atomictid; // pid holding atomic lock
164 // Define the length of the page record numbers
168 // Page key slot definition.
170 // Keys are marked dead, but remain on the page until
171 // it cleanup is called. The fence key (highest key) for
172 // a leaf page is always present, even after cleanup.
176 // In addition to the Unique keys that occupy slots
177 // there are Librarian and Duplicate key
178 // slots occupying the key slot array.
180 // The Librarian slots are dead keys that
181 // serve as filler, available to add new Unique
182 // or Dup slots that are inserted into the B-tree.
184 // The Duplicate slots have had their key bytes extended
185 // by 6 bytes to contain a binary duplicate key uniqueifier.
195 uint off:BT_maxbits; // page offset for key start
196 uint type:3; // type of slot
197 uint dead:1; // set for deleted slot
200 // The key structure occupies space at the upper end of
201 // each page. It's a length byte followed by the key
205 unsigned char len; // this can be changed to a ushort or uint
206 unsigned char key[0];
209 // the value structure also occupies space at the upper
210 // end of the page. Each key is immediately followed by a value.
213 unsigned char len; // this can be changed to a ushort or uint
214 unsigned char value[0];
217 #define BT_maxkey 255 // maximum number of bytes in a key
218 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
220 // The first part of an index page.
221 // It is immediately followed
222 // by the BtSlot array of keys.
224 // note that this structure size
225 // must be a multiple of 8 bytes
226 // in order to place dups correctly.
228 typedef struct BtPage_ {
229 uint cnt; // count of keys in page
230 uint act; // count of active keys
231 uint min; // next key offset
232 uint garbage; // page garbage in bytes
233 unsigned char bits:7; // page size in bits
234 unsigned char free:1; // page is on free chain
235 unsigned char lvl:7; // level of page
236 unsigned char kill:1; // page is being deleted
237 unsigned char right[BtId]; // page number to right
240 // The loadpage interface object
243 BtPage page; // current page pointer
244 BtLatchSet *latch; // current page latch set
247 // structure for latch manager on ALLOC_page
250 struct BtPage_ alloc[1]; // next page_no in right ptr
251 unsigned long long dups[1]; // global duplicate key uniqueifier
252 unsigned char chain[BtId]; // head of free page_nos chain
255 // The object structure for Btree access
258 uint page_size; // page size
259 uint page_bits; // page size in bits
265 BtPageZero *pagezero; // mapped allocation page
266 BtSpinLatch lock[1]; // allocation area lite latch
267 uint latchdeployed; // highest number of latch entries deployed
268 uint nlatchpage; // number of latch pages at BT_latch
269 uint latchtotal; // number of page latch entries
270 uint latchhash; // number of latch hash table slots
271 uint latchvictim; // next latch entry to examine
272 BtHashEntry *hashtable; // the buffer pool hash table entries
273 BtLatchSet *latchsets; // mapped latch set from buffer pool
274 unsigned char *pagepool; // mapped to the buffer pool pages
276 HANDLE halloc; // allocation handle
277 HANDLE hpool; // buffer pool handle
282 BtMgr *mgr; // buffer manager for thread
283 BtPage cursor; // cached frame for start/next (never mapped)
284 BtPage frame; // spare frame for the page split (never mapped)
285 uid cursor_page; // current cursor page number
286 unsigned char *mem; // frame, cursor, page memory buffer
287 unsigned char key[BT_keyarray]; // last found complete key
288 int found; // last delete or insert was found
289 int err; // last error
290 int reads, writes; // number of reads and writes from the btree
304 #define CLOCK_bit 0x8000
307 extern void bt_close (BtDb *bt);
308 extern BtDb *bt_open (BtMgr *mgr);
309 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
310 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
311 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
312 extern BtKey *bt_foundkey (BtDb *bt);
313 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
314 extern uint bt_nextkey (BtDb *bt, uint slot);
317 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
318 void bt_mgrclose (BtMgr *mgr);
320 // Helper functions to return slot values
321 // from the cursor page.
323 extern BtKey *bt_key (BtDb *bt, uint slot);
324 extern BtVal *bt_val (BtDb *bt, uint slot);
326 // The page is allocated from low and hi ends.
327 // The key slots are allocated from the bottom,
328 // while the text and value of the key
329 // are allocated from the top. When the two
330 // areas meet, the page is split into two.
332 // A key consists of a length byte, two bytes of
333 // index number (0 - 65535), and up to 253 bytes
336 // Associated with each key is a value byte string
337 // containing any value desired.
339 // The b-tree root is always located at page 1.
340 // The first leaf page of level zero is always
341 // located on page 2.
343 // The b-tree pages are linked with next
344 // pointers to facilitate enumerators,
345 // and provide for concurrency.
347 // When to root page fills, it is split in two and
348 // the tree height is raised by a new root at page
349 // one with two keys.
351 // Deleted keys are marked with a dead bit until
352 // page cleanup. The fence key for a leaf node is
355 // To achieve maximum concurrency one page is locked at a time
356 // as the tree is traversed to find leaf key in question. The right
357 // page numbers are used in cases where the page is being split,
360 // Page 0 is dedicated to lock for new page extensions,
361 // and chains empty pages together for reuse. It also
362 // contains the latch manager hash table.
364 // The ParentModification lock on a node is obtained to serialize posting
365 // or changing the fence key for a node.
367 // Empty pages are chained together through the ALLOC page and reused.
369 // Access macros to address slot and key values from the page
370 // Page slots use 1 based indexing.
372 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
373 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
374 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
376 void bt_putid(unsigned char *dest, uid id)
381 dest[i] = (unsigned char)id, id >>= 8;
384 uid bt_getid(unsigned char *src)
389 for( i = 0; i < BtId; i++ )
390 id <<= 8, id |= *src++;
395 uid bt_newdup (BtDb *bt)
398 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
400 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
404 // Phase-Fair reader/writer lock implementation
406 void WriteLock (RWLock *lock)
411 tix = __sync_fetch_and_add (lock->ticket, 1);
413 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
415 // wait for our ticket to come up
417 while( tix != lock->serving[0] )
424 w = PRES | (tix & PHID);
426 r = __sync_fetch_and_add (lock->rin, w);
428 r = _InterlockedExchangeAdd16 (lock->rin, w);
430 while( r != *lock->rout )
438 void WriteRelease (RWLock *lock)
441 __sync_fetch_and_and (lock->rin, ~MASK);
443 _InterlockedAnd16 (lock->rin, ~MASK);
448 void ReadLock (RWLock *lock)
452 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
454 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
457 while( w == (*lock->rin & MASK) )
465 void ReadRelease (RWLock *lock)
468 __sync_fetch_and_add (lock->rout, RINC);
470 _InterlockedExchangeAdd16 (lock->rout, RINC);
474 // Spin Latch Manager
476 // wait until write lock mode is clear
477 // and add 1 to the share count
479 void bt_spinreadlock(BtSpinLatch *latch)
485 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
487 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
489 // see if exclusive request is granted or pending
494 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
496 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
499 } while( sched_yield(), 1 );
501 } while( SwitchToThread(), 1 );
505 // wait for other read and write latches to relinquish
507 void bt_spinwritelock(BtSpinLatch *latch)
513 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
515 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
518 if( !(prev & ~BOTH) )
522 __sync_fetch_and_and ((ushort *)latch, ~XCL);
524 _InterlockedAnd16((ushort *)latch, ~XCL);
527 } while( sched_yield(), 1 );
529 } while( SwitchToThread(), 1 );
533 // try to obtain write lock
535 // return 1 if obtained,
538 int bt_spinwritetry(BtSpinLatch *latch)
543 prev = __sync_fetch_and_or((ushort *)latch, XCL);
545 prev = _InterlockedOr16((ushort *)latch, XCL);
547 // take write access if all bits are clear
550 if( !(prev & ~BOTH) )
554 __sync_fetch_and_and ((ushort *)latch, ~XCL);
556 _InterlockedAnd16((ushort *)latch, ~XCL);
563 void bt_spinreleasewrite(BtSpinLatch *latch)
566 __sync_fetch_and_and((ushort *)latch, ~BOTH);
568 _InterlockedAnd16((ushort *)latch, ~BOTH);
572 // decrement reader count
574 void bt_spinreleaseread(BtSpinLatch *latch)
577 __sync_fetch_and_add((ushort *)latch, -SHARE);
579 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
583 // read page from permanent location in Btree file
585 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
587 off64_t off = page_no << mgr->page_bits;
590 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
591 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
598 memset (ovl, 0, sizeof(OVERLAPPED));
600 ovl->OffsetHigh = off >> 32;
602 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
603 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
606 if( *amt < mgr->page_size ) {
607 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
614 // write page to permanent location in Btree file
615 // clear the dirty bit
617 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
619 off64_t off = page_no << mgr->page_bits;
622 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
628 memset (ovl, 0, sizeof(OVERLAPPED));
630 ovl->OffsetHigh = off >> 32;
632 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
635 if( *amt < mgr->page_size )
641 // link latch table entry into head of latch hash table
643 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
645 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
646 BtLatchSet *latch = bt->mgr->latchsets + slot;
648 if( latch->next = bt->mgr->hashtable[hashidx].slot )
649 bt->mgr->latchsets[latch->next].prev = slot;
651 bt->mgr->hashtable[hashidx].slot = slot;
652 memset (&latch->atomictid, 0, sizeof(latch->atomictid));
653 latch->page_no = page_no;
660 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
668 // set CLOCK bit in latch
669 // decrement pin count
671 void bt_unpinlatch (BtLatchSet *latch)
674 if( ~latch->pin & CLOCK_bit )
675 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
676 __sync_fetch_and_add(&latch->pin, -1);
678 if( ~latch->pin & CLOCK_bit )
679 _InterlockedOr16 (&latch->pin, CLOCK_bit);
680 _InterlockedDecrement16 (&latch->pin);
684 // return the btree cached page address
686 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
688 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
693 // find existing latchset or inspire new one
694 // return with latchset pinned
696 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
698 uint hashidx = page_no % bt->mgr->latchhash;
706 // try to find our entry
708 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
710 if( slot = bt->mgr->hashtable[hashidx].slot ) do
712 latch = bt->mgr->latchsets + slot;
713 if( page_no == latch->page_no )
715 } while( slot = latch->next );
721 latch = bt->mgr->latchsets + slot;
723 __sync_fetch_and_add(&latch->pin, 1);
725 _InterlockedIncrement16 (&latch->pin);
727 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
731 // see if there are any unused pool entries
733 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
735 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
738 if( slot < bt->mgr->latchtotal ) {
739 latch = bt->mgr->latchsets + slot;
740 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
742 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
747 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
749 _InterlockedDecrement (&bt->mgr->latchdeployed);
751 // find and reuse previous entry on victim
755 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
757 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
759 // try to get write lock on hash chain
760 // skip entry if not obtained
761 // or has outstanding pins
763 slot %= bt->mgr->latchtotal;
768 latch = bt->mgr->latchsets + slot;
769 idx = latch->page_no % bt->mgr->latchhash;
771 // see we are on same chain as hashidx
776 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
779 // skip this slot if it is pinned
780 // or the CLOCK bit is set
783 if( latch->pin & CLOCK_bit ) {
785 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
787 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
790 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
794 // update permanent page area in btree from buffer pool
796 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
799 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
802 latch->dirty = 0, bt->writes++;
804 // unlink our available slot from its hash chain
807 bt->mgr->latchsets[latch->prev].next = latch->next;
809 bt->mgr->hashtable[idx].slot = latch->next;
812 bt->mgr->latchsets[latch->next].prev = latch->prev;
814 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
816 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
819 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
824 void bt_mgrclose (BtMgr *mgr)
831 // flush dirty pool pages to the btree
833 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
834 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
835 latch = mgr->latchsets + slot;
838 bt_writepage(mgr, page, latch->page_no);
839 latch->dirty = 0, num++;
841 // madvise (page, mgr->page_size, MADV_DONTNEED);
844 fprintf(stderr, "%d buffer pool pages flushed\n", num);
847 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
848 munmap (mgr->pagezero, mgr->page_size);
850 FlushViewOfFile(mgr->pagezero, 0);
851 UnmapViewOfFile(mgr->pagezero);
852 UnmapViewOfFile(mgr->hashtable);
853 CloseHandle(mgr->halloc);
854 CloseHandle(mgr->hpool);
860 FlushFileBuffers(mgr->idx);
861 CloseHandle(mgr->idx);
866 // close and release memory
868 void bt_close (BtDb *bt)
875 VirtualFree (bt->mem, 0, MEM_RELEASE);
880 // open/create new btree buffer manager
882 // call with file_name, BT_openmode, bits in page size (e.g. 16),
883 // size of page pool (e.g. 262144)
885 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
887 uint lvl, attr, last, slot, idx;
888 uint nlatchpage, latchhash;
889 unsigned char value[BtId];
890 int flag, initit = 0;
891 BtPageZero *pagezero;
898 // determine sanity of page size and buffer pool
900 if( bits > BT_maxbits )
902 else if( bits < BT_minbits )
906 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
911 mgr = calloc (1, sizeof(BtMgr));
913 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
915 if( mgr->idx == -1 ) {
916 fprintf (stderr, "Unable to open btree file\n");
917 return free(mgr), NULL;
920 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
921 attr = FILE_ATTRIBUTE_NORMAL;
922 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
924 if( mgr->idx == INVALID_HANDLE_VALUE )
925 return GlobalFree(mgr), NULL;
929 pagezero = valloc (BT_maxpage);
932 // read minimum page size to get root info
933 // to support raw disk partition files
934 // check if bits == 0 on the disk.
936 if( size = lseek (mgr->idx, 0L, 2) )
937 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
938 if( pagezero->alloc->bits )
939 bits = pagezero->alloc->bits;
943 return free(mgr), free(pagezero), NULL;
947 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
948 size = GetFileSize(mgr->idx, amt);
951 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
952 return bt_mgrclose (mgr), NULL;
953 bits = pagezero->alloc->bits;
958 mgr->page_size = 1 << bits;
959 mgr->page_bits = bits;
961 // calculate number of latch hash table entries
963 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
964 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
966 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
967 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
968 mgr->latchtotal = nodemax;
973 // initialize an empty b-tree with latch page, root page, page of leaves
974 // and page(s) of latches and page pool cache
976 memset (pagezero, 0, 1 << bits);
977 pagezero->alloc->bits = mgr->page_bits;
978 bt_putid(pagezero->alloc->right, MIN_lvl+1);
980 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
981 fprintf (stderr, "Unable to create btree page zero\n");
982 return bt_mgrclose (mgr), NULL;
985 memset (pagezero, 0, 1 << bits);
986 pagezero->alloc->bits = mgr->page_bits;
988 for( lvl=MIN_lvl; lvl--; ) {
989 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
990 key = keyptr(pagezero->alloc, 1);
991 key->len = 2; // create stopper key
995 bt_putid(value, MIN_lvl - lvl + 1);
996 val = valptr(pagezero->alloc, 1);
997 val->len = lvl ? BtId : 0;
998 memcpy (val->value, value, val->len);
1000 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1001 pagezero->alloc->lvl = lvl;
1002 pagezero->alloc->cnt = 1;
1003 pagezero->alloc->act = 1;
1005 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1006 fprintf (stderr, "Unable to create btree page zero\n");
1007 return bt_mgrclose (mgr), NULL;
1015 VirtualFree (pagezero, 0, MEM_RELEASE);
1018 // mlock the pagezero page
1020 flag = PROT_READ | PROT_WRITE;
1021 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1022 if( mgr->pagezero == MAP_FAILED ) {
1023 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1024 return bt_mgrclose (mgr), NULL;
1026 mlock (mgr->pagezero, mgr->page_size);
1028 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1029 if( mgr->hashtable == MAP_FAILED ) {
1030 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1031 return bt_mgrclose (mgr), NULL;
1034 flag = PAGE_READWRITE;
1035 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1036 if( !mgr->halloc ) {
1037 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1038 return bt_mgrclose (mgr), NULL;
1041 flag = FILE_MAP_WRITE;
1042 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1043 if( !mgr->pagezero ) {
1044 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1045 return bt_mgrclose (mgr), NULL;
1048 flag = PAGE_READWRITE;
1049 size = (uid)mgr->nlatchpage << mgr->page_bits;
1050 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1052 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1053 return bt_mgrclose (mgr), NULL;
1056 flag = FILE_MAP_WRITE;
1057 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1058 if( !mgr->hashtable ) {
1059 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1060 return bt_mgrclose (mgr), NULL;
1064 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1065 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1070 // open BTree access method
1071 // based on buffer manager
1073 BtDb *bt_open (BtMgr *mgr)
1075 BtDb *bt = malloc (sizeof(*bt));
1077 memset (bt, 0, sizeof(*bt));
1080 bt->mem = valloc (2 *mgr->page_size);
1082 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1084 bt->frame = (BtPage)bt->mem;
1085 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1089 // compare two keys, return > 0, = 0, or < 0
1090 // =0: keys are same
1093 // as the comparison value
1095 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1097 uint len1 = key1->len;
1100 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1111 // place write, read, or parent lock on requested page_no.
1113 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1117 ReadLock (latch->readwr);
1120 WriteLock (latch->readwr);
1123 ReadLock (latch->access);
1126 WriteLock (latch->access);
1129 WriteLock (latch->parent);
1132 WriteLock (latch->atomic);
1134 case BtLockAtomic | BtLockRead:
1135 WriteLock (latch->atomic);
1136 ReadLock (latch->readwr);
1141 // remove write, read, or parent lock on requested page
1143 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1147 ReadRelease (latch->readwr);
1150 WriteRelease (latch->readwr);
1153 ReadRelease (latch->access);
1156 WriteRelease (latch->access);
1159 WriteRelease (latch->parent);
1162 memset (&latch->atomictid, 0, sizeof(latch->atomictid));
1163 WriteRelease (latch->atomic);
1165 case BtLockAtomic | BtLockRead:
1166 ReadRelease (latch->readwr);
1167 memset (&latch->atomictid, 0, sizeof(latch->atomictid));
1168 WriteRelease (latch->atomic);
1173 // allocate a new page
1174 // return with page latched, but unlocked.
1176 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1181 // lock allocation page
1183 bt_spinwritelock(bt->mgr->lock);
1185 // use empty chain first
1186 // else allocate empty page
1188 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1189 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1190 set->page = bt_mappage (bt, set->latch);
1192 return bt->err = BTERR_struct, -1;
1194 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1195 bt_spinreleasewrite(bt->mgr->lock);
1197 memcpy (set->page, contents, bt->mgr->page_size);
1198 set->latch->dirty = 1;
1202 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1203 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1205 // unlock allocation latch
1207 bt_spinreleasewrite(bt->mgr->lock);
1209 // don't load cache from btree page
1211 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1212 set->page = bt_mappage (bt, set->latch);
1214 return bt->err = BTERR_struct;
1216 memcpy (set->page, contents, bt->mgr->page_size);
1217 set->latch->dirty = 1;
1221 // find slot in page for given key at a given level
1223 int bt_findslot (BtPage page, unsigned char *key, uint len)
1225 uint diff, higher = page->cnt, low = 1, slot;
1228 // make stopper key an infinite fence value
1230 if( bt_getid (page->right) )
1235 // low is the lowest candidate.
1236 // loop ends when they meet
1238 // higher is already
1239 // tested as .ge. the passed key.
1241 while( diff = higher - low ) {
1242 slot = low + ( diff >> 1 );
1243 if( keycmp (keyptr(page, slot), key, len) < 0 )
1246 higher = slot, good++;
1249 // return zero if key is on right link page
1251 return good ? higher : 0;
1254 // find and load page at given level for given key
1255 // leave page rd or wr locked as requested
1257 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1259 uid page_no = ROOT_page, prevpage = 0;
1260 uint drill = 0xff, slot;
1261 BtLatchSet *prevlatch;
1262 uint mode, prevmode;
1264 // start at root of btree and drill down
1267 // determine lock mode of drill level
1268 mode = (drill == lvl) ? lock : BtLockRead;
1270 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1273 // obtain access lock using lock chaining with Access mode
1275 if( page_no > ROOT_page )
1276 bt_lockpage(BtLockAccess, set->latch);
1278 set->page = bt_mappage (bt, set->latch);
1280 // release & unpin parent page
1283 bt_unlockpage(prevmode, prevlatch);
1284 bt_unpinlatch (prevlatch);
1288 // skip Atomic lock on leaf page if already held
1291 if( mode & BtLockAtomic )
1292 if( pthread_equal (set->latch->atomictid, pthread_self()) )
1293 mode &= ~BtLockAtomic;
1295 // obtain mode lock using lock chaining through AccessLock
1297 bt_lockpage(mode, set->latch);
1299 if( mode & BtLockAtomic )
1300 set->latch->atomictid = pthread_self();
1302 if( set->page->free )
1303 return bt->err = BTERR_struct, 0;
1305 if( page_no > ROOT_page )
1306 bt_unlockpage(BtLockAccess, set->latch);
1308 // re-read and re-lock root after determining actual level of root
1310 if( set->page->lvl != drill) {
1311 if( set->latch->page_no != ROOT_page )
1312 return bt->err = BTERR_struct, 0;
1314 drill = set->page->lvl;
1316 if( lock != BtLockRead && drill == lvl ) {
1317 bt_unlockpage(mode, set->latch);
1318 bt_unpinlatch (set->latch);
1323 prevpage = set->latch->page_no;
1324 prevlatch = set->latch;
1327 // find key on page at this level
1328 // and descend to requested level
1330 if( set->page->kill )
1333 if( slot = bt_findslot (set->page, key, len) ) {
1337 while( slotptr(set->page, slot)->dead )
1338 if( slot++ < set->page->cnt )
1343 page_no = bt_getid(valptr(set->page, slot)->value);
1348 // or slide right into next page
1351 page_no = bt_getid(set->page->right);
1355 // return error on end of right chain
1357 bt->err = BTERR_struct;
1358 return 0; // return error
1361 // return page to free list
1362 // page must be delete & write locked
1364 void bt_freepage (BtDb *bt, BtPageSet *set)
1366 // lock allocation page
1368 bt_spinwritelock (bt->mgr->lock);
1372 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1373 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1374 set->latch->dirty = 1;
1375 set->page->free = 1;
1377 // unlock released page
1379 bt_unlockpage (BtLockDelete, set->latch);
1380 bt_unlockpage (BtLockWrite, set->latch);
1381 bt_unpinlatch (set->latch);
1383 // unlock allocation page
1385 bt_spinreleasewrite (bt->mgr->lock);
1388 // a fence key was deleted from a page
1389 // push new fence value upwards
1391 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1393 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1394 unsigned char value[BtId];
1398 // remove the old fence value
1400 ptr = keyptr(set->page, set->page->cnt);
1401 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1402 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1403 set->latch->dirty = 1;
1405 // cache new fence value
1407 ptr = keyptr(set->page, set->page->cnt);
1408 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1410 bt_lockpage (BtLockParent, set->latch);
1411 bt_unlockpage (BtLockWrite, set->latch);
1413 // insert new (now smaller) fence key
1415 bt_putid (value, set->latch->page_no);
1416 ptr = (BtKey*)leftkey;
1418 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1421 // now delete old fence key
1423 ptr = (BtKey*)rightkey;
1425 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1428 bt_unlockpage (BtLockParent, set->latch);
1429 bt_unpinlatch(set->latch);
1433 // root has a single child
1434 // collapse a level from the tree
1436 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1442 // find the child entry and promote as new root contents
1445 for( idx = 0; idx++ < root->page->cnt; )
1446 if( !slotptr(root->page, idx)->dead )
1449 page_no = bt_getid (valptr(root->page, idx)->value);
1451 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1452 child->page = bt_mappage (bt, child->latch);
1456 bt_lockpage (BtLockDelete, child->latch);
1457 bt_lockpage (BtLockWrite, child->latch);
1459 memcpy (root->page, child->page, bt->mgr->page_size);
1460 root->latch->dirty = 1;
1462 bt_freepage (bt, child);
1464 } while( root->page->lvl > 1 && root->page->act == 1 );
1466 bt_unlockpage (BtLockWrite, root->latch);
1467 bt_unpinlatch (root->latch);
1471 // delete a page and manage keys
1472 // call with page writelocked
1473 // returns with page unpinned
1475 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1477 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1478 unsigned char value[BtId];
1479 uint lvl = set->page->lvl;
1484 // cache copy of fence key
1485 // to post in parent
1487 ptr = keyptr(set->page, set->page->cnt);
1488 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1490 // obtain lock on right page
1492 page_no = bt_getid(set->page->right);
1494 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1495 right->page = bt_mappage (bt, right->latch);
1499 bt_lockpage (BtLockWrite, right->latch);
1501 // cache copy of key to update
1503 ptr = keyptr(right->page, right->page->cnt);
1504 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1506 if( right->page->kill )
1507 return bt->err = BTERR_struct;
1509 // pull contents of right peer into our empty page
1511 memcpy (set->page, right->page, bt->mgr->page_size);
1512 set->latch->dirty = 1;
1514 // mark right page deleted and point it to left page
1515 // until we can post parent updates that remove access
1516 // to the deleted page.
1518 bt_putid (right->page->right, set->latch->page_no);
1519 right->latch->dirty = 1;
1520 right->page->kill = 1;
1522 bt_lockpage (BtLockParent, right->latch);
1523 bt_unlockpage (BtLockWrite, right->latch);
1525 bt_lockpage (BtLockParent, set->latch);
1526 bt_unlockpage (BtLockWrite, set->latch);
1528 // redirect higher key directly to our new node contents
1530 bt_putid (value, set->latch->page_no);
1531 ptr = (BtKey*)higherfence;
1533 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1536 // delete old lower key to our node
1538 ptr = (BtKey*)lowerfence;
1540 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1543 // obtain delete and write locks to right node
1545 bt_unlockpage (BtLockParent, right->latch);
1546 bt_lockpage (BtLockDelete, right->latch);
1547 bt_lockpage (BtLockWrite, right->latch);
1548 bt_freepage (bt, right);
1550 bt_unlockpage (BtLockParent, set->latch);
1551 bt_unpinlatch (set->latch);
1556 // find and delete key on page by marking delete flag bit
1557 // if page becomes empty, delete it from the btree
1559 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1561 uint slot, idx, found, fence;
1566 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1567 ptr = keyptr(set->page, slot);
1571 // if librarian slot, advance to real slot
1573 if( slotptr(set->page, slot)->type == Librarian )
1574 ptr = keyptr(set->page, ++slot);
1576 fence = slot == set->page->cnt;
1578 // if key is found delete it, otherwise ignore request
1580 if( found = !keycmp (ptr, key, len) )
1581 if( found = slotptr(set->page, slot)->dead == 0 ) {
1582 val = valptr(set->page,slot);
1583 slotptr(set->page, slot)->dead = 1;
1584 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1587 // collapse empty slots beneath the fence
1589 while( idx = set->page->cnt - 1 )
1590 if( slotptr(set->page, idx)->dead ) {
1591 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1592 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1597 // did we delete a fence key in an upper level?
1599 if( found && lvl && set->page->act && fence )
1600 if( bt_fixfence (bt, set, lvl) )
1603 return bt->found = found, 0;
1605 // do we need to collapse root?
1607 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1608 if( bt_collapseroot (bt, set) )
1611 return bt->found = found, 0;
1613 // delete empty page
1615 if( !set->page->act )
1616 return bt_deletepage (bt, set);
1618 set->latch->dirty = 1;
1619 bt_unlockpage(BtLockWrite, set->latch);
1620 bt_unpinlatch (set->latch);
1621 return bt->found = found, 0;
1624 BtKey *bt_foundkey (BtDb *bt)
1626 return (BtKey*)(bt->key);
1629 // advance to next slot
1631 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1633 BtLatchSet *prevlatch;
1636 if( slot < set->page->cnt )
1639 prevlatch = set->latch;
1641 if( page_no = bt_getid(set->page->right) )
1642 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1643 set->page = bt_mappage (bt, set->latch);
1647 return bt->err = BTERR_struct, 0;
1649 // obtain access lock using lock chaining with Access mode
1651 bt_lockpage(BtLockAccess, set->latch);
1653 bt_unlockpage(BtLockRead, prevlatch);
1654 bt_unpinlatch (prevlatch);
1656 bt_lockpage(BtLockRead, set->latch);
1657 bt_unlockpage(BtLockAccess, set->latch);
1661 // find unique key or first duplicate key in
1662 // leaf level and return number of value bytes
1663 // or (-1) if not found. Setup key for bt_foundkey
1665 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1673 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1675 ptr = keyptr(set->page, slot);
1677 // skip librarian slot place holder
1679 if( slotptr(set->page, slot)->type == Librarian )
1680 ptr = keyptr(set->page, ++slot);
1682 // return actual key found
1684 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1687 if( slotptr(set->page, slot)->type == Duplicate )
1690 // not there if we reach the stopper key
1692 if( slot == set->page->cnt )
1693 if( !bt_getid (set->page->right) )
1696 // if key exists, return >= 0 value bytes copied
1697 // otherwise return (-1)
1699 if( slotptr(set->page, slot)->dead )
1703 if( !memcmp (ptr->key, key, len) ) {
1704 val = valptr (set->page,slot);
1705 if( valmax > val->len )
1707 memcpy (value, val->value, valmax);
1713 } while( slot = bt_findnext (bt, set, slot) );
1715 bt_unlockpage (BtLockRead, set->latch);
1716 bt_unpinlatch (set->latch);
1720 // check page for space available,
1721 // clean if necessary and return
1722 // 0 - page needs splitting
1723 // >0 new slot value
1725 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1727 uint nxt = bt->mgr->page_size;
1728 BtPage page = set->page;
1729 uint cnt = 0, idx = 0;
1730 uint max = page->cnt;
1735 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1738 // skip cleanup and proceed to split
1739 // if there's not enough garbage
1742 if( page->garbage < nxt / 5 )
1745 memcpy (bt->frame, page, bt->mgr->page_size);
1747 // skip page info and set rest of page to zero
1749 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1750 set->latch->dirty = 1;
1754 // clean up page first by
1755 // removing deleted keys
1757 while( cnt++ < max ) {
1760 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1763 // copy the value across
1765 val = valptr(bt->frame, cnt);
1766 nxt -= val->len + sizeof(BtVal);
1767 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1769 // copy the key across
1771 key = keyptr(bt->frame, cnt);
1772 nxt -= key->len + sizeof(BtKey);
1773 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1775 // make a librarian slot
1778 slotptr(page, ++idx)->off = nxt;
1779 slotptr(page, idx)->type = Librarian;
1780 slotptr(page, idx)->dead = 1;
1785 slotptr(page, ++idx)->off = nxt;
1786 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1788 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1795 // see if page has enough space now, or does it need splitting?
1797 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1803 // split the root and raise the height of the btree
1805 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1807 unsigned char leftkey[BT_keyarray];
1808 uint nxt = bt->mgr->page_size;
1809 unsigned char value[BtId];
1815 // save left page fence key for new root
1817 ptr = keyptr(root->page, root->page->cnt);
1818 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1820 // Obtain an empty page to use, and copy the current
1821 // root contents into it, e.g. lower keys
1823 if( bt_newpage(bt, left, root->page) )
1826 left_page_no = left->latch->page_no;
1827 bt_unpinlatch (left->latch);
1829 // preserve the page info at the bottom
1830 // of higher keys and set rest to zero
1832 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1834 // insert stopper key at top of newroot page
1835 // and increase the root height
1837 nxt -= BtId + sizeof(BtVal);
1838 bt_putid (value, right->page_no);
1839 val = (BtVal *)((unsigned char *)root->page + nxt);
1840 memcpy (val->value, value, BtId);
1843 nxt -= 2 + sizeof(BtKey);
1844 slotptr(root->page, 2)->off = nxt;
1845 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1850 // insert lower keys page fence key on newroot page as first key
1852 nxt -= BtId + sizeof(BtVal);
1853 bt_putid (value, left_page_no);
1854 val = (BtVal *)((unsigned char *)root->page + nxt);
1855 memcpy (val->value, value, BtId);
1858 ptr = (BtKey *)leftkey;
1859 nxt -= ptr->len + sizeof(BtKey);
1860 slotptr(root->page, 1)->off = nxt;
1861 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1863 bt_putid(root->page->right, 0);
1864 root->page->min = nxt; // reset lowest used offset and key count
1865 root->page->cnt = 2;
1866 root->page->act = 2;
1869 // release and unpin root pages
1871 bt_unlockpage(BtLockWrite, root->latch);
1872 bt_unpinlatch (root->latch);
1874 bt_unpinlatch (right);
1878 // split already locked full node
1880 // return pool entry for new right
1883 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1885 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1886 uint lvl = set->page->lvl;
1893 // split higher half of keys to bt->frame
1895 memset (bt->frame, 0, bt->mgr->page_size);
1896 max = set->page->cnt;
1900 while( cnt++ < max ) {
1901 if( slotptr(set->page, cnt)->dead && cnt < max )
1903 src = valptr(set->page, cnt);
1904 nxt -= src->len + sizeof(BtVal);
1905 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1907 key = keyptr(set->page, cnt);
1908 nxt -= key->len + sizeof(BtKey);
1909 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1910 memcpy (ptr, key, key->len + sizeof(BtKey));
1912 // add librarian slot
1915 slotptr(bt->frame, ++idx)->off = nxt;
1916 slotptr(bt->frame, idx)->type = Librarian;
1917 slotptr(bt->frame, idx)->dead = 1;
1922 slotptr(bt->frame, ++idx)->off = nxt;
1923 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1925 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1929 bt->frame->bits = bt->mgr->page_bits;
1930 bt->frame->min = nxt;
1931 bt->frame->cnt = idx;
1932 bt->frame->lvl = lvl;
1936 if( set->latch->page_no > ROOT_page )
1937 bt_putid (bt->frame->right, bt_getid (set->page->right));
1939 // get new free page and write higher keys to it.
1941 if( bt_newpage(bt, right, bt->frame) )
1944 memcpy (bt->frame, set->page, bt->mgr->page_size);
1945 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1946 set->latch->dirty = 1;
1948 nxt = bt->mgr->page_size;
1949 set->page->garbage = 0;
1955 if( slotptr(bt->frame, max)->type == Librarian )
1958 // assemble page of smaller keys
1960 while( cnt++ < max ) {
1961 if( slotptr(bt->frame, cnt)->dead )
1963 val = valptr(bt->frame, cnt);
1964 nxt -= val->len + sizeof(BtVal);
1965 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1967 key = keyptr(bt->frame, cnt);
1968 nxt -= key->len + sizeof(BtKey);
1969 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1971 // add librarian slot
1974 slotptr(set->page, ++idx)->off = nxt;
1975 slotptr(set->page, idx)->type = Librarian;
1976 slotptr(set->page, idx)->dead = 1;
1981 slotptr(set->page, ++idx)->off = nxt;
1982 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1986 bt_putid(set->page->right, right->latch->page_no);
1987 set->page->min = nxt;
1988 set->page->cnt = idx;
1990 return right->latch->entry;
1993 // fix keys for newly split page
1994 // call with page locked, return
1997 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
1999 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2000 unsigned char value[BtId];
2001 uint lvl = set->page->lvl;
2005 // if current page is the root page, split it
2007 if( set->latch->page_no == ROOT_page )
2008 return bt_splitroot (bt, set, right);
2010 ptr = keyptr(set->page, set->page->cnt);
2011 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2013 page = bt_mappage (bt, right);
2015 ptr = keyptr(page, page->cnt);
2016 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2018 // insert new fences in their parent pages
2020 bt_lockpage (BtLockParent, right);
2022 bt_lockpage (BtLockParent, set->latch);
2023 bt_unlockpage (BtLockWrite, set->latch);
2025 // insert new fence for reformulated left block of smaller keys
2027 bt_putid (value, set->latch->page_no);
2028 ptr = (BtKey *)leftkey;
2030 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2033 // switch fence for right block of larger keys to new right page
2035 bt_putid (value, right->page_no);
2036 ptr = (BtKey *)rightkey;
2038 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2041 bt_unlockpage (BtLockParent, set->latch);
2042 bt_unpinlatch (set->latch);
2044 bt_unlockpage (BtLockParent, right);
2045 bt_unpinlatch (right);
2049 // install new key and value onto page
2050 // page must already be checked for
2053 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2055 uint idx, librarian;
2060 // if found slot > desired slot and previous slot
2061 // is a librarian slot, use it
2064 if( slotptr(set->page, slot-1)->type == Librarian )
2067 // copy value onto page
2069 set->page->min -= vallen + sizeof(BtVal);
2070 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2071 memcpy (val->value, value, vallen);
2074 // copy key onto page
2076 set->page->min -= keylen + sizeof(BtKey);
2077 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2078 memcpy (ptr->key, key, keylen);
2081 // find first empty slot
2083 for( idx = slot; idx < set->page->cnt; idx++ )
2084 if( slotptr(set->page, idx)->dead )
2087 // now insert key into array before slot
2089 if( idx == set->page->cnt )
2090 idx += 2, set->page->cnt += 2, librarian = 2;
2094 set->latch->dirty = 1;
2097 while( idx > slot + librarian - 1 )
2098 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2100 // add librarian slot
2102 if( librarian > 1 ) {
2103 node = slotptr(set->page, slot++);
2104 node->off = set->page->min;
2105 node->type = Librarian;
2111 node = slotptr(set->page, slot);
2112 node->off = set->page->min;
2117 bt_unlockpage (BtLockWrite, set->latch);
2118 bt_unpinlatch (set->latch);
2124 // Insert new key into the btree at given level.
2125 // either add a new key or update/add an existing one
2127 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2129 unsigned char newkey[BT_keyarray];
2130 uint slot, idx, len, entry;
2137 // set up the key we're working on
2139 ins = (BtKey*)newkey;
2140 memcpy (ins->key, key, keylen);
2143 // is this a non-unique index value?
2149 sequence = bt_newdup (bt);
2150 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2154 while( 1 ) { // find the page and slot for the current key
2155 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2156 ptr = keyptr(set->page, slot);
2159 bt->err = BTERR_ovflw;
2163 // if librarian slot == found slot, advance to real slot
2165 if( slotptr(set->page, slot)->type == Librarian )
2166 if( !keycmp (ptr, key, keylen) )
2167 ptr = keyptr(set->page, ++slot);
2171 if( slotptr(set->page, slot)->type == Duplicate )
2174 // if inserting a duplicate key or unique key
2175 // check for adequate space on the page
2176 // and insert the new key before slot.
2178 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2179 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2180 if( !(entry = bt_splitpage (bt, set)) )
2182 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2187 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2190 // if key already exists, update value and return
2192 val = valptr(set->page, slot);
2194 if( val->len >= vallen ) {
2195 if( slotptr(set->page, slot)->dead )
2197 set->page->garbage += val->len - vallen;
2198 set->latch->dirty = 1;
2199 slotptr(set->page, slot)->dead = 0;
2201 memcpy (val->value, value, vallen);
2202 bt_unlockpage(BtLockWrite, set->latch);
2203 bt_unpinlatch (set->latch);
2207 // new update value doesn't fit in existing value area
2209 if( !slotptr(set->page, slot)->dead )
2210 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2212 slotptr(set->page, slot)->dead = 0;
2216 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2217 if( !(entry = bt_splitpage (bt, set)) )
2219 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2224 set->page->min -= vallen + sizeof(BtVal);
2225 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2226 memcpy (val->value, value, vallen);
2229 set->latch->dirty = 1;
2230 set->page->min -= keylen + sizeof(BtKey);
2231 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2232 memcpy (ptr->key, key, keylen);
2235 slotptr(set->page, slot)->off = set->page->min;
2236 bt_unlockpage(BtLockWrite, set->latch);
2237 bt_unpinlatch (set->latch);
2244 uint entry; // latch table entry number
2245 uint slot:30; // page slot number
2246 uint reuse:1; // reused previous page
2247 uint emptied:1; // page was emptied
2251 uid page_no; // page number for split leaf
2252 void *next; // next key to insert
2253 uint entry; // latch table entry number
2254 unsigned char leafkey[BT_keyarray];
2257 // determine actual page where key is located
2258 // return slot number with set page locked
2260 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2262 BtKey *key = keyptr(source,src);
2263 uint slot = locks[src].slot;
2266 if( src > 1 && locks[src].reuse )
2267 entry = locks[src-1].entry, slot = 0;
2269 entry = locks[src].entry;
2272 set->latch = bt->mgr->latchsets + entry;
2273 set->page = bt_mappage (bt, set->latch);
2277 // is locks->reuse set?
2278 // if so, find where our key
2279 // is located on previous page or split pages
2282 set->latch = bt->mgr->latchsets + entry;
2283 set->page = bt_mappage (bt, set->latch);
2285 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2286 if( locks[src].reuse )
2287 locks[src].entry = entry;
2290 } while( entry = set->latch->split );
2292 bt->err = BTERR_atomic;
2296 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2298 BtKey *key = keyptr(source, src);
2299 BtVal *val = valptr(source, src);
2304 while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2305 if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2306 return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2308 if( entry = bt_splitpage (bt, set) )
2309 latch = bt->mgr->latchsets + entry;
2313 // splice right page into split chain
2314 // and WriteLock it.
2316 latch->split = set->latch->split;
2317 set->latch->split = entry;
2318 bt_lockpage(BtLockWrite, latch);
2321 return bt->err = BTERR_atomic;
2324 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2326 BtKey *key = keyptr(source, src);
2327 uint idx, entry, slot;
2332 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2333 slotptr(set->page, slot)->dead = 1;
2335 return bt->err = BTERR_struct;
2337 ptr = keyptr(set->page, slot);
2338 val = valptr(set->page, slot);
2340 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2343 // collapse empty slots beneath the fence
2345 while( idx = set->page->cnt - 1 )
2346 if( slotptr(set->page, idx)->dead ) {
2347 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2348 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2352 set->latch->dirty = 1;
2356 // qsort comparison function
2358 int bt_qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2360 BtKey *key1 = (BtKey *)((unsigned char *)page + slot1->off);
2361 BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off);
2363 return keycmp (key1, key2->key, key2->len);
2366 // atomic modification of a batch of keys.
2368 // return -1 if BTERR is set
2369 // otherwise return slot number
2370 // causing the key constraint violation
2371 // or zero on successful completion.
2373 int bt_atomicmods (BtDb *bt, BtPage source)
2375 uint src, idx, slot, samepage, entry, next, split;
2376 AtomicKey *head, *tail, *leaf;
2377 BtPageSet set[1], prev[1];
2378 unsigned char value[BtId];
2379 BtKey *key, *ptr, *key2;
2388 locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2392 // first sort the list of keys into order to
2393 // prevent deadlocks between threads.
2394 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)bt_qsortcmp, source);
2396 for( src = 1; src++ < source->cnt; ) {
2397 *temp = *slotptr(source,src);
2398 key = keyptr (source,src);
2400 for( idx = src; --idx; ) {
2401 key2 = keyptr (source,idx);
2402 if( keycmp (key, key2->key, key2->len) < 0 ) {
2403 *slotptr(source,idx+1) = *slotptr(source,idx);
2404 *slotptr(source,idx) = *temp;
2410 // Load the leafpage for each key
2411 // and determine any constraint violations
2413 for( src = 0; src++ < source->cnt; ) {
2414 key = keyptr(source, src);
2415 val = valptr(source, src);
2418 // first determine if this modification falls
2419 // on the same page as the previous modification
2420 // note that the far right leaf page is a special case
2422 if( samepage = src > 1 )
2423 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 )
2424 slot = bt_findslot(set->page, key->key, key->len);
2425 else // release read on previous page
2426 bt_unlockpage(BtLockRead, set->latch);
2429 if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
2430 set->latch->split = 0;
2434 if( slotptr(set->page, slot)->type == Librarian )
2435 ptr = keyptr(set->page, ++slot);
2437 ptr = keyptr(set->page, slot);
2440 locks[src].entry = set->latch->entry;
2441 locks[src].slot = slot;
2442 locks[src].reuse = 0;
2444 locks[src].entry = 0;
2445 locks[src].slot = 0;
2446 locks[src].reuse = 1;
2449 switch( slotptr(source, src)->type ) {
2452 if( !slotptr(set->page, slot)->dead )
2453 if( slot < set->page->cnt || bt_getid (set->page->right) )
2454 if( ptr->len == key->len && !memcmp (ptr->key, key->key, key->len) ) {
2456 // return constraint violation if key already exists
2461 if( locks[src].entry ) {
2462 set->latch = bt->mgr->latchsets + locks[src].entry;
2463 bt_unlockpage(BtLockAtomic, set->latch);
2464 bt_unlockpage(BtLockRead, set->latch);
2465 bt_unpinlatch (set->latch);
2476 if( !slotptr(set->page, slot)->dead )
2477 if( ptr->len == key->len )
2478 if( !memcmp (ptr->key, key->key, key->len) )
2481 // Key to delete doesn't exist
2486 if( locks[src].entry ) {
2487 set->latch = bt->mgr->latchsets + locks[src].entry;
2488 bt_unlockpage(BtLockAtomic, set->latch);
2489 bt_unlockpage(BtLockRead, set->latch);
2490 bt_unpinlatch (set->latch);
2500 // unlock last loadpage lock
2502 if( source->cnt > 1 )
2503 bt_unlockpage(BtLockRead, set->latch);
2505 // obtain write lock for each page
2507 for( src = 0; src++ < source->cnt; )
2508 if( locks[src].entry )
2509 bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2511 // insert or delete each key
2513 for( src = 0; src++ < source->cnt; )
2514 if( slotptr(source,src)->type == Delete )
2515 if( bt_atomicdelete (bt, source, locks, src) )
2520 if( bt_atomicinsert (bt, source, locks, src) )
2525 // set ParentModification and release WriteLock
2526 // leave empty pages locked for later processing
2527 // build queue of keys to insert for split pages
2529 for( src = 0; src++ < source->cnt; ) {
2530 if( locks[src].reuse )
2533 prev->latch = bt->mgr->latchsets + locks[src].entry;
2534 prev->page = bt_mappage (bt, prev->latch);
2536 // pick-up all splits from original page
2538 split = next = prev->latch->split;
2540 if( !prev->page->act )
2541 locks[src].emptied = 1;
2543 while( entry = next ) {
2544 set->latch = bt->mgr->latchsets + entry;
2545 set->page = bt_mappage (bt, set->latch);
2546 next = set->latch->split;
2547 set->latch->split = 0;
2549 // delete empty previous page
2551 if( !prev->page->act ) {
2552 memcpy (prev->page, set->page, bt->mgr->page_size);
2553 bt_lockpage (BtLockDelete, set->latch);
2554 bt_freepage (bt, set);
2556 prev->latch->dirty = 1;
2558 if( prev->page->act )
2559 locks[src].emptied = 0;
2564 // prev page is not emptied
2565 locks[src].emptied = 0;
2567 // schedule previous fence key update
2569 ptr = keyptr(prev->page,prev->page->cnt);
2570 leaf = malloc (sizeof(AtomicKey));
2572 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2573 leaf->page_no = prev->latch->page_no;
2574 leaf->entry = prev->latch->entry;
2584 // remove empty block from the split chain
2586 if( !set->page->act ) {
2587 memcpy (prev->page->right, set->page->right, BtId);
2588 bt_lockpage (BtLockDelete, set->latch);
2589 bt_freepage (bt, set);
2593 bt_lockpage(BtLockParent, prev->latch);
2594 bt_unlockpage(BtLockWrite, prev->latch);
2598 // was entire chain emptied?
2600 if( !prev->page->act )
2604 bt_unlockpage(BtLockWrite, prev->latch);
2608 // Process last page split in chain
2610 ptr = keyptr(prev->page,prev->page->cnt);
2611 leaf = malloc (sizeof(AtomicKey));
2613 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2614 leaf->page_no = prev->latch->page_no;
2615 leaf->entry = prev->latch->entry;
2625 bt_lockpage(BtLockParent, prev->latch);
2626 bt_unlockpage(BtLockWrite, prev->latch);
2629 // remove Atomic locks and
2630 // process any empty pages
2632 for( src = source->cnt; src; src-- ) {
2633 if( locks[src].reuse )
2636 set->latch = bt->mgr->latchsets + locks[src].entry;
2637 set->page = bt_mappage (bt, set->latch);
2639 // clear original page split field
2641 split = set->latch->split;
2642 set->latch->split = 0;
2643 bt_unlockpage (BtLockAtomic, set->latch);
2645 // delete page emptied by our atomic action
2647 if( locks[src].emptied )
2648 if( bt_deletepage (bt, set) )
2654 bt_unpinlatch (set->latch);
2657 // add keys for any pages split during action
2661 ptr = (BtKey *)leaf->leafkey;
2662 bt_putid (value, leaf->page_no);
2664 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2667 bt_unlockpage (BtLockParent, bt->mgr->latchsets + leaf->entry);
2668 bt_unpinlatch (bt->mgr->latchsets + leaf->entry);
2671 } while( leaf = tail );
2679 // return next slot on cursor page
2680 // or slide cursor right into next page
2682 uint bt_nextkey (BtDb *bt, uint slot)
2688 right = bt_getid(bt->cursor->right);
2690 while( slot++ < bt->cursor->cnt )
2691 if( slotptr(bt->cursor,slot)->dead )
2693 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2701 bt->cursor_page = right;
2703 if( set->latch = bt_pinlatch (bt, right, 1) )
2704 set->page = bt_mappage (bt, set->latch);
2708 bt_lockpage(BtLockRead, set->latch);
2710 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2712 bt_unlockpage(BtLockRead, set->latch);
2713 bt_unpinlatch (set->latch);
2721 // cache page of keys into cursor and return starting slot for given key
2723 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2728 // cache page for retrieval
2730 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2731 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2735 bt->cursor_page = set->latch->page_no;
2737 bt_unlockpage(BtLockRead, set->latch);
2738 bt_unpinlatch (set->latch);
2742 BtKey *bt_key(BtDb *bt, uint slot)
2744 return keyptr(bt->cursor, slot);
2747 BtVal *bt_val(BtDb *bt, uint slot)
2749 return valptr(bt->cursor,slot);
2755 double getCpuTime(int type)
2758 FILETIME xittime[1];
2759 FILETIME systime[1];
2760 FILETIME usrtime[1];
2761 SYSTEMTIME timeconv[1];
2764 memset (timeconv, 0, sizeof(SYSTEMTIME));
2768 GetSystemTimeAsFileTime (xittime);
2769 FileTimeToSystemTime (xittime, timeconv);
2770 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2773 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2774 FileTimeToSystemTime (usrtime, timeconv);
2777 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2778 FileTimeToSystemTime (systime, timeconv);
2782 ans += (double)timeconv->wHour * 3600;
2783 ans += (double)timeconv->wMinute * 60;
2784 ans += (double)timeconv->wSecond;
2785 ans += (double)timeconv->wMilliseconds / 1000;
2790 #include <sys/resource.h>
2792 double getCpuTime(int type)
2794 struct rusage used[1];
2795 struct timeval tv[1];
2799 gettimeofday(tv, NULL);
2800 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2803 getrusage(RUSAGE_SELF, used);
2804 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2807 getrusage(RUSAGE_SELF, used);
2808 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2815 void bt_poolaudit (BtMgr *mgr)
2820 while( slot++ < mgr->latchdeployed ) {
2821 latch = mgr->latchsets + slot;
2823 if( *latch->readwr->rin & MASK )
2824 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2825 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2827 if( *latch->access->rin & MASK )
2828 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2829 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2831 if( *latch->parent->rin & MASK )
2832 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2833 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2835 if( latch->pin & ~CLOCK_bit ) {
2836 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2842 uint bt_latchaudit (BtDb *bt)
2844 ushort idx, hashidx;
2850 if( *(ushort *)(bt->mgr->lock) )
2851 fprintf(stderr, "Alloc page locked\n");
2852 *(ushort *)(bt->mgr->lock) = 0;
2854 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2855 latch = bt->mgr->latchsets + idx;
2856 if( *latch->readwr->rin & MASK )
2857 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2858 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2860 if( *latch->access->rin & MASK )
2861 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2862 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2864 if( *latch->parent->rin & MASK )
2865 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2866 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2869 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2874 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2875 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
2876 fprintf(stderr, "hash entry %d locked\n", hashidx);
2878 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
2880 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2881 latch = bt->mgr->latchsets + idx;
2883 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2884 } while( idx = latch->next );
2887 page_no = LEAF_page;
2889 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2890 uid off = page_no << bt->mgr->page_bits;
2892 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2896 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2898 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2899 return bt->err = BTERR_map;
2901 if( *amt < bt->mgr->page_size )
2902 return bt->err = BTERR_map;
2904 if( !bt->frame->free && !bt->frame->lvl )
2905 cnt += bt->frame->act;
2909 cnt--; // remove stopper key
2910 fprintf(stderr, " Total keys read %d\n", cnt);
2924 // standalone program to index file of keys
2925 // then list them onto std-out
2928 void *index_file (void *arg)
2930 uint __stdcall index_file (void *arg)
2933 int line = 0, found = 0, cnt = 0, unique;
2934 uid next, page_no = LEAF_page; // start on first page of leaves
2935 unsigned char key[BT_maxkey];
2936 unsigned char txn[65536];
2937 ThreadArg *args = arg;
2938 int ch, len = 0, slot;
2947 bt = bt_open (args->mgr);
2950 unique = (args->type[1] | 0x20) != 'd';
2952 switch(args->type[0] | 0x20)
2955 fprintf(stderr, "started latch mgr audit\n");
2956 cnt = bt_latchaudit (bt);
2957 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2961 fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
2962 if( in = fopen (args->infile, "rb") )
2963 while( ch = getc(in), ch != EOF )
2968 memcpy (txn + nxt, key + 10, len - 10);
2970 txn[nxt] = len - 10;
2972 memcpy (txn + nxt, key, 10);
2975 slotptr(page,++cnt)->off = nxt;
2976 slotptr(page,cnt)->type = Unique;
2986 if( bt_atomicmods (bt, page) )
2987 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2992 else if( len < BT_maxkey )
2994 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2998 fprintf(stderr, "started pennysort for %s\n", args->infile);
2999 if( in = fopen (args->infile, "rb") )
3000 while( ch = getc(in), ch != EOF )
3005 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
3006 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3009 else if( len < BT_maxkey )
3011 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3015 fprintf(stderr, "started indexing for %s\n", args->infile);
3016 if( in = fopen (args->infile, "r") )
3017 while( ch = getc(in), ch != EOF )
3022 if( args->num == 1 )
3023 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3025 else if( args->num )
3026 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3028 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
3029 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3032 else if( len < BT_maxkey )
3034 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3038 fprintf(stderr, "started deleting keys for %s\n", args->infile);
3039 if( in = fopen (args->infile, "rb") )
3040 while( ch = getc(in), ch != EOF )
3044 if( args->num == 1 )
3045 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3047 else if( args->num )
3048 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3050 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
3051 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
3052 ptr = (BtKey*)(bt->key);
3055 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
3056 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3059 else if( len < BT_maxkey )
3061 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3065 fprintf(stderr, "started finding keys for %s\n", args->infile);
3066 if( in = fopen (args->infile, "rb") )
3067 while( ch = getc(in), ch != EOF )
3071 if( args->num == 1 )
3072 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3074 else if( args->num )
3075 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3077 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3080 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3083 else if( len < BT_maxkey )
3085 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3089 fprintf(stderr, "started scanning\n");
3091 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3092 set->page = bt_mappage (bt, set->latch);
3094 fprintf(stderr, "unable to obtain latch"), exit(1);
3095 bt_lockpage (BtLockRead, set->latch);
3096 next = bt_getid (set->page->right);
3098 for( slot = 0; slot++ < set->page->cnt; )
3099 if( next || slot < set->page->cnt )
3100 if( !slotptr(set->page, slot)->dead ) {
3101 ptr = keyptr(set->page, slot);
3104 if( slotptr(set->page, slot)->type == Duplicate )
3107 fwrite (ptr->key, len, 1, stdout);
3108 val = valptr(set->page, slot);
3109 fwrite (val->value, val->len, 1, stdout);
3110 fputc ('\n', stdout);
3114 bt_unlockpage (BtLockRead, set->latch);
3115 bt_unpinlatch (set->latch);
3116 } while( page_no = next );
3118 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3123 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3125 fprintf(stderr, "started counting\n");
3126 page_no = LEAF_page;
3128 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3129 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3132 if( !bt->frame->free && !bt->frame->lvl )
3133 cnt += bt->frame->act;
3139 cnt--; // remove stopper key
3140 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3152 typedef struct timeval timer;
3154 int main (int argc, char **argv)
3156 int idx, cnt, len, slot, err;
3157 int segsize, bits = 16;
3174 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3175 fprintf (stderr, " where page_bits is the page size in bits\n");
3176 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3177 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
3178 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3182 start = getCpuTime(0);
3185 bits = atoi(argv[3]);
3188 poolsize = atoi(argv[4]);
3191 fprintf (stderr, "Warning: no mapped_pool\n");
3194 num = atoi(argv[5]);
3198 threads = malloc (cnt * sizeof(pthread_t));
3200 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3202 args = malloc (cnt * sizeof(ThreadArg));
3204 mgr = bt_mgr ((argv[1]), bits, poolsize);
3207 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3213 for( idx = 0; idx < cnt; idx++ ) {
3214 args[idx].infile = argv[idx + 6];
3215 args[idx].type = argv[2];
3216 args[idx].mgr = mgr;
3217 args[idx].num = num;
3218 args[idx].idx = idx;
3220 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3221 fprintf(stderr, "Error creating thread %d\n", err);
3223 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3227 // wait for termination
3230 for( idx = 0; idx < cnt; idx++ )
3231 pthread_join (threads[idx], NULL);
3233 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3235 for( idx = 0; idx < cnt; idx++ )
3236 CloseHandle(threads[idx]);
3242 elapsed = getCpuTime(0) - start;
3243 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3244 elapsed = getCpuTime(1);
3245 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3246 elapsed = getCpuTime(2);
3247 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);