X-Git-Url: https://pd.if.org/git/?a=blobdiff_plain;f=threadskv8.c;h=16ad09f024b44cef1ca8971a0e1b724bf52bd9e3;hb=75e9a39cab3db94c792cbb16d56ed223ef5523a3;hp=574bbd9b275f4a096f1de23e595af7df1b36602e;hpb=3f25e478bf0646bc6671869801218d0dab1516f0;p=btree diff --git a/threadskv8.c b/threadskv8.c index 574bbd9..16ad09f 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -112,6 +112,13 @@ typedef struct { ushort serving[1]; } RWLock; +// write only queue lock + +typedef struct { + ushort ticket[1]; + ushort serving[1]; +} WOLock; + #define PHID 0x1 #define PRES 0x2 #define MASK 0x3 @@ -147,8 +154,8 @@ typedef struct { uid page_no; // latch set page number RWLock readwr[1]; // read/write page lock RWLock access[1]; // Access Intent/Page delete - RWLock parent[1]; // Posting of fence key in parent - RWLock atomic[1]; // Atomic update in progress + WOLock parent[1]; // Posting of fence key in parent + WOLock atomic[1]; // Atomic update in progress uint split; // right split page atomic insert uint entry; // entry slot in latch table uint next; // next entry in hash table chain @@ -184,7 +191,8 @@ typedef enum { Unique, Librarian, Duplicate, - Delete + Delete, + Update } BtSlotType; typedef struct { @@ -399,6 +407,31 @@ uid bt_newdup (BtDb *bt) #endif } +// Write-Only Queue Lock + +void WriteOLock (WOLock *lock) +{ +ushort tix; +#ifdef unix + tix = __sync_fetch_and_add (lock->ticket, 1); +#else + tix = _InterlockedExchangeAdd16 (lock->ticket, 1); +#endif + // wait for our ticket to come up + + while( tix != lock->serving[0] ) +#ifdef unix + sched_yield(); +#else + SwitchToThread (); +#endif +} + +void WriteORelease (WOLock *lock) +{ + lock->serving[0]++; +} + // Phase-Fair reader/writer lock implementation void WriteLock (RWLock *lock) @@ -1128,10 +1161,10 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch) WriteLock (latch->access); break; case BtLockParent: - WriteLock (latch->parent); + WriteOLock (latch->parent); break; case BtLockAtomic: - WriteLock (latch->atomic); + WriteOLock (latch->atomic); break; } } @@ -1154,10 +1187,10 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch) WriteRelease (latch->access); break; case BtLockParent: - WriteRelease (latch->parent); + WriteORelease (latch->parent); break; case BtLockAtomic: - WriteRelease (latch->atomic); + WriteORelease (latch->atomic); break; } } @@ -1362,6 +1395,7 @@ void bt_freepage (BtDb *bt, BtPageSet *set) bt_unlockpage (BtLockDelete, set->latch); bt_unlockpage (BtLockWrite, set->latch); + bt_unpinlatch (set->latch); // unlock allocation page @@ -1443,7 +1477,6 @@ uint idx; root->latch->dirty = 1; bt_freepage (bt, child); - bt_unpinlatch (child->latch); } while( root->page->lvl > 1 && root->page->act == 1 ); @@ -1530,7 +1563,6 @@ BtKey *ptr; bt_lockpage (BtLockDelete, right->latch); bt_lockpage (BtLockWrite, right->latch); bt_freepage (bt, right); - bt_unpinlatch (right->latch); bt_unlockpage (BtLockParent, set->latch); bt_unpinlatch (set->latch); @@ -2234,8 +2266,9 @@ typedef struct { typedef struct { uid page_no; // page number for split leaf void *next; // next key to insert - uint entry:30; // latch table entry number + uint entry:29; // latch table entry number uint type:2; // 0 == insert, 1 == delete, 2 == free + uint nounlock:1; // don't unlock ParentModification unsigned char leafkey[BT_keyarray]; } AtomicKey; @@ -2271,9 +2304,6 @@ uint slot; else return 0; - if( set->page->free || set->page->lvl ) - return bt->err = BTERR_struct, 0; - // obtain read lock using lock chaining with Access mode // release & unpin parent/left sibling page @@ -2291,12 +2321,13 @@ uint slot; if( !set->page->kill ) if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) { bt_unlockpage(BtLockRead, set->latch); - bt_lockpage(BtLockAtomic, set->latch); bt_lockpage(BtLockAccess, set->latch); + bt_lockpage(BtLockAtomic, set->latch); bt_lockpage(BtLockRead, set->latch); bt_unlockpage(BtLockAccess, set->latch); - if( slot = bt_findslot (set->page, key, len) ) + if( !set->page->kill ) + if( slot = bt_findslot (set->page, key, len) ) return slot; bt_unlockpage(BtLockAtomic, set->latch); @@ -2403,6 +2434,90 @@ BtVal *val; return 0; } +// delete an empty master page for a transaction + +// note that the far right page never empties because +// it always contains (at least) the infinite stopper key +// and that all pages that don't contain any keys are +// deleted, or are being held under Atomic lock. + +BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev) +{ +BtPageSet right[1], temp[1]; +unsigned char value[BtId]; +uid right_page_no; +BtKey *ptr; + + bt_lockpage(BtLockWrite, prev->latch); + + // grab the right sibling + + if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) + right->page = bt_mappage (bt, right->latch); + else + return bt->err; + + bt_lockpage(BtLockAtomic, right->latch); + bt_lockpage(BtLockWrite, right->latch); + + // and pull contents over empty page + // while preserving master's left link + + memcpy (right->page->left, prev->page->left, BtId); + memcpy (prev->page, right->page, bt->mgr->page_size); + + // forward seekers to old right sibling + // to new page location in set + + bt_putid (right->page->right, prev->latch->page_no); + right->latch->dirty = 1; + right->page->kill = 1; + + // remove pointer to right page for searchers by + // changing right fence key to point to the master page + + ptr = keyptr(right->page,right->page->cnt); + bt_putid (value, prev->latch->page_no); + + if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) + return bt->err; + + // now that master page is in good shape we can + // remove its locks. + + bt_unlockpage (BtLockAtomic, prev->latch); + bt_unlockpage (BtLockWrite, prev->latch); + + // fix master's right sibling's left pointer + // to remove scanner's poiner to the right page + + if( right_page_no = bt_getid (prev->page->right) ) { + if( temp->latch = bt_pinlatch (bt, right_page_no, 1) ) + temp->page = bt_mappage (bt, temp->latch); + + bt_lockpage (BtLockWrite, temp->latch); + bt_putid (temp->page->left, prev->latch->page_no); + temp->latch->dirty = 1; + + bt_unlockpage (BtLockWrite, temp->latch); + bt_unpinlatch (temp->latch); + } else { // master is now the far right page + bt_spinwritelock (bt->mgr->lock); + bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); + bt_spinreleasewrite(bt->mgr->lock); + } + + // now that there are no pointers to the right page + // we can delete it after the last read access occurs + + bt_unlockpage (BtLockWrite, right->latch); + bt_unlockpage (BtLockAtomic, right->latch); + bt_lockpage (BtLockDelete, right->latch); + bt_lockpage (BtLockWrite, right->latch); + bt_freepage (bt, right); + return 0; +} + // atomic modification of a batch of keys. // return -1 if BTERR is set @@ -2567,6 +2682,7 @@ int type; samepage = src; // pick-up all splits from master page + // each one is already WriteLocked. entry = prev->latch->split; @@ -2575,14 +2691,15 @@ int type; set->page = bt_mappage (bt, set->latch); entry = set->latch->split; - // delete empty master page + // delete empty master page by undoing its split + // (this is potentially another empty page) + // note that there are no new left pointers yet if( !prev->page->act ) { memcpy (set->page->left, prev->page->left, BtId); memcpy (prev->page, set->page, bt->mgr->page_size); bt_lockpage (BtLockDelete, set->latch); bt_freepage (bt, set); - bt_unpinlatch (set->latch); prev->latch->dirty = 1; continue; @@ -2595,19 +2712,17 @@ int type; prev->latch->split = set->latch->split; bt_lockpage (BtLockDelete, set->latch); bt_freepage (bt, set); - bt_unpinlatch (set->latch); continue; } // schedule prev fence key update ptr = keyptr(prev->page,prev->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; - leaf->next = NULL; leaf->type = 0; if( tail ) @@ -2617,21 +2732,23 @@ int type; tail = leaf; + // splice in the left link into the split page + bt_putid (set->page->left, prev->latch->page_no); bt_lockpage(BtLockParent, prev->latch); bt_unlockpage(BtLockWrite, prev->latch); *prev = *set; } - // update left pointer in next right page - // if we did any now non-empty splits + // update left pointer in next right page from last split page + // (if all splits were reversed, latch->split == 0) if( latch->split ) { - // fix left pointer in master's original right sibling - // or set rightmost page in page zero + // fix left pointer in master's original (now split) + // far right sibling or set rightmost page in page zero if( right = bt_getid (prev->page->right) ) { - if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) ) + if( set->latch = bt_pinlatch (bt, right, 1) ) set->page = bt_mappage (bt, set->latch); else return -1; @@ -2641,18 +2758,20 @@ int type; set->latch->dirty = 1; bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (set->latch); - } else + } else { // prev is rightmost page + bt_spinwritelock (bt->mgr->lock); bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); + bt_spinreleasewrite(bt->mgr->lock); + } // Process last page split in chain ptr = keyptr(prev->page,prev->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; - leaf->next = NULL; leaf->type = 0; if( tail ) @@ -2664,6 +2783,9 @@ int type; bt_lockpage(BtLockParent, prev->latch); bt_unlockpage(BtLockWrite, prev->latch); + + // remove atomic lock on master page + bt_unlockpage(BtLockAtomic, latch); continue; } @@ -2677,59 +2799,26 @@ int type; continue; } - // any splits were reversed, and the + // any and all splits were reversed, and the // master page located in prev is empty, delete it // by pulling over master's right sibling. - // Delete empty master's fence - - ptr = keyptr(prev->page,prev->page->cnt); - leaf = malloc (sizeof(AtomicKey)); - memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; - leaf->next = NULL; - leaf->type = 1; - - if( tail ) - tail->next = leaf; - else - head = leaf; + // Remove empty master's fence key - tail = leaf; - - bt_lockpage(BtLockParent, prev->latch); - bt_unlockpage(BtLockWrite, prev->latch); + ptr = keyptr(prev->page,prev->page->cnt); - if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) - set->page = bt_mappage (bt, set->latch); - else + if( bt_deletekey (bt, ptr->key, ptr->len, 1) ) return -1; - // add page to our transaction + // perform the remainder of the delete + // from the FIFO queue - bt_lockpage(BtLockAtomic, set->latch); - bt_lockpage(BtLockWrite, set->latch); - - // pull contents over empty page - - memcpy (set->page->left, prev->page->left, BtId); - memcpy (prev->page, set->page, bt->mgr->page_size); - - bt_putid (set->page->right, prev->latch->page_no); - set->latch->dirty = 1; - set->page->kill = 1; - - // add new parent key for new master page contents - // delete it after parent posts the new master fence. - - ptr = keyptr(set->page,set->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; - leaf->entry = set->latch->entry; - leaf->next = NULL; + leaf->entry = prev->latch->entry; + leaf->nounlock = 1; leaf->type = 2; if( tail ) @@ -2739,24 +2828,10 @@ int type; tail = leaf; -// bt_lockpage(BtLockParent, set->latch); - bt_unlockpage(BtLockWrite, set->latch); - - // fix master's far right sibling's left pointer - - if( right = bt_getid (set->page->right) ) { - if( set->latch = bt_pinlatch (bt, right, 1) ) - set->page = bt_mappage (bt, set->latch); - - bt_lockpage (BtLockWrite, set->latch); - bt_putid (set->page->left, prev->latch->page_no); - set->latch->dirty = 1; + // leave atomic lock in place until + // deletion completes in next phase. - bt_unlockpage (BtLockWrite, set->latch); - bt_unpinlatch (set->latch); - } - - bt_unlockpage(BtLockAtomic, latch); + bt_unlockpage(BtLockWrite, prev->latch); } // add & delete keys for any pages split or merged during transaction @@ -2774,27 +2849,24 @@ int type; if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) return -1; - bt_unlockpage (BtLockParent, set->latch); break; case 1: // delete key if( bt_deletekey (bt, ptr->key, ptr->len, 1) ) return -1; - bt_unlockpage (BtLockParent, set->latch); break; - case 2: // insert key & free - if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) + case 2: // free page + if( bt_atomicfree (bt, set) ) return -1; - bt_unlockpage (BtLockAtomic, set->latch); - bt_lockpage (BtLockDelete, set->latch); - bt_lockpage (BtLockWrite, set->latch); - bt_freepage (bt, set); break; } + if( !leaf->nounlock ) + bt_unlockpage (BtLockParent, set->latch); + bt_unpinlatch (set->latch); tail = leaf->next; free (leaf); @@ -2812,7 +2884,6 @@ uint bt_lastkey (BtDb *bt) { uid page_no = bt_getid (bt->mgr->pagezero->alloc->left); BtPageSet set[1]; -uint slot; if( set->latch = bt_pinlatch (bt, page_no, 1) ) set->page = bt_mappage (bt, set->latch); @@ -2820,13 +2891,12 @@ uint slot; return 0; bt_lockpage(BtLockRead, set->latch); - memcpy (bt->cursor, set->page, bt->mgr->page_size); - slot = set->page->cnt; - bt_unlockpage(BtLockRead, set->latch); bt_unpinlatch (set->latch); - return slot; + + bt->cursor_page = page_no; + return bt->cursor->cnt; } // return previous slot on cursor page @@ -2860,6 +2930,9 @@ findourself: next = bt_getid (bt->cursor->right); + if( bt->cursor->kill ) + goto findourself; + if( next != us ) if( next == ourright ) goto goleft; @@ -3021,7 +3094,7 @@ uint slot = 0; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->rin & MASK ) + if( *latch->parent->ticket != *latch->parent->serving ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->parent, 0, sizeof(RWLock)); @@ -3054,7 +3127,7 @@ BtKey *ptr; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->rin & MASK ) + if( *latch->parent->ticket != *latch->parent->serving ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no); memset ((ushort *)latch->parent, 0, sizeof(RWLock)); @@ -3123,12 +3196,12 @@ void *index_file (void *arg) uint __stdcall index_file (void *arg) #endif { -int line = 0, found = 0, cnt = 0, unique; +int line = 0, found = 0, cnt = 0, idx; uid next, page_no = LEAF_page; // start on first page of leaves +int ch, len = 0, slot, type = 0; unsigned char key[BT_maxkey]; unsigned char txn[65536]; ThreadArg *args = arg; -int ch, len = 0, slot; BtPageSet set[1]; uint nxt = 65536; BtPage page; @@ -3140,9 +3213,12 @@ FILE *in; bt = bt_open (args->mgr); page = (BtPage)txn; - unique = (args->type[1] | 0x20) != 'd'; + if( args->idx < strlen (args->type) ) + ch = args->type[args->idx]; + else + ch = args->type[strlen(args->type) - 1]; - switch(args->type[0] | 0x20) + switch(ch | 0x20) { case 'a': fprintf(stderr, "started latch mgr audit\n"); @@ -3150,13 +3226,37 @@ FILE *in; fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt); break; - case 't': - fprintf(stderr, "started TXN pennysort for %s\n", args->infile); + case 'd': + type = Delete; + + case 'p': + if( !type ) + type = Unique; + + if( args->num ) + if( type == Delete ) + fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile); + else + fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile); + else + if( type == Delete ) + fprintf(stderr, "started pennysort delete for %s\n", args->infile); + else + fprintf(stderr, "started pennysort insert for %s\n", args->infile); + if( in = fopen (args->infile, "rb") ) while( ch = getc(in), ch != EOF ) if( ch == '\n' ) { line++; + + if( !args->num ) { + if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) ) + fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); + len = 0; + continue; + } + nxt -= len - 10; memcpy (txn + nxt, key + 10, len - 10); nxt -= 1; @@ -3166,7 +3266,7 @@ FILE *in; nxt -= 1; txn[nxt] = 10; slotptr(page,++cnt)->off = nxt; - slotptr(page,cnt)->type = Unique; + slotptr(page,cnt)->type = type; len = 0; if( cnt < args->num ) @@ -3178,7 +3278,7 @@ FILE *in; if( bt_atomictxn (bt, page) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); - nxt = 65536; + nxt = sizeof(txn); cnt = 0; } @@ -3187,23 +3287,6 @@ FILE *in; fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); break; - case 'p': - fprintf(stderr, "started pennysort for %s\n", args->infile); - if( in = fopen (args->infile, "rb") ) - while( ch = getc(in), ch != EOF ) - if( ch == '\n' ) - { - line++; - - if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) ) - fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); - len = 0; - } - else if( len < BT_maxkey ) - key[len++] = ch; - fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); - break; - case 'w': fprintf(stderr, "started indexing for %s\n", args->infile); if( in = fopen (args->infile, "r") ) @@ -3212,13 +3295,7 @@ FILE *in; { line++; - if( args->num == 1 ) - sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9; - - else if( args->num ) - sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - - if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) ) + if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; } @@ -3227,33 +3304,6 @@ FILE *in; fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); break; - case 'd': - fprintf(stderr, "started deleting keys for %s\n", args->infile); - if( in = fopen (args->infile, "rb") ) - while( ch = getc(in), ch != EOF ) - if( ch == '\n' ) - { - line++; - if( args->num == 1 ) - sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9; - - else if( args->num ) - sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - - if( bt_findkey (bt, key, len, NULL, 0) < 0 ) - fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0); - ptr = (BtKey*)(bt->key); - found++; - - if( bt_deletekey (bt, ptr->key, ptr->len, 0) ) - fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); - len = 0; - } - else if( len < BT_maxkey ) - key[len++] = ch; - fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes); - break; - case 'f': fprintf(stderr, "started finding keys for %s\n", args->infile); if( in = fopen (args->infile, "rb") ) @@ -3261,12 +3311,6 @@ FILE *in; if( ch == '\n' ) { line++; - if( args->num == 1 ) - sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9; - - else if( args->num ) - sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - if( bt_findkey (bt, key, len, NULL, 0) == 0 ) found++; else if( bt->err ) @@ -3311,6 +3355,29 @@ FILE *in; fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); break; + case 'r': + fprintf(stderr, "started reverse scan\n"); + if( slot = bt_lastkey (bt) ) + while( slot = bt_prevkey (bt, slot) ) { + if( slotptr(bt->cursor, slot)->dead ) + continue; + + ptr = keyptr(bt->cursor, slot); + len = ptr->len; + + if( slotptr(bt->cursor, slot)->type == Duplicate ) + len -= BtId; + + fwrite (ptr->key, len, 1, stdout); + val = valptr(bt->cursor, slot); + fwrite (val->value, val->len, 1, stdout); + fputc ('\n', stdout); + cnt++; + } + + fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); + break; + case 'c': #ifdef unix posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL); @@ -3330,7 +3397,7 @@ FILE *in; } cnt--; // remove stopper key - fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); + fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes); break; } @@ -3364,10 +3431,12 @@ BtKey *ptr; BtDb *bt; if( argc < 3 ) { - fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]); - fprintf (stderr, " where page_bits is the page size in bits\n"); + fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]); + fprintf (stderr, " where idx_file is the name of the btree file\n"); + fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n"); + fprintf (stderr, " page_bits is the page size in bits\n"); fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n"); - fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n"); + fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n"); fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n"); exit(0); }