uint prev; // prev entry in hash table chain
volatile ushort pin; // number of outstanding threads
ushort dirty:1; // page in cache is dirty
-#ifdef unix
- pthread_t atomictid; // pid holding atomic lock
-#else
- uint atomictid; // pid holding atomic lock
-#endif
} BtLatchSet;
// Define the length of the page record numbers
Unique,
Librarian,
Duplicate,
- Delete
+ Delete,
+ Update
} BtSlotType;
typedef struct {
bt->mgr->latchsets[latch->next].prev = slot;
bt->mgr->hashtable[hashidx].slot = slot;
- memset (&latch->atomictid, 0, sizeof(latch->atomictid));
latch->page_no = page_no;
latch->entry = slot;
latch->split = 0;
case BtLockAtomic:
WriteLock (latch->atomic);
break;
- case BtLockAtomic | BtLockRead:
- WriteLock (latch->atomic);
- ReadLock (latch->readwr);
- break;
}
}
WriteRelease (latch->parent);
break;
case BtLockAtomic:
- memset (&latch->atomictid, 0, sizeof(latch->atomictid));
- WriteRelease (latch->atomic);
- break;
- case BtLockAtomic | BtLockRead:
- ReadRelease (latch->readwr);
- memset (&latch->atomictid, 0, sizeof(latch->atomictid));
WriteRelease (latch->atomic);
break;
}
prevpage = 0;
}
- // skip Atomic lock on leaf page we need to slide over
-
- if( !drill )
- if( mode & BtLockAtomic )
- if( pthread_equal (set->latch->atomictid, pthread_self()) )
- mode &= ~BtLockAtomic;
-
// obtain mode lock using lock chaining through AccessLock
bt_lockpage(mode, set->latch);
- if( mode & BtLockAtomic )
- set->latch->atomictid = pthread_self();
-
if( set->page->free )
return bt->err = BTERR_struct, 0;
if( drill == lvl )
return slot;
+ // find next non-dead slot -- the fence key if nothing else
+
while( slotptr(set->page, slot)->dead )
if( slot++ < set->page->cnt )
- continue;
+ continue;
else
- goto slideright;
+ return bt->err = BTERR_struct, 0;
page_no = bt_getid(valptr(set->page, slot)->value);
drill--;
typedef struct {
uid page_no; // page number for split leaf
void *next; // next key to insert
- uint entry:30; // latch table entry number
+ uint entry:29; // latch table entry number
uint type:2; // 0 == insert, 1 == delete, 2 == free
+ uint nounlock:1; // don't unlock ParentModification
unsigned char leafkey[BT_keyarray];
} AtomicKey;
+// find and load leaf page for given key
+// leave page Atomic locked and Read locked.
+
+int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
+{
+BtLatchSet *prevlatch;
+uid page_no;
+uint slot;
+
+ // find level zero page
+
+ if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
+ return 0;
+
+ // find next non-dead entry on this page
+ // it will be the fence key if nothing else
+
+ while( slotptr(set->page, slot)->dead )
+ if( slot++ < set->page->cnt )
+ continue;
+ else
+ return bt->err = BTERR_struct, 0;
+
+ page_no = bt_getid(valptr(set->page, slot)->value);
+ prevlatch = set->latch;
+
+ while( page_no ) {
+ if( set->latch = bt_pinlatch (bt, page_no, 1) )
+ set->page = bt_mappage (bt, set->latch);
+ else
+ return 0;
+
+ if( set->page->free || set->page->lvl )
+ return bt->err = BTERR_struct, 0;
+
+ // obtain read lock using lock chaining with Access mode
+ // release & unpin parent/left sibling page
+
+ bt_lockpage(BtLockAccess, set->latch);
+
+ bt_unlockpage(BtLockRead, prevlatch);
+ bt_unpinlatch (prevlatch);
+
+ bt_lockpage(BtLockRead, set->latch);
+ bt_unlockpage(BtLockAccess, set->latch);
+
+ // find key on page at this level
+ // and descend to requested level
+
+ if( !set->page->kill )
+ if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
+ bt_unlockpage(BtLockRead, set->latch);
+ bt_lockpage(BtLockAtomic, set->latch);
+ bt_lockpage(BtLockAccess, set->latch);
+ bt_lockpage(BtLockRead, set->latch);
+ bt_unlockpage(BtLockAccess, set->latch);
+
+ if( slot = bt_findslot (set->page, key, len) )
+ return slot;
+
+ bt_unlockpage(BtLockAtomic, set->latch);
+ }
+
+ // slide right into next page
+
+ page_no = bt_getid(set->page->right);
+ prevlatch = set->latch;
+ }
+
+ // return error on end of right chain
+
+ bt->err = BTERR_struct;
+ return 0; // return error
+}
+
// determine actual page where key is located
// return slot number
return 0;
}
+uint bt_parentmatch (AtomicKey *head, uint entry)
+{
+uint cnt = 0;
+
+ if( head )
+ do if( head->entry == entry )
+ head->nounlock = 1, cnt++;
+ while( head = head->next );
+
+ return cnt;
+}
+
// atomic modification of a batch of keys.
// return -1 if BTERR is set
bt_unlockpage(BtLockRead, set->latch);
if( !slot )
- if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
+ if( slot = bt_atomicload(bt, set, key->key, key->len) )
set->latch->split = 0;
else
return -1;
// schedule prev fence key update
ptr = keyptr(prev->page,prev->page->cnt);
- leaf = malloc (sizeof(AtomicKey));
+ leaf = calloc (sizeof(AtomicKey), 1);
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
leaf->entry = prev->latch->entry;
- leaf->next = NULL;
leaf->type = 0;
if( tail )
// Process last page split in chain
ptr = keyptr(prev->page,prev->page->cnt);
- leaf = malloc (sizeof(AtomicKey));
+ leaf = calloc (sizeof(AtomicKey), 1);
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
leaf->entry = prev->latch->entry;
- leaf->next = NULL;
leaf->type = 0;
if( tail )
// Delete empty master's fence
ptr = keyptr(prev->page,prev->page->cnt);
- leaf = malloc (sizeof(AtomicKey));
+ leaf = calloc (sizeof(AtomicKey), 1);
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
leaf->entry = prev->latch->entry;
- leaf->next = NULL;
leaf->type = 1;
if( tail )
bt_lockpage(BtLockParent, prev->latch);
bt_unlockpage(BtLockWrite, prev->latch);
+ // grab master's right sibling
+
if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
set->page = bt_mappage (bt, set->latch);
else
return -1;
- // add page to our transaction
-
- bt_lockpage(BtLockAtomic, set->latch);
bt_lockpage(BtLockWrite, set->latch);
// pull contents over empty page
set->latch->dirty = 1;
set->page->kill = 1;
- // add new parent key for new master page contents
- // delete it after parent posts the new master fence.
+ // add new fence key for new master page contents, delete
+ // right sibling after parent posts the new master fence.
ptr = keyptr(set->page,set->page->cnt);
- leaf = malloc (sizeof(AtomicKey));
+ leaf = calloc (sizeof(AtomicKey), 1);
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
leaf->entry = set->latch->entry;
- leaf->next = NULL;
leaf->type = 2;
+ // see if right sibling is not yet in the FIFO
+
+ if( !bt_parentmatch (head, leaf->entry) )
+ bt_lockpage(BtLockParent, set->latch);
+
+ bt_unlockpage(BtLockWrite, set->latch);
+
if( tail )
tail->next = leaf;
else
tail = leaf;
-// bt_lockpage(BtLockParent, set->latch);
- bt_unlockpage(BtLockWrite, set->latch);
-
// fix master's far right sibling's left pointer
if( right = bt_getid (set->page->right) ) {
if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
return -1;
- bt_unlockpage (BtLockParent, set->latch);
break;
case 1: // delete key
if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
return -1;
- bt_unlockpage (BtLockParent, set->latch);
break;
case 2: // insert key & free
if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
return -1;
- bt_unlockpage (BtLockAtomic, set->latch);
bt_lockpage (BtLockDelete, set->latch);
bt_lockpage (BtLockWrite, set->latch);
bt_freepage (bt, set);
break;
}
+ if( !leaf->nounlock )
+ bt_unlockpage (BtLockParent, set->latch);
+
bt_unpinlatch (set->latch);
tail = leaf->next;
free (leaf);
nxt -= 1;
txn[nxt] = 10;
slotptr(page,++cnt)->off = nxt;
- slotptr(page,cnt)->type = Unique;
+ slotptr(page,cnt)->type = Delete;
len = 0;
if( cnt < args->num )