// btree version threads2h pthread rw lock version
-// 30 JAN 2014
+// with reworked bt_deletekey code
+// 12 FEB 2014
// author: karl malbrain, malbrain@cal.berkeley.edu
#include <memory.h>
#include <string.h>
+#include <stddef.h>
typedef unsigned long long uid;
// It is immediately followed
// by the BtSlot array of keys.
-typedef struct Page {
+typedef struct BtPage_ {
uint cnt; // count of keys in page
uint act; // count of active keys
uint min; // next key offset
- unsigned char bits; // page size in bits
- unsigned char lvl:7; // level of page
+ unsigned char bits:7; // page size in bits
+ unsigned char free:1; // page is on free list
+ unsigned char lvl:4; // level of page
+ unsigned char kill:1; // page is being killed
unsigned char dirty:1; // page has deleted keys
+ unsigned char posted:1; // page fence is posted
+ unsigned char goright:1; // page is being killed, continue to right
unsigned char right[BtId]; // page number to right
+ unsigned char fence[256]; // page fence key
} *BtPage;
// The memory mapping pool table buffer manager entry
#endif
} BtPool;
+// The loadpage interface object
+
+typedef struct {
+ uid page_no; // current page number
+ BtPage page; // current page pointer
+ BtPool *pool; // current page pool
+ BtLatchSet *latch; // current page latch set
+} BtPageSet;
+
// structure for latch manager on ALLOC_page
typedef struct {
- struct Page alloc[2]; // next & free page_nos in right ptr
+ struct BtPage_ alloc[2]; // next & free page_nos in right ptr
BtSpinLatch lock[1]; // allocation area lite latch
ushort latchdeployed; // highest number of latch entries deployed
ushort nlatchpage; // number of latch pages at BT_latch
uint seg_bits; // seg size in pages in bits
uint mode; // read-write mode
#ifdef unix
- char *pooladvise; // bit maps for pool page advisements
int idx;
#else
HANDLE idx;
BtPage cursor; // cached frame for start/next (never mapped)
BtPage frame; // spare frame for the page split (never mapped)
BtPage zero; // page frame for zeroes at end of file
- BtPage page; // current page
- uid page_no; // current page number
uid cursor_page; // current cursor page number
- BtLatchSet *set; // current page latch set
- BtPool *pool; // current page pool
unsigned char *mem; // frame, cursor, page memory buffer
- int parent; // last loadpage was from a parent level
int found; // last delete or insert was found
int err; // last error
} BtDb;
extern uint bt_nextkey (BtDb *bt, uint slot);
// internal functions
-BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
-uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
-BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
+BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence);
// manager functions
extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
// one with two keys.
// Deleted keys are marked with a dead bit until
-// page cleanup The fence key for a node is always
-// present, even after deletion and cleanup.
+// page cleanup The fence key for a node is
+// present in a special array.
// Groups of pages called segments from the btree are optionally
// cached with a memory mapped pool. A hash table is used to keep
// Page 0 is dedicated to lock for new page extensions,
// and chains empty pages together for reuse.
-// The ParentModification lock on a node is obtained to prevent resplitting
-// or deleting a node before its fence is posted into its upper level.
+// The ParentModification lock on a node is obtained to serialize posting
+// or changing the fence key for a node.
// Empty pages are chained together through the ALLOC page and reused.
-// Access macros to address slot and key values from the page
+// Access macros to address slot and key values from the page.
+// Page slots use 1 based indexing.
#define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
#define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
#endif
}
-void bt_initlockset (BtLatchSet *set, int reuse)
+void bt_initlockset (BtLatchSet *set)
{
#ifdef unix
pthread_rwlockattr_t rwattr[1];
- if( reuse ) {
- pthread_rwlock_destroy (set->readwr->lock);
- pthread_rwlock_destroy (set->access->lock);
- pthread_rwlock_destroy (set->parent->lock);
- }
-
pthread_rwlockattr_init (rwattr);
- pthread_rwlockattr_setkind_np (rwattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlockattr_setpshared (rwattr, PTHREAD_PROCESS_SHARED);
pthread_rwlock_init (set->readwr->lock, rwattr);
#else
_InterlockedIncrement16 (&set->pin);
#endif
- bt_initlockset (set, 0);
+ bt_initlockset (set);
bt_latchlink (bt, hashidx, victim, page_no);
bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
return set;
#else
_InterlockedIncrement16 (&set->pin);
#endif
- bt_initlockset (set, 1);
bt_latchlink (bt, hashidx, victim, page_no);
bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
bt_spinreleasewrite (set->busy);
free (mgr->pool);
free (mgr->hash);
free (mgr->latch);
- free (mgr->pooladvise);
free (mgr);
#else
FlushFileBuffers(mgr->idx);
off64_t size;
uint amt[1];
BtMgr* mgr;
-BtKey key;
int flag;
#ifndef unix
mgr->pool = calloc (poolmax, sizeof(BtPool));
mgr->hash = calloc (hashsize, sizeof(ushort));
mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
- mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
#else
mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
latchmgr->alloc->bits = mgr->page_bits;
for( lvl=MIN_lvl; lvl--; ) {
- slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3;
+ slotptr(latchmgr->alloc, 1)->off = offsetof(struct BtPage_, fence);
bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0); // next(lower) page number
- key = keyptr(latchmgr->alloc, 1);
- key->len = 2; // create stopper key
- key->key[0] = 0xff;
- key->key[1] = 0xff;
- latchmgr->alloc->min = mgr->page_size - 3;
+ latchmgr->alloc->fence[0] = 2;
+ latchmgr->alloc->fence[1] = 0xff;
+ latchmgr->alloc->fence[2] = 0xff;
+ latchmgr->alloc->min = mgr->page_size;
latchmgr->alloc->lvl = lvl;
latchmgr->alloc->cnt = 1;
latchmgr->alloc->act = 1;
#ifdef unix
flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
- pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
+ pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED | MAP_POPULATE, bt->mgr->idx, off);
if( pool->map == MAP_FAILED )
return bt->err = BTERR_map;
-
- // clear out madvise issued bits
- memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
#else
flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
BtPage page;
page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
-#ifdef unix
- {
- uint idx = subpage / 8;
- uint bit = subpage % 8;
-
- if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
- madvise (page, bt->mgr->page_size, MADV_WILLNEED);
- (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
- }
- }
-#endif
return page;
}
uid bt_newpage(BtDb *bt, BtPage page)
{
-BtLatchSet *set;
-BtPool *pool;
+BtPageSet set[1];
uid new_page;
-BtPage pmap;
int reuse;
// lock allocation page
// else allocate empty page
if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
- if( pool = bt_pinpool (bt, new_page) )
- pmap = bt_page (bt, pool, new_page);
+ if( set->pool = bt_pinpool (bt, new_page) )
+ set->page = bt_page (bt, set->pool, new_page);
else
return 0;
- bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(pmap->right));
- bt_unpinpool (pool);
+
+ bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(set->page->right));
+ bt_unpinpool (set->pool);
reuse = 1;
} else {
new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
{
// use zero buffer to write zeros
- memset(bt->zero, 0, bt->mgr->page_size);
if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
return bt->err = BTERR_wrt, 0;
}
// bring new page into pool and copy page.
// this will extend the file into the new pages.
- if( pool = bt_pinpool (bt, new_page) )
- pmap = bt_page (bt, pool, new_page);
+ if( set->pool = bt_pinpool (bt, new_page) )
+ set->page = bt_page (bt, set->pool, new_page);
else
return 0;
- memcpy(pmap, page, bt->mgr->page_size);
- bt_unpinpool (pool);
+ memcpy(set->page, page, bt->mgr->page_size);
+ bt_unpinpool (set->pool);
#endif
// unlock allocation latch and return new page no
// find slot in page for given key at a given level
-int bt_findslot (BtDb *bt, unsigned char *key, uint len)
+int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
{
-uint diff, higher = bt->page->cnt, low = 1, slot;
-uint good = 0;
+uint diff, higher = set->page->cnt, low = 1, slot;
- // if no right link
// make stopper key an infinite fence value
- // by setting the good flag
- if( bt_getid (bt->page->right) )
+ if( bt_getid (set->page->right) )
higher++;
- else
- good++;
- // low is the next candidate.
+ // low is the lowest candidate.
// loop ends when they meet
- // if good, higher is already
+ // higher is already
// tested as .ge. the given key.
while( diff = higher - low ) {
slot = low + ( diff >> 1 );
- if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
+ if( keycmp (keyptr(set->page, slot), key, len) < 0 )
low = slot + 1;
else
- higher = slot, good++;
+ higher = slot;
}
+ if( higher <= set->page->cnt )
+ return higher;
+
+ // if leaf page, compare against fence value
+
// return zero if key is on right link page
+ // or return slot beyond last key
- return good ? higher : 0;
+ if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 )
+ return 0;
+
+ return higher;
}
// find and load page at given level for given key
// leave page rd or wr locked as requested
-int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
+int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock)
{
uid page_no = ROOT_page, prevpage = 0;
-BtLatchSet *set, *prevset;
uint drill = 0xff, slot;
+BtLatchSet *prevlatch;
uint mode, prevmode;
BtPool *prevpool;
-int parent = 1;
// start at root of btree and drill down
- bt->set = NULL;
-
do {
// determine lock mode of drill level
mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead;
- bt->set = bt_pinlatch (bt, page_no);
- bt->page_no = page_no;
+ set->latch = bt_pinlatch (bt, page_no);
+ set->page_no = page_no;
// pin page contents
- if( bt->pool = bt_pinpool (bt, page_no) )
- bt->page = bt_page (bt, bt->pool, page_no);
+ if( set->pool = bt_pinpool (bt, page_no) )
+ set->page = bt_page (bt, set->pool, page_no);
else
return 0;
// obtain access lock using lock chaining with Access mode
if( page_no > ROOT_page )
- bt_lockpage(BtLockAccess, bt->set);
+ bt_lockpage(BtLockAccess, set->latch);
// release & unpin parent page
if( prevpage ) {
- bt_unlockpage(prevmode, prevset);
- bt_unpinlatch (prevset);
+ bt_unlockpage(prevmode, prevlatch);
+ bt_unpinlatch (prevlatch);
bt_unpinpool (prevpool);
prevpage = 0;
}
// obtain read lock using lock chaining
- bt_lockpage(mode, bt->set);
+ bt_lockpage(mode, set->latch);
if( page_no > ROOT_page )
- bt_unlockpage(BtLockAccess, bt->set);
+ bt_unlockpage(BtLockAccess, set->latch);
// re-read and re-lock root after determining actual level of root
- if( bt->page->lvl != drill) {
- if ( bt->page_no != ROOT_page )
+ if( set->page->lvl != drill) {
+ if ( set->page_no != ROOT_page )
return bt->err = BTERR_struct, 0;
- drill = bt->page->lvl;
+ drill = set->page->lvl;
if( lock == BtLockWrite && drill == lvl ) {
- bt_unlockpage(mode, bt->set);
- bt_unpinlatch (bt->set);
- bt_unpinpool (bt->pool);
+ bt_unlockpage(mode, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
continue;
}
}
+ prevpage = set->page_no;
+ prevlatch = set->latch;
+ prevpool = set->pool;
+ prevmode = mode;
+
+ // if page is being deleted and we should continue right
+
+ if( set->page->kill && set->page->goright ) {
+ page_no = bt_getid (set->page->right);
+ continue;
+ }
+
+ // otherwise, wait for deleted node to clear
+
+ if( set->page->kill ) {
+ bt_unlockpage(mode, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
+ page_no = ROOT_page;
+ prevpage = 0;
+ drill = 0xff;
+#ifdef unix
+ sched_yield();
+#else
+ SwitchToThread();
+#endif
+ continue;
+ }
+
// find key on page at this level
// and descend to requested level
- if( slot = bt_findslot (bt, key, len) ) {
+ if( slot = bt_findslot (set, key, len) ) {
if( drill == lvl )
- return bt->parent = parent, slot;
+ return slot;
- while( slotptr(bt->page, slot)->dead )
- if( slot++ < bt->page->cnt )
+ if( slot > set->page->cnt )
+ return bt->err = BTERR_struct;
+
+ while( slotptr(set->page, slot)->dead )
+ if( slot++ < set->page->cnt )
continue;
- else {
- page_no = bt_getid(bt->page->right);
- parent = 0;
- goto slideright;
- }
+ else
+ return bt->err = BTERR_struct, 0;
- page_no = bt_getid(slotptr(bt->page, slot)->id);
- parent = 1;
+ page_no = bt_getid(slotptr(set->page, slot)->id);
drill--;
+ continue;
}
// or slide right into next page
- else {
- page_no = bt_getid(bt->page->right);
- parent = 0;
- }
-
- // continue down / right using overlapping locks
- // to protect pages being split.
-
-slideright:
- prevpage = bt->page_no;
- prevpool = bt->pool;
- prevset = bt->set;
- prevmode = mode;
+ page_no = bt_getid(set->page->right);
} while( page_no );
return 0; // return error
}
-// remove empty page from the B-tree
-// by pulling our right node left over ourselves
+// drill down fixing fence values for left sibling tree
-// call with bt->page, etc, set to page's locked parent
-// returns with page locked.
+// call with set write locked
+// return with set unlocked & unpinned.
-BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
+BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence)
{
-BtLatchSet *rset, *pset, *rpset;
-BtPool *rpool, *ppool, *rppool;
-BtPage rpage, ppage, rppage;
-uid right, parent, rparent;
-BtKey ptr;
-uint idx;
+unsigned char oldfence[256];
+BtPageSet next[1];
+int chk;
+
+ memcpy (oldfence, set->page->fence, 256);
+ next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
+
+ while( !set->page->kill && set->page->lvl ) {
+ next->latch = bt_pinlatch (bt, next->page_no);
+ bt_lockpage (BtLockParent, next->latch);
+ bt_lockpage (BtLockAccess, next->latch);
+ bt_lockpage (BtLockWrite, next->latch);
+ bt_unlockpage (BtLockAccess, next->latch);
+
+ if( next->pool = bt_pinpool (bt, next->page_no) )
+ next->page = bt_page (bt, next->pool, next->page_no);
+ else
+ return bt->err;
- // cache node's parent page
+ chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence);
- parent = bt->page_no;
- ppage = bt->page;
- ppool = bt->pool;
- pset = bt->set;
+ if( chk < 0 ) {
+ next->page_no = bt_getid (next->page->right);
+ bt_unlockpage (BtLockWrite, next->latch);
+ bt_unlockpage (BtLockParent, next->latch);
+ bt_unpinlatch (next->latch);
+ bt_unpinpool (next->pool);
+ continue;
+ }
- // lock and map our right page
- // it cannot be NULL because of the stopper
- // in the last right page
+ if( chk > 0 )
+ return bt->err = BTERR_struct;
- bt_lockpage (BtLockWrite, set);
+ if( bt_fixfences (bt, next, newfence) )
+ return bt->err;
- // if we aren't dead yet
+ break;
+ }
- if( page->act )
- goto rmergexit;
+ memcpy (set->page->fence, newfence, 256);
- if( right = bt_getid (page->right) )
- if( rpool = bt_pinpool (bt, right) )
- rpage = bt_page (bt, rpool, right);
- else
- return bt->err;
- else
- return bt->err = BTERR_struct;
+ bt_unlockpage (BtLockWrite, set->latch);
+ bt_unlockpage (BtLockParent, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
+ return 0;
+}
- rset = bt_pinlatch (bt, right);
+// return page to free list
+// page must be delete & write locked
- // find our right neighbor
+void bt_freepage (BtDb *bt, BtPageSet *set)
+{
+ // lock allocation page
- if( ppage->act > 1 ) {
- for( idx = slot; idx++ < ppage->cnt; )
- if( !slotptr(ppage, idx)->dead )
- break;
+ bt_spinwritelock (bt->mgr->latchmgr->lock);
- if( idx > ppage->cnt )
- return bt->err = BTERR_struct;
+ // store chain in second right
+ bt_putid(set->page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+ bt_putid(bt->mgr->latchmgr->alloc[1].right, set->page_no);
+ set->page->free = 1;
- // redirect right neighbor in parent to left node
+ // unlock released page
- bt_putid(slotptr(ppage,idx)->id, page_no);
- }
+ bt_unlockpage (BtLockDelete, set->latch);
+ bt_unlockpage (BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
- // if parent has only our deleted page, e.g. no right neighbor
- // prepare to merge parent itself
+ // unlock allocation page
- if( ppage->act == 1 ) {
- if( rparent = bt_getid (ppage->right) )
- if( rppool = bt_pinpool (bt, rparent) )
- rppage = bt_page (bt, rppool, rparent);
- else
- return bt->err;
+ bt_spinreleasewrite (bt->mgr->latchmgr->lock);
+}
+
+// remove the root level by promoting its only child
+// call with parent and child pages
+
+BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child)
+{
+uid next = 0;
+
+ do {
+ if( next ) {
+ child->latch = bt_pinlatch (bt, next);
+ bt_lockpage (BtLockDelete, child->latch);
+ bt_lockpage (BtLockWrite, child->latch);
+
+ if( child->pool = bt_pinpool (bt, next) )
+ child->page = bt_page (bt, child->pool, next);
else
- return bt->err = BTERR_struct;
+ return bt->err;
+
+ child->page_no = next;
+ }
+
+ memcpy (root->page, child->page, bt->mgr->page_size);
+ next = bt_getid (slotptr(child->page, child->page->cnt)->id);
+ bt_freepage (bt, child);
+ } while( root->page->lvl > 1 && root->page->cnt == 1 );
- rpset = bt_pinlatch (bt, rparent);
- bt_lockpage (BtLockWrite, rpset);
+ bt_unlockpage (BtLockWrite, root->latch);
+ bt_unpinlatch (root->latch);
+ bt_unpinpool (root->pool);
+ return 0;
+}
+
+// pull right page over ourselves in simple merge
+
+BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx)
+{
+ // install ourselves as child page
+ // and delete ourselves from parent
+
+ bt_putid (slotptr(parent->page, idx)->id, set->page_no);
+ slotptr(parent->page, slot)->dead = 1;
+ parent->page->act--;
- // find our right neighbor on right parent page
+ // collapse any empty slots
- for( idx = 0; idx++ < rppage->cnt; )
- if( !slotptr(rppage, idx)->dead ) {
- bt_putid (slotptr(rppage, idx)->id, page_no);
+ while( idx = parent->page->cnt - 1 )
+ if( slotptr(parent->page, idx)->dead ) {
+ *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1);
+ memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
+ } else
break;
- }
- if( idx > rppage->cnt )
- return bt->err = BTERR_struct;
- }
+ memcpy (set->page, right->page, bt->mgr->page_size);
+ bt_unlockpage (BtLockParent, right->latch);
- // now that there are no more pointers to our right node
- // we can wait for delete lock on it
+ bt_freepage (bt, right);
- bt_lockpage(BtLockDelete, rset);
- bt_lockpage(BtLockWrite, rset);
+ // do we need to remove a btree level?
+ // (leave the first page of leaves alone)
- // pull contents of right page into our empty page
+ if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
+ if( set->page->lvl )
+ return bt_removeroot (bt, parent, set);
- memcpy (page, rpage, bt->mgr->page_size);
+ bt_unlockpage (BtLockWrite, parent->latch);
+ bt_unlockpage (BtLockDelete, set->latch);
+ bt_unlockpage (BtLockWrite, set->latch);
+ bt_unpinlatch (parent->latch);
+ bt_unpinpool (parent->pool);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
+ return 0;
+}
- // ready to release right parent lock
- // now that we have a new page in place
+// remove both child and parent from the btree
+// from the fence position in the parent
+// call with both pages locked for writing
- if( ppage->act == 1 ) {
- bt_unlockpage (BtLockWrite, rpset);
- bt_unpinlatch (rpset);
- bt_unpinpool (rppool);
- }
+BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet *right, BtPageSet *rparent, uint lvl)
+{
+unsigned char pagefence[256];
+uint idx;
- // add killed right block to free chain
- // lock latch mgr
+ // pull right sibling over ourselves and unlock
- bt_spinwritelock(bt->mgr->latchmgr->lock);
+ memcpy (child->page, right->page, bt->mgr->page_size);
- // store free chain in allocation page second right
+ bt_unlockpage (BtLockWrite, child->latch);
+ bt_unpinlatch (child->latch);
+ bt_unpinpool (child->pool);
- bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
- bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+ // install ourselves into right link of old right page
- // unlock latch mgr and right page
+ bt_putid (right->page->right, child->page_no);
+ right->page->goright = 1; // tell bt_loadpage to go right to us
+ right->page->kill = 1;
- bt_unlockpage(BtLockDelete, rset);
- bt_unlockpage(BtLockWrite, rset);
- bt_unpinlatch (rset);
- bt_unpinpool (rpool);
+ bt_unlockpage (BtLockWrite, right->latch);
- bt_spinreleasewrite(bt->mgr->latchmgr->lock);
+ // remove our slot from our parent
+ // signal to move right
- // delete our obsolete fence key from our parent
+ parent->page->goright = 1; // tell bt_loadpage to go right to rparent
+ parent->page->kill = 1;
+ parent->page->act--;
- slotptr(ppage, slot)->dead = 1;
- ppage->dirty = 1;
+ // redirect right page pointer in right parent to us
- // if our parent now empty
- // remove it from the tree
+ for( idx = 0; idx++ < rparent->page->cnt; )
+ if( !slotptr(rparent->page, idx)->dead )
+ break;
- if( ppage->act-- == 1 )
- if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
- return bt->err;
+ if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no )
+ return bt->err = BTERR_struct;
-rmergexit:
- bt_unlockpage (BtLockWrite, pset);
- bt_unpinlatch (pset);
- bt_unpinpool (ppool);
+ bt_putid (slotptr(rparent->page, idx)->id, child->page_no);
+ bt_unlockpage (BtLockWrite, rparent->latch);
+ bt_unpinlatch (rparent->latch);
+ bt_unpinpool (rparent->pool);
- bt->found = 1;
- return bt->err = 0;
-}
+ // free the right page
+
+ bt_lockpage (BtLockDelete, right->latch);
+ bt_lockpage (BtLockWrite, right->latch);
+ bt_freepage (bt, right);
+
+ // save parent page fence value
+
+ memcpy (pagefence, parent->page->fence, 256);
+ bt_unlockpage (BtLockWrite, parent->latch);
-// remove empty page from the B-tree
-// try merging left first. If no left
-// sibling, then merge right.
+ return bt_removepage (bt, parent, lvl, pagefence);
+}
-// call with page loaded and locked,
-// return with page locked.
+// remove page from btree
+// call with page unlocked
+// returns with page on free list
-BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
+BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence)
{
-unsigned char fencekey[256], postkey[256];
-uint slot, idx, postfence = 0;
-BtLatchSet *lset, *pset;
-BtPool *lpool, *ppool;
-BtPage lpage, ppage;
-uid left, parent;
+BtPageSet parent[1], sibling[1], rparent[1];
+unsigned char newfence[256];
+uint slot, idx;
BtKey ptr;
- ptr = keyptr(page, page->cnt);
- memcpy(fencekey, ptr, ptr->len + 1);
- bt_unlockpage (BtLockWrite, set);
-
// load and lock our parent
-retry:
- if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
+ while( 1 ) {
+ if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
return bt->err;
- parent = bt->page_no;
- ppage = bt->page;
- ppool = bt->pool;
- pset = bt->set;
+ // do we show up in our parent yet?
- // wait until we are posted in our parent
-
- if( !bt->parent ) {
- bt_unlockpage (BtLockWrite, pset);
- bt_unpinlatch (pset);
- bt_unpinpool (ppool);
-#ifdef unix
+ if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) {
+ bt_unlockpage (BtLockWrite, parent->latch);
+ bt_unpinlatch (parent->latch);
+ bt_unpinpool (parent->pool);
+#ifdef linux
sched_yield();
#else
SwitchToThread();
#endif
- goto retry;
+ continue;
}
- // find our left neighbor in our parent page
+ // can we do a simple merge entirely
+ // between siblings on the parent page?
- for( idx = slot; --idx; )
- if( !slotptr(ppage, idx)->dead )
- break;
+ if( slot < parent->page->cnt ) {
+ // find our right neighbor
+ // right must exist because the stopper prevents
+ // the rightmost page from deleting
- // if no left neighbor, do right merge
+ for( idx = slot; idx++ < parent->page->cnt; )
+ if( !slotptr(parent->page, idx)->dead )
+ break;
- if( !idx )
- return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
+ sibling->page_no = bt_getid (slotptr (parent->page, idx)->id);
- // lock and map our left neighbor's page
+ bt_lockpage (BtLockDelete, set->latch);
+ bt_lockpage (BtLockWrite, set->latch);
- left = bt_getid (slotptr(ppage, idx)->id);
+ // merge right if sibling shows up in
+ // our parent and is not being killed
- if( lpool = bt_pinpool (bt, left) )
- lpage = bt_page (bt, lpool, left);
- else
- return bt->err;
+ if( sibling->page_no == bt_getid (set->page->right) ) {
+ sibling->latch = bt_pinlatch (bt, sibling->page_no);
+ bt_lockpage (BtLockParent, sibling->latch);
+ bt_lockpage (BtLockDelete, sibling->latch);
+ bt_lockpage (BtLockWrite, sibling->latch);
- lset = bt_pinlatch (bt, left);
- bt_lockpage(BtLockWrite, lset);
+ if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+ sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+ else
+ return bt->err;
- // wait until sibling is in our parent
+ if( !sibling->page->kill )
+ return bt_mergeright(bt, set, parent, sibling, slot, idx);
- if( bt_getid (lpage->right) != page_no ) {
- bt_unlockpage (BtLockWrite, pset);
- bt_unpinlatch (pset);
- bt_unpinpool (ppool);
- bt_unlockpage (BtLockWrite, lset);
- bt_unpinlatch (lset);
- bt_unpinpool (lpool);
+ // try again later
+
+ bt_unlockpage (BtLockWrite, sibling->latch);
+ bt_unlockpage (BtLockParent, sibling->latch);
+ bt_unlockpage (BtLockDelete, sibling->latch);
+ bt_unpinlatch (sibling->latch);
+ bt_unpinpool (sibling->pool);
+ }
+
+ bt_unlockpage (BtLockDelete, set->latch);
+ bt_unlockpage (BtLockWrite, set->latch);
+ bt_unlockpage (BtLockWrite, parent->latch);
+ bt_unpinlatch (parent->latch);
+ bt_unpinpool (parent->pool);
#ifdef linux
sched_yield();
#else
SwitchToThread();
#endif
- goto retry;
+ continue;
}
- // since our page will have no more pointers to it,
- // obtain Delete lock and wait for write locks to clear
+ // find our left neighbor in our parent page
- bt_lockpage(BtLockDelete, set);
- bt_lockpage(BtLockWrite, set);
+ for( idx = slot; --idx; )
+ if( !slotptr(parent->page, idx)->dead )
+ break;
- // if we aren't dead yet,
- // get ready for exit
+ // if no left neighbor, delete ourselves and our parent
- if( page->act ) {
- bt_unlockpage(BtLockDelete, set);
- bt_unlockpage(BtLockWrite, lset);
- bt_unpinlatch (lset);
- bt_unpinpool (lpool);
- goto lmergexit;
- }
+ if( !idx ) {
+ bt_lockpage (BtLockAccess, set->latch);
+ bt_lockpage (BtLockWrite, set->latch);
+ bt_unlockpage (BtLockAccess, set->latch);
- // are we are the fence key for our parent?
- // if so, grab our old fence key
+ rparent->page_no = bt_getid (parent->page->right);
+ rparent->latch = bt_pinlatch (bt, rparent->page_no);
- if( postfence = slot == ppage->cnt ) {
- ptr = keyptr (ppage, ppage->cnt);
- memcpy(fencekey, ptr, ptr->len + 1);
- memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
+ bt_lockpage (BtLockAccess, rparent->latch);
+ bt_lockpage (BtLockWrite, rparent->latch);
+ bt_unlockpage (BtLockAccess, rparent->latch);
- // clear out other dead slots
+ if( rparent->pool = bt_pinpool (bt, rparent->page_no) )
+ rparent->page = bt_page (bt, rparent->pool, rparent->page_no);
+ else
+ return bt->err;
- while( --ppage->cnt )
- if( slotptr(ppage, ppage->cnt)->dead )
- memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
- else
- break;
+ if( !rparent->page->kill ) {
+ sibling->page_no = bt_getid (set->page->right);
+ sibling->latch = bt_pinlatch (bt, sibling->page_no);
- ptr = keyptr (ppage, ppage->cnt);
- memcpy(postkey, ptr, ptr->len + 1);
- } else
- slotptr(ppage,slot)->dead = 1;
+ bt_lockpage (BtLockAccess, sibling->latch);
+ bt_lockpage (BtLockWrite, sibling->latch);
+ bt_unlockpage (BtLockAccess, sibling->latch);
- ppage->dirty = 1;
- ppage->act--;
+ if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+ sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+ else
+ return bt->err;
- // push our right neighbor pointer to our left
+ if( !sibling->page->kill )
+ return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1);
- memcpy (lpage->right, page->right, BtId);
+ // try again later
- // add ourselves to free chain
- // lock latch mgr
+ bt_unlockpage (BtLockWrite, sibling->latch);
+ bt_unpinlatch (sibling->latch);
+ bt_unpinpool (sibling->pool);
+ }
- bt_spinwritelock(bt->mgr->latchmgr->lock);
+ bt_unlockpage (BtLockWrite, set->latch);
+ bt_unlockpage (BtLockWrite, rparent->latch);
+ bt_unpinlatch (rparent->latch);
+ bt_unpinpool (rparent->pool);
- // store free chain in allocation page second right
- bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
- bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
+ bt_unlockpage (BtLockWrite, parent->latch);
+ bt_unpinlatch (parent->latch);
+ bt_unpinpool (parent->pool);
+#ifdef linux
+ sched_yield();
+#else
+ SwitchToThread();
+#endif
+ continue;
+ }
- // unlock latch mgr and pages
+ // redirect parent to our left sibling
+ // lock and map our left sibling's page
- bt_spinreleasewrite(bt->mgr->latchmgr->lock);
- bt_unlockpage(BtLockWrite, lset);
- bt_unpinlatch (lset);
- bt_unpinpool (lpool);
+ sibling->page_no = bt_getid (slotptr(parent->page, idx)->id);
+ sibling->latch = bt_pinlatch (bt, sibling->page_no);
- // release our node's delete lock
+ // wait our turn on fence key maintenance
- bt_unlockpage(BtLockDelete, set);
+ bt_lockpage(BtLockParent, sibling->latch);
+ bt_lockpage(BtLockAccess, sibling->latch);
+ bt_lockpage(BtLockWrite, sibling->latch);
+ bt_unlockpage(BtLockAccess, sibling->latch);
-lmergexit:
- bt_unlockpage (BtLockWrite, pset);
- bt_unpinpool (ppool);
+ if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+ sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+ else
+ return bt->err;
- // do we need to post parent's fence key in its parent?
+ // wait until left sibling is in our parent
- if( !postfence || parent == ROOT_page ) {
- bt_unpinlatch (pset);
- bt->found = 1;
- return bt->err = 0;
+ if( bt_getid (sibling->page->right) != set->page_no ) {
+ bt_unlockpage (BtLockWrite, parent->latch);
+ bt_unlockpage (BtLockWrite, sibling->latch);
+ bt_unlockpage (BtLockParent, sibling->latch);
+ bt_unpinlatch (parent->latch);
+ bt_unpinpool (parent->pool);
+ bt_unpinlatch (sibling->latch);
+ bt_unpinpool (sibling->pool);
+#ifdef linux
+ sched_yield();
+#else
+ SwitchToThread();
+#endif
+ continue;
}
- // interlock parent fence post
+ // delete our left sibling from parent
- bt_lockpage (BtLockParent, pset);
+ slotptr(parent->page,idx)->dead = 1;
+ parent->page->dirty = 1;
+ parent->page->act--;
- // load parent's parent page
-posttry:
- if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
- return bt->err;
+ // redirect our parent slot to our left sibling
+
+ bt_putid (slotptr(parent->page, slot)->id, sibling->page_no);
+ memcpy (sibling->page->right, set->page->right, BtId);
+
+ // collapse dead slots from parent
+
+ while( idx = parent->page->cnt - 1 )
+ if( slotptr(parent->page, idx)->dead ) {
+ *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt);
+ memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
+ } else
+ break;
+
+ // free our original page
+
+ bt_lockpage (BtLockDelete, set->latch);
+ bt_lockpage (BtLockWrite, set->latch);
+ bt_freepage (bt, set);
+
+ // go down the left node's fence keys to the leaf level
+ // and update the fence keys in each page
+
+ memcpy (newfence, parent->page->fence, 256);
- if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
- if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+ if( bt_fixfences (bt, sibling, newfence) )
return bt->err;
- else
- goto posttry;
- page = bt->page;
+ // promote sibling as new root?
- page->min -= *postkey + 1;
- ((unsigned char *)page)[page->min] = *postkey;
- memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
- slotptr(page, slot)->off = page->min;
+ if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
+ if( sibling->page->lvl ) {
+ sibling->latch = bt_pinlatch (bt, sibling->page_no);
+ bt_lockpage (BtLockDelete, sibling->latch);
+ bt_lockpage (BtLockWrite, sibling->latch);
- bt_unlockpage (BtLockParent, pset);
- bt_unpinlatch (pset);
+ if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+ sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+ else
+ return bt->err;
- bt_unlockpage (BtLockWrite, bt->set);
- bt_unpinlatch (bt->set);
- bt_unpinpool (bt->pool);
+ return bt_removeroot (bt, parent, sibling);
+ }
- bt->found = 1;
- return bt->err = 0;
+ bt_unlockpage (BtLockWrite, parent->latch);
+ bt_unpinlatch (parent->latch);
+ bt_unpinpool (parent->pool);
+
+ return 0;
+ }
}
// find and delete key on page by marking delete flag bit
BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
{
-BtLatchSet *set;
-BtPool *pool;
-BtPage page;
-uid page_no;
+unsigned char pagefence[256];
+uint slot, idx, found;
+BtPageSet set[1];
BtKey ptr;
-uint slot;
- if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
+ if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) )
+ ptr = keyptr(set->page, slot);
+ else
return bt->err;
- page_no = bt->page_no;
- page = bt->page;
- pool = bt->pool;
- set = bt->set;
-
// if key is found delete it, otherwise ignore request
- ptr = keyptr(page, slot);
+ if( found = slot <= set->page->cnt )
+ if( found = !keycmp (ptr, key, len) )
+ if( found = slotptr(set->page, slot)->dead == 0 ) {
+ slotptr(set->page,slot)->dead = 1;
+ set->page->dirty = 1;
+ set->page->act--;
+
+ // collapse empty slots
- if( bt->found = !keycmp (ptr, key, len) )
- if( bt->found = slotptr(page, slot)->dead == 0 ) {
- slotptr(page,slot)->dead = 1;
- if( slot < page->cnt )
- page->dirty = 1;
- if( !--page->act )
- if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
- return bt->err;
+ while( idx = set->page->cnt - 1 )
+ if( slotptr(set->page, idx)->dead ) {
+ *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
+ memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
+ } else
+ break;
}
- bt_unlockpage(BtLockWrite, set);
- bt_unpinlatch (set);
- bt_unpinpool (pool);
- return bt->err = 0;
+ if( set->page->act ) {
+ bt_unlockpage(BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
+ return bt->found = found, 0;
+ }
+
+ memcpy (pagefence, set->page->fence, 256);
+ set->page->kill = 1;
+
+ bt_unlockpage (BtLockWrite, set->latch);
+
+ if( bt_removepage (bt, set, 0, pagefence) )
+ return bt->err;
+
+ bt->found = found;
+ return 0;
}
// find key in leaf level and return row-id
uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
{
+BtPageSet set[1];
uint slot;
+uid id = 0;
BtKey ptr;
-uid id;
- if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
- ptr = keyptr(bt->page, slot);
+ if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
+ ptr = keyptr(set->page, slot);
else
return 0;
// if key exists, return row-id
// otherwise return 0
- if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
- id = bt_getid(slotptr(bt->page,slot)->id);
- else
- id = 0;
+ if( slot <= set->page->cnt )
+ if( !keycmp (ptr, key, len) )
+ id = bt_getid(slotptr(set->page,slot)->id);
- bt_unlockpage (BtLockRead, bt->set);
- bt_unpinlatch (bt->set);
- bt_unpinpool (bt->pool);
+ bt_unlockpage (BtLockRead, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
return id;
}
uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
{
-uint nxt = bt->mgr->page_size;
+uint nxt = bt->mgr->page_size, off;
uint cnt = 0, idx = 0;
uint max = page->cnt;
uint newslot = max;
page->act = 0;
// try cleaning up page first
-
- // always leave fence key in the array
- // otherwise, remove deleted key
+ // by removing deleted keys
while( cnt++ < max ) {
if( cnt == slot )
newslot = idx + 1;
- if( cnt < max && slotptr(bt->frame,cnt)->dead )
+ if( slotptr(bt->frame,cnt)->dead )
continue;
- // copy key
+ // if its not the fence key,
+ // copy the key across
- key = keyptr(bt->frame, cnt);
- nxt -= key->len + 1;
- memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+ off = slotptr(bt->frame,cnt)->off;
+
+ if( off >= sizeof(*page) ) {
+ key = keyptr(bt->frame, cnt);
+ off = nxt -= key->len + 1;
+ memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+ }
// copy slot
+
memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
- if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
- page->act++;
slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
- slotptr(page, idx)->off = nxt;
+ slotptr(page, idx)->off = off;
+ page->act++;
}
page->min = nxt;
return 0;
}
-// add key to current page
-// page must already be writelocked
+// split the root and raise the height of the btree
-void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
-{
-uint idx;
-
- // find next available dead slot and copy key onto page
-
- for( idx = slot; idx < page->cnt; idx++ )
- if( slotptr(page, idx)->dead )
- break;
-
- if( idx == page->cnt )
- idx++, page->cnt++;
-
- page->act++;
-
- // now insert key into array before slot
-
- while( idx > slot )
- *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
-
- page->min -= len + 1;
- ((unsigned char *)page)[page->min] = len;
- memcpy ((unsigned char *)page + page->min +1, key, len );
-
- bt_putid(slotptr(page,slot)->id, id);
- slotptr(page, slot)->off = page->min;
- slotptr(page, slot)->tod = tod;
- slotptr(page, slot)->dead = 0;
-}
-
-BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
+BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2)
{
uint nxt = bt->mgr->page_size;
-BtPage root = bt->page;
+unsigned char leftkey[256];
uid new_page;
// Obtain an empty page to use, and copy the current
- // root contents into it
+ // root contents into it, e.g. lower keys
- if( !(new_page = bt_newpage(bt, root)) )
+ memcpy (leftkey, root->page->fence, 256);
+ root->page->posted = 1;
+
+ if( !(new_page = bt_newpage(bt, root->page)) )
return bt->err;
// preserve the page info at the bottom
- // and set rest to zero
+ // of higher keys and set rest to zero
- memset(root+1, 0, bt->mgr->page_size - sizeof(*root));
+ memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
+ memset(root->page->fence, 0, 256);
+ root->page->fence[0] = 2;
+ root->page->fence[1] = 0xff;
+ root->page->fence[2] = 0xff;
- // insert first key on newroot page
+ // insert lower keys page fence key on newroot page
nxt -= *leftkey + 1;
- memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
- bt_putid(slotptr(root, 1)->id, new_page);
- slotptr(root, 1)->off = nxt;
+ memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
+ bt_putid(slotptr(root->page, 1)->id, new_page);
+ slotptr(root->page, 1)->off = nxt;
- // insert second key (stopper key) on newroot page
+ // insert stopper key on newroot page
// and increase the root height
- nxt -= 3;
- *((unsigned char *)root + nxt) = 2;
- memset ((unsigned char *)root + nxt + 1, 0xff, 2);
- bt_putid(slotptr(root, 2)->id, page_no2);
- slotptr(root, 2)->off = nxt;
+ bt_putid(slotptr(root->page, 2)->id, page_no2);
+ slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence);
- bt_putid(root->right, 0);
- root->min = nxt; // reset lowest used offset and key count
- root->cnt = 2;
- root->act = 2;
- root->lvl++;
+ bt_putid(root->page->right, 0);
+ root->page->min = nxt; // reset lowest used offset and key count
+ root->page->cnt = 2;
+ root->page->act = 2;
+ root->page->lvl++;
- // release and unpin root (bt->page)
+ // release and unpin root
- bt_unlockpage(BtLockWrite, bt->set);
- bt_unpinlatch (bt->set);
- bt_unpinpool (bt->pool);
+ bt_unlockpage(BtLockWrite, root->latch);
+ bt_unpinlatch (root->latch);
+ bt_unpinpool (root->pool);
return 0;
}
// split already locked full node
-// return unlocked and unpinned.
+// return unlocked.
-BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
+BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
{
-uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
-unsigned char rightkey[256], leftkey[256];
-uint tod = time(NULL);
-uint lvl = page->lvl;
-uid new_page;
+uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size, off;
+unsigned char fencekey[256];
+uint lvl = set->page->lvl;
+uid right;
BtKey key;
- // initialize frame buffer for right node
+ // split higher half of keys to bt->frame
memset (bt->frame, 0, bt->mgr->page_size);
- max = page->cnt;
+ max = set->page->cnt;
cnt = max / 2;
idx = 0;
- // split higher half of keys to bt->frame
-
while( cnt++ < max ) {
- key = keyptr(page, cnt);
- nxt -= key->len + 1;
- memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
- memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
- if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
- bt->frame->act++;
- slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
- slotptr(bt->frame, idx)->off = nxt;
+ if( !lvl || cnt < max ) {
+ key = keyptr(set->page, cnt);
+ off = nxt -= key->len + 1;
+ memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
+ } else
+ off = offsetof(struct BtPage_, fence);
+
+ memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
+ slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
+ slotptr(bt->frame, idx)->off = off;
+ bt->frame->act++;
}
- // transfer right link node to new right node
-
- if( page_no > ROOT_page )
- memcpy (bt->frame->right, page->right, BtId);
+ if( set->page_no == ROOT_page )
+ bt->frame->posted = 1;
+ memcpy (bt->frame->fence, set->page->fence, 256);
bt->frame->bits = bt->mgr->page_bits;
bt->frame->min = nxt;
bt->frame->cnt = idx;
bt->frame->lvl = lvl;
- // get new free page and write right frame to it.
+ // link right node
- if( !(new_page = bt_newpage(bt, bt->frame)) )
- return bt->err;
+ if( set->page_no > ROOT_page )
+ memcpy (bt->frame->right, set->page->right, BtId);
- // remember fence key for new right page to add
- // as right sibling to the left node
+ // get new free page and write higher keys to it.
- key = keyptr(bt->frame, idx);
- memcpy (rightkey, key, key->len + 1);
+ if( !(right = bt_newpage(bt, bt->frame)) )
+ return bt->err;
// update lower keys to continue in old page
- memcpy (bt->frame, page, bt->mgr->page_size);
- memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
+ memcpy (bt->frame, set->page, bt->mgr->page_size);
+ memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
nxt = bt->mgr->page_size;
- page->dirty = 0;
- page->act = 0;
+ set->page->posted = 0;
+ set->page->dirty = 0;
+ set->page->act = 0;
cnt = 0;
idx = 0;
// assemble page of smaller keys
- // to remain in the old page
while( cnt++ < max / 2 ) {
key = keyptr(bt->frame, cnt);
- nxt -= key->len + 1;
- memcpy ((unsigned char *)page + nxt, key, key->len + 1);
- memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
- if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
- page->act++;
- slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
- slotptr(page, idx)->off = nxt;
- }
- // finalize left page and save fence key
+ if( !lvl || cnt < max / 2 ) {
+ off = nxt -= key->len + 1;
+ memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
+ } else
+ off = offsetof(struct BtPage_, fence);
- memcpy(leftkey, key, key->len + 1);
- page->min = nxt;
- page->cnt = idx;
+ memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
+ slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
+ slotptr(set->page, idx)->off = off;
+ set->page->act++;
+ }
+
+ // install fence key for smaller key page
- // link new right page
+ memset(set->page->fence, 0, 256);
+ memcpy(set->page->fence, key, key->len + 1);
- bt_putid (page->right, new_page);
+ bt_putid(set->page->right, right);
+ set->page->min = nxt;
+ set->page->cnt = idx;
// if current page is the root page, split it
- if( page_no == ROOT_page )
- return bt_splitroot (bt, leftkey, new_page);
+ if( set->page_no == ROOT_page )
+ return bt_splitroot (bt, set, right);
- // obtain ParentModification lock for current page
+ bt_unlockpage (BtLockWrite, set->latch);
- bt_lockpage (BtLockParent, set);
+ // insert new fences in their parent pages
- // release wr lock on our page.
- // this will keep out another SMO
+ while( 1 ) {
+ bt_lockpage (BtLockParent, set->latch);
+ bt_lockpage (BtLockWrite, set->latch);
- bt_unlockpage (BtLockWrite, set);
+ memcpy (fencekey, set->page->fence, 256);
+ right = bt_getid (set->page->right);
- // insert key for old page (lower keys)
+ if( set->page->posted ) {
+ bt_unlockpage (BtLockParent, set->latch);
+ bt_unlockpage (BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
+ return 0;
+ }
- if( bt_insertkey (bt, leftkey + 1, *leftkey, page_no, tod, lvl + 1) )
- return bt->err;
+ set->page->posted = 1;
+ bt_unlockpage (BtLockWrite, set->latch);
- // switch old parent key from us to our right page
+ if( bt_insertkey (bt, fencekey+1, *fencekey, set->page_no, time(NULL), lvl+1) )
+ return bt->err;
- if( bt_insertkey (bt, rightkey + 1, *rightkey, new_page, tod, lvl + 1) )
- return bt->err;
+ bt_unlockpage (BtLockParent, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
- // unlock and unpin
+ if( !(set->page_no = right) )
+ break;
+
+ set->latch = bt_pinlatch (bt, right);
+
+ if( set->pool = bt_pinpool (bt, right) )
+ set->page = bt_page (bt, set->pool, right);
+ else
+ return bt->err;
+ }
- bt_unlockpage (BtLockParent, set);
- bt_unpinlatch (set);
- bt_unpinpool (pool);
return 0;
}
BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
{
+BtPageSet set[1];
uint slot, idx;
-BtPage page;
BtKey ptr;
while( 1 ) {
- if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
- ptr = keyptr(bt->page, slot);
+ if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
+ ptr = keyptr(set->page, slot);
else
{
if ( !bt->err )
// if key already exists, update id and return
- page = bt->page;
-
- if( !keycmp (ptr, key, len) ) {
- if( slotptr(page, slot)->dead )
- page->act++;
- slotptr(page, slot)->dead = 0;
- slotptr(page, slot)->tod = tod;
- bt_putid(slotptr(page,slot)->id, id);
- bt_unlockpage(BtLockWrite, bt->set);
- bt_unpinlatch (bt->set);
- bt_unpinpool (bt->pool);
- return bt->err;
+ if( slot <= set->page->cnt )
+ if( !keycmp (ptr, key, len) ) {
+ if( slotptr(set->page, slot)->dead )
+ set->page->act++;
+ slotptr(set->page, slot)->dead = 0;
+ slotptr(set->page, slot)->tod = tod;
+ bt_putid(slotptr(set->page,slot)->id, id);
+ bt_unlockpage(BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
+ return 0;
}
// check if page has enough space
- if( slot = bt_cleanpage (bt, bt->page, len, slot) )
+ if( slot = bt_cleanpage (bt, set->page, len, slot) )
break;
- if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+ if( bt_splitpage (bt, set) )
return bt->err;
}
- bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
+ // calculate next available slot and copy key into page
+
+ set->page->min -= len + 1; // reset lowest used offset
+ ((unsigned char *)set->page)[set->page->min] = len;
+ memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
+
+ for( idx = slot; idx <= set->page->cnt; idx++ )
+ if( slotptr(set->page, idx)->dead )
+ break;
+
+ // now insert key into array before slot
+
+ if( idx > set->page->cnt )
+ set->page->cnt++;
+
+ set->page->act++;
+
+ while( idx > slot )
+ *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
+
+ bt_putid(slotptr(set->page,slot)->id, id);
+ slotptr(set->page, slot)->off = set->page->min;
+ slotptr(set->page, slot)->tod = tod;
+ slotptr(set->page, slot)->dead = 0;
- bt_unlockpage (BtLockWrite, bt->set);
- bt_unpinlatch (bt->set);
- bt_unpinpool (bt->pool);
+ bt_unlockpage (BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
return 0;
}
uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
{
+BtPageSet set[1];
uint slot;
// cache page for retrieval
- if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
- memcpy (bt->cursor, bt->page, bt->mgr->page_size);
- bt->cursor_page = bt->page_no;
+ if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
+ memcpy (bt->cursor, set->page, bt->mgr->page_size);
+ else
+ return 0;
- bt_unlockpage(BtLockRead, bt->set);
- bt_unpinlatch (bt->set);
- bt_unpinpool (bt->pool);
+ bt->cursor_page = set->page_no;
+
+ bt_unlockpage(BtLockRead, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
return slot;
}
uint bt_nextkey (BtDb *bt, uint slot)
{
-BtLatchSet *set;
-BtPool *pool;
-BtPage page;
+BtPageSet set[1];
uid right;
do {
while( slot++ < bt->cursor->cnt )
if( slotptr(bt->cursor,slot)->dead )
continue;
- else if( right || (slot < bt->cursor->cnt) )
+ else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
return slot;
else
break;
break;
bt->cursor_page = right;
- if( pool = bt_pinpool (bt, right) )
- page = bt_page (bt, pool, right);
+
+ if( set->pool = bt_pinpool (bt, right) )
+ set->page = bt_page (bt, set->pool, right);
else
return 0;
- set = bt_pinlatch (bt, right);
- bt_lockpage(BtLockRead, set);
+ set->latch = bt_pinlatch (bt, right);
+ bt_lockpage(BtLockRead, set->latch);
- memcpy (bt->cursor, page, bt->mgr->page_size);
+ memcpy (bt->cursor, set->page, bt->mgr->page_size);
- bt_unlockpage(BtLockRead, set);
- bt_unpinlatch (set);
- bt_unpinpool (pool);
+ bt_unlockpage(BtLockRead, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
slot = 0;
} while( 1 );
void bt_latchaudit (BtDb *bt)
{
ushort idx, hashidx;
-BtLatchSet *set;
-BtPool *pool;
-BtPage page;
-uid page_no;
+BtPageSet set[1];
#ifdef unix
for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
- set = bt->mgr->latchsets + idx;
- if( set->pin ) {
- fprintf(stderr, "latchset %d pinned\n", idx);
- set->pin = 0;
+ set->latch = bt->mgr->latchsets + idx;
+ if( set->latch->pin ) {
+ fprintf(stderr, "latchset %d pinned for page %.6x\n", idx, set->latch->page_no);
+ set->latch->pin = 0;
}
}
for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
- if( *(uint *)bt->mgr->latchmgr->table[hashidx].latch )
- fprintf(stderr, "latchmgr locked\n");
if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
- set = bt->mgr->latchsets + idx;
- if( set->hash != hashidx )
+ set->latch = bt->mgr->latchsets + idx;
+ if( set->latch->hash != hashidx )
fprintf(stderr, "latchset %d wrong hashidx\n", idx);
- if( set->pin )
- fprintf(stderr, "latchset %d pinned\n", idx);
- } while( idx = set->next );
+ if( set->latch->pin )
+ fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, set->latch->page_no);
+ } while( idx = set->latch->next );
}
- page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
-
- while( page_no ) {
- fprintf(stderr, "free: %.6x\n", (uint)page_no);
- pool = bt_pinpool (bt, page_no);
- page = bt_page (bt, pool, page_no);
- page_no = bt_getid(page->right);
- bt_unpinpool (pool);
+
+ set->page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
+
+ while( set->page_no ) {
+ fprintf(stderr, "free: %.6x\n", (uint)set->page_no);
+
+ if( set->pool = bt_pinpool (bt, set->page_no) )
+ set->page = bt_page (bt, set->pool, set->page_no);
+ else
+ return;
+
+ set->page_no = bt_getid(set->page->right);
+ bt_unpinpool (set->pool);
}
#endif
}
unsigned char key[256];
ThreadArg *args = arg;
int ch, len = 0, slot;
-BtLatchSet *set;
+BtPageSet set[1];
time_t tod[1];
-BtPool *pool;
-BtPage page;
BtKey ptr;
BtDb *bt;
FILE *in;
break;
case 's':
- len = key[0] = 0;
-
- fprintf(stderr, "started reading\n");
-
- if( slot = bt_startkey (bt, key, len) )
- slot--;
- else
- fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
-
- while( slot = bt_nextkey (bt, slot) ) {
- ptr = bt_key(bt, slot);
- fwrite (ptr->key, ptr->len, 1, stdout);
- fputc ('\n', stdout);
- }
-
- break;
-
- case 'c':
- fprintf(stderr, "started reading\n");
-
+ fprintf(stderr, "started scanning\n");
do {
- if( pool = bt_pinpool (bt, page_no) )
- page = bt_page (bt, pool, page_no);
+ if( set->pool = bt_pinpool (bt, page_no) )
+ set->page = bt_page (bt, set->pool, page_no);
else
break;
- set = bt_pinlatch (bt, page_no);
- bt_lockpage (BtLockRead, set);
- cnt += page->act;
- next = bt_getid (page->right);
- bt_unlockpage (BtLockRead, set);
- bt_unpinlatch (set);
- bt_unpinpool (pool);
+ set->latch = bt_pinlatch (bt, page_no);
+ bt_lockpage (BtLockRead, set->latch);
+ next = bt_getid (set->page->right);
+ cnt += set->page->act;
+
+ for( slot = 0; slot++ < set->page->cnt; )
+ if( next || slot < set->page->cnt )
+ if( !slotptr(set->page, slot)->dead ) {
+ ptr = keyptr(set->page, slot);
+ fwrite (ptr->key, ptr->len, 1, stdout);
+ fputc ('\n', stdout);
+ }
+
+ bt_unlockpage (BtLockRead, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
} while( page_no = next );
+ cnt--; // remove stopper key
+ fprintf(stderr, " Total keys read %d\n", cnt);
+ break;
+
+ case 'c':
+ fprintf(stderr, "started counting\n");
+ next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
+ page_no = LEAF_page;
+
+ while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
+ pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, page_no << bt->mgr->page_bits);
+ if( !bt->frame->free && !bt->frame->lvl )
+ cnt += bt->frame->act;
+ if( page_no > LEAF_page )
+ next = page_no + 1;
+ page_no = next;
+ }
+
cnt--; // remove stopper key
fprintf(stderr, " Total keys read %d\n", cnt);
break;