X-Git-Url: https://pd.if.org/git/?a=blobdiff_plain;f=threadskv8.c;h=91ec929670d8fd4ed6a27cd4fe040f8c9c038ec5;hb=97a2992f299b35ede35012314c35857ff28d174e;hp=3bae89e89b861f8d99806ef2937a5fada4805f80;hpb=139a1d55e5bd4a7fe1f605c8865fab586ae64a0f;p=btree diff --git a/threadskv8.c b/threadskv8.c index 3bae89e..91ec929 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -3,10 +3,11 @@ // phase-fair reader writer lock, // librarian page split code, // duplicate key management +// bi-directional cursors // traditional buffer pool manager -// and ACID batched key updates +// and ACID batched key-value updates -// 21 SEP 2014 +// 26 SEP 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -111,6 +112,13 @@ typedef struct { ushort serving[1]; } RWLock; +// write only queue lock + +typedef struct { + ushort ticket[1]; + ushort serving[1]; +} WOLock; + #define PHID 0x1 #define PRES 0x2 #define MASK 0x3 @@ -146,19 +154,14 @@ typedef struct { uid page_no; // latch set page number RWLock readwr[1]; // read/write page lock RWLock access[1]; // Access Intent/Page delete - RWLock parent[1]; // Posting of fence key in parent - RWLock atomic[1]; // Atomic update in progress + WOLock parent[1]; // Posting of fence key in parent + WOLock atomic[1]; // Atomic update in progress uint split; // right split page atomic insert uint entry; // entry slot in latch table uint next; // next entry in hash table chain uint prev; // prev entry in hash table chain volatile ushort pin; // number of outstanding threads ushort dirty:1; // page in cache is dirty -#ifdef unix - pthread_t atomictid; // pid holding atomic lock -#else - uint atomictid; // pid holding atomic lock -#endif } BtLatchSet; // Define the length of the page record numbers @@ -188,7 +191,8 @@ typedef enum { Unique, Librarian, Duplicate, - Delete + Delete, + Update } BtSlotType; typedef struct { @@ -235,6 +239,8 @@ typedef struct BtPage_ { unsigned char lvl:7; // level of page unsigned char kill:1; // page is being deleted unsigned char right[BtId]; // page number to right + unsigned char left[BtId]; // page number to left + unsigned char filler[2]; // padding to multiple of 8 } *BtPage; // The loadpage interface object @@ -401,6 +407,31 @@ uid bt_newdup (BtDb *bt) #endif } +// Write-Only Queue Lock + +void WriteOLock (WOLock *lock) +{ +ushort tix; +#ifdef unix + tix = __sync_fetch_and_add (lock->ticket, 1); +#else + tix = _InterlockedExchangeAdd16 (lock->ticket, 1); +#endif + // wait for our ticket to come up + + while( tix != lock->serving[0] ) +#ifdef unix + sched_yield(); +#else + SwitchToThread (); +#endif +} + +void WriteORelease (WOLock *lock) +{ + lock->serving[0]++; +} + // Phase-Fair reader/writer lock implementation void WriteLock (RWLock *lock) @@ -649,7 +680,6 @@ BtLatchSet *latch = bt->mgr->latchsets + slot; bt->mgr->latchsets[latch->next].prev = slot; bt->mgr->hashtable[hashidx].slot = slot; - memset (&latch->atomictid, 0, sizeof(latch->atomictid)); latch->page_no = page_no; latch->entry = slot; latch->split = 0; @@ -977,6 +1007,11 @@ BtVal *val; pagezero->alloc->bits = mgr->page_bits; bt_putid(pagezero->alloc->right, MIN_lvl+1); + // initialize left-most LEAF page in + // alloc->left. + + bt_putid (pagezero->alloc->left, LEAF_page); + if( bt_writepage (mgr, pagezero->alloc, 0) ) { fprintf (stderr, "Unable to create btree page zero\n"); return bt_mgrclose (mgr), NULL; @@ -1126,14 +1161,10 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch) WriteLock (latch->access); break; case BtLockParent: - WriteLock (latch->parent); + WriteOLock (latch->parent); break; case BtLockAtomic: - WriteLock (latch->atomic); - break; - case BtLockAtomic | BtLockRead: - WriteLock (latch->atomic); - ReadLock (latch->readwr); + WriteOLock (latch->atomic); break; } } @@ -1156,16 +1187,10 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch) WriteRelease (latch->access); break; case BtLockParent: - WriteRelease (latch->parent); + WriteORelease (latch->parent); break; case BtLockAtomic: - memset (&latch->atomictid, 0, sizeof(latch->atomictid)); - WriteRelease (latch->atomic); - break; - case BtLockAtomic | BtLockRead: - ReadRelease (latch->readwr); - memset (&latch->atomictid, 0, sizeof(latch->atomictid)); - WriteRelease (latch->atomic); + WriteORelease (latch->atomic); break; } } @@ -1277,7 +1302,7 @@ uint mode, prevmode; set->page = bt_mappage (bt, set->latch); - // release & unpin parent page + // release & unpin parent or left sibling page if( prevpage ) { bt_unlockpage(prevmode, prevlatch); @@ -1285,20 +1310,10 @@ uint mode, prevmode; prevpage = 0; } - // skip Atomic lock on leaf page if already held - - if( !drill ) - if( mode & BtLockAtomic ) - if( pthread_equal (set->latch->atomictid, pthread_self()) ) - mode &= ~BtLockAtomic; - // obtain mode lock using lock chaining through AccessLock bt_lockpage(mode, set->latch); - if( mode & BtLockAtomic ) - set->latch->atomictid = pthread_self(); - if( set->page->free ) return bt->err = BTERR_struct, 0; @@ -1334,11 +1349,13 @@ uint mode, prevmode; if( drill == lvl ) return slot; + // find next non-dead slot -- the fence key if nothing else + while( slotptr(set->page, slot)->dead ) if( slot++ < set->page->cnt ) - continue; + continue; else - goto slideright; + return bt->err = BTERR_struct, 0; page_no = bt_getid(valptr(set->page, slot)->value); drill--; @@ -1378,7 +1395,6 @@ void bt_freepage (BtDb *bt, BtPageSet *set) bt_unlockpage (BtLockDelete, set->latch); bt_unlockpage (BtLockWrite, set->latch); - bt_unpinlatch (set->latch); // unlock allocation page @@ -1460,6 +1476,7 @@ uint idx; root->latch->dirty = 1; bt_freepage (bt, child); + bt_unpinlatch (child->latch); } while( root->page->lvl > 1 && root->page->act == 1 ); @@ -1546,6 +1563,7 @@ BtKey *ptr; bt_lockpage (BtLockDelete, right->latch); bt_lockpage (BtLockWrite, right->latch); bt_freepage (bt, right); + bt_unpinlatch (right->latch); bt_unlockpage (BtLockParent, set->latch); bt_unpinlatch (set->latch); @@ -2242,20 +2260,96 @@ uint type; typedef struct { uint entry; // latch table entry number - uint slot:30; // page slot number + uint slot:31; // page slot number uint reuse:1; // reused previous page - uint emptied:1; // page was emptied } AtomicMod; typedef struct { uid page_no; // page number for split leaf void *next; // next key to insert - uint entry; // latch table entry number + uint entry:29; // latch table entry number + uint type:2; // 0 == insert, 1 == delete, 2 == free + uint nounlock:1; // don't unlock ParentModification unsigned char leafkey[BT_keyarray]; } AtomicKey; +// find and load leaf page for given key +// leave page Atomic locked and Read locked. + +int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len) +{ +BtLatchSet *prevlatch; +uid page_no; +uint slot; + + // find level zero page + + if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) ) + return 0; + + // find next non-dead entry on this page + // it will be the fence key if nothing else + + while( slotptr(set->page, slot)->dead ) + if( slot++ < set->page->cnt ) + continue; + else + return bt->err = BTERR_struct, 0; + + page_no = bt_getid(valptr(set->page, slot)->value); + prevlatch = set->latch; + + while( page_no ) { + if( set->latch = bt_pinlatch (bt, page_no, 1) ) + set->page = bt_mappage (bt, set->latch); + else + return 0; + + if( set->page->free || set->page->lvl ) + return bt->err = BTERR_struct, 0; + + // obtain read lock using lock chaining with Access mode + // release & unpin parent/left sibling page + + bt_lockpage(BtLockAccess, set->latch); + + bt_unlockpage(BtLockRead, prevlatch); + bt_unpinlatch (prevlatch); + + bt_lockpage(BtLockRead, set->latch); + bt_unlockpage(BtLockAccess, set->latch); + + // find key on page at this level + // and descend to requested level + + if( !set->page->kill ) + if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) { + bt_unlockpage(BtLockRead, set->latch); + bt_lockpage(BtLockAtomic, set->latch); + bt_lockpage(BtLockAccess, set->latch); + bt_lockpage(BtLockRead, set->latch); + bt_unlockpage(BtLockAccess, set->latch); + + if( slot = bt_findslot (set->page, key, len) ) + return slot; + + bt_unlockpage(BtLockAtomic, set->latch); + } + + // slide right into next page + + page_no = bt_getid(set->page->right); + prevlatch = set->latch; + } + + // return error on end of right chain + + bt->err = BTERR_struct; + return 0; // return error +} + // determine actual page where key is located -// return slot number with set page locked +// return slot number uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set) { @@ -2338,29 +2432,21 @@ BtVal *val; val = valptr(set->page, slot); set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal); - set->page->act--; - - // collapse empty slots beneath the fence - - while( idx = set->page->cnt - 1 ) - if( slotptr(set->page, idx)->dead ) { - *slotptr(set->page, idx) = *slotptr(set->page, idx + 1); - memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot)); - } else - break; - set->latch->dirty = 1; + set->page->act--; return 0; } -// qsort comparison function - -int bt_qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page) +uint bt_parentmatch (AtomicKey *head, uint entry) { -BtKey *key1 = (BtKey *)((unsigned char *)page + slot1->off); -BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off); +uint cnt = 0; + + if( head ) + do if( head->entry == entry ) + head->nounlock = 1, cnt++; + while( head = head->next ); - return keycmp (key1, key2->key, key2->len); + return cnt; } // atomic modification of a batch of keys. @@ -2370,9 +2456,9 @@ BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off); // causing the key constraint violation // or zero on successful completion. -int bt_atomicmods (BtDb *bt, BtPage source) +int bt_atomictxn (BtDb *bt, BtPage source) { -uint src, idx, slot, samepage, entry, next, split; +uint src, idx, slot, samepage, entry; AtomicKey *head, *tail, *leaf; BtPageSet set[1], prev[1]; unsigned char value[BtId]; @@ -2383,16 +2469,16 @@ int result = 0; BtSlot temp[1]; BtPage page; BtVal *val; +uid right; int type; locks = calloc (source->cnt + 1, sizeof(AtomicMod)); head = NULL; tail = NULL; - // first sort the list of keys into order to + // stable sort the list of keys into order to // prevent deadlocks between threads. - qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)bt_qsortcmp, source); -/* + for( src = 1; src++ < source->cnt; ) { *temp = *slotptr(source,src); key = keyptr (source,src); @@ -2406,13 +2492,13 @@ int type; break; } } -*/ - // Load the leafpage for each key + + // Load the leaf page for each key + // group same page references with reuse bit // and determine any constraint violations for( src = 0; src++ < source->cnt; ) { key = keyptr(source, src); - val = valptr(source, src); slot = 0; // first determine if this modification falls @@ -2420,13 +2506,13 @@ int type; // note that the far right leaf page is a special case if( samepage = src > 1 ) - if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 ) + if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 ) slot = bt_findslot(set->page, key->key, key->len); else // release read on previous page bt_unlockpage(BtLockRead, set->latch); if( !slot ) - if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) ) + if( slot = bt_atomicload(bt, set, key->key, key->len) ) set->latch->split = 0; else return -1; @@ -2451,50 +2537,26 @@ int type; case Unique: if( !slotptr(set->page, slot)->dead ) if( slot < set->page->cnt || bt_getid (set->page->right) ) - if( ptr->len == key->len && !memcmp (ptr->key, key->key, key->len) ) { + if( !keycmp (ptr, key->key, key->len) ) { // return constraint violation if key already exists + bt_unlockpage(BtLockRead, set->latch); result = src; while( src ) { if( locks[src].entry ) { set->latch = bt->mgr->latchsets + locks[src].entry; bt_unlockpage(BtLockAtomic, set->latch); - bt_unlockpage(BtLockRead, set->latch); bt_unpinlatch (set->latch); } src--; } - + free (locks); return result; } - break; - - case Delete: - if( !slotptr(set->page, slot)->dead ) - if( ptr->len == key->len ) - if( !memcmp (ptr->key, key->key, key->len) ) - break; - - // Key to delete doesn't exist - - result = src; - - while( src ) { - if( locks[src].entry ) { - set->latch = bt->mgr->latchsets + locks[src].entry; - bt_unlockpage(BtLockAtomic, set->latch); - bt_unlockpage(BtLockRead, set->latch); - bt_unpinlatch (set->latch); - } - src--; - } - - return result; } - } // unlock last loadpage lock @@ -2502,77 +2564,96 @@ int type; if( source->cnt > 1 ) bt_unlockpage(BtLockRead, set->latch); - // obtain write lock for each page + // obtain write lock for each master page for( src = 0; src++ < source->cnt; ) - if( locks[src].entry ) + if( locks[src].reuse ) + continue; + else bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry); // insert or delete each key + // process any splits or merges + // release Write & Atomic latches + // set ParentModifications and build + // queue of keys to insert for split pages + // or delete for deleted pages. - for( src = 0; src++ < source->cnt; ) - if( slotptr(source,src)->type == Delete ) - if( bt_atomicdelete (bt, source, locks, src) ) - return -1; - else - continue; - else - if( bt_atomicinsert (bt, source, locks, src) ) - return -1; - else - continue; + // run through txn list backwards - // set ParentModification and release WriteLock - // leave empty pages locked for later processing - // build queue of keys to insert for split pages + samepage = source->cnt + 1; - for( src = 0; src++ < source->cnt; ) { + for( src = source->cnt; src; src-- ) { if( locks[src].reuse ) continue; - prev->latch = bt->mgr->latchsets + locks[src].entry; - prev->page = bt_mappage (bt, prev->latch); + // perform the txn operations + // from smaller to larger on + // the same page - // pick-up all splits from original page + for( idx = src; idx < samepage; idx++ ) + switch( slotptr(source,idx)->type ) { + case Delete: + if( bt_atomicdelete (bt, source, locks, idx) ) + return -1; + break; - split = next = prev->latch->split; + case Duplicate: + case Unique: + if( bt_atomicinsert (bt, source, locks, idx) ) + return -1; + break; + } + + // after the same page operations have finished, + // process master page for splits or deletion. + + latch = prev->latch = bt->mgr->latchsets + locks[src].entry; + prev->page = bt_mappage (bt, prev->latch); + samepage = src; + + // pick-up all splits from master page - if( !prev->page->act ) - locks[src].emptied = 1; + entry = prev->latch->split; - while( entry = next ) { + while( entry ) { set->latch = bt->mgr->latchsets + entry; set->page = bt_mappage (bt, set->latch); - next = set->latch->split; - set->latch->split = 0; + entry = set->latch->split; - // delete empty previous page + // delete empty master page if( !prev->page->act ) { + memcpy (set->page->left, prev->page->left, BtId); memcpy (prev->page, set->page, bt->mgr->page_size); bt_lockpage (BtLockDelete, set->latch); bt_freepage (bt, set); + bt_unpinlatch (set->latch); prev->latch->dirty = 1; + continue; + } - if( prev->page->act ) - locks[src].emptied = 0; + // remove empty page from the split chain + if( !set->page->act ) { + memcpy (prev->page->right, set->page->right, BtId); + prev->latch->split = set->latch->split; + bt_lockpage (BtLockDelete, set->latch); + bt_freepage (bt, set); + bt_unpinlatch (set->latch); continue; } - // prev page is not emptied - locks[src].emptied = 0; - - // schedule previous fence key update + // schedule prev fence key update ptr = keyptr(prev->page,prev->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; - leaf->next = NULL; + leaf->type = 0; if( tail ) tail->next = leaf; @@ -2581,40 +2662,85 @@ int type; tail = leaf; - // remove empty block from the split chain + bt_putid (set->page->left, prev->latch->page_no); + bt_lockpage(BtLockParent, prev->latch); + bt_unlockpage(BtLockWrite, prev->latch); + *prev = *set; + } - if( !set->page->act ) { - memcpy (prev->page->right, set->page->right, BtId); - bt_lockpage (BtLockDelete, set->latch); - bt_freepage (bt, set); - continue; + // update left pointer in next right page from last split page + // (if all splits were reversed, latch->split == 0) + + if( latch->split ) { + // fix left pointer in master's original (now split) right sibling + // or set rightmost page in page zero + + if( right = bt_getid (prev->page->right) ) { + if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) ) + set->page = bt_mappage (bt, set->latch); + else + return -1; + + bt_lockpage (BtLockWrite, set->latch); + bt_putid (set->page->left, prev->latch->page_no); + set->latch->dirty = 1; + bt_unlockpage (BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + } else { // prev is rightmost page + bt_spinwritelock (bt->mgr->lock); + bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); + bt_spinreleasewrite(bt->mgr->lock); } + // Process last page split in chain + + ptr = keyptr(prev->page,prev->page->cnt); + leaf = calloc (sizeof(AtomicKey), 1); + + memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); + leaf->page_no = prev->latch->page_no; + leaf->entry = prev->latch->entry; + leaf->type = 0; + + if( tail ) + tail->next = leaf; + else + head = leaf; + + tail = leaf; + bt_lockpage(BtLockParent, prev->latch); bt_unlockpage(BtLockWrite, prev->latch); - *prev = *set; - } - // was entire chain emptied? + // remove atomic lock on master page - if( !prev->page->act ) - continue; + bt_unlockpage(BtLockAtomic, latch); + continue; + } - if( !split ) { - bt_unlockpage(BtLockWrite, prev->latch); - continue; + // finished if prev page occupied (either master or final split) + + if( prev->page->act ) { + bt_unlockpage(BtLockWrite, latch); + bt_unlockpage(BtLockAtomic, latch); + bt_unpinlatch(latch); + continue; } - // Process last page split in chain + // any and all splits were reversed, and the + // master page located in prev is empty, delete it + // by pulling over master's right sibling. + + // Delete empty master's fence key ptr = keyptr(prev->page,prev->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; - leaf->next = NULL; - + leaf->type = 1; + if( tail ) tail->next = leaf; else @@ -2624,48 +2750,117 @@ int type; bt_lockpage(BtLockParent, prev->latch); bt_unlockpage(BtLockWrite, prev->latch); - } + + // grab master's right sibling + // note that the far right page never empties because + // it always contains (at least) the infinite stopper key + // and that all pages that don't contain any keys have + // been deleted, or are being deleted under write lock. + + if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) + set->page = bt_mappage (bt, set->latch); + else + return -1; + + bt_lockpage(BtLockWrite, set->latch); + + // and pull contents over empty page + // while preserving master's left link + + memcpy (set->page->left, prev->page->left, BtId); + memcpy (prev->page, set->page, bt->mgr->page_size); + + // forward seekers to old right sibling + // to new page location in prev + + bt_putid (set->page->right, prev->latch->page_no); + set->latch->dirty = 1; + set->page->kill = 1; + + // add new fence key for new master page contents, delete + // right sibling after parent posts the new master fence. + + ptr = keyptr(set->page,set->page->cnt); + leaf = calloc (sizeof(AtomicKey), 1); + + memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); + leaf->page_no = prev->latch->page_no; + leaf->entry = set->latch->entry; + leaf->type = 2; - // remove Atomic locks and - // process any empty pages + // see if right sibling key update is in the FIFO, + // and ParentLock if not there. - for( src = source->cnt; src; src-- ) { - if( locks[src].reuse ) - continue; + if( !bt_parentmatch (head, leaf->entry) ) + bt_lockpage(BtLockParent, set->latch); - set->latch = bt->mgr->latchsets + locks[src].entry; - set->page = bt_mappage (bt, set->latch); + bt_unlockpage(BtLockWrite, set->latch); - // clear original page split field + if( tail ) + tail->next = leaf; + else + head = leaf; - split = set->latch->split; - set->latch->split = 0; - bt_unlockpage (BtLockAtomic, set->latch); + tail = leaf; - // delete page emptied by our atomic action + // fix new master's right sibling's left pointer - if( locks[src].emptied ) - if( bt_deletepage (bt, set) ) - return bt->err; - else - continue; + if( right = bt_getid (set->page->right) ) { + if( set->latch = bt_pinlatch (bt, right, 1) ) + set->page = bt_mappage (bt, set->latch); - if( !split ) - bt_unpinlatch (set->latch); - } + bt_lockpage (BtLockWrite, set->latch); + bt_putid (set->page->left, prev->latch->page_no); + set->latch->dirty = 1; + + bt_unlockpage (BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + } else { // master is now the far right page + bt_spinwritelock (bt->mgr->lock); + bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); + bt_spinreleasewrite(bt->mgr->lock); + } - // add keys for any pages split during action + bt_unlockpage(BtLockAtomic, latch); + } + + // add & delete keys for any pages split or merged during transaction if( leaf = head ) do { - ptr = (BtKey *)leaf->leafkey; + set->latch = bt->mgr->latchsets + leaf->entry; + set->page = bt_mappage (bt, set->latch); + bt_putid (value, leaf->page_no); + ptr = (BtKey *)leaf->leafkey; - if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) - return bt->err; + switch( leaf->type ) { + case 0: // insert key + if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) + return -1; + + break; + + case 1: // delete key + if( bt_deletekey (bt, ptr->key, ptr->len, 1) ) + return -1; - bt_unlockpage (BtLockParent, bt->mgr->latchsets + leaf->entry); - bt_unpinlatch (bt->mgr->latchsets + leaf->entry); + break; + + case 2: // insert key & free + if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) + return -1; + + bt_lockpage (BtLockDelete, set->latch); + bt_lockpage (BtLockWrite, set->latch); + bt_freepage (bt, set); + break; + } + + if( !leaf->nounlock ) + bt_unlockpage (BtLockParent, set->latch); + + bt_unpinlatch (set->latch); tail = leaf->next; free (leaf); } while( leaf = tail ); @@ -2676,6 +2871,70 @@ int type; return 0; } +// set cursor to highest slot on highest page + +uint bt_lastkey (BtDb *bt) +{ +uid page_no = bt_getid (bt->mgr->pagezero->alloc->left); +BtPageSet set[1]; + + if( set->latch = bt_pinlatch (bt, page_no, 1) ) + set->page = bt_mappage (bt, set->latch); + else + return 0; + + bt_lockpage(BtLockRead, set->latch); + memcpy (bt->cursor, set->page, bt->mgr->page_size); + bt_unlockpage(BtLockRead, set->latch); + bt_unpinlatch (set->latch); + + bt->cursor_page = page_no; + return bt->cursor->cnt; +} + +// return previous slot on cursor page + +uint bt_prevkey (BtDb *bt, uint slot) +{ +uid ourright, next, us = bt->cursor_page; +BtPageSet set[1]; + + if( --slot ) + return slot; + + ourright = bt_getid(bt->cursor->right); + +goleft: + if( !(next = bt_getid(bt->cursor->left)) ) + return 0; + +findourself: + bt->cursor_page = next; + + if( set->latch = bt_pinlatch (bt, next, 1) ) + set->page = bt_mappage (bt, set->latch); + else + return 0; + + bt_lockpage(BtLockRead, set->latch); + memcpy (bt->cursor, set->page, bt->mgr->page_size); + bt_unlockpage(BtLockRead, set->latch); + bt_unpinlatch (set->latch); + + next = bt_getid (bt->cursor->right); + + if( bt->cursor->kill ) + goto findourself; + + if( next != us ) + if( next == ourright ) + goto goleft; + else + goto findourself; + + return bt->cursor->cnt; +} + // return next slot on cursor page // or slide cursor right into next page @@ -2828,7 +3087,7 @@ uint slot = 0; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->rin & MASK ) + if( *latch->parent->ticket != *latch->parent->serving ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->parent, 0, sizeof(RWLock)); @@ -2861,7 +3120,7 @@ BtKey *ptr; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->rin & MASK ) + if( *latch->parent->ticket != *latch->parent->serving ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no); memset ((ushort *)latch->parent, 0, sizeof(RWLock)); @@ -2930,12 +3189,12 @@ void *index_file (void *arg) uint __stdcall index_file (void *arg) #endif { -int line = 0, found = 0, cnt = 0, unique; +int line = 0, found = 0, cnt = 0, idx; uid next, page_no = LEAF_page; // start on first page of leaves +int ch, len = 0, slot, type = 0; unsigned char key[BT_maxkey]; unsigned char txn[65536]; ThreadArg *args = arg; -int ch, len = 0, slot; BtPageSet set[1]; uint nxt = 65536; BtPage page; @@ -2947,9 +3206,12 @@ FILE *in; bt = bt_open (args->mgr); page = (BtPage)txn; - unique = (args->type[1] | 0x20) != 'd'; + if( args->idx < strlen (args->type) ) + ch = args->type[args->idx]; + else + ch = args->type[strlen(args->type) - 1]; - switch(args->type[0] | 0x20) + switch(ch | 0x20) { case 'a': fprintf(stderr, "started latch mgr audit\n"); @@ -2957,13 +3219,37 @@ FILE *in; fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt); break; - case 't': - fprintf(stderr, "started TXN pennysort for %s\n", args->infile); + case 'd': + type = Delete; + + case 'p': + if( !type ) + type = Unique; + + if( args->num ) + if( type == Delete ) + fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile); + else + fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile); + else + if( type == Delete ) + fprintf(stderr, "started pennysort delete for %s\n", args->infile); + else + fprintf(stderr, "started pennysort insert for %s\n", args->infile); + if( in = fopen (args->infile, "rb") ) while( ch = getc(in), ch != EOF ) if( ch == '\n' ) { line++; + + if( !args->num ) { + if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) ) + fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); + len = 0; + continue; + } + nxt -= len - 10; memcpy (txn + nxt, key + 10, len - 10); nxt -= 1; @@ -2973,19 +3259,19 @@ FILE *in; nxt -= 1; txn[nxt] = 10; slotptr(page,++cnt)->off = nxt; - slotptr(page,cnt)->type = Unique; + slotptr(page,cnt)->type = type; len = 0; - if( cnt < 25 ) + if( cnt < args->num ) continue; page->cnt = cnt; page->act = cnt; page->min = nxt; - if( bt_atomicmods (bt, page) ) + if( bt_atomictxn (bt, page) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); - nxt = 65536; + nxt = sizeof(txn); cnt = 0; } @@ -2994,23 +3280,6 @@ FILE *in; fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); break; - case 'p': - fprintf(stderr, "started pennysort for %s\n", args->infile); - if( in = fopen (args->infile, "rb") ) - while( ch = getc(in), ch != EOF ) - if( ch == '\n' ) - { - line++; - - if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) ) - fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); - len = 0; - } - else if( len < BT_maxkey ) - key[len++] = ch; - fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); - break; - case 'w': fprintf(stderr, "started indexing for %s\n", args->infile); if( in = fopen (args->infile, "r") ) @@ -3019,13 +3288,7 @@ FILE *in; { line++; - if( args->num == 1 ) - sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9; - - else if( args->num ) - sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - - if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) ) + if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; } @@ -3034,33 +3297,6 @@ FILE *in; fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); break; - case 'd': - fprintf(stderr, "started deleting keys for %s\n", args->infile); - if( in = fopen (args->infile, "rb") ) - while( ch = getc(in), ch != EOF ) - if( ch == '\n' ) - { - line++; - if( args->num == 1 ) - sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9; - - else if( args->num ) - sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - - if( bt_findkey (bt, key, len, NULL, 0) < 0 ) - fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0); - ptr = (BtKey*)(bt->key); - found++; - - if( bt_deletekey (bt, ptr->key, ptr->len, 0) ) - fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); - len = 0; - } - else if( len < BT_maxkey ) - key[len++] = ch; - fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes); - break; - case 'f': fprintf(stderr, "started finding keys for %s\n", args->infile); if( in = fopen (args->infile, "rb") ) @@ -3068,12 +3304,6 @@ FILE *in; if( ch == '\n' ) { line++; - if( args->num == 1 ) - sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9; - - else if( args->num ) - sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - if( bt_findkey (bt, key, len, NULL, 0) == 0 ) found++; else if( bt->err ) @@ -3118,6 +3348,29 @@ FILE *in; fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); break; + case 'r': + fprintf(stderr, "started reverse scan\n"); + if( slot = bt_lastkey (bt) ) + while( slot = bt_prevkey (bt, slot) ) { + if( slotptr(bt->cursor, slot)->dead ) + continue; + + ptr = keyptr(bt->cursor, slot); + len = ptr->len; + + if( slotptr(bt->cursor, slot)->type == Duplicate ) + len -= BtId; + + fwrite (ptr->key, len, 1, stdout); + val = valptr(bt->cursor, slot); + fwrite (val->value, val->len, 1, stdout); + fputc ('\n', stdout); + cnt++; + } + + fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); + break; + case 'c': #ifdef unix posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL); @@ -3137,7 +3390,7 @@ FILE *in; } cnt--; // remove stopper key - fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); + fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); break; } @@ -3171,10 +3424,12 @@ BtKey *ptr; BtDb *bt; if( argc < 3 ) { - fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]); - fprintf (stderr, " where page_bits is the page size in bits\n"); + fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]); + fprintf (stderr, " where idx_file is the name of the btree file\n"); + fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n"); + fprintf (stderr, " page_bits is the page size in bits\n"); fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n"); - fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n"); + fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n"); fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n"); exit(0); }