X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=btree2t.c;h=1a544e956f88970c9157ffb25c2a9d58104a951e;hp=900cc1f9c8d855e5612617a4cf389999319b1183;hb=HEAD;hpb=6272c4d064d8c01849a4a17ccc492913c849fb3d diff --git a/btree2t.c b/btree2t.c index 900cc1f..1a544e9 100644 --- a/btree2t.c +++ b/btree2t.c @@ -1,5 +1,6 @@ -// btree version 2t -// 15 FEB 2014 +// btree version 2t sched_yield version of spinlocks +// with reworked bt_deletekey code +// 12 MAR 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -29,7 +30,6 @@ REDISTRIBUTION OF THIS SOFTWARE. #ifdef unix #include -#include #include #include #include @@ -45,7 +45,6 @@ REDISTRIBUTION OF THIS SOFTWARE. #include #endif -#include #include #include @@ -57,6 +56,8 @@ typedef unsigned short ushort; typedef unsigned int uint; #endif +#define BT_latchtable 8192 // number of latch manager slots + #define BT_ro 0x6f72 // ro #define BT_rw 0x7772 // rw #define BT_fl 0x6c66 // fl @@ -64,6 +65,7 @@ typedef unsigned int uint; #define BT_maxbits 24 // maximum page size in bits #define BT_minbits 9 // minimum page size in bits #define BT_minpage (1 << BT_minbits) // minimum page size +#define BT_maxpage (1 << BT_maxbits) // maximum page size /* There are five lock types for each node in three independent sets: @@ -82,6 +84,44 @@ typedef enum{ BtLockParent }BtLock; +// definition for latch implementation + +// exclusive is set for write access +// share is count of read accessors +// grant write lock when share == 0 + +volatile typedef struct { + ushort exclusive:1; + ushort pending:1; + ushort share:14; +} BtSpinLatch; + +#define XCL 1 +#define PEND 2 +#define BOTH 3 +#define SHARE 4 + +// hash table entries + +typedef struct { + BtSpinLatch latch[1]; + volatile ushort slot; // Latch table entry at head of chain +} BtHashEntry; + +// latch manager table structure + +typedef struct { + BtSpinLatch readwr[1]; // read/write page lock + BtSpinLatch access[1]; // Access Intent/Page delete + BtSpinLatch parent[1]; // Posting of fence key in parent + BtSpinLatch busy[1]; // slot is being moved between chains + volatile ushort next; // next entry in hash table chain + volatile ushort prev; // prev entry in hash table chain + volatile ushort pin; // number of outstanding locks + volatile ushort hash; // hash slot entry is under + volatile uid page_no; // latch set page number +} BtLatchSet; + // Define the length of the page and key pointers #define BtId 6 @@ -94,7 +134,8 @@ typedef enum{ // the tod field from the key. // Keys are marked dead, but remain on the page until -// cleanup is called. +// cleanup is called. The fence key (highest key) for +// the page is always present, even if dead. typedef struct { uint off:BT_maxbits; // page offset for key start @@ -122,22 +163,12 @@ typedef struct BtPage_ { uint min; // next key offset unsigned char bits:7; // page size in bits unsigned char free:1; // page is on free list - unsigned char lvl:4; // level of page - unsigned char kill:1; // page is empty + unsigned char lvl:6; // level of page + unsigned char kill:1; // page is being deleted unsigned char dirty:1; // page is dirty - unsigned char posted:1; // page fence is posted - unsigned char goright:1; // continue to right link unsigned char right[BtId]; // page number to right - unsigned char fence[256]; // page fence key } *BtPage; -// The loadpage interface object - -typedef struct { - uid page_no; - BtPage page; -} BtPageSet; - // The memory mapping hash table entry typedef struct { @@ -150,7 +181,18 @@ typedef struct { #ifndef unix HANDLE hmap; #endif -} BtHash; +}BtHash; + +typedef struct { + struct BtPage_ alloc[2]; // next & free page_nos in right ptr + BtSpinLatch lock[1]; // allocation area lite latch + ushort latchdeployed; // highest number of latch entries deployed + ushort nlatchpage; // number of latch pages at BT_latch + ushort latchtotal; // number of page latch entries + ushort latchhash; // number of latch hash table slots + ushort latchvictim; // next latch entry to examine + BtHashEntry table[0]; // the hash table +} BtLatchMgr; // The object structure for Btree access @@ -164,54 +206,57 @@ typedef struct _BtDb { uint mode; // read-write mode uint mapped_io; // use memory mapping BtPage temp; // temporary frame buffer (memory mapped/file IO) - BtPage temp2; // temporary frame buffer (memory mapped/file IO) - BtPage parent; // current page's parent node (memory mapped/file IO) - BtPage alloc; // frame for alloc page (memory mapped/file IO) + BtPage alloc; // frame buffer for alloc page ( page 0 ) BtPage cursor; // cached frame for start/next (never mapped) BtPage frame; // spare frame for the page split (never mapped) BtPage zero; // zeroes frame buffer (never mapped) - BtPage page; // temporary page (memory mapped/file IO) + BtPage page; // current page + BtLatchSet *latch; // current page latch + BtLatchMgr *latchmgr; // mapped latch page from allocation page + BtLatchSet *latchsets; // mapped latch set from latch pages #ifdef unix int idx; #else HANDLE idx; + HANDLE halloc; // allocation and latch table handle #endif unsigned char *mem; // frame, cursor, page memory buffer int nodecnt; // highest page cache segment in use int nodemax; // highest page cache segment allocated int hashmask; // number of pages in segments - 1 int hashsize; // size of hash table - int posted; // last loadpage found posted key - int found; // last insert/delete found key + int found; // last deletekey found key BtHash *lrufirst; // lru list head BtHash *lrulast; // lru list tail ushort *cache; // hash table for cached segments - BtHash nodes[1]; // segment cache follows + BtHash *nodes; // segment cache } BtDb; typedef enum { BTERR_ok = 0, +BTERR_notfound, BTERR_struct, BTERR_ovflw, BTERR_lock, +BTERR_hash, +BTERR_kill, BTERR_map, BTERR_wrt, -BTERR_hash +BTERR_eof } BTERR; // B-Tree functions extern void bt_close (BtDb *bt); -extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk, uint pgblk); +extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk, uint pgblk, uint hashsize); extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod); -extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len); +extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl); extern uid bt_findkey (BtDb *bt, unsigned char *key, uint len); extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len); extern uint bt_nextkey (BtDb *bt, uint slot); -// Internal functions - -BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence); - +// internal functions +BTERR bt_update (BtDb *bt, BtPage page, uid page_no); +BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no); // Helper functions to return slot values extern BtKey bt_key (BtDb *bt, uint slot); @@ -222,6 +267,7 @@ extern uint bt_tod (BtDb *bt, uint slot); #define ALLOC_page 0 #define ROOT_page 1 #define LEAF_page 2 +#define LATCH_page 3 // Number of levels to create in a new BTree @@ -251,7 +297,11 @@ extern uint bt_tod (BtDb *bt, uint slot); // one with two keys. // Deleted keys are marked with a dead bit until -// page cleanup +// page cleanup The fence key for a node is always +// present, even after deletion and cleanup. + +// Deleted leaf pages are reclaimed on a free list. +// The upper levels of the btree are fixed on creation. // Groups of pages from the btree are optionally // cached with memory mapping. A hash table is used to keep @@ -264,14 +314,12 @@ extern uint bt_tod (BtDb *bt, uint slot); // page numbers are used in cases where the page is being split, // or consolidated. -// Page 0 is dedicated to lock for new page extensions, -// and chains empty pages together for reuse. +// Page 0 (ALLOC page) is dedicated to lock for new page extensions, +// and chains empty leaf pages together for reuse. // Parent locks are obtained to prevent resplitting or deleting a node // before its fence is posted into its upper level. -// Empty nodes are chained together through the ALLOC page and reused. - // A special open mode of BT_fl is provided to safely access files on // WIN32 networks. WIN32 network operations should not use memory mapping. // This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH @@ -302,102 +350,296 @@ int i; return id; } -// place write, read, or parent lock on requested page_no. +BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err) +{ +BtKey ptr; + + fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no); + fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act); + ptr = keyptr(page, page->cnt); + fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key); + fprintf(stderr, "right=%.8x\n", bt_getid(page->right)); + return bt->err = err; +} + +// Spin Latch Manager -BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode) +// wait until write lock mode is clear +// and add 1 to the share count + +void bt_spinreadlock(BtSpinLatch *latch) { -off64_t off = page_no << bt->page_bits; +ushort prev; + + do { #ifdef unix -int flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE ); -struct flock lock[1]; + prev = __sync_fetch_and_add ((ushort *)latch, SHARE); #else -uint flags = 0, len; -OVERLAPPED ovl[1]; + prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE); #endif + // see if exclusive request is granted or pending - if( mode == BtLockRead || mode == BtLockWrite ) - off += 1 * sizeof(*bt->page); // use second segment + if( !(prev & BOTH) ) + return; +#ifdef unix + prev = __sync_fetch_and_add ((ushort *)latch, -SHARE); +#else + prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE); +#endif +#ifdef unix + } while( sched_yield(), 1 ); +#else + } while( SwitchToThread(), 1 ); +#endif +} - if( mode == BtLockParent ) - off += 2 * sizeof(*bt->page); // use third segment +// wait for other read and write latches to relinquish + +void bt_spinwritelock(BtSpinLatch *latch) +{ +ushort prev; + do { +#ifdef unix + prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL); +#else + prev = _InterlockedOr16((ushort *)latch, PEND | XCL); +#endif + if( !(prev & XCL) ) + if( !(prev & ~BOTH) ) + return; + else #ifdef unix - memset (lock, 0, sizeof(lock)); + __sync_fetch_and_and ((ushort *)latch, ~XCL); +#else + _InterlockedAnd16((ushort *)latch, ~XCL); +#endif +#ifdef unix + } while( sched_yield(), 1 ); +#else + } while( SwitchToThread(), 1 ); +#endif +} - lock->l_start = off; - lock->l_type = (mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent) ? F_WRLCK : F_RDLCK; - lock->l_len = sizeof(*bt->page); - lock->l_whence = 0; +// try to obtain write lock - if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) - return bt->err = BTERR_lock; +// return 1 if obtained, +// 0 otherwise + +int bt_spinwritetry(BtSpinLatch *latch) +{ +ushort prev; + +#ifdef unix + prev = __sync_fetch_and_or((ushort *)latch, XCL); +#else + prev = _InterlockedOr16((ushort *)latch, XCL); +#endif + // take write access if all bits are clear + if( !(prev & XCL) ) + if( !(prev & ~BOTH) ) + return 1; + else +#ifdef unix + __sync_fetch_and_and ((ushort *)latch, ~XCL); +#else + _InterlockedAnd16((ushort *)latch, ~XCL); +#endif return 0; +} + +// clear write mode + +void bt_spinreleasewrite(BtSpinLatch *latch) +{ +#ifdef unix + __sync_fetch_and_and((ushort *)latch, ~BOTH); #else - memset (ovl, 0, sizeof(ovl)); - ovl->OffsetHigh = (uint)(off >> 32); - ovl->Offset = (uint)off; - len = sizeof(*bt->page); + _InterlockedAnd16((ushort *)latch, ~BOTH); +#endif +} - // use large offsets to - // simulate advisory locking +// decrement reader count - ovl->OffsetHigh |= 0x80000000; +void bt_spinreleaseread(BtSpinLatch *latch) +{ +#ifdef unix + __sync_fetch_and_add((ushort *)latch, -SHARE); +#else + _InterlockedExchangeAdd16((ushort *)latch, -SHARE); +#endif +} - if( mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent ) - flags |= LOCKFILE_EXCLUSIVE_LOCK; +// link latch table entry into latch hash table - if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) ) - return bt->err = 0; +void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no) +{ +BtLatchSet *latch = bt->latchsets + victim; - return bt->err = BTERR_lock; -#endif + if( latch->next = bt->latchmgr->table[hashidx].slot ) + bt->latchsets[latch->next].prev = victim; + + bt->latchmgr->table[hashidx].slot = victim; + latch->page_no = page_no; + latch->hash = hashidx; + latch->prev = 0; } -// remove write, read, or parent lock on requested page_no. +// release latch pin -BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode) +void bt_unpinlatch (BtLatchSet *latch) { -off64_t off = page_no << bt->page_bits; #ifdef unix -struct flock lock[1]; + __sync_fetch_and_add(&latch->pin, -1); #else -OVERLAPPED ovl[1]; -uint len; + _InterlockedDecrement16 (&latch->pin); #endif +} - if( mode == BtLockRead || mode == BtLockWrite ) - off += 1 * sizeof(*bt->page); // use second segment +// find existing latchset or inspire new one +// return with latchset pinned - if( mode == BtLockParent ) - off += 2 * sizeof(*bt->page); // use third segment +BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no) +{ +ushort hashidx = page_no % bt->latchmgr->latchhash; +ushort slot, avail = 0, victim, idx; +BtLatchSet *latch; + + // obtain read lock on hash table entry + + bt_spinreadlock(bt->latchmgr->table[hashidx].latch); + + if( slot = bt->latchmgr->table[hashidx].slot ) do + { + latch = bt->latchsets + slot; + if( page_no == latch->page_no ) + break; + } while( slot = latch->next ); + if( slot ) { #ifdef unix - memset (lock, 0, sizeof(lock)); + __sync_fetch_and_add(&latch->pin, 1); +#else + _InterlockedIncrement16 (&latch->pin); +#endif + } - lock->l_start = off; - lock->l_type = F_UNLCK; - lock->l_len = sizeof(*bt->page); - lock->l_whence = 0; + bt_spinreleaseread (bt->latchmgr->table[hashidx].latch); - if( fcntl (bt->idx, F_SETLK, lock) < 0 ) - return bt->err = BTERR_lock; + if( slot ) + return latch; + + // try again, this time with write lock + + bt_spinwritelock(bt->latchmgr->table[hashidx].latch); + + if( slot = bt->latchmgr->table[hashidx].slot ) do + { + latch = bt->latchsets + slot; + if( page_no == latch->page_no ) + break; + if( !latch->pin && !avail ) + avail = slot; + } while( slot = latch->next ); + + // found our entry, or take over an unpinned one + + if( slot || (slot = avail) ) { + latch = bt->latchsets + slot; +#ifdef unix + __sync_fetch_and_add(&latch->pin, 1); #else - memset (ovl, 0, sizeof(ovl)); - ovl->OffsetHigh = (uint)(off >> 32); - ovl->Offset = (uint)off; - len = sizeof(*bt->page); + _InterlockedIncrement16 (&latch->pin); +#endif + latch->page_no = page_no; + bt_spinreleasewrite(bt->latchmgr->table[hashidx].latch); + return latch; + } - // use large offsets to - // simulate advisory locking + // see if there are any unused entries +#ifdef unix + victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1; +#else + victim = _InterlockedIncrement16 (&bt->latchmgr->latchdeployed); +#endif - ovl->OffsetHigh |= 0x80000000; + if( victim < bt->latchmgr->latchtotal ) { + latch = bt->latchsets + victim; +#ifdef unix + __sync_fetch_and_add(&latch->pin, 1); +#else + _InterlockedIncrement16 (&latch->pin); +#endif + bt_latchlink (bt, hashidx, victim, page_no); + bt_spinreleasewrite (bt->latchmgr->table[hashidx].latch); + return latch; + } + +#ifdef unix + victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1); +#else + victim = _InterlockedDecrement16 (&bt->latchmgr->latchdeployed); +#endif + // find and reuse previous lock entry - if( !UnlockFileEx (bt->idx, 0, len, 0, ovl) ) - return GetLastError(), bt->err = BTERR_lock; + while( 1 ) { +#ifdef unix + victim = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1); +#else + victim = _InterlockedIncrement16 (&bt->latchmgr->latchvictim) - 1; #endif + // we don't use slot zero - return bt->err = 0; + if( victim %= bt->latchmgr->latchtotal ) + latch = bt->latchsets + victim; + else + continue; + + // take control of our slot + // from other threads + + if( latch->pin || !bt_spinwritetry (latch->busy) ) + continue; + + idx = latch->hash; + + // try to get write lock on hash chain + // skip entry if not obtained + // or has outstanding locks + + if( !bt_spinwritetry (bt->latchmgr->table[idx].latch) ) { + bt_spinreleasewrite (latch->busy); + continue; + } + + if( latch->pin ) { + bt_spinreleasewrite (latch->busy); + bt_spinreleasewrite (bt->latchmgr->table[idx].latch); + continue; + } + + // unlink our available victim from its hash chain + + if( latch->prev ) + bt->latchsets[latch->prev].next = latch->next; + else + bt->latchmgr->table[idx].slot = latch->next; + + if( latch->next ) + bt->latchsets[latch->next].prev = latch->prev; + + bt_spinreleasewrite (bt->latchmgr->table[idx].latch); +#ifdef unix + __sync_fetch_and_add(&latch->pin, 1); +#else + _InterlockedIncrement16 (&latch->pin); +#endif + bt_latchlink (bt, hashidx, victim, page_no); + bt_spinreleasewrite (bt->latchmgr->table[hashidx].latch); + bt_spinreleasewrite (latch->busy); + return latch; + } } // close and release memory @@ -405,6 +647,14 @@ uint len; void bt_close (BtDb *bt) { BtHash *hash; +#ifdef unix + munmap (bt->latchsets, bt->latchmgr->nlatchpage * bt->page_size); + munmap (bt->latchmgr, bt->page_size); +#else + FlushViewOfFile(bt->latchmgr, 0); + UnmapViewOfFile(bt->latchmgr); + CloseHandle(bt->halloc); +#endif #ifdef unix // release mapped pages @@ -412,7 +662,7 @@ BtHash *hash; do munmap (hash->page, (bt->hashmask+1) << bt->page_bits); while(hash = hash->lrunext); - if ( bt->mem ) + if( bt->mem ) free (bt->mem); close (bt->idx); free (bt->cache); @@ -426,7 +676,7 @@ BtHash *hash; CloseHandle(hash->hmap); } while(hash = hash->lrunext); - if ( bt->mem) + if( bt->mem) VirtualFree (bt->mem, 0, MEM_RELEASE); FlushFileBuffers(bt->idx); CloseHandle(bt->idx); @@ -434,110 +684,105 @@ BtHash *hash; GlobalFree (bt); #endif } - // open/create new btree + // call with file_name, BT_openmode, bits in page size (e.g. 16), -// size of mapped page cache (e.g. 8192) or zero for no mapping. +// size of mapped page pool (e.g. 8192) -BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax, uint pgblk) +BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax, uint segsize, uint hashsize) { -uint lvl, attr, cacheblk, last; -BtLock lockmode = BtLockWrite; -BtPage alloc; +uint lvl, attr, cacheblk, last, slot, idx; +uint nlatchpage, latchhash; +BtLatchMgr *latchmgr; off64_t size; uint amt[1]; +BtKey key; BtDb* bt; +int flag; #ifndef unix SYSTEM_INFO sysinfo[1]; +OVERLAPPED ovl[1]; +uint len, flags; +#else +struct flock lock[1]; #endif + // determine sanity of page size and buffer pool + + if( bits > BT_maxbits ) + bits = BT_maxbits; + else if( bits < BT_minbits ) + bits = BT_minbits; + #ifdef unix - bt = malloc (sizeof(BtDb) + nodemax * sizeof(BtHash)); - memset (bt, 0, sizeof(BtDb)); + bt = calloc (1, sizeof(BtDb)); - switch (mode & 0x7fff) - { - case BT_fl: - case BT_rw: - bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666); - break; + bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666); - case BT_ro: - default: - bt->idx = open ((char*)name, O_RDONLY); - lockmode = BtLockRead; - break; - } if( bt->idx == -1 ) return free(bt), NULL; - if( nodemax ) - cacheblk = 4096; // page size for unix - else - cacheblk = 0; + cacheblk = 4096; // minimum mmap segment size for unix #else - bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb) + nodemax * sizeof(BtHash)); + bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb)); attr = FILE_ATTRIBUTE_NORMAL; - switch (mode & 0x7fff) - { - case BT_fl: - attr |= FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING; + bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL); - case BT_rw: - bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL); - break; - - case BT_ro: - default: - bt->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL); - lockmode = BtLockRead; - break; - } if( bt->idx == INVALID_HANDLE_VALUE ) return GlobalFree(bt), NULL; // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity GetSystemInfo(sysinfo); - - if( nodemax ) - cacheblk = sysinfo->dwAllocationGranularity; - else - cacheblk = 0; + cacheblk = sysinfo->dwAllocationGranularity; #endif - // determine sanity of page size +#ifdef unix + memset (lock, 0, sizeof(lock)); - if( bits > BT_maxbits ) - bits = BT_maxbits; - else if( bits < BT_minbits ) - bits = BT_minbits; + lock->l_type = F_WRLCK; + lock->l_len = sizeof(struct BtPage_); + lock->l_whence = 0; - if ( bt_lockpage(bt, ALLOC_page, lockmode) ) + if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) return bt_close (bt), NULL; +#else + memset (ovl, 0, sizeof(ovl)); + len = sizeof(struct BtPage_); + + // use large offsets to + // simulate advisory locking + + ovl->OffsetHigh |= 0x80000000; + + if( mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent ) + flags |= LOCKFILE_EXCLUSIVE_LOCK; + if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) ) + return bt_close (bt), NULL; +#endif #ifdef unix + latchmgr = malloc (BT_maxpage); *amt = 0; // read minimum page size to get root info if( size = lseek (bt->idx, 0L, 2) ) { - alloc = malloc (BT_minpage); - pread(bt->idx, alloc, BT_minpage, 0); - bits = alloc->bits; - free (alloc); + if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage ) + bits = latchmgr->alloc->bits; + else + return free(bt), free(latchmgr), NULL; } else if( mode == BT_ro ) - return bt_close (bt), NULL; + return free(latchmgr), bt_close (bt), NULL; #else + latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE); size = GetFileSize(bt->idx, amt); if( size || *amt ) { - alloc = VirtualAlloc(NULL, BT_minpage, MEM_COMMIT, PAGE_READWRITE); - if( !ReadFile(bt->idx, (char *)alloc, BT_minpage, amt, NULL) ) + if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) ) return bt_close (bt), NULL; - bits = alloc->bits; - VirtualFree (alloc, 0, MEM_RELEASE); + bits = latchmgr->alloc->bits; } else if( mode == BT_ro ) return bt_close (bt), NULL; #endif @@ -545,90 +790,101 @@ SYSTEM_INFO sysinfo[1]; bt->page_size = 1 << bits; bt->page_bits = bits; - bt->nodemax = nodemax; bt->mode = mode; - // setup cache mapping + if( cacheblk < bt->page_size ) + cacheblk = bt->page_size; - if( cacheblk ) { - if( cacheblk < bt->page_size ) - cacheblk = bt->page_size; + // mask for partial memmaps - bt->hashsize = nodemax / 8; - bt->hashmask = (cacheblk >> bits) - 1; - bt->mapped_io = 1; - } + bt->hashmask = (cacheblk >> bits) - 1; - // requested number of pages per memmap segment + // see if requested size of pages per memmap is greater - if( cacheblk ) - if( (1 << pgblk) > bt->hashmask ) - bt->hashmask = (1 << pgblk) - 1; + if( (1 << segsize) > bt->hashmask ) + bt->hashmask = (1 << segsize) - 1; bt->seg_bits = 0; while( (1 << bt->seg_bits) <= bt->hashmask ) bt->seg_bits++; + bt->hashsize = hashsize; + + if( bt->nodemax = nodemax++ ) { #ifdef unix - bt->mem = malloc (8 *bt->page_size); - bt->cache = calloc (bt->hashsize, sizeof(ushort)); + bt->nodes = calloc (nodemax, sizeof(BtHash)); + bt->cache = calloc (hashsize, sizeof(ushort)); #else - bt->mem = VirtualAlloc(NULL, 8 * bt->page_size, MEM_COMMIT, PAGE_READWRITE); - bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, bt->hashsize * sizeof(ushort)); + bt->nodes = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, nodemax * sizeof(BtHash)); + bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort)); #endif - bt->frame = (BtPage)bt->mem; - bt->cursor = (BtPage)(bt->mem + bt->page_size); - bt->page = (BtPage)(bt->mem + 2 * bt->page_size); - bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size); - bt->temp = (BtPage)(bt->mem + 4 * bt->page_size); - bt->temp2 = (BtPage)(bt->mem + 5 * bt->page_size); - bt->parent = (BtPage)(bt->mem + 6 * bt->page_size); - bt->zero = (BtPage)(bt->mem + 7 * bt->page_size); + bt->mapped_io = 1; + } if( size || *amt ) { - if ( bt_unlockpage(bt, ALLOC_page, lockmode) ) - return bt_close (bt), NULL; - - return bt; + goto btlatch; } - // initializes an empty b-tree with root page and page of leaves + // initialize an empty b-tree with latch page, root page, page of leaves + // and page(s) of latches + + memset (latchmgr, 0, 1 << bits); + + nlatchpage = BT_latchtable; + if( nlatchpage > nodemax ) + nlatchpage = nodemax; + nlatchpage *= sizeof(BtLatchSet); + nlatchpage += bt->page_size - 1; + nlatchpage /= bt->page_size; + + bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage); + latchmgr->alloc->bits = bt->page_bits; + + latchmgr->nlatchpage = nlatchpage; + latchmgr->latchtotal = nlatchpage * bt->page_size / sizeof(BtLatchSet); + + // initialize latch manager - memset (bt->alloc, 0, bt->page_size); - bt_putid(bt->alloc->right, MIN_lvl+1); - bt->alloc->bits = bt->page_bits; + latchhash = (bt->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry); + + // size of hash table = total number of latchsets + + if( latchhash > latchmgr->latchtotal ) + latchhash = latchmgr->latchtotal; + + latchmgr->latchhash = latchhash; #ifdef unix - if( write (bt->idx, bt->alloc, bt->page_size) < bt->page_size ) + if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size ) return bt_close (bt), NULL; #else - if( !WriteFile (bt->idx, (char *)bt->alloc, bt->page_size, amt, NULL) ) + if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) ) return bt_close (bt), NULL; if( *amt < bt->page_size ) return bt_close (bt), NULL; #endif - memset (bt->frame, 0, bt->page_size); - bt->frame->bits = bt->page_bits; - bt->frame->posted = 1; + memset (latchmgr, 0, 1 << bits); + latchmgr->alloc->bits = bt->page_bits; for( lvl=MIN_lvl; lvl--; ) { - slotptr(bt->frame, 1)->off = offsetof(struct BtPage_, fence); - bt_putid(slotptr(bt->frame, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0); // next(lower) page number - bt->frame->fence[0] = 2; - bt->frame->fence[1] = 0xff; - bt->frame->fence[2] = 0xff; - bt->frame->min = bt->page_size; - bt->frame->lvl = lvl; - bt->frame->cnt = 1; - bt->frame->act = 1; + slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3; + bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0); // next(lower) page number + key = keyptr(latchmgr->alloc, 1); + key->len = 2; // create stopper key + key->key[0] = 0xff; + key->key[1] = 0xff; + latchmgr->alloc->min = bt->page_size - 3; + latchmgr->alloc->lvl = lvl; + latchmgr->alloc->cnt = 1; + latchmgr->alloc->act = 1; #ifdef unix - if( write (bt->idx, bt->frame, bt->page_size) < bt->page_size ) + if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size ) return bt_close (bt), NULL; #else - if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) ) + if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) ) return bt_close (bt), NULL; if( *amt < bt->page_size ) @@ -636,41 +892,190 @@ SYSTEM_INFO sysinfo[1]; #endif } - // create empty page area by writing last page of first - // cache area (other pages are zeroed by O/S) + // clear out latch manager locks + // and rest of pages to round out segment - if( bt->mapped_io && bt->hashmask ) { - memset(bt->frame, 0, bt->page_size); - last = bt->hashmask; + memset(latchmgr, 0, bt->page_size); + last = MIN_lvl + 1; - while( last < MIN_lvl + 1 ) - last += bt->hashmask + 1; + while( last <= ((MIN_lvl + 1 + nlatchpage) | bt->hashmask) ) { #ifdef unix - pwrite(bt->idx, bt->frame, bt->page_size, last << bt->page_bits); + pwrite(bt->idx, latchmgr, bt->page_size, last << bt->page_bits); #else SetFilePointer (bt->idx, last << bt->page_bits, NULL, FILE_BEGIN); - if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) ) + if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) ) return bt_close (bt), NULL; if( *amt < bt->page_size ) return bt_close (bt), NULL; #endif + last++; } - if( bt_unlockpage(bt, ALLOC_page, lockmode) ) +btlatch: +#ifdef unix + lock->l_type = F_UNLCK; + if( fcntl (bt->idx, F_SETLK, lock) < 0 ) return bt_close (bt), NULL; +#else + if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) ) + return bt_close (bt), NULL; +#endif +#ifdef unix + flag = PROT_READ | PROT_WRITE; + bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size); + if( bt->latchmgr == MAP_FAILED ) + return bt_close (bt), NULL; + bt->latchsets = (BtLatchSet *)mmap (0, bt->latchmgr->nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size); + if( bt->latchsets == MAP_FAILED ) + return bt_close (bt), NULL; +#else + flag = PAGE_READWRITE; + bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, (BT_latchtable / (bt->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * bt->page_size, NULL); + if( !bt->halloc ) + return bt_close (bt), NULL; + + flag = FILE_MAP_WRITE; + bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, (BT_latchtable / (bt->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * bt->page_size); + if( !bt->latchmgr ) + return GetLastError(), bt_close (bt), NULL; + + bt->latchsets = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size); +#endif + +#ifdef unix + free (latchmgr); +#else + VirtualFree (latchmgr, 0, MEM_RELEASE); +#endif + +#ifdef unix + bt->mem = malloc (6 * bt->page_size); +#else + bt->mem = VirtualAlloc(NULL, 6 * bt->page_size, MEM_COMMIT, PAGE_READWRITE); +#endif + bt->frame = (BtPage)bt->mem; + bt->cursor = (BtPage)(bt->mem + bt->page_size); + bt->page = (BtPage)(bt->mem + 2 * bt->page_size); + bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size); + bt->temp = (BtPage)(bt->mem + 4 * bt->page_size); + bt->zero = (BtPage)(bt->mem + 5 * bt->page_size); + memset (bt->zero, 0, bt->page_size); return bt; } -// compare two keys, returning > 0, = 0, or < 0 -// as the comparison value +// place write, read, or parent lock on requested page_no. -int keycmp (BtKey key1, unsigned char *key2, uint len2) +void bt_lockpage(BtLock mode, BtLatchSet *latch) { -uint len1 = key1->len; -int ans; - - if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) ) + switch( mode ) { + case BtLockRead: + bt_spinreadlock (latch->readwr); + break; + case BtLockWrite: + bt_spinwritelock (latch->readwr); + break; + case BtLockAccess: + bt_spinreadlock (latch->access); + break; + case BtLockDelete: + bt_spinwritelock (latch->access); + break; + case BtLockParent: + bt_spinwritelock (latch->parent); + break; + } +} + +// remove write, read, or parent lock on requested page + +void bt_unlockpage(BtLock mode, BtLatchSet *latch) +{ + switch( mode ) { + case BtLockRead: + bt_spinreleaseread (latch->readwr); + break; + case BtLockWrite: + bt_spinreleasewrite (latch->readwr); + break; + case BtLockAccess: + bt_spinreleaseread (latch->access); + break; + case BtLockDelete: + bt_spinreleasewrite (latch->access); + break; + case BtLockParent: + bt_spinreleasewrite (latch->parent); + break; + } +} + +// allocate a new page and write page into it + +uid bt_newpage(BtDb *bt, BtPage page) +{ +uid new_page; +int reuse; + + // lock allocation page + + bt_spinwritelock(bt->latchmgr->lock); + + // use empty chain first + // else allocate empty page + + if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) { + if( bt_mappage (bt, &bt->temp, new_page) ) + return 0; + bt_putid(bt->latchmgr->alloc[1].right, bt_getid(bt->temp->right)); + reuse = 1; + } else { + new_page = bt_getid(bt->latchmgr->alloc->right); + bt_putid(bt->latchmgr->alloc->right, new_page+1); + reuse = 0; + } + + bt_spinreleasewrite(bt->latchmgr->lock); + + if( !bt->mapped_io ) + if( bt_update(bt, page, new_page) ) + return 0; //don't unlock on error + else + return new_page; + +#ifdef unix + if( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size ) + return bt->err = BTERR_wrt, 0; + + // if writing first page of pool block, zero last page in the block + + if( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 ) + { + // use zero buffer to write zeros + if( pwrite(bt->idx,bt->zero, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size ) + return bt->err = BTERR_wrt, 0; + } +#else + // bring new page into pool and copy page. + // this will extend the file into the new pages. + + if( bt_mappage (bt, &bt->temp, new_page) ) + return 0; + + memcpy(bt->temp, page, bt->page_size); +#endif + return new_page; +} + +// compare two keys, returning > 0, = 0, or < 0 +// as the comparison value + +int keycmp (BtKey key1, unsigned char *key2, uint len2) +{ +uint len1 = key1->len; +int ans; + + if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) ) return ans; if( len1 > len2 ) @@ -689,12 +1094,12 @@ BTERR bt_update (BtDb *bt, BtPage page, uid page_no) off64_t off = page_no << bt->page_bits; #ifdef unix - if ( !bt->mapped_io ) - if ( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size ) + if( !bt->mapped_io ) + if( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size ) return bt->err = BTERR_wrt; #else uint amt[1]; - if ( !bt->mapped_io ) + if( !bt->mapped_io ) { SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN); if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) ) @@ -703,7 +1108,7 @@ uint amt[1]; if( *amt < bt->page_size ) return GetLastError(), bt->err = BTERR_wrt; } - else if ( bt->mode == BT_fl ) { + else if( bt->mode == BT_fl ) { FlushViewOfFile(page, bt->page_size); FlushFileBuffers(bt->idx); } @@ -775,12 +1180,12 @@ BtHash *hash; BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no) { int flag; -off64_t off = (page_no & ~bt->hashmask) << bt->page_bits; +off64_t off = (page_no & ~(uid)bt->hashmask) << bt->page_bits; off64_t limit = off + ((bt->hashmask+1) << bt->page_bits); BtHash *node; memset(hash, 0, sizeof(BtHash)); - hash->page_no = (page_no & ~bt->hashmask); + hash->page_no = (page_no & ~(uid)bt->hashmask); bt_linkhash(bt, hash, page_no); if( node = hash->lrunext = bt->lrufirst ) @@ -860,7 +1265,7 @@ BtPage page; #ifdef unix munmap (hash->page, (bt->hashmask+1) << bt->page_bits); #else - FlushViewOfFile(hash->page, 0); +// FlushViewOfFile(hash->page, 0); UnmapViewOfFile(hash->page); CloseHandle(hash->hmap); #endif @@ -891,7 +1296,7 @@ int amt[1]; return bt->err; } #ifdef unix - if ( pread(bt->idx, *page, bt->page_size, off) < bt->page_size ) + if( pread(bt->idx, *page, bt->page_size, off) < bt->page_size ) return bt->err = BTERR_map; #else SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN); @@ -905,841 +1310,389 @@ int amt[1]; return 0; } -// allocate a new page and write page into it - -uid bt_newpage(BtDb *bt, BtPage page) -{ -uid new_page; -char *pmap; -int reuse; - - // lock page zero - - if ( bt_lockpage(bt, ALLOC_page, BtLockWrite) ) - return 0; - - if( bt_mappage (bt, &bt->alloc, ALLOC_page) ) - return 0; - - // use empty chain first - // else allocate empty page - - if( new_page = bt_getid(bt->alloc[1].right) ) { - if( bt_mappage (bt, &bt->temp, new_page) ) - return 0; // don't unlock on error - memcpy(bt->alloc[1].right, bt->temp->right, BtId); - reuse = 1; - } else { - new_page = bt_getid(bt->alloc->right); - bt_putid(bt->alloc->right, new_page+1); - reuse = 0; - } - - if( bt_update(bt, bt->alloc, ALLOC_page) ) - return 0; // don't unlock on error - - if( !bt->mapped_io ) { - if( bt_update(bt, page, new_page) ) - return 0; //don't unlock on error - - // unlock page zero - - if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) ) - return 0; - - return new_page; - } - -#ifdef unix - if ( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size ) - return bt->err = BTERR_wrt, 0; - - // if writing first page of hash block, zero last page in the block - - if ( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 ) - { - // use temp buffer to write zeros - memset(bt->zero, 0, bt->page_size); - if ( pwrite(bt->idx,bt->zero, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size ) - return bt->err = BTERR_wrt, 0; - } -#else - // bring new page into page-cache and copy page. - // this will extend the file into the new pages. - - if( !(pmap = (char*)bt_hashpage(bt, new_page & ~bt->hashmask)) ) - return 0; - - memcpy(pmap+((new_page & bt->hashmask) << bt->page_bits), page, bt->page_size); -#endif - - // unlock page zero - - if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) ) - return 0; - - return new_page; -} - -// find slot in page for given key at a given level -// return 0 if beyond fence value - -int bt_findslot (BtPageSet *set, unsigned char *key, uint len) -{ -uint diff, higher = set->page->cnt, low = 1, slot; - - // make stopper key an infinite fence value - - if( bt_getid (set->page->right) ) - higher++; - - // low is the lowest candidate, higher is already - // tested as .ge. the given key, loop ends when they meet - - while( diff = higher - low ) { - slot = low + ( diff >> 1 ); - if( keycmp (keyptr(set->page, slot), key, len) < 0 ) - low = slot + 1; - else - higher = slot; - } - - if( higher <= set->page->cnt ) - return higher; - - // if leaf page, compare against fence value - - // return zero if key is on right link page - // or return slot beyond last key - - if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 ) - return 0; - - return higher; -} - -// find and load page at given level for given key -// leave page rd or wr locked as requested +// deallocate a deleted page +// place on free chain out of allocator page +// call with page latched for Writing and Deleting -int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock) +BTERR bt_freepage(BtDb *bt, uid page_no, BtPage page, BtLatchSet *latch) { -uid page_no = ROOT_page, prevpage = 0; -uint drill = 0xff, slot; -uint mode, prevmode; - - // start at root of btree and drill down - - do { - // determine lock mode of drill level - mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; - - set->page_no = page_no; - - // obtain access lock using lock chaining - - if( page_no > ROOT_page ) - if( bt_lockpage(bt, page_no, BtLockAccess) ) - return 0; - - if( prevpage ) - if( bt_unlockpage(bt, prevpage, prevmode) ) - return 0; - - // obtain read lock using lock chaining - - if( bt_lockpage(bt, page_no, mode) ) - return 0; - - if( page_no > ROOT_page ) - if( bt_unlockpage(bt, page_no, BtLockAccess) ) - return 0; - - // map/obtain page contents - - if( bt_mappage (bt, &set->page, page_no) ) - return 0; - - // re-read and re-lock root after determining actual level of root - - if( set->page->lvl != drill) { - if ( page_no != ROOT_page ) - return bt->err = BTERR_struct, 0; - - drill = set->page->lvl; - - if( lock == BtLockWrite && drill == lvl ) - if( bt_unlockpage(bt, page_no, mode) ) - return 0; - else - continue; - } - - prevpage = page_no; - prevmode = mode; - - // if page is being deleted and we should continue right - - if( set->page->kill && set->page->goright ) { - page_no = bt_getid (set->page->right); - continue; - } - - // otherwise, wait for deleted node to clear - - if( set->page->kill ) { - if( bt_unlockpage(bt, set->page_no, mode) ) - return bt->err; - page_no = ROOT_page; - prevpage = 0; - drill = 0xff; -#ifdef unix - sched_yield(); -#else - SwitchToThread(); -#endif - continue; - } - - // find key on page at this level - // and descend to requested level - - if( slot = bt_findslot (set, key, len) ) { - if( drill == lvl ) - return slot; - - if( slot > set->page->cnt ) - return bt->err = BTERR_struct, 0; - - // if drilling down, find next active key - - while( slotptr(set->page, slot)->dead ) - if( slot++ < set->page->cnt ) - continue; - else - return bt->err = BTERR_struct, 0; - - page_no = bt_getid(slotptr(set->page, slot)->id); - drill--; - continue; - } - - // or slide right into next page - // (slide left from deleted page) - - page_no = bt_getid(set->page->right); - - } while( page_no ); - - // return error on end of right chain - - bt->err = BTERR_struct; - return 0; // return error -} - -// drill down fixing fence values for left sibling tree - -// call with set write locked mapped to bt->temp -// return with set unlocked & unpinned. - -BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence) -{ -unsigned char oldfence[256]; -BtPageSet next[1]; -uid right; -int chk; - - next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id); - memcpy (oldfence, set->page->fence, 256); - next->page = bt->temp2; - bt->temp2 = bt->temp; - bt->temp = next->page; - - while( !set->page->kill && set->page->lvl ) { - if( bt_lockpage (bt, next->page_no, BtLockParent) ) - return bt->err; - if( bt_lockpage (bt, next->page_no, BtLockAccess) ) - return bt->err; - if( bt_lockpage (bt, next->page_no, BtLockWrite) ) - return bt->err; - if( bt_unlockpage (bt, next->page_no, BtLockAccess) ) - return bt->err; - - if( bt_mappage (bt, &next->page, next->page_no) ) - return bt->err; - - chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence); - - if( chk < 0 ) { - right = bt_getid (next->page->right); - if( bt_unlockpage (bt, next->page_no, BtLockWrite) ) + if( bt_mappage (bt, &page, page_no) ) return bt->err; - if( bt_unlockpage (bt, next->page_no, BtLockParent) ) - return bt->err; - next->page_no = right; - continue; - } - if( chk > 0 ) - return bt->err = BTERR_struct; + // lock allocation page - if( bt_fixfences (bt, next, newfence) ) - return bt->err; - - break; - } - - set->page = bt->temp; - - if( bt_mappage (bt, &set->page, set->page_no) ) - return bt->err; - - memcpy (set->page->fence, newfence, 256); - - if( bt_update(bt, set->page, set->page_no) ) - return bt->err; - if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) - return bt->err; - if( bt_unlockpage (bt, set->page_no, BtLockParent) ) - return bt->err; - return 0; -} - - -// return page to free list -// page must be delete & write locked - -BTERR bt_freepage (BtDb *bt, BtPageSet *set) -{ - // lock & map allocation page - - if( bt_lockpage (bt, ALLOC_page, BtLockWrite) ) - return bt->err; - - if( bt_mappage (bt, &bt->alloc, ALLOC_page) ) - return bt->err; + bt_spinwritelock (bt->latchmgr->lock); // store chain in second right - bt_putid(set->page->right, bt_getid(bt->alloc[1].right)); - bt_putid(bt->alloc[1].right, set->page_no); - set->page->free = 1; + bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right)); + bt_putid(bt->latchmgr->alloc[1].right, page_no); + page->free = 1; - if( bt_update(bt, bt->alloc, ALLOC_page) ) - return bt->err; - if( bt_update(bt, set->page, set->page_no) ) + if( bt_update(bt, page, page_no) ) return bt->err; - // unlock page zero + // unlock released page - if( bt_unlockpage(bt, ALLOC_page, BtLockWrite) ) - return bt->err; - - // remove write lock on deleted node - - if( bt_unlockpage(bt, set->page_no, BtLockWrite) ) - return bt->err; - - return bt_unlockpage (bt, set->page_no, BtLockDelete); -} - -// remove the root level by promoting its only child - -BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child) -{ -uid next = 0; - - do { - if( next ) { - if( bt_lockpage (bt, next, BtLockDelete) ) - return bt->err; - if( bt_lockpage (bt, next, BtLockWrite) ) - return bt->err; - - if( bt_mappage (bt, &child->page, next) ) - return bt->err; - - child->page_no = next; - } - - memcpy (root->page, child->page, bt->page_size); - next = bt_getid (slotptr(child->page, child->page->cnt)->id); - - if( bt_freepage (bt, child) ) - return bt->err; - } while( root->page->lvl > 1 && root->page->cnt == 1 ); - - if( bt_update (bt, root->page, ROOT_page) ) - return bt->err; + bt_unlockpage (BtLockDelete, latch); + bt_unlockpage (BtLockWrite, latch); + bt_unpinlatch (latch); - return bt_unlockpage (bt, ROOT_page, BtLockWrite); -} - -// pull right page over ourselves in simple merge - -BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx) -{ - // install ourselves as child page - // and delete ourselves from parent - - bt_putid (slotptr(parent->page, idx)->id, set->page_no); - slotptr(parent->page, slot)->dead = 1; - parent->page->act--; - - // collapse any empty slots - - while( idx = parent->page->cnt - 1 ) - if( slotptr(parent->page, idx)->dead ) { - *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1); - memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot)); - } else - break; - - memcpy (set->page, right->page, bt->page_size); - - if( bt_unlockpage (bt, right->page_no, BtLockParent) ) - return bt->err; - - if( bt_freepage (bt, right) ) - return bt->err; - - // do we need to remove a btree level? - // (leave the first page of leaves alone) - - if( parent->page_no == ROOT_page && parent->page->cnt == 1 ) - if( set->page->lvl ) - return bt_removeroot (bt, parent, set); - - if( bt_update (bt, parent->page, parent->page_no) ) - return bt->err; - - if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) - return bt->err; - - if( bt_update (bt, set->page, set->page_no) ) - return bt->err; - - if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) - return bt->err; - - if( bt_unlockpage (bt, set->page_no, BtLockDelete) ) - return bt->err; + // unlock allocation page + bt_spinreleasewrite (bt->latchmgr->lock); return 0; } -// remove both child and parent from the btree -// from the fence position in the parent - -BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet *right, BtPageSet *rparent, uint lvl) -{ -unsigned char pagefence[256]; -uint idx; - - // pull right sibling over ourselves and unlock - - memcpy (child->page, right->page, bt->page_size); - - if( bt_update(bt, child->page, child->page_no) ) - return bt->err; - - if( bt_unlockpage (bt, child->page_no, BtLockWrite) ) - return bt->err; - - // install ourselves into right link of old right page - - bt_putid (right->page->right, child->page_no); - right->page->goright = 1; // tell bt_loadpage to go right to us - right->page->kill = 1; - - if( bt_update(bt, right->page, right->page_no) ) - return bt->err; - - if( bt_unlockpage (bt, right->page_no, BtLockWrite) ) - return bt->err; - - // remove our slot from our parent - // signal to move right - - parent->page->goright = 1; // tell bt_findslot to go right to rparent - parent->page->kill = 1; - parent->page->act--; - - // redirect right page pointer in right parent to us - - for( idx = 0; idx++ < rparent->page->cnt; ) - if( !slotptr(rparent->page, idx)->dead ) - break; - - if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no ) - return bt->err = BTERR_struct; - - bt_putid (slotptr(rparent->page, idx)->id, child->page_no); - - if( bt_update (bt, rparent->page, rparent->page_no) ) - return bt->err; - - if( bt_unlockpage (bt, rparent->page_no, BtLockWrite) ) - return bt->err; - - // save parent page fence value - - memcpy (pagefence, parent->page->fence, 256); - - if( bt_update (bt, parent->page, parent->page_no) ) - return bt->err; - if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) - return bt->err; - - return bt_removepage (bt, parent->page_no, lvl, pagefence); -} - -// remove page from btree -// call with page unlocked -// returns with page on free list +// find slot in page for given key at a given level -BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence) +int bt_findslot (BtDb *bt, unsigned char *key, uint len) { -BtPageSet parent[1], rparent[1], sibling[1], set[1]; -unsigned char newfence[256]; -uint slot, idx; -BtKey ptr; - - parent->page = bt->parent; - set->page_no = page_no; - set->page = bt->page; +uint diff, higher = bt->page->cnt, low = 1, slot; +uint good = 0; - // load and lock our parent + // make stopper key an infinite fence value - while( 1 ) { - if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) ) - return bt->err; + if( bt_getid (bt->page->right) ) + higher++; + else + good++; - // wait until we are posted in our parent + // low is the lowest candidate, higher is already + // tested as .ge. the given key, loop ends when they meet - if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) { - if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) - return bt->err; -#ifdef unix - sched_yield(); -#else - SwitchToThread(); -#endif - continue; + while( diff = higher - low ) { + slot = low + ( diff >> 1 ); + if( keycmp (keyptr(bt->page, slot), key, len) < 0 ) + low = slot + 1; + else + higher = slot, good++; } - // can we do a simple merge entirely - // between siblings on the parent page? - - if( slot < parent->page->cnt ) { - // find our right neighbor - // right must exist because the stopper prevents - // the rightmost page from deleting + // return zero if key is on right link page - for( idx = slot; idx++ < parent->page->cnt; ) - if( !slotptr(parent->page, idx)->dead ) - break; + return good ? higher : 0; +} - sibling->page_no = bt_getid (slotptr (parent->page, idx)->id); +// find and load page at given level for given key +// leave page rd or wr locked as requested - if( bt_lockpage (bt, set->page_no, BtLockDelete) ) - return bt->err; +int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock) +{ +uid page_no = ROOT_page, prevpage = 0; +uint drill = 0xff, slot; +BtLatchSet *prevlatch; +uint mode, prevmode; - if( bt_lockpage (bt, set->page_no, BtLockWrite) ) - return bt->err; + // start at root of btree and drill down - if( bt_mappage (bt, &set->page, set->page_no) ) - return bt->err; + do { + // determine lock mode of drill level + mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; - // merge right if sibling shows up in - // our parent and is not being killed + bt->latch = bt_pinlatch(bt, page_no); + bt->page_no = page_no; - if( sibling->page_no == bt_getid (set->page->right) ) { - if( bt_lockpage (bt, sibling->page_no, BtLockParent) ) - return bt->err; + // obtain access lock using lock chaining - if( bt_lockpage (bt, sibling->page_no, BtLockDelete) ) - return bt->err; + if( page_no > ROOT_page ) + bt_lockpage(BtLockAccess, bt->latch); - if( bt_lockpage (bt, sibling->page_no, BtLockWrite) ) - return bt->err; + if( prevpage ) { + bt_unlockpage(prevmode, prevlatch); + bt_unpinlatch(prevlatch); + prevpage = 0; + } - sibling->page = bt->temp; + // obtain read lock using lock chaining - if( bt_mappage (bt, &sibling->page, sibling->page_no) ) - return bt->err; + bt_lockpage(mode, bt->latch); - if( !sibling->page->kill ) - return bt_mergeright(bt, set, parent, sibling, slot, idx); + if( page_no > ROOT_page ) + bt_unlockpage(BtLockAccess, bt->latch); - // try again later + // map/obtain page contents - if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) ) - return bt->err; - } + if( bt_mappage (bt, &bt->page, page_no) ) + return 0; - if( bt_unlockpage (bt, set->page_no, BtLockDelete) ) - return bt->err; + // re-read and re-lock root after determining actual level of root - if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) - return bt->err; + if( bt->page->lvl != drill) { + if( bt->page_no != ROOT_page ) + return bt->err = BTERR_struct, 0; + + drill = bt->page->lvl; - if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) - return bt->err; -#ifdef linux - sched_yield (); -#else - SwitchToThread(); -#endif - continue; + if( lock != BtLockRead && drill == lvl ) { + bt_unlockpage(mode, bt->latch); + bt_unpinlatch(bt->latch); + continue; + } } - // find our left neighbor in our parent page + prevpage = bt->page_no; + prevlatch = bt->latch; + prevmode = mode; + + // find key on page at this level + // and descend to requested level - for( idx = slot; --idx; ) - if( !slotptr(parent->page, idx)->dead ) - break; + if( !bt->page->kill ) + if( slot = bt_findslot (bt, key, len) ) { + if( drill == lvl ) + return slot; + + while( slotptr(bt->page, slot)->dead ) + if( slot++ < bt->page->cnt ) + continue; + else + goto slideright; - // if no left neighbor, delete ourselves and our parent + page_no = bt_getid(slotptr(bt->page, slot)->id); + drill--; + continue; + } - if( !idx ) { - if( bt_lockpage (bt, set->page_no, BtLockAccess) ) - return bt->err; + // or slide right into next page - if( bt_lockpage (bt, set->page_no, BtLockWrite) ) - return bt->err; +slideright: + page_no = bt_getid(bt->page->right); - if( bt_unlockpage (bt, set->page_no, BtLockAccess) ) - return bt->err; + } while( page_no ); - if( bt_mappage (bt, &set->page, set->page_no) ) - return bt->err; + // return error on end of right chain - rparent->page_no = bt_getid (parent->page->right); - rparent->page = bt->temp; + bt->err = BTERR_eof; + return 0; // return error +} - if( bt_lockpage (bt, rparent->page_no, BtLockAccess) ) - return bt->err; +// a fence key was deleted from a page +// push new fence value upwards - if( bt_lockpage (bt, rparent->page_no, BtLockWrite) ) - return bt->err; +BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl) +{ +unsigned char leftkey[256], rightkey[256]; +BtLatchSet *latch = bt->latch; +BtKey ptr; - if( bt_unlockpage (bt, rparent->page_no, BtLockAccess) ) - return bt->err; + // remove deleted key, the old fence value - if( bt_mappage (bt, &rparent->page, rparent->page_no) ) - return bt->err; + ptr = keyptr(bt->page, bt->page->cnt); + memcpy(rightkey, ptr, ptr->len + 1); - if( !rparent->page->kill ) { - sibling->page_no = bt_getid (set->page->right); + memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot)); + bt->page->dirty = 1; - if( bt_lockpage (bt, sibling->page_no, BtLockAccess) ) - return bt->err; + ptr = keyptr(bt->page, bt->page->cnt); + memcpy(leftkey, ptr, ptr->len + 1); - if( bt_lockpage (bt, sibling->page_no, BtLockWrite) ) - return bt->err; + if( bt_update (bt, bt->page, page_no) ) + return bt->err; - if( bt_unlockpage (bt, sibling->page_no, BtLockAccess) ) - return bt->err; + bt_lockpage (BtLockParent, latch); + bt_unlockpage (BtLockWrite, latch); - sibling->page = bt->temp2; + // insert new (now smaller) fence key - if( bt_mappage (bt, &sibling->page, sibling->page_no) ) - return bt->err; + if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) ) + return bt->err; - if( !sibling->page->kill ) - return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1); + // remove old (larger) fence key - // try again later + if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) ) + return bt->err; - if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) ) - return bt->err; - } + bt_unlockpage (BtLockParent, latch); + bt_unpinlatch (latch); + return 0; +} - if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) - return bt->err; +// root has a single child +// collapse a level from the btree +// call with root locked in bt->page - if( bt_unlockpage (bt, rparent->page_no, BtLockWrite) ) - return bt->err; +BTERR bt_collapseroot (BtDb *bt, BtPage root) +{ +BtLatchSet *latch; +uid child; +uint idx; - if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) - return bt->err; -#ifdef linux - sched_yield(); -#else - SwitchToThread(); -#endif - continue; - } + // find the child entry + // and promote to new root - // redirect parent to our left sibling - // lock and map our left sibling's page + do { + for( idx = 0; idx++ < root->cnt; ) + if( !slotptr(root, idx)->dead ) + break; - sibling->page_no = bt_getid (slotptr(parent->page, idx)->id); - sibling->page = bt->temp; + child = bt_getid (slotptr(root, idx)->id); + latch = bt_pinlatch (bt, child); - // wait our turn on fence key maintenance + bt_lockpage (BtLockDelete, latch); + bt_lockpage (BtLockWrite, latch); - if( bt_lockpage(bt, sibling->page_no, BtLockParent) ) + if( bt_mappage (bt, &bt->temp, child) ) return bt->err; - if( bt_lockpage(bt, sibling->page_no, BtLockAccess) ) - return bt->err; + memcpy (root, bt->temp, bt->page_size); - if( bt_lockpage(bt, sibling->page_no, BtLockWrite) ) + if( bt_update (bt, root, ROOT_page) ) return bt->err; - if( bt_unlockpage(bt, sibling->page_no, BtLockAccess) ) + if( bt_freepage (bt, child, bt->temp, latch) ) return bt->err; - if( bt_mappage (bt, &sibling->page, sibling->page_no) ) - return bt->err; + } while( root->lvl > 1 && root->act == 1 ); - // wait until sibling is in our parent + bt_unlockpage (BtLockWrite, bt->latch); + bt_unpinlatch (bt->latch); + return 0; +} - if( bt_getid (sibling->page->right) != set->page_no ) { - if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) - return bt->err; - if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) ) - return bt->err; - if( bt_unlockpage (bt, sibling->page_no, BtLockParent) ) - return bt->err; -#ifdef linux - sched_yield(); -#else - SwitchToThread(); -#endif - continue; - } +// find and delete key on page by marking delete flag bit +// when page becomes empty, delete it - // map page being killed +BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl) +{ +unsigned char lowerkey[256], higherkey[256]; +uint slot, dirty = 0, idx, fence, found; +BtLatchSet *latch, *rlatch; +uid page_no, right; +BtKey ptr; - if( bt_lockpage (bt, set->page_no, BtLockDelete) ) + if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) + ptr = keyptr(bt->page, slot); + else return bt->err; - if( bt_lockpage (bt, set->page_no, BtLockWrite) ) - return bt->err; + // are we deleting a fence slot? - if( bt_mappage (bt, &set->page, set->page_no) ) - return bt->err; + fence = slot == bt->page->cnt; + + // if key is found delete it, otherwise ignore request - // delete our left sibling from parent + if( found = !keycmp (ptr, key, len) ) + if( found = slotptr(bt->page, slot)->dead == 0 ) { + dirty = slotptr(bt->page,slot)->dead = 1; + bt->page->dirty = 1; + bt->page->act--; - slotptr(parent->page,idx)->dead = 1; - parent->page->dirty = 1; - parent->page->act--; + // collapse empty slots - // redirect our parent slot to our left sibling + while( idx = bt->page->cnt - 1 ) + if( slotptr(bt->page, idx)->dead ) { + *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1); + memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot)); + } else + break; + } - bt_putid (slotptr(parent->page, slot)->id, sibling->page_no); - memcpy (sibling->page->right, set->page->right, BtId); + right = bt_getid(bt->page->right); + page_no = bt->page_no; + latch = bt->latch; - if( bt_update (bt, sibling->page, sibling->page_no) ) - return bt->err; + if( !dirty ) { + if( lvl ) + return bt_abort (bt, bt->page, page_no, BTERR_notfound); + bt_unlockpage(BtLockWrite, latch); + bt_unpinlatch (latch); + return bt->found = found, 0; + } - // collapse dead slots from parent + // did we delete a fence key in an upper level? - while( idx = parent->page->cnt - 1 ) - if( slotptr(parent->page, idx)->dead ) { - *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt); - memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot)); - } else - break; + if( lvl && bt->page->act && fence ) + if( bt_fixfence (bt, page_no, lvl) ) + return bt->err; + else + return bt->found = found, 0; - // update parent page + // is this a collapsed root? - if( bt_update (bt, parent->page, parent->page_no) ) + if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 ) + if( bt_collapseroot (bt, bt->page) ) return bt->err; + else + return bt->found = found, 0; - // free our original page + // return if page is not empty - if( bt_freepage (bt, set) ) + if( bt->page->act ) { + if( bt_update(bt, bt->page, page_no) ) return bt->err; + bt_unlockpage(BtLockWrite, latch); + bt_unpinlatch (latch); + return bt->found = found, 0; + } - // go down the left node's fence keys to the leaf level - // and update the fence keys in each page + // cache copy of fence key + // in order to find parent - memcpy (newfence, parent->page->fence, 256); + ptr = keyptr(bt->page, bt->page->cnt); + memcpy(lowerkey, ptr, ptr->len + 1); - if( bt_fixfences (bt, sibling, newfence) ) - return bt->err; + // obtain lock on right page - // promote sibling as new root? + rlatch = bt_pinlatch (bt, right); + bt_lockpage(BtLockWrite, rlatch); - if( parent->page_no == ROOT_page && parent->page->cnt == 1 ) - if( sibling->page->lvl ) { - if( bt_lockpage (bt, sibling->page_no, BtLockDelete) ) + if( bt_mappage (bt, &bt->temp, right) ) return bt->err; - if( bt_lockpage (bt, sibling->page_no, BtLockWrite) ) - return bt->err; + if( bt->temp->kill ) { + bt_abort(bt, bt->temp, right, 0); + return bt_abort(bt, bt->page, bt->page_no, BTERR_kill); + } - if( bt_mappage (bt, &sibling->page, set->page_no) ) - return bt->err; + // pull contents of next page into current empty page - return bt_removeroot (bt, parent, sibling); - } + memcpy (bt->page, bt->temp, bt->page_size); - return bt_unlockpage (bt, parent->page_no, BtLockWrite); - } -} + // cache copy of key to update -// find and delete key on page by marking delete flag bit -// when page becomes empty, delete it + ptr = keyptr(bt->temp, bt->temp->cnt); + memcpy(higherkey, ptr, ptr->len + 1); -BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len) -{ -unsigned char pagefence[256]; -uint slot, found, idx; -BtPageSet set[1]; -BtKey ptr; + // Mark right page as deleted and point it to left page + // until we can post updates at higher level. - set->page = bt->page; + bt_putid(bt->temp->right, page_no); + bt->temp->kill = 1; - if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) ) - ptr = keyptr(set->page, slot); - else + if( bt_update(bt, bt->page, page_no) ) return bt->err; - // if key is found delete it, otherwise ignore request - - if( found = slot <= set->page->cnt ) - if( found = !keycmp (ptr, key, len) ) - if( found = slotptr(set->page, slot)->dead == 0 ) { - slotptr(set->page,slot)->dead = 1; - set->page->dirty = 1; - set->page->act--; - - // collapse empty slots - - while( idx = set->page->cnt - 1 ) - if( slotptr(set->page, idx)->dead ) { - *slotptr(set->page, idx) = *slotptr(set->page, idx + 1); - memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot)); - } else - break; - } + if( bt_update(bt, bt->temp, right) ) + return bt->err; - if( set->page->act ) { - if( bt_update(bt, set->page, set->page_no) ) - return bt->err; - bt->found = found; - return bt_unlockpage (bt, set->page_no, BtLockWrite); - } + bt_lockpage(BtLockParent, latch); + bt_unlockpage(BtLockWrite, latch); - // delete page when empty + bt_lockpage(BtLockParent, rlatch); + bt_unlockpage(BtLockWrite, rlatch); - memcpy (pagefence, set->page->fence, 256); - set->page->kill = 1; + // redirect higher key directly to consolidated node - if( bt_update(bt, set->page, set->page_no) ) + if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) ) return bt->err; - if( bt_unlockpage(bt, set->page_no, BtLockWrite) ) + // delete old lower key to consolidated node + + if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) ) return bt->err; - if( bt_removepage (bt, set->page_no, 0, pagefence) ) + // obtain write & delete lock on deleted node + // add right block to free chain + + bt_lockpage(BtLockDelete, rlatch); + bt_lockpage(BtLockWrite, rlatch); + bt_unlockpage(BtLockParent, rlatch); + + if( bt_freepage (bt, right, bt->temp, rlatch) ) return bt->err; - bt->found = found; + bt_unlockpage(BtLockParent, latch); + bt_unpinlatch(latch); return 0; } @@ -1747,43 +1700,42 @@ BtKey ptr; uid bt_findkey (BtDb *bt, unsigned char *key, uint len) { -BtPageSet set[1]; uint slot; -uid id = 0; BtKey ptr; +uid id; - set->page = bt->page; - - if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) ) - ptr = keyptr(set->page, slot); + if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) ) + ptr = keyptr(bt->page, slot); else return 0; // if key exists, return row-id // otherwise return 0 - if( slot <= set->page->cnt ) - if( !keycmp (ptr, key, len) ) - id = bt_getid(slotptr(set->page,slot)->id); - - if ( bt_unlockpage(bt, set->page_no, BtLockRead) ) - return 0; + if( ptr->len == len && !memcmp (ptr->key, key, len) ) + id = bt_getid(slotptr(bt->page,slot)->id); + else + id = 0; + bt_unlockpage (BtLockRead, bt->latch); + bt_unpinlatch (bt->latch); return id; } // check page for space available, // clean if necessary and return // 0 - page needs splitting -// >0 - new slot value +// >0 - go ahead with new slot -uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot) +uint bt_cleanpage(BtDb *bt, uint amt, uint slot) { -uint nxt = bt->page_size, off; +uint nxt = bt->page_size; +BtPage page = bt->page; uint cnt = 0, idx = 0; uint max = page->cnt; -uint newslot = max; +uint newslot = slot; BtKey key; +int ret; if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 ) return slot; @@ -1798,36 +1750,32 @@ BtKey key; // skip page info and set rest of page to zero memset (page+1, 0, bt->page_size - sizeof(*page)); - page->dirty = 0; page->act = 0; while( cnt++ < max ) { if( cnt == slot ) newslot = idx + 1; - if( slotptr(bt->frame,cnt)->dead ) + // always leave fence key in list + if( cnt < max && slotptr(bt->frame,cnt)->dead ) continue; - off = slotptr(bt->frame,cnt)->off; - // copy key - - if( off >= sizeof(*page) ) { - key = keyptr(bt->frame, cnt); - off = nxt -= key->len + 1; - memcpy ((unsigned char *)page + nxt, key, key->len + 1); - } + key = keyptr(bt->frame, cnt); + nxt -= key->len + 1; + memcpy ((unsigned char *)page + nxt, key, key->len + 1); // copy slot - memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId); + if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) ) + page->act++; slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod; - slotptr(page, idx)->off = off; - page->act++; + slotptr(page, idx)->off = nxt; } + page->min = nxt; page->cnt = idx; - if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 ) + if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 ) return newslot; return 0; @@ -1835,93 +1783,94 @@ BtKey key; // split the root and raise the height of the btree -BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2) +BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2) { -unsigned char leftkey[256]; uint nxt = bt->page_size; -uid new_page; +BtPage root = bt->page; +uid right; // Obtain an empty page to use, and copy the current - // root contents into it, e.g. lower keys - - memcpy (leftkey, root->page->fence, 256); - root->page->posted = 1; + // root contents into it - if( !(new_page = bt_newpage(bt, root->page)) ) + if( !(right = bt_newpage(bt, root)) ) return bt->err; // preserve the page info at the bottom - // of higher keys and set rest to zero + // and set rest to zero - memset(root->page+1, 0, bt->page_size - sizeof(*root->page)); - memset(root->page->fence, 0, 256); - root->page->fence[0] = 2; - root->page->fence[1] = 0xff; - root->page->fence[2] = 0xff; + memset(root+1, 0, bt->page_size - sizeof(*root)); - // insert new page fence key on newroot page + // insert first key on newroot page nxt -= *leftkey + 1; - memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1); - bt_putid(slotptr(root->page, 1)->id, new_page); - slotptr(root->page, 1)->off = nxt; + memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1); + bt_putid(slotptr(root, 1)->id, right); + slotptr(root, 1)->off = nxt; - // insert stopper key on newroot page + // insert second key on newroot page // and increase the root height - bt_putid(slotptr(root->page, 2)->id, page_no2); - slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence); + nxt -= 3; + ((unsigned char *)root)[nxt] = 2; + ((unsigned char *)root)[nxt+1] = 0xff; + ((unsigned char *)root)[nxt+2] = 0xff; + bt_putid(slotptr(root, 2)->id, page_no2); + slotptr(root, 2)->off = nxt; - bt_putid(root->page->right, 0); - root->page->min = nxt; // reset lowest used offset and key count - root->page->cnt = 2; - root->page->act = 2; - root->page->lvl++; + bt_putid(root->right, 0); + root->min = nxt; // reset lowest used offset and key count + root->cnt = 2; + root->act = 2; + root->lvl++; - // update and release root + // update and release root (bt->page) - if( bt_update(bt, root->page, root->page_no) ) + if( bt_update(bt, root, bt->page_no) ) return bt->err; - return bt_unlockpage(bt, root->page_no, BtLockWrite); + bt_unlockpage(BtLockWrite, bt->latch); + bt_unpinlatch(bt->latch); + return 0; } // split already locked full node // return unlocked. -BTERR bt_splitpage (BtDb *bt, BtPageSet *set) +BTERR bt_splitpage (BtDb *bt) { -uint cnt = 0, idx = 0, max, nxt = bt->page_size, off; -unsigned char fencekey[256]; -uint lvl = set->page->lvl; -uid right; +uint cnt = 0, idx = 0, max, nxt = bt->page_size; +unsigned char fencekey[256], rightkey[256]; +uid page_no = bt->page_no, right; +BtLatchSet *latch, *rlatch; +BtPage page = bt->page; +uint lvl = page->lvl; BtKey key; + latch = bt->latch; + // split higher half of keys to bt->frame + // the last key (fence key) might be dead memset (bt->frame, 0, bt->page_size); - max = set->page->cnt; + max = page->cnt; cnt = max / 2; idx = 0; while( cnt++ < max ) { - if( !lvl || cnt < max ) { - key = keyptr(set->page, cnt); - off = nxt -= key->len + 1; - memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1); - } else - off = offsetof(struct BtPage_, fence); - - memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId); - slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod; - slotptr(bt->frame, idx)->off = off; - bt->frame->act++; + key = keyptr(page, cnt); + nxt -= key->len + 1; + memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1); + memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId); + if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) ) + bt->frame->act++; + slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod; + slotptr(bt->frame, idx)->off = nxt; } - if( set->page_no == ROOT_page ) - bt->frame->posted = 1; + // remember fence key for new right page + + memcpy (rightkey, key, key->len + 1); - memcpy (bt->frame->fence, set->page->fence, 256); bt->frame->bits = bt->page_bits; bt->frame->min = nxt; bt->frame->cnt = idx; @@ -1929,200 +1878,176 @@ BtKey key; // link right node - if( set->page_no > ROOT_page ) - memcpy (bt->frame->right, set->page->right, BtId); + if( page_no > ROOT_page ) + memcpy (bt->frame->right, page->right, BtId); - // get new free page and write higher keys to it. + // get new free page and write frame to it. if( !(right = bt_newpage(bt, bt->frame)) ) return bt->err; // update lower keys to continue in old page - memcpy (bt->frame, set->page, bt->page_size); - memset (set->page+1, 0, bt->page_size - sizeof(*set->page)); + memcpy (bt->frame, page, bt->page_size); + memset (page+1, 0, bt->page_size - sizeof(*page)); nxt = bt->page_size; - set->page->posted = 0; - set->page->dirty = 0; - set->page->act = 0; + page->dirty = 0; + page->act = 0; cnt = 0; idx = 0; // assemble page of smaller keys + // (they're all active keys) while( cnt++ < max / 2 ) { key = keyptr(bt->frame, cnt); - - if( !lvl || cnt < max / 2 ) { - off = nxt -= key->len + 1; - memcpy ((unsigned char *)set->page + nxt, key, key->len + 1); - } else - off = offsetof(struct BtPage_, fence); - - memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId); - slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod; - slotptr(set->page, idx)->off = off; - set->page->act++; + nxt -= key->len + 1; + memcpy ((unsigned char *)page + nxt, key, key->len + 1); + memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId); + slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod; + slotptr(page, idx)->off = nxt; + page->act++; } - // install fence key for smaller key page + // remember fence key for smaller page - memset(set->page->fence, 0, 256); - memcpy(set->page->fence, key, key->len + 1); + memcpy (fencekey, key, key->len + 1); - bt_putid(set->page->right, right); - set->page->min = nxt; - set->page->cnt = idx; + bt_putid(page->right, right); + page->min = nxt; + page->cnt = idx; // if current page is the root page, split it - if( set->page_no == ROOT_page ) - return bt_splitroot (bt, set, right); - - if( bt_update (bt, set->page, set->page_no) ) - return bt->err; - - if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) - return bt->err; - - // insert new fences in their parent pages - - while( 1 ) { - if( bt_lockpage (bt, set->page_no, BtLockParent) ) - return bt->err; + if( page_no == ROOT_page ) + return bt_splitroot (bt, fencekey, right); - if( bt_lockpage (bt, set->page_no, BtLockWrite) ) - return bt->err; + // lock right page - if( bt_mappage (bt, &set->page, set->page_no) ) - return bt->err; + rlatch = bt_pinlatch (bt, right); + bt_lockpage (BtLockParent, rlatch); - memcpy (fencekey, set->page->fence, 256); - right = bt_getid (set->page->right); + // update left (containing) node - if( set->page->posted ) { - if( bt_unlockpage (bt, set->page_no, BtLockParent) ) - return bt->err; - - return bt_unlockpage (bt, set->page_no, BtLockWrite); - } + if( bt_update(bt, page, page_no) ) + return bt->err; - set->page->posted = 1; + bt_lockpage (BtLockParent, latch); + bt_unlockpage (BtLockWrite, latch); - if( bt_update (bt, set->page, set->page_no) ) - return bt->err; + // insert new fence for reformulated left block - if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) - return bt->err; + if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) ) + return bt->err; - if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, set->page_no, time(NULL)) ) - return bt->err; + // switch fence for right block of larger keys to new right page - if( bt_unlockpage (bt, set->page_no, BtLockParent) ) - return bt->err; + if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) ) + return bt->err; - if( !(set->page_no = right) ) - break; - } + bt_unlockpage (BtLockParent, latch); + bt_unlockpage (BtLockParent, rlatch); + bt_unpinlatch (rlatch); + bt_unpinlatch (latch); return 0; } // Insert new key into the btree at requested level. +// Pages are unlocked at exit. BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod) { -BtPageSet set[1]; uint slot, idx; +BtPage page; BtKey ptr; - set->page = bt->page; - while( 1 ) { - if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) ) - ptr = keyptr(set->page, slot); + if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) + ptr = keyptr(bt->page, slot); else { - if ( !bt->err ) + if( !bt->err ) bt->err = BTERR_ovflw; return bt->err; } // if key already exists, update id and return - if( slot <= set->page->cnt ) - if( !keycmp (ptr, key, len) ) { - if( slotptr(set->page, slot)->dead ) - set->page->act++; - - slotptr(set->page, slot)->dead = 0; - slotptr(set->page, slot)->tod = tod; - bt_putid(slotptr(set->page,slot)->id, id); + page = bt->page; - if ( bt_update(bt, set->page, set->page_no) ) - return bt->err; - - return bt_unlockpage(bt, set->page_no, BtLockWrite); - } + if( !keycmp (ptr, key, len) ) { + if( slotptr(page, slot)->dead ) + page->act++; + slotptr(page, slot)->dead = 0; + slotptr(page, slot)->tod = tod; + bt_putid(slotptr(page,slot)->id, id); + if( bt_update(bt, bt->page, bt->page_no) ) + return bt->err; + bt_unlockpage(BtLockWrite, bt->latch); + bt_unpinlatch (bt->latch); + return 0; + } // check if page has enough space - if( slot = bt_cleanpage (bt, set->page, len, slot) ) + if( slot = bt_cleanpage (bt, len, slot) ) break; - if( bt_splitpage (bt, set) ) + if( bt_splitpage (bt) ) return bt->err; } // calculate next available slot and copy key into page - set->page->min -= len + 1; // reset lowest used offset - ((unsigned char *)set->page)[set->page->min] = len; - memcpy ((unsigned char *)set->page + set->page->min +1, key, len ); + page->min -= len + 1; // reset lowest used offset + ((unsigned char *)page)[page->min] = len; + memcpy ((unsigned char *)page + page->min +1, key, len ); - for( idx = slot; idx <= set->page->cnt; idx++ ) - if( slotptr(set->page, idx)->dead ) + for( idx = slot; idx < page->cnt; idx++ ) + if( slotptr(page, idx)->dead ) break; // now insert key into array before slot + // preserving the fence slot - if( idx > set->page->cnt ) - set->page->cnt++; + if( idx == page->cnt ) + idx++, page->cnt++; - set->page->act++; + page->act++; while( idx > slot ) - *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--; + *slotptr(page, idx) = *slotptr(page, idx -1), idx--; - bt_putid(slotptr(set->page,slot)->id, id); - slotptr(set->page, slot)->off = set->page->min; - slotptr(set->page, slot)->tod = tod; - slotptr(set->page, slot)->dead = 0; + bt_putid(slotptr(page,slot)->id, id); + slotptr(page, slot)->off = page->min; + slotptr(page, slot)->tod = tod; + slotptr(page, slot)->dead = 0; - if( bt_update(bt, set->page, set->page_no) ) - return bt->err; + if( bt_update(bt, bt->page, bt->page_no) ) + return bt->err; - return bt_unlockpage(bt, set->page_no, BtLockWrite); + bt_unlockpage(BtLockWrite, bt->latch); + bt_unpinlatch(bt->latch); + return 0; } // cache page of keys into cursor and return starting slot for given key uint bt_startkey (BtDb *bt, unsigned char *key, uint len) { -BtPageSet set[1]; uint slot; - set->page = bt->page; - // cache page for retrieval - if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) ) - memcpy (bt->cursor, set->page, bt->page_size); - bt->cursor_page = set->page_no; - - if ( bt_unlockpage(bt, set->page_no, BtLockRead) ) - return 0; + if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) ) + memcpy (bt->cursor, bt->page, bt->page_size); + else + return 0; + bt_unlockpage(BtLockRead, bt->latch); + bt->cursor_page = bt->page_no; + bt_unpinlatch (bt->latch); return slot; } @@ -2131,15 +2056,16 @@ uint slot; uint bt_nextkey (BtDb *bt, uint slot) { -BtPageSet set[1]; -uid right; +BtLatchSet *latch; +off64_t right; do { right = bt_getid(bt->cursor->right); + while( slot++ < bt->cursor->cnt ) if( slotptr(bt->cursor,slot)->dead ) continue; - else if( right || (slot < bt->cursor->cnt)) // skip infinite stopper + else if( right || (slot < bt->cursor->cnt)) return slot; else break; @@ -2148,19 +2074,15 @@ uid right; break; bt->cursor_page = right; - set->page = bt->page; - - if( bt_lockpage(bt, right, BtLockRead) ) - return 0; - - if( bt_mappage (bt, &set->page, right) ) - break; - - memcpy (bt->cursor, set->page, bt->page_size); + latch = bt_pinlatch (bt, right); + bt_lockpage(BtLockRead, latch); - if( bt_unlockpage(bt, right, BtLockRead) ) + if( bt_mappage (bt, &bt->page, right) ) return 0; + memcpy (bt->cursor, bt->page, bt->page_size); + bt_unlockpage(BtLockRead, latch); + bt_unpinlatch (latch); slot = 0; } while( 1 ); @@ -2184,16 +2106,168 @@ uint bt_tod(BtDb *bt, uint slot) #ifdef STANDALONE + +uint bt_audit (BtDb *bt) +{ +ushort idx, hashidx; +uid next, page_no; +BtLatchSet *latch; +uint cnt = 0; +BtKey ptr; + +#ifdef unix + posix_fadvise( bt->idx, 0, 0, POSIX_FADV_SEQUENTIAL); +#endif + if( *(ushort *)(bt->latchmgr->lock) ) + fprintf(stderr, "Alloc page locked\n"); + *(ushort *)(bt->latchmgr->lock) = 0; + + for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) { + latch = bt->latchsets + idx; + if( *(ushort *)latch->readwr ) + fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no); + *(ushort *)latch->readwr = 0; + + if( *(ushort *)latch->access ) + fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no); + *(ushort *)latch->access = 0; + + if( *(ushort *)latch->parent ) + fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no); + *(ushort *)latch->parent = 0; + + if( latch->pin ) { + fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no); + latch->pin = 0; + } + } + + for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) { + if( *(ushort *)(bt->latchmgr->table[hashidx].latch) ) + fprintf(stderr, "hash entry %d locked\n", hashidx); + + *(ushort *)(bt->latchmgr->table[hashidx].latch) = 0; + + if( idx = bt->latchmgr->table[hashidx].slot ) do { + latch = bt->latchsets + idx; + if( *(ushort *)latch->busy ) + fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no); + *(ushort *)latch->busy = 0; + if( latch->hash != hashidx ) + fprintf(stderr, "latchset %d wrong hashidx\n", idx); + if( latch->pin ) + fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no); + } while( idx = latch->next ); + } + + next = bt->latchmgr->nlatchpage + LATCH_page; + page_no = LEAF_page; + + while( page_no < bt_getid(bt->latchmgr->alloc->right) ) { + off64_t off = page_no << bt->page_bits; +#ifdef unix + pread (bt->idx, bt->frame, bt->page_size, off); +#else + DWORD amt[1]; + + SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN); + + if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL)) + fprintf(stderr, "page %.8x unable to read\n", page_no); + + if( *amt < bt->page_size ) + fprintf(stderr, "page %.8x unable to read\n", page_no); +#endif + if( !bt->frame->free ) { + for( idx = 0; idx++ < bt->frame->cnt - 1; ) { + ptr = keyptr(bt->frame, idx+1); + if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 ) + fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx); + } + if( !bt->frame->lvl ) + cnt += bt->frame->act; + } + + if( page_no > LEAF_page ) + next = page_no + 1; + page_no = next; + } + return cnt - 1; +} + +#ifndef unix +double getCpuTime(int type) +{ +FILETIME crtime[1]; +FILETIME xittime[1]; +FILETIME systime[1]; +FILETIME usrtime[1]; +SYSTEMTIME timeconv[1]; +double ans = 0; + + memset (timeconv, 0, sizeof(SYSTEMTIME)); + + switch( type ) { + case 0: + GetSystemTimeAsFileTime (xittime); + FileTimeToSystemTime (xittime, timeconv); + ans = (double)timeconv->wDayOfWeek * 3600 * 24; + break; + case 1: + GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime); + FileTimeToSystemTime (usrtime, timeconv); + break; + case 2: + GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime); + FileTimeToSystemTime (systime, timeconv); + break; + } + + ans += (double)timeconv->wHour * 3600; + ans += (double)timeconv->wMinute * 60; + ans += (double)timeconv->wSecond; + ans += (double)timeconv->wMilliseconds / 1000; + return ans; +} +#else +#include +#include + +double getCpuTime(int type) +{ +struct rusage used[1]; +struct timeval tv[1]; + + switch( type ) { + case 0: + gettimeofday(tv, NULL); + return (double)tv->tv_sec + (double)tv->tv_usec / 1000000; + + case 1: + getrusage(RUSAGE_SELF, used); + return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000; + + case 2: + getrusage(RUSAGE_SELF, used); + return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000; + } + + return 0; +} +#endif + // standalone program to index file of keys // then list them onto std-out int main (int argc, char **argv) { uint slot, line = 0, off = 0, found = 0; -int dead, ch, cnt = 0, bits = 12; +int ch, cnt = 0, bits = 12; unsigned char key[256]; -clock_t done, start; +double done, start; +uid next, page_no; uint pgblk = 0; +float elapsed; time_t tod[1]; uint scan = 0; uint len = 0; @@ -2210,7 +2284,7 @@ FILE *in; exit(0); } - start = clock(); + start = getCpuTime(0); time(tod); if( argc > 4 ) @@ -2234,7 +2308,7 @@ FILE *in; if( argc > 7 ) off = atoi(argv[7]); - bt = bt_open ((argv[1]), BT_rw, bits, map, pgblk); + bt = bt_open ((argv[1]), BT_rw, bits, map, pgblk, map / 8); if( !bt ) { fprintf(stderr, "Index Open Error %s\n", argv[1]); @@ -2243,6 +2317,12 @@ FILE *in; switch(argv[3][0]| 0x20) { + case 'a': + fprintf(stderr, "started audit for %s\n", argv[2]); + cnt = bt_audit (bt); + fprintf(stderr, "finished audit for %s, %d keys\n", argv[2], cnt); + break; + case 'w': fprintf(stderr, "started indexing for %s\n", argv[2]); if( argc > 2 && (in = fopen (argv[2], "rb")) ) @@ -2258,7 +2338,7 @@ FILE *in; } else if( len < 245 ) key[len++] = ch; - fprintf(stderr, "finished adding keys, %d \n", line); + fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line); break; case 'd': @@ -2270,13 +2350,13 @@ FILE *in; if( off ) sprintf((char *)key+len, "%.9d", line + off), len += 9; line++; - if( bt_deletekey (bt, key, len) ) + if( bt_deletekey (bt, key, len, 0) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; } else if( len < 245 ) key[len++] = ch; - fprintf(stderr, "finished deleting keys, %d \n", line); + fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line); break; case 'f': @@ -2296,36 +2376,69 @@ FILE *in; } else if( len < 245 ) key[len++] = ch; - fprintf(stderr, "finished search of %d keys, found %d\n", line, found); + fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found); break; case 's': - scan++; + fprintf(stderr, "started scaning\n"); + cnt = len = key[0] = 0; + + if( slot = bt_startkey (bt, key, len) ) + slot--; + else + fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0); + + while( slot = bt_nextkey (bt, slot) ) { + ptr = bt_key(bt, slot); + fwrite (ptr->key, ptr->len, 1, stdout); + fputc ('\n', stdout); + cnt++; + } + + fprintf(stderr, " Total keys read %d\n", cnt - 1); break; - } + case 'c': + fprintf(stderr, "started counting\n"); - done = clock(); - fprintf(stderr, " Time to complete: %.2f seconds\n", (float)(done - start) / CLOCKS_PER_SEC); + next = bt->latchmgr->nlatchpage + LATCH_page; + page_no = LEAF_page; + cnt = 0; - dead = cnt = 0; - len = key[0] = 0; + while( page_no < bt_getid(bt->latchmgr->alloc->right) ) { + uid off = page_no << bt->page_bits; +#ifdef unix + pread (bt->idx, bt->frame, bt->page_size, off); +#else + DWORD amt[1]; - fprintf(stderr, "started reading\n"); + SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN); - if( slot = bt_startkey (bt, key, len) ) - slot--; - else - fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0); + if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL)) + fprintf (stderr, "unable to read page %.8x", page_no); - while( slot = bt_nextkey (bt, slot) ) - if( cnt++, scan ) { - ptr = bt_key(bt, slot); - fwrite (ptr->key, ptr->len, 1, stdout); - fputc ('\n', stdout); + if( *amt < bt->page_size ) + fprintf (stderr, "unable to read page %.8x", page_no); +#endif + if( !bt->frame->free && !bt->frame->lvl ) + cnt += bt->frame->act; + if( page_no > LEAF_page ) + next = page_no + 1; + page_no = next; } + + cnt--; // remove stopper key + fprintf(stderr, " Total keys read %d\n", cnt); + break; + } - fprintf(stderr, " Total keys read %d\n", cnt); + done = getCpuTime(0); + elapsed = (float)(done - start); + fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60); + elapsed = getCpuTime(1); + fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60); + elapsed = getCpuTime(2); + fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60); return 0; }