typedef struct {
union {
struct {
- volatile uint xlock:1; // one writer has exclusive lock
- volatile uint wrt:31; // count of other writers waiting
+ volatile ushort xlock; // one writer has exclusive lock
+ volatile ushort wrt; // count of other writers waiting
} bits[1];
uint value[1];
};
} BtMutexLatch;
#define XCL 1
-#define WRT 2
+#define WRT 65536
// definition for phase-fair reader/writer lock implementation
volatile ushort rout[1];
volatile ushort ticket[1];
volatile ushort serving[1];
- volatile ushort tid[1];
- volatile ushort dup[1];
} RWLock;
// write only lock
unsigned char key[BT_keyarray]; // last found complete key
} BtDb;
+// atomic txn structures
+
+typedef struct {
+ logseqno reqlsn; // redo log seq no required
+ uint entry; // latch table entry number
+ uint slot:31; // page slot number
+ uint reuse:1; // reused previous page
+} AtomicTxn;
+
+typedef struct {
+ uid page_no; // page number for split leaf
+ void *next; // next key to insert
+ uint entry:29; // latch table entry number
+ uint type:2; // 0 == insert, 1 == delete, 2 == free
+ uint nounlock:1; // don't unlock ParentModification
+ unsigned char leafkey[BT_keyarray];
+} AtomicKey;
+
// Catastrophic errors
typedef enum {
extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
-// transaction functions
+// atomic transaction functions
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
BTERR bt_txnpromote (BtDb *bt);
// The page is allocated from low and hi ends.
while( 1 ) {
*prev->value = __sync_fetch_and_or(latch->value, XCL);
- if( !prev->bits->xlock ) { // did we set XCL bit?
+ if( !prev->bits->xlock ) { // did we set XCL?
if( slept )
__sync_fetch_and_sub(latch->value, WRT);
return;
__sync_fetch_and_add(latch->value, WRT);
}
- sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
+ sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
slept = 1;
}
}
*prev->value = __sync_fetch_and_or(latch->value, XCL);
- // take write access if exclusive bit is clear
+ // take write access if exclusive bit was clear
return !prev->bits->xlock;
}
*prev->value = __sync_fetch_and_and(latch->value, ~XCL);
if( prev->bits->wrt )
- sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
+ sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
}
// Write-Only Queue Lock
void WriteOLock (WOLock *lock, ushort tid)
{
+ while( 1 ) {
+ bt_mutexlock(lock->xcl);
+
if( *lock->tid == tid ) {
*lock->dup += 1;
+ bt_releasemutex(lock->xcl);
return;
}
-
- bt_mutexlock(lock->xcl);
- *lock->tid = tid;
+ if( !*lock->tid ) {
+ *lock->tid = tid;
+ bt_releasemutex(lock->xcl);
+ return;
+ }
+ bt_releasemutex(lock->xcl);
+ sched_yield();
+ }
}
void WriteORelease (WOLock *lock)
}
*lock->tid = 0;
- bt_releasemutex(lock->xcl);
}
// Phase-Fair reader/writer lock implementation
{
ushort w, r, tix;
- if( *lock->tid == tid ) {
- *lock->dup += 1;
- return;
- }
#ifdef unix
tix = __sync_fetch_and_add (lock->ticket, 1);
#else
#else
SwitchToThread();
#endif
- *lock->tid = tid;
}
void WriteRelease (RWLock *lock)
{
- if( *lock->dup ) {
- *lock->dup -= 1;
- return;
- }
-
- *lock->tid = 0;
#ifdef unix
__sync_fetch_and_and (lock->rin, ~MASK);
#else
{
ushort w;
- // OK if write lock already held by same thread
-
- if( *lock->tid == tid ) {
- *lock->dup += 1;
- return 1;
- }
#ifdef unix
w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
#else
{
ushort w;
- if( *lock->tid == tid ) {
- *lock->dup += 1;
- return;
- }
#ifdef unix
w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
#else
void ReadRelease (RWLock *lock)
{
- if( *lock->dup ) {
- *lock->dup -= 1;
- return;
- }
-
#ifdef unix
__sync_fetch_and_add (lock->rout, RINC);
#else
last = mgr->redolast & ~0xfff;
end = mgr->redoend;
- if( end - last + sizeof(BtLogHdr) >= 8192 )
+ if( end - last + sizeof(BtLogHdr) >= 32768 )
if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
else
last = mgr->redolast & ~0xfff;
end = mgr->redoend;
- if( end - last + sizeof(BtLogHdr) >= 8192 )
+ if( end - last + sizeof(BtLogHdr) >= 32768 )
if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
else
return lsn;
}
+// sync a single btree page to disk
+
+BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
+{
+uint segment = latch->page_no >> 16;
+BtPage perm;
+
+ if( bt_writepage (mgr, page, latch->page_no) )
+ return mgr->err;
+
+ perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
+
+ if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+
+ latch->dirty = 0;
+ return 0;
+}
+
// read page into buffer pool from permanent location in Btree file
BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
BtSlot *node;
BtKey *ptr;
BtVal *val;
+int rate;
// if found slot > desired slot and previous slot
// is a librarian slot, use it
if( slotptr(set->page, idx)->dead )
break;
- // now insert key into array before slot
+ // now insert key into array before slot,
+ // adding as many librarian slots as
+ // makes sense.
- if( idx == set->page->cnt )
- idx += 2, set->page->cnt += 2, librarian = 2;
- else
- librarian = 1;
+ if( idx == set->page->cnt ) {
+ int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
- set->latch->dirty = 1;
- set->page->act++;
+ librarian = ++idx - slot;
+ avail /= sizeof(BtSlot);
- while( idx > slot + librarian - 1 )
- *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
+ if( avail < 0 )
+ avail = 0;
- // add librarian slot
+ if( librarian > avail )
+ librarian = avail;
- if( librarian > 1 ) {
- node = slotptr(set->page, slot++);
- node->off = set->page->min;
- node->type = Librarian;
- node->dead = 1;
+ if( librarian ) {
+ rate = (idx - slot) / librarian;
+ set->page->cnt += librarian;
+ idx += librarian;
+ } else
+ rate = 0;
+ } else
+ librarian = 0, rate = 0;
+
+ while( idx > slot ) {
+ // transfer slot
+ *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
+ idx--;
+
+ // add librarian slot per rate
+
+ if( librarian )
+ if( (idx - slot + 1)/2 <= librarian * rate ) {
+// if( rate && !(idx % rate) ) {
+ node = slotptr(set->page, idx--);
+ node->off = node[1].off;
+ node->type = Librarian;
+ node->dead = 1;
+ librarian--;
+ }
}
+if(librarian)
+abort();
+ set->latch->dirty = 1;
+ set->page->act++;
// fill in new slot
return 0;
}
-typedef struct {
- logseqno reqlsn; // redo log seq no required
- logseqno lsn; // redo log sequence number
- uint entry; // latch table entry number
- uint slot:31; // page slot number
- uint reuse:1; // reused previous page
-} AtomicTxn;
-
-typedef struct {
- uid page_no; // page number for split leaf
- void *next; // next key to insert
- uint entry:29; // latch table entry number
- uint type:2; // 0 == insert, 1 == delete, 2 == free
- uint nounlock:1; // don't unlock ParentModification
- unsigned char leafkey[BT_keyarray];
-} AtomicKey;
-
// determine actual page where key is located
// return slot number
return 0;
}
-BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
+BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
{
BtKey *key = keyptr(source, src);
BtVal *val = valptr(source, src);
if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
return mgr->err;
- set->page->lsn = locks[src].lsn;
+ set->page->lsn = lsn;
return 0;
}
// perform delete from smaller btree
// insert a delete slot if not found there
-BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
+BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
{
BtKey *key = keyptr(source, src);
BtPageSet set[1];
set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
set->latch->dirty = 1;
- set->page->lsn = locks[src].lsn;
+ set->page->lsn = lsn;
set->page->act--;
node->dead = 0;
BTERR bt_atomictxn (BtDb *bt, BtPage source)
{
uint src, idx, slot, samepage, entry, que = 0;
-AtomicKey *head, *tail, *leaf;
-BtPageSet set[1], prev[1];
-unsigned char value[BtId];
BtKey *key, *ptr, *key2;
-BtLatchSet *latch;
-AtomicTxn *locks;
int result = 0;
BtSlot temp[1];
logseqno lsn;
-BtPage page;
-BtVal *val;
-uid right;
int type;
- locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
- head = NULL;
- tail = NULL;
-
// stable sort the list of keys into order to
// prevent deadlocks between threads.
// add entries to redo log
if( bt->mgr->pagezero->redopages )
- if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
- for( src = 0; src++ < source->cnt; )
- locks[src].lsn = lsn;
- else
- return bt->mgr->err;
+ lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
+ else
+ lsn = 0;
+
+ // perform the individual actions in the transaction
+
+ if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
+ return bt->mgr->err;
+
+ // if number of active pages
+ // is greater than the buffer pool
+ // promote page into larger btree
+
+ if( bt->main )
+ while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
+ if( bt_txnpromote (bt) )
+ return bt->mgr->err;
+
+ // return success
+
+ return 0;
+}
+
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
+{
+uint src, idx, slot, samepage, entry, que = 0;
+AtomicKey *head, *tail, *leaf;
+BtPageSet set[1], prev[1];
+unsigned char value[BtId];
+BtLatchSet *latch;
+AtomicTxn *locks;
+BtKey *key, *ptr;
+BtPage page;
+BtVal *val;
+uid right;
+
+ locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
+
+ head = NULL;
+ tail = NULL;
// Load the leaf page for each key
// group same page references with reuse bit
bt_unlockpage(BtLockRead, set->latch);
if( !slot )
- if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
+ if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, thread_no) )
set->latch->split = 0;
else
- return bt->mgr->err;
+ return mgr->err;
if( slotptr(set->page, slot)->type == Librarian )
ptr = keyptr(set->page, ++slot);
ptr = keyptr(set->page, slot);
if( !samepage ) {
- locks[src].entry = set->latch - bt->mgr->latchsets;
+ locks[src].entry = set->latch - mgr->latchsets;
locks[src].slot = slot;
locks[src].reuse = 0;
} else {
if( locks[src].reuse )
continue;
- set->latch = bt->mgr->latchsets + locks[src].entry;
- bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
+ set->latch = mgr->latchsets + locks[src].entry;
+ bt_lockpage (BtLockWrite, set->latch, thread_no);
}
// insert or delete each key
for( idx = src; idx < samepage; idx++ )
switch( slotptr(source,idx)->type ) {
case Delete:
- if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
- return bt->mgr->err;
+ if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
+ return mgr->err;
break;
case Duplicate:
case Unique:
- if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
- return bt->mgr->err;
+ if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
+ return mgr->err;
+ break;
+
+ default:
+ bt_atomicpage (mgr, source, locks, idx, set);
break;
}
// after the same page operations have finished,
// process master page for splits or deletion.
- latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
- prev->page = bt_mappage (bt->mgr, prev->latch);
+ latch = prev->latch = mgr->latchsets + locks[src].entry;
+ prev->page = bt_mappage (mgr, prev->latch);
samepage = src;
// pick-up all splits from master page
entry = prev->latch->split;
while( entry ) {
- set->latch = bt->mgr->latchsets + entry;
- set->page = bt_mappage (bt->mgr, set->latch);
+ set->latch = mgr->latchsets + entry;
+ set->page = bt_mappage (mgr, set->latch);
entry = set->latch->split;
// delete empty master page by undoing its split
if( !prev->page->act ) {
memcpy (set->page->left, prev->page->left, BtId);
- memcpy (prev->page, set->page, bt->mgr->page_size);
- bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
- bt_freepage (bt->mgr, set);
+ memcpy (prev->page, set->page, mgr->page_size);
+ bt_lockpage (BtLockDelete, set->latch, thread_no);
+ bt_freepage (mgr, set);
prev->latch->split = set->latch->split;
prev->latch->dirty = 1;
if( !set->page->act ) {
memcpy (prev->page->right, set->page->right, BtId);
prev->latch->split = set->latch->split;
- bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
- bt_freepage (bt->mgr, set);
+ bt_lockpage (BtLockDelete, set->latch, thread_no);
+ bt_freepage (mgr, set);
continue;
}
leaf = calloc (sizeof(AtomicKey), 1), que++;
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
- leaf->entry = prev->latch - bt->mgr->latchsets;
+ leaf->entry = prev->latch - mgr->latchsets;
leaf->page_no = prev->latch->page_no;
leaf->type = 0;
// splice in the left link into the split page
bt_putid (set->page->left, prev->latch->page_no);
- bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
+ bt_lockpage(BtLockParent, prev->latch, thread_no);
bt_unlockpage(BtLockWrite, prev->latch);
+ if( lsm )
+ bt_syncpage (mgr, prev->page, prev->latch);
*prev = *set;
}
// far right sibling or set rightmost page in page zero
if( right = bt_getid (prev->page->right) ) {
- if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
- set->page = bt_mappage (bt->mgr, set->latch);
+ if( set->latch = bt_pinlatch (mgr, right, NULL, thread_no) )
+ set->page = bt_mappage (mgr, set->latch);
else
- return bt->mgr->err;
+ return mgr->err;
- bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
+ bt_lockpage (BtLockWrite, set->latch, thread_no);
bt_putid (set->page->left, prev->latch->page_no);
set->latch->dirty = 1;
bt_unlockpage (BtLockWrite, set->latch);
- bt_unpinlatch (bt->mgr, set->latch);
+ bt_unpinlatch (mgr, set->latch);
} else { // prev is rightmost page
- bt_mutexlock (bt->mgr->lock);
- bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
- bt_releasemutex(bt->mgr->lock);
+ bt_mutexlock (mgr->lock);
+ bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
+ bt_releasemutex(mgr->lock);
}
// Process last page split in chain
leaf = calloc (sizeof(AtomicKey), 1), que++;
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
- leaf->entry = prev->latch - bt->mgr->latchsets;
+ leaf->entry = prev->latch - mgr->latchsets;
leaf->page_no = prev->latch->page_no;
leaf->type = 0;
tail = leaf;
- bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
+ bt_lockpage(BtLockParent, prev->latch, thread_no);
bt_unlockpage(BtLockWrite, prev->latch);
+ if( lsm )
+ bt_syncpage (mgr, prev->page, prev->latch);
+
// remove atomic lock on master page
bt_unlockpage(BtLockAtomic, latch);
if( prev->page->act ) {
bt_unlockpage(BtLockWrite, latch);
bt_unlockpage(BtLockAtomic, latch);
- bt_unpinlatch(bt->mgr, latch);
+
+ if( lsm )
+ bt_syncpage (mgr, prev->page, latch);
+
+ bt_unpinlatch(mgr, latch);
continue;
}
ptr = keyptr(prev->page,prev->page->cnt);
- if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
- return bt->mgr->err;
+ if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
+ return mgr->err;
// perform the remainder of the delete
// from the FIFO queue
leaf = calloc (sizeof(AtomicKey), 1), que++;
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
- leaf->entry = prev->latch - bt->mgr->latchsets;
+ leaf->entry = prev->latch - mgr->latchsets;
leaf->page_no = prev->latch->page_no;
leaf->nounlock = 1;
leaf->type = 2;
// deletion completes in next phase.
bt_unlockpage(BtLockWrite, prev->latch);
+
+ if( lsm )
+ bt_syncpage (mgr, prev->page, prev->latch);
+
}
// add & delete keys for any pages split or merged during transaction
if( leaf = head )
do {
- set->latch = bt->mgr->latchsets + leaf->entry;
- set->page = bt_mappage (bt->mgr, set->latch);
+ set->latch = mgr->latchsets + leaf->entry;
+ set->page = bt_mappage (mgr, set->latch);
bt_putid (value, leaf->page_no);
ptr = (BtKey *)leaf->leafkey;
switch( leaf->type ) {
case 0: // insert key
- if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
- return bt->mgr->err;
+ if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
+ return mgr->err;
break;
case 1: // delete key
- if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
- return bt->mgr->err;
+ if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
+ return mgr->err;
break;
case 2: // free page
- if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
- return bt->mgr->err;
+ if( bt_atomicfree (mgr, set, thread_no) )
+ return mgr->err;
break;
}
if( !leaf->nounlock )
bt_unlockpage (BtLockParent, set->latch);
- bt_unpinlatch (bt->mgr, set->latch);
+ bt_unpinlatch (mgr, set->latch);
tail = leaf->next;
free (leaf);
} while( leaf = tail );
- // if number of active pages
- // is greater than the buffer pool
- // promote page into larger btree
-
- if( bt->main )
- while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
- if( bt_txnpromote (bt) )
- return bt->mgr->err;
-
- // return success
-
free (locks);
return 0;
}
continue;
}
- bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
+ bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
- bt_unlockpage(BtLockAccess, set->latch);
// entry interiour node or being or was killed
if( set->page->lvl || set->page->free || set->page->kill ) {
bt_releasemutex(set->latch->modify);
+ bt_unlockpage(BtLockAtomic, set->latch);
bt_unlockpage(BtLockWrite, set->latch);
continue;
}
// transfer slots in our selected page to larger btree
if( !(set->latch->page_no % 100) )
-fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
-
- for( slot = 0; slot++ < set->page->cnt; ) {
- ptr = keyptr (set->page, slot);
- val = valptr (set->page, slot);
- node = slotptr(set->page, slot);
-
- switch( node->type ) {
- case Duplicate:
- case Unique:
- if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
- return bt->main->err;
+fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
- continue;
-
- case Delete:
- if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
- return bt->main->err;
+ if( bt_atomicexec (bt->main, set->page, 0, 1, bt->thread_no) )
+ return bt->main->err;
- continue;
- }
- }
+ bt_unlockpage(BtLockAtomic, set->latch);
// now delete the page