// phase-fair reader writer lock,
// librarian page split code,
// duplicate key management
+// bi-directional cursors
// traditional buffer pool manager
-// and ACID batched key updates
+// and ACID batched key-value updates
-// 21 SEP 2014
+// 26 SEP 2014
// author: karl malbrain, malbrain@cal.berkeley.edu
unsigned char lvl:7; // level of page
unsigned char kill:1; // page is being deleted
unsigned char right[BtId]; // page number to right
+ unsigned char left[BtId]; // page number to left
+ unsigned char filler[2]; // padding to multiple of 8
} *BtPage;
// The loadpage interface object
pagezero->alloc->bits = mgr->page_bits;
bt_putid(pagezero->alloc->right, MIN_lvl+1);
+ // initialize left-most LEAF page in
+ // alloc->left.
+
+ bt_putid (pagezero->alloc->left, LEAF_page);
+
if( bt_writepage (mgr, pagezero->alloc, 0) ) {
fprintf (stderr, "Unable to create btree page zero\n");
return bt_mgrclose (mgr), NULL;
set->page = bt_mappage (bt, set->latch);
- // release & unpin parent page
+ // release & unpin parent or left sibling page
if( prevpage ) {
bt_unlockpage(prevmode, prevlatch);
prevpage = 0;
}
- // skip Atomic lock on leaf page if already held
+ // skip Atomic lock on leaf page we need to slide over
if( !drill )
if( mode & BtLockAtomic )
bt_unlockpage (BtLockDelete, set->latch);
bt_unlockpage (BtLockWrite, set->latch);
- bt_unpinlatch (set->latch);
// unlock allocation page
root->latch->dirty = 1;
bt_freepage (bt, child);
+ bt_unpinlatch (child->latch);
} while( root->page->lvl > 1 && root->page->act == 1 );
bt_lockpage (BtLockDelete, right->latch);
bt_lockpage (BtLockWrite, right->latch);
bt_freepage (bt, right);
+ bt_unpinlatch (right->latch);
bt_unlockpage (BtLockParent, set->latch);
bt_unpinlatch (set->latch);
typedef struct {
uint entry; // latch table entry number
- uint slot:30; // page slot number
+ uint slot:31; // page slot number
uint reuse:1; // reused previous page
- uint emptied:1; // page was emptied
} AtomicMod;
typedef struct {
uid page_no; // page number for split leaf
void *next; // next key to insert
- uint entry; // latch table entry number
+ uint entry:30; // latch table entry number
+ uint type:2; // 0 == insert, 1 == delete, 2 == free
unsigned char leafkey[BT_keyarray];
} AtomicKey;
// determine actual page where key is located
-// return slot number with set page locked
+// return slot number
uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
{
val = valptr(set->page, slot);
set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
- set->page->act--;
-
- // collapse empty slots beneath the fence
-
- while( idx = set->page->cnt - 1 )
- if( slotptr(set->page, idx)->dead ) {
- *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
- memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
- } else
- break;
-
set->latch->dirty = 1;
+ set->page->act--;
return 0;
}
-// qsort comparison function
-
-int bt_qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
-{
-BtKey *key1 = (BtKey *)((unsigned char *)page + slot1->off);
-BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off);
-
- return keycmp (key1, key2->key, key2->len);
-}
-
// atomic modification of a batch of keys.
// return -1 if BTERR is set
// causing the key constraint violation
// or zero on successful completion.
-int bt_atomicmods (BtDb *bt, BtPage source)
+int bt_atomictxn (BtDb *bt, BtPage source)
{
-uint src, idx, slot, samepage, entry, next, split;
+uint src, idx, slot, samepage, entry;
AtomicKey *head, *tail, *leaf;
BtPageSet set[1], prev[1];
unsigned char value[BtId];
BtSlot temp[1];
BtPage page;
BtVal *val;
+uid right;
int type;
locks = calloc (source->cnt + 1, sizeof(AtomicMod));
head = NULL;
tail = NULL;
- // first sort the list of keys into order to
+ // stable sort the list of keys into order to
// prevent deadlocks between threads.
- qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)bt_qsortcmp, source);
-/*
+
for( src = 1; src++ < source->cnt; ) {
*temp = *slotptr(source,src);
key = keyptr (source,src);
break;
}
}
-*/
- // Load the leafpage for each key
+
+ // Load the leaf page for each key
+ // group same page references with reuse bit
// and determine any constraint violations
for( src = 0; src++ < source->cnt; ) {
key = keyptr(source, src);
- val = valptr(source, src);
slot = 0;
// first determine if this modification falls
// note that the far right leaf page is a special case
if( samepage = src > 1 )
- if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 )
+ if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
slot = bt_findslot(set->page, key->key, key->len);
else // release read on previous page
bt_unlockpage(BtLockRead, set->latch);
case Unique:
if( !slotptr(set->page, slot)->dead )
if( slot < set->page->cnt || bt_getid (set->page->right) )
- if( ptr->len == key->len && !memcmp (ptr->key, key->key, key->len) ) {
+ if( !keycmp (ptr, key->key, key->len) ) {
// return constraint violation if key already exists
+ bt_unlockpage(BtLockRead, set->latch);
result = src;
while( src ) {
if( locks[src].entry ) {
set->latch = bt->mgr->latchsets + locks[src].entry;
bt_unlockpage(BtLockAtomic, set->latch);
- bt_unlockpage(BtLockRead, set->latch);
bt_unpinlatch (set->latch);
}
src--;
}
-
+ free (locks);
return result;
}
-
break;
-
- case Delete:
- if( !slotptr(set->page, slot)->dead )
- if( ptr->len == key->len )
- if( !memcmp (ptr->key, key->key, key->len) )
- break;
-
- // Key to delete doesn't exist
-
- result = src;
-
- while( src ) {
- if( locks[src].entry ) {
- set->latch = bt->mgr->latchsets + locks[src].entry;
- bt_unlockpage(BtLockAtomic, set->latch);
- bt_unlockpage(BtLockRead, set->latch);
- bt_unpinlatch (set->latch);
- }
- src--;
- }
-
- return result;
}
-
}
// unlock last loadpage lock
if( source->cnt > 1 )
bt_unlockpage(BtLockRead, set->latch);
- // obtain write lock for each page
+ // obtain write lock for each master page
for( src = 0; src++ < source->cnt; )
- if( locks[src].entry )
+ if( locks[src].reuse )
+ continue;
+ else
bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
// insert or delete each key
+ // process any splits or merges
+ // release Write & Atomic latches
+ // set ParentModifications and build
+ // queue of keys to insert for split pages
+ // or delete for deleted pages.
- for( src = 0; src++ < source->cnt; )
- if( slotptr(source,src)->type == Delete )
- if( bt_atomicdelete (bt, source, locks, src) )
- return -1;
- else
- continue;
- else
- if( bt_atomicinsert (bt, source, locks, src) )
- return -1;
- else
- continue;
+ // run through txn list backwards
- // set ParentModification and release WriteLock
- // leave empty pages locked for later processing
- // build queue of keys to insert for split pages
+ samepage = source->cnt + 1;
- for( src = 0; src++ < source->cnt; ) {
+ for( src = source->cnt; src; src-- ) {
if( locks[src].reuse )
continue;
- prev->latch = bt->mgr->latchsets + locks[src].entry;
- prev->page = bt_mappage (bt, prev->latch);
+ // perform the txn operations
+ // from smaller to larger on
+ // the same page
+
+ for( idx = src; idx < samepage; idx++ )
+ switch( slotptr(source,idx)->type ) {
+ case Delete:
+ if( bt_atomicdelete (bt, source, locks, idx) )
+ return -1;
+ break;
- // pick-up all splits from original page
+ case Duplicate:
+ case Unique:
+ if( bt_atomicinsert (bt, source, locks, idx) )
+ return -1;
+ break;
+ }
+
+ // after the same page operations have finished,
+ // process master page for splits or deletion.
- split = next = prev->latch->split;
+ latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
+ prev->page = bt_mappage (bt, prev->latch);
+ samepage = src;
- if( !prev->page->act )
- locks[src].emptied = 1;
+ // pick-up all splits from master page
- while( entry = next ) {
+ entry = prev->latch->split;
+
+ while( entry ) {
set->latch = bt->mgr->latchsets + entry;
set->page = bt_mappage (bt, set->latch);
- next = set->latch->split;
- set->latch->split = 0;
+ entry = set->latch->split;
- // delete empty previous page
+ // delete empty master page
if( !prev->page->act ) {
+ memcpy (set->page->left, prev->page->left, BtId);
memcpy (prev->page, set->page, bt->mgr->page_size);
bt_lockpage (BtLockDelete, set->latch);
bt_freepage (bt, set);
+ bt_unpinlatch (set->latch);
prev->latch->dirty = 1;
+ continue;
+ }
- if( prev->page->act )
- locks[src].emptied = 0;
+ // remove empty page from the split chain
+ if( !set->page->act ) {
+ memcpy (prev->page->right, set->page->right, BtId);
+ prev->latch->split = set->latch->split;
+ bt_lockpage (BtLockDelete, set->latch);
+ bt_freepage (bt, set);
+ bt_unpinlatch (set->latch);
continue;
}
- // prev page is not emptied
- locks[src].emptied = 0;
-
- // schedule previous fence key update
+ // schedule prev fence key update
ptr = keyptr(prev->page,prev->page->cnt);
leaf = malloc (sizeof(AtomicKey));
leaf->page_no = prev->latch->page_no;
leaf->entry = prev->latch->entry;
leaf->next = NULL;
+ leaf->type = 0;
if( tail )
tail->next = leaf;
tail = leaf;
- // remove empty block from the split chain
-
- if( !set->page->act ) {
- memcpy (prev->page->right, set->page->right, BtId);
- bt_lockpage (BtLockDelete, set->latch);
- bt_freepage (bt, set);
- continue;
- }
-
+ bt_putid (set->page->left, prev->latch->page_no);
bt_lockpage(BtLockParent, prev->latch);
bt_unlockpage(BtLockWrite, prev->latch);
*prev = *set;
}
- // was entire chain emptied?
+ // update left pointer in next right page
+ // if we did any now non-empty splits
- if( !prev->page->act )
- continue;
+ if( latch->split ) {
+ // fix left pointer in master's original right sibling
+ // or set rightmost page in page zero
- if( !split ) {
- bt_unlockpage(BtLockWrite, prev->latch);
- continue;
+ if( right = bt_getid (prev->page->right) ) {
+ if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
+ set->page = bt_mappage (bt, set->latch);
+ else
+ return -1;
+
+ bt_lockpage (BtLockWrite, set->latch);
+ bt_putid (set->page->left, prev->latch->page_no);
+ set->latch->dirty = 1;
+ bt_unlockpage (BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
+ } else
+ bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
+
+ // Process last page split in chain
+
+ ptr = keyptr(prev->page,prev->page->cnt);
+ leaf = malloc (sizeof(AtomicKey));
+
+ memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+ leaf->page_no = prev->latch->page_no;
+ leaf->entry = prev->latch->entry;
+ leaf->next = NULL;
+ leaf->type = 0;
+
+ if( tail )
+ tail->next = leaf;
+ else
+ head = leaf;
+
+ tail = leaf;
+
+ bt_lockpage(BtLockParent, prev->latch);
+ bt_unlockpage(BtLockWrite, prev->latch);
+ bt_unlockpage(BtLockAtomic, latch);
+ continue;
}
- // Process last page split in chain
+ // finished if prev page occupied (either master or final split)
+
+ if( prev->page->act ) {
+ bt_unlockpage(BtLockWrite, latch);
+ bt_unlockpage(BtLockAtomic, latch);
+ bt_unpinlatch(latch);
+ continue;
+ }
+
+ // any splits were reversed, and the
+ // master page located in prev is empty, delete it
+ // by pulling over master's right sibling.
+ // Delete empty master's fence
ptr = keyptr(prev->page,prev->page->cnt);
leaf = malloc (sizeof(AtomicKey));
leaf->page_no = prev->latch->page_no;
leaf->entry = prev->latch->entry;
leaf->next = NULL;
-
+ leaf->type = 1;
+
if( tail )
tail->next = leaf;
else
bt_lockpage(BtLockParent, prev->latch);
bt_unlockpage(BtLockWrite, prev->latch);
- }
+
+ if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
+ set->page = bt_mappage (bt, set->latch);
+ else
+ return -1;
+
+ // add page to our transaction
+
+ bt_lockpage(BtLockAtomic, set->latch);
+ bt_lockpage(BtLockWrite, set->latch);
+
+ // pull contents over empty page
+
+ memcpy (set->page->left, prev->page->left, BtId);
+ memcpy (prev->page, set->page, bt->mgr->page_size);
+
+ bt_putid (set->page->right, prev->latch->page_no);
+ set->latch->dirty = 1;
+ set->page->kill = 1;
+
+ // add new parent key for new master page contents
+ // delete it after parent posts the new master fence.
+
+ ptr = keyptr(set->page,set->page->cnt);
+ leaf = malloc (sizeof(AtomicKey));
+
+ memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+ leaf->page_no = prev->latch->page_no;
+ leaf->entry = set->latch->entry;
+ leaf->next = NULL;
+ leaf->type = 2;
- // remove Atomic locks and
- // process any empty pages
+ if( tail )
+ tail->next = leaf;
+ else
+ head = leaf;
- for( src = source->cnt; src; src-- ) {
- if( locks[src].reuse )
- continue;
+ tail = leaf;
- set->latch = bt->mgr->latchsets + locks[src].entry;
- set->page = bt_mappage (bt, set->latch);
+// bt_lockpage(BtLockParent, set->latch);
+ bt_unlockpage(BtLockWrite, set->latch);
- // clear original page split field
+ // fix master's far right sibling's left pointer
- split = set->latch->split;
- set->latch->split = 0;
- bt_unlockpage (BtLockAtomic, set->latch);
+ if( right = bt_getid (set->page->right) ) {
+ if( set->latch = bt_pinlatch (bt, right, 1) )
+ set->page = bt_mappage (bt, set->latch);
- // delete page emptied by our atomic action
+ bt_lockpage (BtLockWrite, set->latch);
+ bt_putid (set->page->left, prev->latch->page_no);
+ set->latch->dirty = 1;
- if( locks[src].emptied )
- if( bt_deletepage (bt, set) )
- return bt->err;
- else
- continue;
+ bt_unlockpage (BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
+ }
- if( !split )
- bt_unpinlatch (set->latch);
+ bt_unlockpage(BtLockAtomic, latch);
}
-
- // add keys for any pages split during action
+
+ // add & delete keys for any pages split or merged during transaction
if( leaf = head )
do {
- ptr = (BtKey *)leaf->leafkey;
+ set->latch = bt->mgr->latchsets + leaf->entry;
+ set->page = bt_mappage (bt, set->latch);
+
bt_putid (value, leaf->page_no);
+ ptr = (BtKey *)leaf->leafkey;
- if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
- return bt->err;
+ switch( leaf->type ) {
+ case 0: // insert key
+ if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+ return -1;
+
+ bt_unlockpage (BtLockParent, set->latch);
+ break;
- bt_unlockpage (BtLockParent, bt->mgr->latchsets + leaf->entry);
- bt_unpinlatch (bt->mgr->latchsets + leaf->entry);
+ case 1: // delete key
+ if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
+ return -1;
+
+ bt_unlockpage (BtLockParent, set->latch);
+ break;
+
+ case 2: // insert key & free
+ if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+ return -1;
+
+ bt_unlockpage (BtLockAtomic, set->latch);
+ bt_lockpage (BtLockDelete, set->latch);
+ bt_lockpage (BtLockWrite, set->latch);
+ bt_freepage (bt, set);
+ break;
+ }
+
+ bt_unpinlatch (set->latch);
tail = leaf->next;
free (leaf);
} while( leaf = tail );
return 0;
}
+// set cursor to highest slot on highest page
+
+uint bt_lastkey (BtDb *bt)
+{
+uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
+BtPageSet set[1];
+uint slot;
+
+ if( set->latch = bt_pinlatch (bt, page_no, 1) )
+ set->page = bt_mappage (bt, set->latch);
+ else
+ return 0;
+
+ bt_lockpage(BtLockRead, set->latch);
+
+ memcpy (bt->cursor, set->page, bt->mgr->page_size);
+ slot = set->page->cnt;
+
+ bt_unlockpage(BtLockRead, set->latch);
+ bt_unpinlatch (set->latch);
+ return slot;
+}
+
+// return previous slot on cursor page
+
+uint bt_prevkey (BtDb *bt, uint slot)
+{
+uid ourright, next, us = bt->cursor_page;
+BtPageSet set[1];
+
+ if( --slot )
+ return slot;
+
+ ourright = bt_getid(bt->cursor->right);
+
+goleft:
+ if( !(next = bt_getid(bt->cursor->left)) )
+ return 0;
+
+findourself:
+ bt->cursor_page = next;
+
+ if( set->latch = bt_pinlatch (bt, next, 1) )
+ set->page = bt_mappage (bt, set->latch);
+ else
+ return 0;
+
+ bt_lockpage(BtLockRead, set->latch);
+ memcpy (bt->cursor, set->page, bt->mgr->page_size);
+ bt_unlockpage(BtLockRead, set->latch);
+ bt_unpinlatch (set->latch);
+
+ next = bt_getid (bt->cursor->right);
+
+ if( next != us )
+ if( next == ourright )
+ goto goleft;
+ else
+ goto findourself;
+
+ return bt->cursor->cnt;
+}
+
// return next slot on cursor page
// or slide cursor right into next page
slotptr(page,cnt)->type = Unique;
len = 0;
- if( cnt < 25 )
+ if( cnt < args->num )
continue;
page->cnt = cnt;
page->act = cnt;
page->min = nxt;
- if( bt_atomicmods (bt, page) )
+ if( bt_atomictxn (bt, page) )
fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
nxt = 65536;
cnt = 0;