uint prev; // prev entry in hash table chain
volatile ushort pin; // number of outstanding threads
ushort dirty:1; // page in cache is dirty
-#ifdef unix
- pthread_t atomictid; // pid holding atomic lock
-#else
- uint atomictid; // pid holding atomic lock
-#endif
} BtLatchSet;
// Define the length of the page record numbers
bt->mgr->latchsets[latch->next].prev = slot;
bt->mgr->hashtable[hashidx].slot = slot;
- memset (&latch->atomictid, 0, sizeof(latch->atomictid));
latch->page_no = page_no;
latch->entry = slot;
latch->split = 0;
case BtLockAtomic:
WriteLock (latch->atomic);
break;
- case BtLockAtomic | BtLockRead:
- WriteLock (latch->atomic);
- ReadLock (latch->readwr);
- break;
}
}
WriteRelease (latch->parent);
break;
case BtLockAtomic:
- memset (&latch->atomictid, 0, sizeof(latch->atomictid));
- WriteRelease (latch->atomic);
- break;
- case BtLockAtomic | BtLockRead:
- ReadRelease (latch->readwr);
- memset (&latch->atomictid, 0, sizeof(latch->atomictid));
WriteRelease (latch->atomic);
break;
}
prevpage = 0;
}
- // skip Atomic lock on leaf page we need to slide over
-
- if( !drill )
- if( mode & BtLockAtomic )
- if( pthread_equal (set->latch->atomictid, pthread_self()) )
- mode &= ~BtLockAtomic;
-
// obtain mode lock using lock chaining through AccessLock
bt_lockpage(mode, set->latch);
- if( mode & BtLockAtomic )
- set->latch->atomictid = pthread_self();
-
if( set->page->free )
return bt->err = BTERR_struct, 0;
if( drill == lvl )
return slot;
+ // find next non-dead slot -- the fence key if nothing else
+
while( slotptr(set->page, slot)->dead )
if( slot++ < set->page->cnt )
- continue;
+ continue;
else
- goto slideright;
+ return bt->err = BTERR_struct, 0;
page_no = bt_getid(valptr(set->page, slot)->value);
drill--;
unsigned char leafkey[BT_keyarray];
} AtomicKey;
+// find and load leaf page for given key
+// leave page Atomic locked and Read locked.
+
+int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
+{
+BtLatchSet *prevlatch;
+uid page_no;
+uint slot;
+
+ // find level zero page
+
+ if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
+ return 0;
+
+ // find next non-dead entry on this page
+ // it will be the fence key if nothing else
+
+ while( slotptr(set->page, slot)->dead )
+ if( slot++ < set->page->cnt )
+ continue;
+ else
+ return bt->err = BTERR_struct, 0;
+
+ page_no = bt_getid(valptr(set->page, slot)->value);
+ prevlatch = set->latch;
+
+ while( page_no ) {
+ if( set->latch = bt_pinlatch (bt, page_no, 1) )
+ set->page = bt_mappage (bt, set->latch);
+ else
+ return 0;
+
+ if( set->page->free || set->page->lvl )
+ return bt->err = BTERR_struct, 0;
+
+ // obtain read lock using lock chaining with Access mode
+ // release & unpin parent/left sibling page
+
+ bt_lockpage(BtLockAccess, set->latch);
+
+ bt_unlockpage(BtLockRead, prevlatch);
+ bt_unpinlatch (prevlatch);
+
+ bt_lockpage(BtLockRead, set->latch);
+ bt_unlockpage(BtLockAccess, set->latch);
+
+ // find key on page at this level
+ // and descend to requested level
+
+ if( !set->page->kill )
+ if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
+ bt_unlockpage(BtLockRead, set->latch);
+ bt_lockpage(BtLockAtomic, set->latch);
+ bt_lockpage(BtLockAccess, set->latch);
+ bt_lockpage(BtLockRead, set->latch);
+ bt_unlockpage(BtLockAccess, set->latch);
+
+ if( slot = bt_findslot (set->page, key, len) )
+ return slot;
+
+ bt_unlockpage(BtLockAtomic, set->latch);
+ }
+
+ // slide right into next page
+
+ page_no = bt_getid(set->page->right);
+ prevlatch = set->latch;
+ }
+
+ // return error on end of right chain
+
+ bt->err = BTERR_struct;
+ return 0; // return error
+}
+
// determine actual page where key is located
// return slot number
bt_unlockpage(BtLockRead, set->latch);
if( !slot )
- if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
+ if( slot = bt_atomicload(bt, set, key->key, key->len) )
set->latch->split = 0;
else
return -1;