From 97a2992f299b35ede35012314c35857ff28d174e Mon Sep 17 00:00:00 2001 From: unknown Date: Sat, 27 Sep 2014 11:02:29 -0700 Subject: [PATCH] Fix bug in reverse scan. New command line interface with a command per input file --- threadskv8.c | 231 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 140 insertions(+), 91 deletions(-) diff --git a/threadskv8.c b/threadskv8.c index f2a23c7..91ec929 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -112,6 +112,13 @@ typedef struct { ushort serving[1]; } RWLock; +// write only queue lock + +typedef struct { + ushort ticket[1]; + ushort serving[1]; +} WOLock; + #define PHID 0x1 #define PRES 0x2 #define MASK 0x3 @@ -147,8 +154,8 @@ typedef struct { uid page_no; // latch set page number RWLock readwr[1]; // read/write page lock RWLock access[1]; // Access Intent/Page delete - RWLock parent[1]; // Posting of fence key in parent - RWLock atomic[1]; // Atomic update in progress + WOLock parent[1]; // Posting of fence key in parent + WOLock atomic[1]; // Atomic update in progress uint split; // right split page atomic insert uint entry; // entry slot in latch table uint next; // next entry in hash table chain @@ -400,6 +407,31 @@ uid bt_newdup (BtDb *bt) #endif } +// Write-Only Queue Lock + +void WriteOLock (WOLock *lock) +{ +ushort tix; +#ifdef unix + tix = __sync_fetch_and_add (lock->ticket, 1); +#else + tix = _InterlockedExchangeAdd16 (lock->ticket, 1); +#endif + // wait for our ticket to come up + + while( tix != lock->serving[0] ) +#ifdef unix + sched_yield(); +#else + SwitchToThread (); +#endif +} + +void WriteORelease (WOLock *lock) +{ + lock->serving[0]++; +} + // Phase-Fair reader/writer lock implementation void WriteLock (RWLock *lock) @@ -1129,10 +1161,10 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch) WriteLock (latch->access); break; case BtLockParent: - WriteLock (latch->parent); + WriteOLock (latch->parent); break; case BtLockAtomic: - WriteLock (latch->atomic); + WriteOLock (latch->atomic); break; } } @@ -1155,10 +1187,10 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch) WriteRelease (latch->access); break; case BtLockParent: - WriteRelease (latch->parent); + WriteORelease (latch->parent); break; case BtLockAtomic: - WriteRelease (latch->atomic); + WriteORelease (latch->atomic); break; } } @@ -2636,11 +2668,11 @@ int type; *prev = *set; } - // update left pointer in next right page - // if we did any now non-empty splits + // update left pointer in next right page from last split page + // (if all splits were reversed, latch->split == 0) if( latch->split ) { - // fix left pointer in master's original right sibling + // fix left pointer in master's original (now split) right sibling // or set rightmost page in page zero if( right = bt_getid (prev->page->right) ) { @@ -2654,8 +2686,11 @@ int type; set->latch->dirty = 1; bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (set->latch); - } else + } else { // prev is rightmost page + bt_spinwritelock (bt->mgr->lock); bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); + bt_spinreleasewrite(bt->mgr->lock); + } // Process last page split in chain @@ -2676,6 +2711,9 @@ int type; bt_lockpage(BtLockParent, prev->latch); bt_unlockpage(BtLockWrite, prev->latch); + + // remove atomic lock on master page + bt_unlockpage(BtLockAtomic, latch); continue; } @@ -2689,10 +2727,11 @@ int type; continue; } - // any splits were reversed, and the + // any and all splits were reversed, and the // master page located in prev is empty, delete it // by pulling over master's right sibling. - // Delete empty master's fence + + // Delete empty master's fence key ptr = keyptr(prev->page,prev->page->cnt); leaf = calloc (sizeof(AtomicKey), 1); @@ -2713,6 +2752,10 @@ int type; bt_unlockpage(BtLockWrite, prev->latch); // grab master's right sibling + // note that the far right page never empties because + // it always contains (at least) the infinite stopper key + // and that all pages that don't contain any keys have + // been deleted, or are being deleted under write lock. if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) set->page = bt_mappage (bt, set->latch); @@ -2721,11 +2764,15 @@ int type; bt_lockpage(BtLockWrite, set->latch); - // pull contents over empty page + // and pull contents over empty page + // while preserving master's left link memcpy (set->page->left, prev->page->left, BtId); memcpy (prev->page, set->page, bt->mgr->page_size); + // forward seekers to old right sibling + // to new page location in prev + bt_putid (set->page->right, prev->latch->page_no); set->latch->dirty = 1; set->page->kill = 1; @@ -2741,7 +2788,8 @@ int type; leaf->entry = set->latch->entry; leaf->type = 2; - // see if right sibling is not yet in the FIFO + // see if right sibling key update is in the FIFO, + // and ParentLock if not there. if( !bt_parentmatch (head, leaf->entry) ) bt_lockpage(BtLockParent, set->latch); @@ -2755,7 +2803,7 @@ int type; tail = leaf; - // fix master's far right sibling's left pointer + // fix new master's right sibling's left pointer if( right = bt_getid (set->page->right) ) { if( set->latch = bt_pinlatch (bt, right, 1) ) @@ -2767,6 +2815,10 @@ int type; bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (set->latch); + } else { // master is now the far right page + bt_spinwritelock (bt->mgr->lock); + bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); + bt_spinreleasewrite(bt->mgr->lock); } bt_unlockpage(BtLockAtomic, latch); @@ -2825,7 +2877,6 @@ uint bt_lastkey (BtDb *bt) { uid page_no = bt_getid (bt->mgr->pagezero->alloc->left); BtPageSet set[1]; -uint slot; if( set->latch = bt_pinlatch (bt, page_no, 1) ) set->page = bt_mappage (bt, set->latch); @@ -2833,13 +2884,12 @@ uint slot; return 0; bt_lockpage(BtLockRead, set->latch); - memcpy (bt->cursor, set->page, bt->mgr->page_size); - slot = set->page->cnt; - bt_unlockpage(BtLockRead, set->latch); bt_unpinlatch (set->latch); - return slot; + + bt->cursor_page = page_no; + return bt->cursor->cnt; } // return previous slot on cursor page @@ -2873,6 +2923,9 @@ findourself: next = bt_getid (bt->cursor->right); + if( bt->cursor->kill ) + goto findourself; + if( next != us ) if( next == ourright ) goto goleft; @@ -3034,7 +3087,7 @@ uint slot = 0; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->rin & MASK ) + if( *latch->parent->ticket != *latch->parent->serving ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->parent, 0, sizeof(RWLock)); @@ -3067,7 +3120,7 @@ BtKey *ptr; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->rin & MASK ) + if( *latch->parent->ticket != *latch->parent->serving ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no); memset ((ushort *)latch->parent, 0, sizeof(RWLock)); @@ -3136,12 +3189,12 @@ void *index_file (void *arg) uint __stdcall index_file (void *arg) #endif { -int line = 0, found = 0, cnt = 0, unique; +int line = 0, found = 0, cnt = 0, idx; uid next, page_no = LEAF_page; // start on first page of leaves +int ch, len = 0, slot, type = 0; unsigned char key[BT_maxkey]; unsigned char txn[65536]; ThreadArg *args = arg; -int ch, len = 0, slot; BtPageSet set[1]; uint nxt = 65536; BtPage page; @@ -3153,9 +3206,12 @@ FILE *in; bt = bt_open (args->mgr); page = (BtPage)txn; - unique = (args->type[1] | 0x20) != 'd'; + if( args->idx < strlen (args->type) ) + ch = args->type[args->idx]; + else + ch = args->type[strlen(args->type) - 1]; - switch(args->type[0] | 0x20) + switch(ch | 0x20) { case 'a': fprintf(stderr, "started latch mgr audit\n"); @@ -3163,13 +3219,37 @@ FILE *in; fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt); break; - case 't': - fprintf(stderr, "started TXN pennysort for %s\n", args->infile); + case 'd': + type = Delete; + + case 'p': + if( !type ) + type = Unique; + + if( args->num ) + if( type == Delete ) + fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile); + else + fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile); + else + if( type == Delete ) + fprintf(stderr, "started pennysort delete for %s\n", args->infile); + else + fprintf(stderr, "started pennysort insert for %s\n", args->infile); + if( in = fopen (args->infile, "rb") ) while( ch = getc(in), ch != EOF ) if( ch == '\n' ) { line++; + + if( !args->num ) { + if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) ) + fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); + len = 0; + continue; + } + nxt -= len - 10; memcpy (txn + nxt, key + 10, len - 10); nxt -= 1; @@ -3179,7 +3259,7 @@ FILE *in; nxt -= 1; txn[nxt] = 10; slotptr(page,++cnt)->off = nxt; - slotptr(page,cnt)->type = Delete; + slotptr(page,cnt)->type = type; len = 0; if( cnt < args->num ) @@ -3191,7 +3271,7 @@ FILE *in; if( bt_atomictxn (bt, page) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); - nxt = 65536; + nxt = sizeof(txn); cnt = 0; } @@ -3200,23 +3280,6 @@ FILE *in; fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); break; - case 'p': - fprintf(stderr, "started pennysort for %s\n", args->infile); - if( in = fopen (args->infile, "rb") ) - while( ch = getc(in), ch != EOF ) - if( ch == '\n' ) - { - line++; - - if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) ) - fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); - len = 0; - } - else if( len < BT_maxkey ) - key[len++] = ch; - fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); - break; - case 'w': fprintf(stderr, "started indexing for %s\n", args->infile); if( in = fopen (args->infile, "r") ) @@ -3225,13 +3288,7 @@ FILE *in; { line++; - if( args->num == 1 ) - sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9; - - else if( args->num ) - sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - - if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) ) + if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; } @@ -3240,33 +3297,6 @@ FILE *in; fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); break; - case 'd': - fprintf(stderr, "started deleting keys for %s\n", args->infile); - if( in = fopen (args->infile, "rb") ) - while( ch = getc(in), ch != EOF ) - if( ch == '\n' ) - { - line++; - if( args->num == 1 ) - sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9; - - else if( args->num ) - sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - - if( bt_findkey (bt, key, len, NULL, 0) < 0 ) - fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0); - ptr = (BtKey*)(bt->key); - found++; - - if( bt_deletekey (bt, ptr->key, ptr->len, 0) ) - fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); - len = 0; - } - else if( len < BT_maxkey ) - key[len++] = ch; - fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes); - break; - case 'f': fprintf(stderr, "started finding keys for %s\n", args->infile); if( in = fopen (args->infile, "rb") ) @@ -3274,12 +3304,6 @@ FILE *in; if( ch == '\n' ) { line++; - if( args->num == 1 ) - sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9; - - else if( args->num ) - sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - if( bt_findkey (bt, key, len, NULL, 0) == 0 ) found++; else if( bt->err ) @@ -3324,6 +3348,29 @@ FILE *in; fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); break; + case 'r': + fprintf(stderr, "started reverse scan\n"); + if( slot = bt_lastkey (bt) ) + while( slot = bt_prevkey (bt, slot) ) { + if( slotptr(bt->cursor, slot)->dead ) + continue; + + ptr = keyptr(bt->cursor, slot); + len = ptr->len; + + if( slotptr(bt->cursor, slot)->type == Duplicate ) + len -= BtId; + + fwrite (ptr->key, len, 1, stdout); + val = valptr(bt->cursor, slot); + fwrite (val->value, val->len, 1, stdout); + fputc ('\n', stdout); + cnt++; + } + + fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); + break; + case 'c': #ifdef unix posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL); @@ -3343,7 +3390,7 @@ FILE *in; } cnt--; // remove stopper key - fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); + fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); break; } @@ -3377,10 +3424,12 @@ BtKey *ptr; BtDb *bt; if( argc < 3 ) { - fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]); - fprintf (stderr, " where page_bits is the page size in bits\n"); + fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]); + fprintf (stderr, " where idx_file is the name of the btree file\n"); + fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n"); + fprintf (stderr, " page_bits is the page size in bits\n"); fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n"); - fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n"); + fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n"); fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n"); exit(0); } -- 2.40.0