Unique,
Librarian,
Duplicate,
- Delete
+ Delete,
+ Update
} BtSlotType;
typedef struct {
typedef struct {
uid page_no; // page number for split leaf
void *next; // next key to insert
- uint entry:30; // latch table entry number
+ uint entry:29; // latch table entry number
uint type:2; // 0 == insert, 1 == delete, 2 == free
+ uint nounlock:1; // don't unlock ParentModification
unsigned char leafkey[BT_keyarray];
} AtomicKey;
return 0;
}
+uint bt_parentmatch (AtomicKey *head, uint entry)
+{
+uint cnt = 0;
+
+ if( head )
+ do if( head->entry == entry )
+ head->nounlock = 1, cnt++;
+ while( head = head->next );
+
+ return cnt;
+}
+
// atomic modification of a batch of keys.
// return -1 if BTERR is set
// schedule prev fence key update
ptr = keyptr(prev->page,prev->page->cnt);
- leaf = malloc (sizeof(AtomicKey));
+ leaf = calloc (sizeof(AtomicKey), 1);
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
leaf->entry = prev->latch->entry;
- leaf->next = NULL;
leaf->type = 0;
if( tail )
// Process last page split in chain
ptr = keyptr(prev->page,prev->page->cnt);
- leaf = malloc (sizeof(AtomicKey));
+ leaf = calloc (sizeof(AtomicKey), 1);
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
leaf->entry = prev->latch->entry;
- leaf->next = NULL;
leaf->type = 0;
if( tail )
// Delete empty master's fence
ptr = keyptr(prev->page,prev->page->cnt);
- leaf = malloc (sizeof(AtomicKey));
+ leaf = calloc (sizeof(AtomicKey), 1);
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
leaf->entry = prev->latch->entry;
- leaf->next = NULL;
leaf->type = 1;
if( tail )
bt_lockpage(BtLockParent, prev->latch);
bt_unlockpage(BtLockWrite, prev->latch);
+ // grab master's right sibling
+
if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
set->page = bt_mappage (bt, set->latch);
else
return -1;
- // add page to our transaction
-
- bt_lockpage(BtLockAtomic, set->latch);
bt_lockpage(BtLockWrite, set->latch);
// pull contents over empty page
set->latch->dirty = 1;
set->page->kill = 1;
- // add new parent key for new master page contents
- // delete it after parent posts the new master fence.
+ // add new fence key for new master page contents, delete
+ // right sibling after parent posts the new master fence.
ptr = keyptr(set->page,set->page->cnt);
- leaf = malloc (sizeof(AtomicKey));
+ leaf = calloc (sizeof(AtomicKey), 1);
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
leaf->entry = set->latch->entry;
- leaf->next = NULL;
leaf->type = 2;
+ // see if right sibling is not yet in the FIFO
+
+ if( !bt_parentmatch (head, leaf->entry) )
+ bt_lockpage(BtLockParent, set->latch);
+
+ bt_unlockpage(BtLockWrite, set->latch);
+
if( tail )
tail->next = leaf;
else
tail = leaf;
-// bt_lockpage(BtLockParent, set->latch);
- bt_unlockpage(BtLockWrite, set->latch);
-
// fix master's far right sibling's left pointer
if( right = bt_getid (set->page->right) ) {
if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
return -1;
- bt_unlockpage (BtLockParent, set->latch);
break;
case 1: // delete key
if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
return -1;
- bt_unlockpage (BtLockParent, set->latch);
break;
case 2: // insert key & free
if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
return -1;
- bt_unlockpage (BtLockAtomic, set->latch);
bt_lockpage (BtLockDelete, set->latch);
bt_lockpage (BtLockWrite, set->latch);
bt_freepage (bt, set);
break;
}
+ if( !leaf->nounlock )
+ bt_unlockpage (BtLockParent, set->latch);
+
bt_unpinlatch (set->latch);
tail = leaf->next;
free (leaf);
nxt -= 1;
txn[nxt] = 10;
slotptr(page,++cnt)->off = nxt;
- slotptr(page,cnt)->type = Unique;
+ slotptr(page,cnt)->type = Delete;
len = 0;
if( cnt < args->num )