X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=btree2u.c;h=aaffee30d65fdafb176738bf864ab368dd3cda8c;hp=0c344e87ceeb57a27d377576d6db298ebb0c4853;hb=HEAD;hpb=5542a281b95fcbb9afdd96c49f06a05844c485be diff --git a/btree2u.c b/btree2u.c index 0c344e8..aaffee3 100644 --- a/btree2u.c +++ b/btree2u.c @@ -1,6 +1,7 @@ -// btree version 2u +// btree version 2u sched_yield locks // with combined latch & pool manager -// 26 FEB 2014 +// and phase-fair reader writer lock +// 12 MAR 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -43,6 +44,7 @@ REDISTRIBUTION OF THIS SOFTWARE. #include #include #include +#include #endif #include @@ -65,6 +67,17 @@ typedef unsigned int uint; #define BT_minpage (1 << BT_minbits) // minimum page size #define BT_maxpage (1 << BT_maxbits) // maximum page size +// BTree page number constants +#define ALLOC_page 0 +#define ROOT_page 1 +#define LEAF_page 2 +#define LATCH_page 3 + +// Number of levels to create in a new BTree + +#define MIN_lvl 2 +#define MAX_lvl 15 + /* There are five lock types for each node in three independent sets: 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. @@ -84,17 +97,29 @@ typedef enum{ // definition for latch implementation -// exclusive is set for write access -// share is count of read accessors -// grant write lock when share == 0 - volatile typedef struct { - unsigned char mutex[1]; - unsigned char exclusive:1; - unsigned char pending:1; - ushort share; + ushort lock[1]; } BtSpinLatch; +#define XCL 1 +#define PEND 2 +#define BOTH 3 +#define SHARE 4 + +volatile typedef struct { + ushort rin[1]; // readers in count + ushort rout[1]; // readers out count + ushort serving[1]; // writers out count + ushort ticket[1]; // writers in count +} RWLock; + +// define bits at bottom of rin + +#define PHID 0x1 // writer phase (0/1) +#define PRES 0x2 // writer present +#define MASK 0x3 // both write bits +#define RINC 0x4 // reader increment + // Define the length of the page and key pointers #define BtId 6 @@ -136,22 +161,25 @@ typedef struct BtPage_ { uint cnt; // count of keys in page uint act; // count of active keys uint min; // next key offset - unsigned char bits:7; // page size in bits + unsigned char bits:6; // page size in bits unsigned char free:1; // page is on free list + unsigned char dirty:1; // page is dirty in cache unsigned char lvl:6; // level of page unsigned char kill:1; // page is being deleted - unsigned char dirty:1; // page is dirty + unsigned char clean:1; // page needs cleaning unsigned char right[BtId]; // page number to right } *BtPage; typedef struct { struct BtPage_ alloc[2]; // next & free page_nos in right ptr BtSpinLatch lock[1]; // allocation area lite latch - uint latchdeployed; // highest number of latch entries deployed - uint nlatchpage; // number of latch pages at BT_latch - uint latchtotal; // number of page latch entries - uint latchhash; // number of latch hash table slots - uint latchvictim; // next latch entry to examine + volatile uint latchdeployed;// highest number of latch entries deployed + volatile uint nlatchpage; // number of latch pages at BT_latch + volatile uint latchtotal; // number of page latch entries + volatile uint latchhash; // number of latch hash table slots + volatile uint latchvictim; // next latch hash entry to examine + volatile uint safelevel; // safe page level in cache + volatile uint cache[MAX_lvl];// cache census counts by btree level } BtLatchMgr; // latch hash table entries @@ -164,18 +192,21 @@ typedef struct { // latch manager table structure typedef struct { - BtSpinLatch readwr[1]; // read/write page lock - BtSpinLatch access[1]; // Access Intent/Page delete - BtSpinLatch parent[1]; // Posting of fence key in parent - BtSpinLatch busy[1]; // slot is being moved between chains - volatile uint next; // next entry in hash table chain - volatile uint prev; // prev entry in hash table chain - volatile uint hash; // hash slot entry is under - volatile ushort dirty; // page is dirty in cache - volatile ushort pin; // number of outstanding pins - volatile uid page_no; // latch set page number on disk + volatile uid page_no; // latch set page number on disk + RWLock readwr[1]; // read/write page lock + RWLock access[1]; // Access Intent/Page delete + RWLock parent[1]; // Posting of fence key in parent + volatile ushort pin; // number of pins/level/clock bits + volatile uint next; // next entry in hash table chain + volatile uint prev; // prev entry in hash table chain } BtLatchSet; +#define CLOCK_mask 0xe000 +#define CLOCK_unit 0x2000 +#define PIN_mask 0x07ff +#define LVL_mask 0x1800 +#define LVL_shift 11 + // The object structure for Btree access typedef struct _BtDb { @@ -185,15 +216,13 @@ typedef struct _BtDb { uid cursor_page; // current cursor page number int err; uint mode; // read-write mode - BtPage alloc; // frame buffer for alloc page ( page 0 ) BtPage cursor; // cached frame for start/next (never mapped) BtPage frame; // spare frame for the page split (never mapped) - BtPage zero; // zeroes frame buffer (never mapped) - BtPage page; // current page + BtPage page; // current mapped page in buffer pool BtLatchSet *latch; // current page latch BtLatchMgr *latchmgr; // mapped latch page from allocation page BtLatchSet *latchsets; // mapped latch set from latch pages - unsigned char *latchpool; // cached page pool set + unsigned char *pagepool; // cached page pool set BtHashEntry *table; // the hash table #ifdef unix int idx; @@ -201,7 +230,7 @@ typedef struct _BtDb { HANDLE idx; HANDLE halloc; // allocation and latch table handle #endif - unsigned char *mem; // frame, cursor, page memory buffer + unsigned char *mem; // frame, cursor, memory buffers uint found; // last deletekey found key } BtDb; @@ -229,7 +258,7 @@ extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len); extern uint bt_nextkey (BtDb *bt, uint slot); // internal functions -BTERR bt_update (BtDb *bt, BtPage page, BtLatchSet *latch); +void bt_update (BtDb *bt, BtPage page); BtPage bt_mappage (BtDb *bt, BtLatchSet *latch); // Helper functions to return slot values @@ -239,16 +268,6 @@ extern uid bt_uid (BtDb *bt, uint slot); extern uint bt_tod (BtDb *bt, uint slot); #endif -// BTree page number constants -#define ALLOC_page 0 -#define ROOT_page 1 -#define LEAF_page 2 -#define LATCH_page 3 - -// Number of levels to create in a new BTree - -#define MIN_lvl 2 - // The page is allocated from low and hi ends. // The key offsets and row-id's are allocated // from the bottom, while the text of the key @@ -332,7 +351,77 @@ BtKey ptr; return bt->err = err; } -// Latch Manager +// Phase-Fair reader/writer lock implementation + +void WriteLock (RWLock *lock) +{ +ushort w, r, tix; + +#ifdef unix + tix = __sync_fetch_and_add (lock->ticket, 1); +#else + tix = _InterlockedExchangeAdd16 (lock->ticket, 1); +#endif + // wait for our ticket to come up + + while( tix != lock->serving[0] ) +#ifdef unix + sched_yield(); +#else + SwitchToThread (); +#endif + + w = PRES | (tix & PHID); +#ifdef unix + r = __sync_fetch_and_add (lock->rin, w); +#else + r = _InterlockedExchangeAdd16 (lock->rin, w); +#endif + while( r != *lock->rout ) +#ifdef unix + sched_yield(); +#else + SwitchToThread(); +#endif +} + +void WriteRelease (RWLock *lock) +{ +#ifdef unix + __sync_fetch_and_and (lock->rin, ~MASK); +#else + _InterlockedAnd16 (lock->rin, ~MASK); +#endif + lock->serving[0]++; +} + +void ReadLock (RWLock *lock) +{ +ushort w; +#ifdef unix + w = __sync_fetch_and_add (lock->rin, RINC) & MASK; +#else + w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK; +#endif + if( w ) + while( w == (*lock->rin & MASK) ) +#ifdef unix + sched_yield (); +#else + SwitchToThread (); +#endif +} + +void ReadRelease (RWLock *lock) +{ +#ifdef unix + __sync_fetch_and_add (lock->rout, RINC); +#else + _InterlockedExchangeAdd16 (lock->rout, RINC); +#endif +} + +// Spin Latch Manager // wait until write lock mode is clear // and add 1 to the share count @@ -342,28 +431,20 @@ void bt_spinreadlock(BtSpinLatch *latch) ushort prev; do { - // obtain latch mutex #ifdef unix - if( __sync_lock_test_and_set(latch->mutex, 1) ) - continue; + prev = __sync_fetch_and_add (latch->lock, SHARE); #else - if( _InterlockedExchange8(latch->mutex, 1) ) - continue; + prev = _InterlockedExchangeAdd16(latch->lock, SHARE); #endif // see if exclusive request is granted or pending - if( prev = !(latch->exclusive | latch->pending) ) - latch->share++; - + if( !(prev & BOTH) ) + return; #ifdef unix - *latch->mutex = 0; + prev = __sync_fetch_and_add (latch->lock, -SHARE); #else - _InterlockedExchange8(latch->mutex, 0); + prev = _InterlockedExchangeAdd16(latch->lock, -SHARE); #endif - - if( prev ) - return; - #ifdef unix } while( sched_yield(), 1 ); #else @@ -375,27 +456,23 @@ ushort prev; void bt_spinwritelock(BtSpinLatch *latch) { -uint prev; +ushort prev; do { #ifdef unix - if( __sync_lock_test_and_set(latch->mutex, 1) ) - continue; + prev = __sync_fetch_and_or(latch->lock, PEND | XCL); #else - if( _InterlockedExchange8(latch->mutex, 1) ) - continue; + prev = _InterlockedOr16(latch->lock, PEND | XCL); #endif - if( prev = !(latch->share | latch->exclusive) ) - latch->exclusive = 1, latch->pending = 0; - else - latch->pending = 1; + if( !(prev & XCL) ) + if( !(prev & ~BOTH) ) + return; + else #ifdef unix - *latch->mutex = 0; + __sync_fetch_and_and (latch->lock, ~XCL); #else - _InterlockedExchange8(latch->mutex, 0); + _InterlockedAnd16(latch->lock, ~XCL); #endif - if( prev ) - return; #ifdef unix } while( sched_yield(), 1 ); #else @@ -410,26 +487,25 @@ uint prev; int bt_spinwritetry(BtSpinLatch *latch) { -uint prev; +ushort prev; -#ifdef unix - if( __sync_lock_test_and_set(latch->mutex, 1) ) - return 0; +#ifdef unix + prev = __sync_fetch_and_or(latch->lock, XCL); #else - if( _InterlockedExchange8(latch->mutex, 1) ) - return 0; + prev = _InterlockedOr16(latch->lock, XCL); #endif // take write access if all bits are clear - if( prev = !(latch->exclusive | latch->share) ) - latch->exclusive = 1; - + if( !(prev & XCL) ) + if( !(prev & ~BOTH) ) + return 1; + else #ifdef unix - *latch->mutex = 0; + __sync_fetch_and_and (latch->lock, ~XCL); #else - _InterlockedExchange8(latch->mutex, 0); + _InterlockedAnd16(latch->lock, ~XCL); #endif - return prev; + return 0; } // clear write mode @@ -437,17 +513,9 @@ uint prev; void bt_spinreleasewrite(BtSpinLatch *latch) { #ifdef unix - while( __sync_lock_test_and_set(latch->mutex, 1) ) - sched_yield(); -#else - while( _InterlockedExchange8(latch->mutex, 1) ) - SwitchToThread(); -#endif - latch->exclusive = 0; -#ifdef unix - *latch->mutex = 0; + __sync_fetch_and_and(latch->lock, ~BOTH); #else - _InterlockedExchange8(latch->mutex, 0); + _InterlockedAnd16(latch->lock, ~BOTH); #endif } @@ -456,48 +524,104 @@ void bt_spinreleasewrite(BtSpinLatch *latch) void bt_spinreleaseread(BtSpinLatch *latch) { #ifdef unix - while( __sync_lock_test_and_set(latch->mutex, 1) ) - sched_yield(); + __sync_fetch_and_add(latch->lock, -SHARE); #else - while( _InterlockedExchange8(latch->mutex, 1) ) - SwitchToThread(); + _InterlockedExchangeAdd16(latch->lock, -SHARE); #endif - latch->share--; +} + +// read page from permanent location in Btree file + +BTERR bt_readpage (BtDb *bt, BtPage page, uid page_no) +{ +off64_t off = page_no << bt->page_bits; + #ifdef unix - *latch->mutex = 0; + if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size ) { + fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno); + return bt->err = BTERR_read; + } #else - _InterlockedExchange8(latch->mutex, 0); +OVERLAPPED ovl[1]; +uint amt[1]; + + memset (ovl, 0, sizeof(OVERLAPPED)); + ovl->Offset = off; + ovl->OffsetHigh = off >> 32; + + if( !ReadFile(bt->idx, page, bt->page_size, amt, ovl)) { + fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError()); + return bt->err = BTERR_read; + } + if( *amt < bt->page_size ) { + fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError()); + return bt->err = BTERR_read; + } #endif + return 0; } -// link latch table entry into head of latch hash table +// write page to permanent location in Btree file +// clear the dirty bit -BTERR bt_latchlink (BtDb *bt, uint hashidx, uint victim, uid page_no) +BTERR bt_writepage (BtDb *bt, BtPage page, uid page_no) { -BtPage page = (BtPage)(victim * bt->page_size + bt->latchpool); -BtLatchSet *latch = bt->latchsets + victim; off64_t off = page_no << bt->page_bits; + +#ifdef unix + page->dirty = 0; + + if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size ) + return bt->err = BTERR_wrt; +#else +OVERLAPPED ovl[1]; uint amt[1]; + memset (ovl, 0, sizeof(OVERLAPPED)); + ovl->Offset = off; + ovl->OffsetHigh = off >> 32; + page->dirty = 0; + + if( !WriteFile(bt->idx, page, bt->page_size, amt, ovl) ) + return bt->err = BTERR_wrt; + + if( *amt < bt->page_size ) + return bt->err = BTERR_wrt; +#endif + return 0; +} + +// link latch table entry into head of latch hash table + +BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no) +{ +BtPage page = (BtPage)((uid)slot * bt->page_size + bt->pagepool); +BtLatchSet *latch = bt->latchsets + slot; +int lvl; + if( latch->next = bt->table[hashidx].slot ) - bt->latchsets[latch->next].prev = victim; + bt->latchsets[latch->next].prev = slot; - bt->table[hashidx].slot = victim; + bt->table[hashidx].slot = slot; latch->page_no = page_no; - latch->hash = hashidx; - latch->dirty = 0; latch->prev = 0; + latch->pin = 1; + + if( bt_readpage (bt, page, page_no) ) + return bt->err; + + lvl = page->lvl << LVL_shift; + if( lvl > LVL_mask ) + lvl = LVL_mask; + latch->pin |= lvl; // store lvl + latch->pin |= lvl << 3; // initialize clock + #ifdef unix - if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) ) - return bt->err = BTERR_read; + __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], 1); #else - SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN); - if( !ReadFile(bt->idx, page, bt->page_size, amt, NULL)) - return bt->err = BTERR_read; - if( *amt < bt->page_size ) - return bt->err = BTERR_read; + _InterlockedExchangeAdd(&bt->latchmgr->cache[page->lvl], 1); #endif - return 0; + return bt->err = 0; } // release latch pin @@ -517,13 +641,14 @@ void bt_unpinlatch (BtLatchSet *latch) BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no) { uint hashidx = page_no % bt->latchmgr->latchhash; -uint slot, victim, idx; BtLatchSet *latch; +uint slot, idx; +uint lvl, cnt; off64_t off; uint amt[1]; BtPage page; - // try to find unpinned entry + // try to find our entry bt_spinwritelock(bt->table[hashidx].latch); @@ -535,100 +660,111 @@ BtPage page; } while( slot = latch->next ); // found our entry + // increment clock if( slot ) { latch = bt->latchsets + slot; + lvl = (latch->pin & LVL_mask) >> LVL_shift; + lvl *= CLOCK_unit * 2; + lvl |= CLOCK_unit; #ifdef unix __sync_fetch_and_add(&latch->pin, 1); + __sync_fetch_and_or(&latch->pin, lvl); #else _InterlockedIncrement16 (&latch->pin); + _InterlockedOr16 (&latch->pin, lvl); #endif bt_spinreleasewrite(bt->table[hashidx].latch); return latch; } - // see if there are any unused entries + // see if there are any unused pool entries #ifdef unix - victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1; + slot = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1; #else - victim = _InterlockedIncrement (&bt->latchmgr->latchdeployed); + slot = _InterlockedIncrement (&bt->latchmgr->latchdeployed); #endif - if( victim < bt->latchmgr->latchtotal ) { - latch = bt->latchsets + victim; -#ifdef unix - __sync_fetch_and_add(&latch->pin, 1); -#else - _InterlockedIncrement16 (&latch->pin); -#endif - bt_latchlink (bt, hashidx, victim, page_no); + if( slot < bt->latchmgr->latchtotal ) { + latch = bt->latchsets + slot; + if( bt_latchlink (bt, hashidx, slot, page_no) ) + return NULL; bt_spinreleasewrite (bt->table[hashidx].latch); return latch; } #ifdef unix - victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1); + __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1); #else - victim = _InterlockedDecrement (&bt->latchmgr->latchdeployed); + _InterlockedDecrement (&bt->latchmgr->latchdeployed); #endif - // find and reuse previous lock entry + // find and reuse previous entry on victim while( 1 ) { #ifdef unix - victim = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1); + slot = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1); #else - victim = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1; + slot = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1; #endif - // we don't use slot zero + // try to get write lock on hash chain + // skip entry if not obtained + // or has outstanding pins - if( victim %= bt->latchmgr->latchtotal ) - latch = bt->latchsets + victim; - else - continue; + slot %= bt->latchmgr->latchtotal; - // take control of our slot - // from other threads + // on slot wraparound, check census + // count and increment safe level - if( latch->pin || !bt_spinwritetry (latch->busy) ) - continue; + cnt = bt->latchmgr->cache[bt->latchmgr->safelevel]; - idx = latch->hash; + if( !slot ) { + if( cnt < bt->latchmgr->latchtotal / 10 ) +#ifdef unix + __sync_fetch_and_add(&bt->latchmgr->safelevel, 1); +#else + _InterlockedIncrement (&bt->latchmgr->safelevel); +#endif + continue; + } - // try to get write lock on hash chain - // skip entry if not obtained - // or has outstanding locks + latch = bt->latchsets + slot; + idx = latch->page_no % bt->latchmgr->latchhash; + lvl = (latch->pin & LVL_mask) >> LVL_shift; + + // see if we are evicting this level yet + // or if we are on same chain as hashidx - if( !bt_spinwritetry (bt->table[idx].latch) ) { - bt_spinreleasewrite (latch->busy); + if( idx == hashidx || lvl > bt->latchmgr->safelevel ) continue; - } - if( latch->pin ) { - bt_spinreleasewrite (latch->busy); - bt_spinreleasewrite (bt->table[idx].latch); + if( !bt_spinwritetry (bt->table[idx].latch) ) continue; + + if( latch->pin & ~LVL_mask ) { + if( latch->pin & CLOCK_mask ) +#ifdef unix + __sync_fetch_and_add(&latch->pin, -CLOCK_unit); +#else + _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_unit); +#endif + bt_spinreleasewrite (bt->table[idx].latch); + continue; } // update permanent page area in btree - page = (BtPage)(victim * bt->page_size + bt->latchpool); - off = latch->page_no << bt->page_bits; + page = (BtPage)((uid)slot * bt->page_size + bt->pagepool); #ifdef unix - if( latch->dirty ) - if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size ) - return bt->err = BTERR_wrt, NULL; + posix_fadvise (bt->idx, page_no << bt->page_bits, bt->page_size, POSIX_FADV_WILLNEED); + __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], -1); #else - if( latch->dirty ) { - SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN); - - if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) ) - return bt->err = BTERR_wrt, NULL; - - if( *amt < bt->page_size ) - return bt->err = BTERR_wrt, NULL; - } + _InterlockedExchangeAdd(&bt->latchmgr->cache[page->lvl], -1); #endif - // unlink our available victim from its hash chain + if( page->dirty ) + if( bt_writepage (bt, page, latch->page_no) ) + return NULL; + + // unlink our available slot from its hash chain if( latch->prev ) bt->latchsets[latch->prev].next = latch->next; @@ -639,14 +775,11 @@ BtPage page; bt->latchsets[latch->next].prev = latch->prev; bt_spinreleasewrite (bt->table[idx].latch); -#ifdef unix - __sync_fetch_and_add(&latch->pin, 1); -#else - _InterlockedIncrement16 (&latch->pin); -#endif - bt_latchlink (bt, hashidx, victim, page_no); + + if( bt_latchlink (bt, hashidx, slot, page_no) ) + return NULL; + bt_spinreleasewrite (bt->table[hashidx].latch); - bt_spinreleasewrite (latch->busy); return latch; } } @@ -705,45 +838,55 @@ struct flock lock[1]; else if( bits < BT_minbits ) bits = BT_minbits; + if( mode == BT_ro ) { + fprintf(stderr, "ReadOnly mode not supported: %s\n", name); + return NULL; + } #ifdef unix bt = calloc (1, sizeof(BtDb)); bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666); - - if( bt->idx == -1 ) + posix_fadvise( bt->idx, 0, 0, POSIX_FADV_RANDOM); + + if( bt->idx == -1 ) { + fprintf(stderr, "unable to open %s\n", name); return free(bt), NULL; + } #else bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb)); attr = FILE_ATTRIBUTE_NORMAL; bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL); - if( bt->idx == INVALID_HANDLE_VALUE ) + if( bt->idx == INVALID_HANDLE_VALUE ) { + fprintf(stderr, "unable to open %s\n", name); return GlobalFree(bt), NULL; + } #endif #ifdef unix memset (lock, 0, sizeof(lock)); - - lock->l_type = F_WRLCK; lock->l_len = sizeof(struct BtPage_); - lock->l_whence = 0; + lock->l_type = F_WRLCK; - if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) + if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) { + fprintf(stderr, "unable to lock record zero %s\n", name); return bt_close (bt), NULL; + } #else memset (ovl, 0, sizeof(ovl)); - len = sizeof(struct BtPage_); // use large offsets to // simulate advisory locking ovl->OffsetHigh |= 0x80000000; - if( LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, len, 0L, ovl) ) + if( !LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) ) { + fprintf(stderr, "unable to lock record zero %s, GetLastError = %d\n", name, GetLastError()); return bt_close (bt), NULL; + } #endif #ifdef unix - latchmgr = malloc (BT_maxpage); + latchmgr = valloc (BT_maxpage); *amt = 0; // read minimum page size to get root info @@ -751,20 +894,22 @@ struct flock lock[1]; if( size = lseek (bt->idx, 0L, 2) ) { if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage ) bits = latchmgr->alloc->bits; - else + else { + fprintf(stderr, "Unable to read page zero\n"); return free(bt), free(latchmgr), NULL; - } else if( mode == BT_ro ) - return free(latchmgr), bt_close (bt), NULL; + } + } #else latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE); size = GetFileSize(bt->idx, amt); if( size || *amt ) { - if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) ) + if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) ) { + fprintf(stderr, "Unable to read page zero\n"); return bt_close (bt), NULL; - bits = latchmgr->alloc->bits; - } else if( mode == BT_ro ) - return bt_close (bt), NULL; + } else + bits = latchmgr->alloc->bits; + } #endif bt->page_size = 1 << bits; @@ -772,8 +917,15 @@ struct flock lock[1]; bt->mode = mode; - if( size || *amt ) + if( size || *amt ) { + nlatchpage = latchmgr->nlatchpage; goto btlatch; + } + + if( nodemax < 16 ) { + fprintf(stderr, "Buffer pool too small: %d\n", nodemax); + return bt_close(bt), NULL; + } // initialize an empty b-tree with latch page, root page, page of leaves // and page(s) of latches and page pool cache @@ -783,7 +935,7 @@ struct flock lock[1]; // calculate number of latch hash table entries - nlatchpage = (nodemax/8 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size; + nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size; latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry); nlatchpage += nodemax; // size of the buffer pool in pages @@ -793,110 +945,104 @@ struct flock lock[1]; latchmgr->nlatchpage = nlatchpage; latchmgr->latchtotal = nodemax; latchmgr->latchhash = latchhash; -#ifdef unix - if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size ) - return bt_close (bt), NULL; -#else - if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) ) - return bt_close (bt), NULL; - if( *amt < bt->page_size ) + if( bt_writepage (bt, latchmgr->alloc, 0) ) { + fprintf (stderr, "Unable to create btree page zero\n"); return bt_close (bt), NULL; -#endif + } + memset (latchmgr, 0, 1 << bits); latchmgr->alloc->bits = bt->page_bits; for( lvl=MIN_lvl; lvl--; ) { + last = MIN_lvl - lvl; // page number slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3; - bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0); // next(lower) page number + bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? last + 1 : 0); key = keyptr(latchmgr->alloc, 1); key->len = 2; // create stopper key key->key[0] = 0xff; key->key[1] = 0xff; + latchmgr->alloc->min = bt->page_size - 3; latchmgr->alloc->lvl = lvl; latchmgr->alloc->cnt = 1; latchmgr->alloc->act = 1; -#ifdef unix - if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size ) - return bt_close (bt), NULL; -#else - if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) ) - return bt_close (bt), NULL; - if( *amt < bt->page_size ) + if( bt_writepage (bt, latchmgr->alloc, last) ) { + fprintf (stderr, "Unable to create btree page %.8x\n", last); return bt_close (bt), NULL; -#endif + } } - // clear out latch manager pages + // clear out buffer pool pages memset(latchmgr, 0, bt->page_size); - last = MIN_lvl + 1; + last = MIN_lvl + nlatchpage; - while( last < ((MIN_lvl + 1 + nlatchpage) ) ) { - off = (uid)last << bt->page_bits; + if( bt_writepage (bt, latchmgr->alloc, last) ) { + fprintf (stderr, "Unable to write buffer pool page %.8x\n", last); + return bt_close (bt), NULL; + } + #ifdef unix - pwrite(bt->idx, latchmgr, bt->page_size, off); + free (latchmgr); #else - SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN); - if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) ) - return bt_close (bt), NULL; - if( *amt < bt->page_size ) - return bt_close (bt), NULL; + VirtualFree (latchmgr, 0, MEM_RELEASE); #endif - last++; - } btlatch: #ifdef unix lock->l_type = F_UNLCK; - if( fcntl (bt->idx, F_SETLK, lock) < 0 ) + if( fcntl (bt->idx, F_SETLK, lock) < 0 ) { + fprintf (stderr, "Unable to unlock page zero\n"); return bt_close (bt), NULL; + } #else - if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) ) + if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) ) { + fprintf (stderr, "Unable to unlock page zero, GetLastError = %d\n", GetLastError()); return bt_close (bt), NULL; + } #endif #ifdef unix flag = PROT_READ | PROT_WRITE; bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size); - if( bt->latchmgr == MAP_FAILED ) + if( bt->latchmgr == MAP_FAILED ) { + fprintf (stderr, "Unable to mmap page zero, errno = %d", errno); return bt_close (bt), NULL; - bt->table = (void *)mmap (0, bt->latchmgr->nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size); - if( bt->table == MAP_FAILED ) + } + bt->table = (void *)mmap (0, (uid)nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size); + if( bt->table == MAP_FAILED ) { + fprintf (stderr, "Unable to mmap buffer pool, errno = %d", errno); return bt_close (bt), NULL; + } + madvise (bt->table, (uid)nlatchpage << bt->page_bits, MADV_RANDOM | MADV_WILLNEED); #else flag = PAGE_READWRITE; - bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, (bt->latchmgr->nlatchpage + LATCH_page) * bt->page_size, NULL); - if( !bt->halloc ) + bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size, NULL); + if( !bt->halloc ) { + fprintf (stderr, "Unable to create file mapping for buffer pool mgr, GetLastError = %d\n", GetLastError()); return bt_close (bt), NULL; + } flag = FILE_MAP_WRITE; - bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, (bt->latchmgr->nlatchpage + LATCH_page) * bt->page_size); - if( !bt->latchmgr ) - return GetLastError(), bt_close (bt), NULL; + bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size); + if( !bt->latchmgr ) { + fprintf (stderr, "Unable to map buffer pool, GetLastError = %d\n", GetLastError()); + return bt_close (bt), NULL; + } bt->table = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size); #endif - bt->latchpool = (unsigned char *)bt->table + (bt->latchmgr->nlatchpage - bt->latchmgr->latchtotal) * bt->page_size; - bt->latchsets = (BtLatchSet *)(bt->latchpool - bt->latchmgr->latchtotal * sizeof(BtLatchSet)); - -#ifdef unix - free (latchmgr); -#else - VirtualFree (latchmgr, 0, MEM_RELEASE); -#endif + bt->pagepool = (unsigned char *)bt->table + (uid)(nlatchpage - bt->latchmgr->latchtotal) * bt->page_size; + bt->latchsets = (BtLatchSet *)(bt->pagepool - (uid)bt->latchmgr->latchtotal * sizeof(BtLatchSet)); #ifdef unix - bt->mem = malloc (3 * bt->page_size); + bt->mem = valloc (2 * bt->page_size); #else - bt->mem = VirtualAlloc(NULL, 3 * bt->page_size, MEM_COMMIT, PAGE_READWRITE); + bt->mem = VirtualAlloc(NULL, 2 * bt->page_size, MEM_COMMIT, PAGE_READWRITE); #endif bt->frame = (BtPage)bt->mem; bt->cursor = (BtPage)(bt->mem + bt->page_size); - bt->zero = (BtPage)(bt->mem + 2 * bt->page_size); - - memset (bt->zero, 0, bt->page_size); return bt; } @@ -906,19 +1052,19 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch) { switch( mode ) { case BtLockRead: - bt_spinreadlock (latch->readwr); + ReadLock (latch->readwr); break; case BtLockWrite: - bt_spinwritelock (latch->readwr); + WriteLock (latch->readwr); break; case BtLockAccess: - bt_spinreadlock (latch->access); + ReadLock (latch->access); break; case BtLockDelete: - bt_spinwritelock (latch->access); + WriteLock (latch->access); break; case BtLockParent: - bt_spinwritelock (latch->parent); + WriteLock (latch->parent); break; } } @@ -929,19 +1075,19 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch) { switch( mode ) { case BtLockRead: - bt_spinreleaseread (latch->readwr); + ReadRelease (latch->readwr); break; case BtLockWrite: - bt_spinreleasewrite (latch->readwr); + WriteRelease (latch->readwr); break; case BtLockAccess: - bt_spinreleaseread (latch->access); + ReadRelease (latch->access); break; case BtLockDelete: - bt_spinreleasewrite (latch->access); + WriteRelease (latch->access); break; case BtLockParent: - bt_spinreleasewrite (latch->parent); + WriteRelease (latch->parent); break; } } @@ -953,9 +1099,6 @@ uid bt_newpage(BtDb *bt, BtPage page) BtLatchSet *latch; uid new_page; BtPage temp; -off64_t off; -uint amt[1]; -int reuse; // lock allocation page @@ -965,36 +1108,28 @@ int reuse; // else allocate empty page if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) { - latch = bt_pinlatch (bt, new_page); - temp = bt_mappage (bt, latch); + if( latch = bt_pinlatch (bt, new_page) ) + temp = bt_mappage (bt, latch); + else + return 0; bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right)); bt_spinreleasewrite(bt->latchmgr->lock); memcpy (temp, page, bt->page_size); - if( bt_update (bt, temp, latch) ) - return 0; - + bt_update (bt, temp); bt_unpinlatch (latch); + return new_page; } else { new_page = bt_getid(bt->latchmgr->alloc->right); bt_putid(bt->latchmgr->alloc->right, new_page+1); bt_spinreleasewrite(bt->latchmgr->lock); - off = new_page << bt->page_bits; -#ifdef unix - if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size ) - return bt->err = BTERR_wrt, 0; -#else - SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN); - - if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) ) - return bt->err = BTERR_wrt, 0; - if( *amt < bt->page_size ) - return bt->err = BTERR_wrt, 0; -#endif + if( bt_writepage (bt, page, new_page) ) + return 0; } + bt_update (bt, bt->latchmgr->alloc); return new_page; } @@ -1019,23 +1154,23 @@ int ans; // Update current page of btree by // flushing mapped area to disk backing of cache pool. +// mark page as dirty for rewrite to permanent location -BTERR bt_update (BtDb *bt, BtPage page, BtLatchSet *latch) +void bt_update (BtDb *bt, BtPage page) { #ifdef unix msync (page, bt->page_size, MS_ASYNC); #else - FlushViewOfFile (page, bt->page_size); +// FlushViewOfFile (page, bt->page_size); #endif - latch->dirty = 1; - return 0; + page->dirty = 1; } // map the btree cached page onto current page BtPage bt_mappage (BtDb *bt, BtLatchSet *latch) { - return (BtPage)((latch - bt->latchsets) * bt->page_size + bt->latchpool); + return (BtPage)((uid)(latch - bt->latchsets) * bt->page_size + bt->pagepool); } // deallocate a deleted page @@ -1053,10 +1188,9 @@ BtPage page = bt_mappage (bt, latch); // store chain in second right bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right)); bt_putid(bt->latchmgr->alloc[1].right, page_no); - page->free = 1; - if( bt_update(bt, page, latch) ) - return bt->err; + page->free = 1; + bt_update(bt, page); // unlock released page @@ -1067,6 +1201,7 @@ BtPage page = bt_mappage (bt, latch); // unlock allocation page bt_spinreleasewrite (bt->latchmgr->lock); + bt_update (bt, bt->latchmgr->alloc); return 0; } @@ -1116,8 +1251,10 @@ uint mode, prevmode; // determine lock mode of drill level mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; - bt->latch = bt_pinlatch(bt, page_no); - bt->page_no = page_no; + if( bt->latch = bt_pinlatch(bt, page_no) ) + bt->page_no = page_no; + else + return 0; // obtain access lock using lock chaining @@ -1207,14 +1344,12 @@ BtKey ptr; memcpy(rightkey, ptr, ptr->len + 1); memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot)); - bt->page->dirty = 1; + bt->page->clean = 1; ptr = keyptr(bt->page, bt->page->cnt); memcpy(leftkey, ptr, ptr->len + 1); - if( bt_update (bt, bt->page, latch) ) - return bt->err; - + bt_update (bt, bt->page); bt_lockpage (BtLockParent, latch); bt_unlockpage (BtLockWrite, latch); @@ -1253,16 +1388,16 @@ uint idx; break; child = bt_getid (slotptr(root, idx)->id); - latch = bt_pinlatch (bt, child); + if( latch = bt_pinlatch (bt, child) ) + temp = bt_mappage (bt, latch); + else + return bt->err; bt_lockpage (BtLockDelete, latch); bt_lockpage (BtLockWrite, latch); - - temp = bt_mappage (bt, latch); memcpy (root, temp, bt->page_size); - if( bt_update (bt, root, bt->latch) ) - return bt->err; + bt_update (bt, root); if( bt_freepage (bt, child, latch) ) return bt->err; @@ -1300,7 +1435,7 @@ BtKey ptr; if( found = !keycmp (ptr, key, len) ) if( found = slotptr(bt->page, slot)->dead == 0 ) { dirty = slotptr(bt->page,slot)->dead = 1; - bt->page->dirty = 1; + bt->page->clean = 1; bt->page->act--; // collapse empty slots @@ -1344,8 +1479,7 @@ BtKey ptr; // return if page is not empty if( bt->page->act ) { - if( bt_update(bt, bt->page, latch) ) - return bt->err; + bt_update(bt, bt->page); bt_unlockpage(BtLockWrite, latch); bt_unpinlatch (latch); return bt->found = found, 0; @@ -1359,10 +1493,12 @@ BtKey ptr; // obtain lock on right page - rlatch = bt_pinlatch (bt, right); - bt_lockpage(BtLockWrite, rlatch); + if( rlatch = bt_pinlatch (bt, right) ) + temp = bt_mappage (bt, rlatch); + else + return bt->err; - temp = bt_mappage (bt, rlatch); + bt_lockpage(BtLockWrite, rlatch); if( temp->kill ) { bt_abort(bt, temp, right, 0); @@ -1384,11 +1520,8 @@ BtKey ptr; bt_putid(temp->right, page_no); temp->kill = 1; - if( bt_update(bt, bt->page, latch) ) - return bt->err; - - if( bt_update(bt, temp, rlatch) ) - return bt->err; + bt_update(bt, bt->page); + bt_update(bt, temp); bt_lockpage(BtLockParent, latch); bt_unlockpage(BtLockWrite, latch); @@ -1467,7 +1600,7 @@ int ret; // skip cleanup if nothing to reclaim - if( !page->dirty ) + if( !page->clean ) return 0; memcpy (bt->frame, page, bt->page_size); @@ -1552,8 +1685,7 @@ uid right; // update and release root (bt->page) - if( bt_update(bt, root, bt->latch) ) - return bt->err; + bt_update(bt, root); bt_unlockpage(BtLockWrite, bt->latch); bt_unpinlatch(bt->latch); @@ -1620,7 +1752,7 @@ BtKey key; memcpy (bt->frame, page, bt->page_size); memset (page+1, 0, bt->page_size - sizeof(*page)); nxt = bt->page_size; - page->dirty = 0; + page->clean = 0; page->act = 0; cnt = 0; idx = 0; @@ -1655,13 +1787,14 @@ BtKey key; // lock right page - rlatch = bt_pinlatch (bt, right); - bt_lockpage (BtLockParent, rlatch); + if( rlatch = bt_pinlatch (bt, right) ) + bt_lockpage (BtLockParent, rlatch); + else + return bt->err; // update left (containing) node - if( bt_update(bt, page, latch) ) - return bt->err; + bt_update(bt, page); bt_lockpage (BtLockParent, latch); bt_unlockpage (BtLockWrite, latch); @@ -1715,8 +1848,7 @@ BtKey ptr; slotptr(page, slot)->tod = tod; #endif bt_putid(slotptr(page,slot)->id, id); - if( bt_update(bt, bt->page, bt->latch) ) - return bt->err; + bt_update(bt, bt->page); bt_unlockpage(BtLockWrite, bt->latch); bt_unpinlatch (bt->latch); return 0; @@ -1759,8 +1891,7 @@ BtKey ptr; #endif slotptr(page, slot)->dead = 0; - if( bt_update(bt, bt->page, bt->latch) ) - return bt->err; + bt_update(bt, bt->page); bt_unlockpage(BtLockWrite, bt->latch); bt_unpinlatch(bt->latch); @@ -1809,8 +1940,11 @@ off64_t right; break; bt->cursor_page = right; - latch = bt_pinlatch (bt, right); - bt_lockpage(BtLockRead, latch); + + if( latch = bt_pinlatch (bt, right) ) + bt_lockpage(BtLockRead, latch); + else + return 0; bt->page = bt_mappage (bt, latch); memcpy (bt->cursor, bt->page, bt->page_size); @@ -1846,17 +1980,21 @@ uint bt_audit (BtDb *bt) uint idx, hashidx; uid next, page_no; BtLatchSet *latch; +uint blks[64]; uint cnt = 0; BtPage page; -off64_t off; uint amt[1]; BtKey ptr; #ifdef unix + posix_fadvise( bt->idx, 0, 0, POSIX_FADV_SEQUENTIAL); +#endif if( *(ushort *)(bt->latchmgr->lock) ) fprintf(stderr, "Alloc page locked\n"); *(ushort *)(bt->latchmgr->lock) = 0; + memset (blks, 0, sizeof(blks)); + for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) { latch = bt->latchsets + idx; if( *(ushort *)latch->readwr ) @@ -1871,53 +2009,36 @@ BtKey ptr; fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no); *(ushort *)latch->parent = 0; - if( latch->pin ) { + if( latch->pin & PIN_mask ) { fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no); latch->pin = 0; } - page = (BtPage)(idx * bt->page_size + bt->latchpool); - off = latch->page_no << bt->page_bits; -#ifdef unix - if( latch->dirty ) - if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size ) - fprintf(stderr, "Page %.8x Write Error\n", latch->page_no); -#else - if( latch->dirty ) { - SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN); + page = (BtPage)((uid)idx * bt->page_size + bt->pagepool); + blks[page->lvl]++; - if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) ) + if( page->dirty ) + if( bt_writepage (bt, page, latch->page_no) ) fprintf(stderr, "Page %.8x Write Error\n", latch->page_no); - - if( *amt < bt->page_size ) - fprintf(stderr, "Page %.8x Write Error\n", latch->page_no); - } -#endif - latch->dirty = 0; } + for( idx = 0; blks[idx]; idx++ ) + fprintf(stderr, "cache: %d lvl %d blocks\n", blks[idx], idx); + for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) { if( *(ushort *)(bt->table[hashidx].latch) ) fprintf(stderr, "hash entry %d locked\n", hashidx); *(ushort *)(bt->table[hashidx].latch) = 0; - - if( idx = bt->table[hashidx].slot ) do { - latch = bt->latchsets + idx; - if( *(ushort *)latch->busy ) - fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no); - *(ushort *)latch->busy = 0; - if( latch->hash != hashidx ) - fprintf(stderr, "latchset %d wrong hashidx\n", idx); - if( latch->pin ) - fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no); - } while( idx = latch->next ); } + memset (blks, 0, sizeof(blks)); + next = bt->latchmgr->nlatchpage + LATCH_page; page_no = LEAF_page; while( page_no < bt_getid(bt->latchmgr->alloc->right) ) { - pread (bt->idx, bt->frame, bt->page_size, page_no << bt->page_bits); + if( bt_readpage (bt, bt->frame, page_no) ) + fprintf(stderr, "page %.8x unreadable\n", page_no); if( !bt->frame->free ) { for( idx = 0; idx++ < bt->frame->cnt - 1; ) { ptr = keyptr(bt->frame, idx+1); @@ -1926,16 +2047,18 @@ BtKey ptr; } if( !bt->frame->lvl ) cnt += bt->frame->act; + blks[bt->frame->lvl]++; } if( page_no > LEAF_page ) next = page_no + 1; page_no = next; } + + for( idx = 0; blks[idx]; idx++ ) + fprintf(stderr, "btree: %d lvl %d blocks\n", blks[idx], idx); + return cnt - 1; -#else - return 0; -#endif } #ifndef unix @@ -2009,15 +2132,20 @@ int ch, cnt = 0, bits = 12, idx; unsigned char key[256]; double done, start; uid next, page_no; +BtLatchSet *latch; float elapsed; time_t tod[1]; uint scan = 0; uint len = 0; uint map = 0; +BtPage page; BtKey ptr; BtDb *bt; FILE *in; +#ifdef WIN32 + _setmode (1, _O_BINARY); +#endif if( argc < 4 ) { fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find/Count [page_bits mapped_pool_pages start_line_number]\n", argv[0]); fprintf (stderr, " page_bits: size of btree page in bits\n"); @@ -2046,15 +2174,27 @@ FILE *in; switch(argv[3][0]| 0x20) { - case 'a': - fprintf(stderr, "started audit for %s\n", argv[2]); + case 'p': // display page + if( latch = bt_pinlatch (bt, off) ) + page = bt_mappage (bt, latch); + else + fprintf(stderr, "unable to read page %.8x\n", off); + + write (1, page, bt->page_size); + break; + + case 'a': // buffer pool audit + fprintf(stderr, "started audit for %s\n", argv[1]); cnt = bt_audit (bt); - fprintf(stderr, "finished audit for %s, %d keys\n", argv[2], cnt); + fprintf(stderr, "finished audit for %s, %d keys\n", argv[1], cnt); break; - case 'w': + case 'w': // write keys fprintf(stderr, "started indexing for %s\n", argv[2]); - if( argc > 2 && (in = fopen (argv[2], "rb")) ) + if( argc > 2 && (in = fopen (argv[2], "rb")) ) { +#ifdef unix + posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE); +#endif while( ch = getc(in), ch != EOF ) if( ch == '\n' ) { @@ -2067,12 +2207,16 @@ FILE *in; } else if( len < 245 ) key[len++] = ch; + } fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line); break; - case 'd': + case 'd': // delete keys fprintf(stderr, "started deleting keys for %s\n", argv[2]); - if( argc > 2 && (in = fopen (argv[2], "rb")) ) + if( argc > 2 && (in = fopen (argv[2], "rb")) ) { +#ifdef unix + posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE); +#endif while( ch = getc(in), ch != EOF ) if( ch == '\n' ) { @@ -2085,12 +2229,16 @@ FILE *in; } else if( len < 245 ) key[len++] = ch; + } fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line); break; - case 'f': + case 'f': // find keys fprintf(stderr, "started finding keys for %s\n", argv[2]); - if( argc > 2 && (in = fopen (argv[2], "rb")) ) + if( argc > 2 && (in = fopen (argv[2], "rb")) ) { +#ifdef unix + posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE); +#endif while( ch = getc(in), ch != EOF ) if( ch == '\n' ) { @@ -2105,13 +2253,30 @@ FILE *in; } else if( len < 245 ) key[len++] = ch; + } fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found); break; - case 's': - scan++; + case 's': // scan and print keys + fprintf(stderr, "started scaning\n"); + cnt = len = key[0] = 0; + + if( slot = bt_startkey (bt, key, len) ) + slot--; + else + fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0); + + while( slot = bt_nextkey (bt, slot) ) { + ptr = bt_key(bt, slot); + fwrite (ptr->key, ptr->len, 1, stdout); + fputc ('\n', stdout); + cnt++; + } + + fprintf(stderr, " Total keys read %d\n", cnt - 1); + break; - case 'c': + case 'c': // count keys fprintf(stderr, "started counting\n"); cnt = 0; @@ -2119,24 +2284,23 @@ FILE *in; page_no = LEAF_page; while( page_no < bt_getid(bt->latchmgr->alloc->right) ) { - uid off = page_no << bt->page_bits; -#ifdef unix - pread (bt->idx, bt->frame, bt->page_size, off); -#else - DWORD amt[1]; - - SetFilePointer (bt->idx, (long)off, NULL, FILE_BEGIN); - - if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL)) - fprintf (stderr, "unable to read page %.8x", page_no); - - if( *amt < bt->page_size ) - fprintf (stderr, "unable to read page %.8x", page_no); -#endif - if( !bt->frame->free && !bt->frame->lvl ) - cnt += bt->frame->act; + if( latch = bt_pinlatch (bt, page_no) ) + page = bt_mappage (bt, latch); + if( !page->free && !page->lvl ) + cnt += page->act; if( page_no > LEAF_page ) next = page_no + 1; + if( scan ) + for( idx = 0; idx++ < page->cnt; ) { + if( slotptr(page, idx)->dead ) + continue; + ptr = keyptr(page, idx); + if( idx != page->cnt && bt_getid (page->right) ) { + fwrite (ptr->key, ptr->len, 1, stdout); + fputc ('\n', stdout); + } + } + bt_unpinlatch (latch); page_no = next; }