memset (&latch->atomictid, 0, sizeof(latch->atomictid));
latch->page_no = page_no;
latch->entry = slot;
+ latch->split = 0;
latch->prev = 0;
latch->pin = 1;
// place write, read, or parent lock on requested page_no.
-void bt_lockpage(BtLock mode, BtLatchSet *set)
+void bt_lockpage(BtLock mode, BtLatchSet *latch)
{
switch( mode ) {
case BtLockRead:
- ReadLock (set->readwr);
+ ReadLock (latch->readwr);
break;
case BtLockWrite:
- WriteLock (set->readwr);
+ WriteLock (latch->readwr);
break;
case BtLockAccess:
- ReadLock (set->access);
+ ReadLock (latch->access);
break;
case BtLockDelete:
- WriteLock (set->access);
+ WriteLock (latch->access);
break;
case BtLockParent:
- WriteLock (set->parent);
+ WriteLock (latch->parent);
break;
case BtLockAtomic:
- WriteLock (set->atomic);
+ WriteLock (latch->atomic);
break;
case BtLockAtomic | BtLockRead:
- WriteLock (set->atomic);
- ReadLock (set->readwr);
+ WriteLock (latch->atomic);
+ ReadLock (latch->readwr);
break;
}
}
// remove write, read, or parent lock on requested page
-void bt_unlockpage(BtLock mode, BtLatchSet *set)
+void bt_unlockpage(BtLock mode, BtLatchSet *latch)
{
switch( mode ) {
case BtLockRead:
- ReadRelease (set->readwr);
+ ReadRelease (latch->readwr);
break;
case BtLockWrite:
- WriteRelease (set->readwr);
+ WriteRelease (latch->readwr);
break;
case BtLockAccess:
- ReadRelease (set->access);
+ ReadRelease (latch->access);
break;
case BtLockDelete:
- WriteRelease (set->access);
+ WriteRelease (latch->access);
break;
case BtLockParent:
- WriteRelease (set->parent);
+ WriteRelease (latch->parent);
break;
case BtLockAtomic:
- memset (&set->atomictid, 0, sizeof(set->atomictid));
- WriteRelease (set->atomic);
+ memset (&latch->atomictid, 0, sizeof(latch->atomictid));
+ WriteRelease (latch->atomic);
break;
case BtLockAtomic | BtLockRead:
- ReadRelease (set->readwr);
- memset (&set->atomictid, 0, sizeof(set->atomictid));
- WriteRelease (set->atomic);
+ ReadRelease (latch->readwr);
+ memset (&latch->atomictid, 0, sizeof(latch->atomictid));
+ WriteRelease (latch->atomic);
break;
}
}
// find slot in page for given key at a given level
-int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
+int bt_findslot (BtPage page, unsigned char *key, uint len)
{
-uint diff, higher = set->page->cnt, low = 1, slot;
+uint diff, higher = page->cnt, low = 1, slot;
uint good = 0;
// make stopper key an infinite fence value
- if( bt_getid (set->page->right) )
+ if( bt_getid (page->right) )
higher++;
else
good++;
while( diff = higher - low ) {
slot = low + ( diff >> 1 );
- if( keycmp (keyptr(set->page, slot), key, len) < 0 )
+ if( keycmp (keyptr(page, slot), key, len) < 0 )
low = slot + 1;
else
higher = slot, good++;
if( set->page->kill )
goto slideright;
- if( slot = bt_findslot (set, key, len) ) {
+ if( slot = bt_findslot (set->page, key, len) ) {
if( drill == lvl )
return slot;
uint entry;
if( src > 1 && locks[src].reuse )
- entry = locks[src-1].entry, locks[src].entry = entry, slot = 0;
+ entry = locks[src-1].entry, slot = 0;
else
entry = locks[src].entry;
set->latch = bt->mgr->latchsets + entry;
set->page = bt_mappage (bt, set->latch);
- if( slot = bt_findslot(set, key->key, key->len) )
- return slot;
-
+ if( slot = bt_findslot(set->page, key->key, key->len) ) {
+ if( locks[src].reuse )
+ locks[src].entry = entry;
+ return slot;
+ }
} while( entry = set->latch->split );
bt->err = BTERR_atomic;
BtVal *val = valptr(source, src);
BtLatchSet *latch;
BtPageSet set[1];
-uint slot, entry;
+uint entry;
+
+ while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
+ if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
+ return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
- while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
- if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
- return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
if( entry = bt_splitpage (bt, set) )
latch = bt->mgr->latchsets + entry;
else
return bt->err;
+
+ // splice right page into split chain
+ // and WriteLock it.
+
latch->split = set->latch->split;
set->latch->split = entry;
bt_lockpage(BtLockWrite, latch);
if( samepage = src > 1 )
if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 )
- slot = bt_findslot(set, key->key, key->len);
+ slot = bt_findslot(set->page, key->key, key->len);
else // release read on previous page
bt_unlockpage(BtLockRead, set->latch);
- if( !slot ) {
- slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead);
- set->latch->split = 0;
- }
-
if( !slot )
- return -1;
+ if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
+ set->latch->split = 0;
+ else
+ return -1;
+
+ if( slotptr(set->page, slot)->type == Librarian )
+ ptr = keyptr(set->page, ++slot);
+ else
+ ptr = keyptr(set->page, slot);
if( !samepage ) {
- for(idx = 0; idx++ < source->cnt; )
- if( locks[idx].entry == set->latch->entry )
- abort();
locks[src].entry = set->latch->entry;
locks[src].slot = slot;
locks[src].reuse = 0;
locks[src].reuse = 1;
}
- if( slotptr(set->page, slot)->type == Librarian )
- ptr = keyptr(set->page, ++slot);
- else
- ptr = keyptr(set->page, slot);
-
switch( slotptr(source, src)->type ) {
case Duplicate:
case Unique:
return result;
}
+
}
// unlock last loadpage lock
return -1;
else
continue;
- else if( bt_atomicinsert (bt, source, locks, src) )
+ else
+ if( bt_atomicinsert (bt, source, locks, src) )
return -1;
else
continue;
prev->latch = bt->mgr->latchsets + locks[src].entry;
prev->page = bt_mappage (bt, prev->latch);
- // pick-up all splits from first entry
- // save page that points to this entry
+ // pick-up all splits from original page
split = next = prev->latch->split;
set->latch = bt->mgr->latchsets + entry;
set->page = bt_mappage (bt, set->latch);
next = set->latch->split;
+ set->latch->split = 0;
// delete empty previous page
continue;
}
- // prev page is not emptied
-
+ // prev page is not emptied
locks[src].emptied = 0;
// schedule previous fence key update
else
head = leaf;
- latch = prev->latch;
tail = leaf;
// remove empty block from the split chain
memcpy (prev->page->right, set->page->right, BtId);
bt_lockpage (BtLockDelete, set->latch);
bt_freepage (bt, set);
- } else
- *prev = *set;
+ continue;
+ }
- bt_lockpage(BtLockParent, latch);
- bt_unlockpage(BtLockWrite, latch);
+ bt_lockpage(BtLockParent, prev->latch);
+ bt_unlockpage(BtLockWrite, prev->latch);
+ *prev = *set;
}
+ // was entire chain emptied?
+
if( !prev->page->act )
continue;
if( locks[src].reuse )
continue;
- // delete page emptied by our atomic action
-
set->latch = bt->mgr->latchsets + locks[src].entry;
set->page = bt_mappage (bt, set->latch);
- if( locks[src].emptied ) {
- bt_unlockpage (BtLockAtomic, set->latch);
- if( bt_deletepage (bt, set) )
- return bt->err;
- continue;
- }
+ // clear original page split field
+ split = set->latch->split;
+ set->latch->split = 0;
bt_unlockpage (BtLockAtomic, set->latch);
- if( !set->latch->split )
+ // delete page emptied by our atomic action
+
+ if( locks[src].emptied )
+ if( bt_deletepage (bt, set) )
+ return bt->err;
+ else
+ continue;
+
+ if( !split )
bt_unpinlatch (set->latch);
}