uint cnt; // count of keys in page
uint act; // count of active keys
uint min; // next key offset
- unsigned char bits:7; // page size in bits
+ unsigned char bits:6; // page size in bits
unsigned char free:1; // page is on free list
+ unsigned char dirty:1; // page is dirty in cache
unsigned char lvl:6; // level of page
unsigned char kill:1; // page is being deleted
- unsigned char dirty:1; // page is dirty
+ unsigned char clean:1; // page needs cleaning
unsigned char right[BtId]; // page number to right
} *BtPage;
uint nlatchpage; // number of latch pages at BT_latch
uint latchtotal; // number of page latch entries
uint latchhash; // number of latch hash table slots
- uint latchvictim; // next latch entry to examine
+ uint latchvictim; // next latch hash entry to examine
} BtLatchMgr;
// latch hash table entries
// latch manager table structure
typedef struct {
+ volatile uid page_no; // latch set page number on disk
BtSpinLatch readwr[1]; // read/write page lock
BtSpinLatch access[1]; // Access Intent/Page delete
BtSpinLatch parent[1]; // Posting of fence key in parent
- BtSpinLatch busy[1]; // slot is being moved between chains
volatile uint next; // next entry in hash table chain
volatile uint prev; // prev entry in hash table chain
- volatile uint hash; // hash slot entry is under
- volatile ushort dirty; // page is dirty in cache
- volatile ushort pin; // number of outstanding pins
- volatile uid page_no; // latch set page number on disk
+ volatile uint pin; // number of outstanding pins
} BtLatchSet;
// The object structure for Btree access
uid cursor_page; // current cursor page number
int err;
uint mode; // read-write mode
- BtPage alloc; // frame buffer for alloc page ( page 0 )
BtPage cursor; // cached frame for start/next (never mapped)
BtPage frame; // spare frame for the page split (never mapped)
- BtPage zero; // zeroes frame buffer (never mapped)
- BtPage page; // current page
+ BtPage page; // current mapped page in buffer pool
BtLatchSet *latch; // current page latch
BtLatchMgr *latchmgr; // mapped latch page from allocation page
BtLatchSet *latchsets; // mapped latch set from latch pages
- unsigned char *latchpool; // cached page pool set
+ unsigned char *pagepool; // cached page pool set
BtHashEntry *table; // the hash table
#ifdef unix
int idx;
HANDLE idx;
HANDLE halloc; // allocation and latch table handle
#endif
- unsigned char *mem; // frame, cursor, page memory buffer
+ unsigned char *mem; // frame, cursor, memory buffers
uint found; // last deletekey found key
} BtDb;
extern uint bt_nextkey (BtDb *bt, uint slot);
// internal functions
-BTERR bt_update (BtDb *bt, BtPage page, BtLatchSet *latch);
+void bt_update (BtDb *bt, BtPage page);
BtPage bt_mappage (BtDb *bt, BtLatchSet *latch);
// Helper functions to return slot values
#endif
}
-// link latch table entry into head of latch hash table
+// read page from permanent location in Btree file
-BTERR bt_latchlink (BtDb *bt, uint hashidx, uint victim, uid page_no)
+BTERR bt_readpage (BtDb *bt, BtPage page, uid page_no)
{
-BtPage page = (BtPage)(victim * bt->page_size + bt->latchpool);
-BtLatchSet *latch = bt->latchsets + victim;
off64_t off = page_no << bt->page_bits;
-uint amt[1];
- if( latch->next = bt->table[hashidx].slot )
- bt->latchsets[latch->next].prev = victim;
-
- bt->table[hashidx].slot = victim;
- latch->page_no = page_no;
- latch->hash = hashidx;
- latch->dirty = 0;
- latch->prev = 0;
#ifdef unix
- if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) )
+ if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size ) {
+ fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
return bt->err = BTERR_read;
+ }
#else
- SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
- if( !ReadFile(bt->idx, page, bt->page_size, amt, NULL))
+OVERLAPPED ovl[1];
+uint amt[1];
+
+ memset (ovl, 0, sizeof(OVERLAPPED));
+ ovl->Offset = off;
+ ovl->OffsetHigh = off >> 32;
+
+ if( !ReadFile(bt->idx, page, bt->page_size, amt, ovl)) {
+ fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
return bt->err = BTERR_read;
- if( *amt < bt->page_size )
+ }
+ if( *amt < bt->page_size ) {
+ fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
return bt->err = BTERR_read;
+ }
#endif
return 0;
}
+// write page to permanent location in Btree file
+// clear the dirty bit
+
+BTERR bt_writepage (BtDb *bt, BtPage page, uid page_no)
+{
+off64_t off = page_no << bt->page_bits;
+
+#ifdef unix
+ page->dirty = 0;
+
+ if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
+ return bt->err = BTERR_wrt;
+#else
+OVERLAPPED ovl[1];
+uint amt[1];
+
+ memset (ovl, 0, sizeof(OVERLAPPED));
+ ovl->Offset = off;
+ ovl->OffsetHigh = off >> 32;
+ page->dirty = 0;
+
+ if( !WriteFile(bt->idx, page, bt->page_size, amt, ovl) )
+ return bt->err = BTERR_wrt;
+
+ if( *amt < bt->page_size )
+ return bt->err = BTERR_wrt;
+#endif
+ return 0;
+}
+
+// link latch table entry into head of latch hash table
+
+BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no)
+{
+BtPage page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
+BtLatchSet *latch = bt->latchsets + slot;
+
+ if( latch->next = bt->table[hashidx].slot )
+ bt->latchsets[latch->next].prev = slot;
+
+ bt->table[hashidx].slot = slot;
+ latch->page_no = page_no;
+ latch->prev = 0;
+ latch->pin = 1;
+
+ return bt_readpage (bt, page, page_no);
+}
+
// release latch pin
void bt_unpinlatch (BtLatchSet *latch)
#ifdef unix
__sync_fetch_and_add(&latch->pin, -1);
#else
- _InterlockedDecrement16 (&latch->pin);
+ _InterlockedDecrement (&latch->pin);
#endif
}
BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
{
uint hashidx = page_no % bt->latchmgr->latchhash;
-uint slot, victim, idx;
BtLatchSet *latch;
+uint slot, idx;
off64_t off;
uint amt[1];
BtPage page;
break;
} while( slot = latch->next );
- // found our entry
+ // found our entry, bring to front of hash chain
if( slot ) {
latch = bt->latchsets + slot;
#ifdef unix
__sync_fetch_and_add(&latch->pin, 1);
#else
- _InterlockedIncrement16 (&latch->pin);
+ _InterlockedIncrement (&latch->pin);
#endif
+ // unlink our entry from its hash chain position
+
+ if( latch->prev )
+ bt->latchsets[latch->prev].next = latch->next;
+ else
+ bt->table[hashidx].slot = latch->next;
+
+ if( latch->next )
+ bt->latchsets[latch->next].prev = latch->prev;
+
+ // now link into head of the hash chain
+
+ if( latch->next = bt->table[hashidx].slot )
+ bt->latchsets[latch->next].prev = slot;
+
+ bt->table[hashidx].slot = slot;
+ latch->prev = 0;
+
bt_spinreleasewrite(bt->table[hashidx].latch);
return latch;
}
- // see if there are any unused entries
+ // see if there are any unused pool entries
#ifdef unix
- victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
+ slot = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
#else
- victim = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
+ slot = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
#endif
- if( victim < bt->latchmgr->latchtotal ) {
- latch = bt->latchsets + victim;
-#ifdef unix
- __sync_fetch_and_add(&latch->pin, 1);
-#else
- _InterlockedIncrement16 (&latch->pin);
-#endif
- bt_latchlink (bt, hashidx, victim, page_no);
+ if( slot < bt->latchmgr->latchtotal ) {
+ latch = bt->latchsets + slot;
+ if( bt_latchlink (bt, hashidx, slot, page_no) )
+ return NULL;
bt_spinreleasewrite (bt->table[hashidx].latch);
return latch;
}
#ifdef unix
- victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
+ __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
#else
- victim = _InterlockedDecrement (&bt->latchmgr->latchdeployed);
+ _InterlockedDecrement (&bt->latchmgr->latchdeployed);
#endif
- // find and reuse previous lock entry
+ // find and reuse previous lru lock entry on victim hash chain
while( 1 ) {
#ifdef unix
- victim = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
+ idx = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
#else
- victim = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
+ idx = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
#endif
- // we don't use slot zero
-
- if( victim %= bt->latchmgr->latchtotal )
- latch = bt->latchsets + victim;
- else
- continue;
-
- // take control of our slot
- // from other threads
-
- if( latch->pin || !bt_spinwritetry (latch->busy) )
- continue;
-
- idx = latch->hash;
-
// try to get write lock on hash chain
// skip entry if not obtained
// or has outstanding locks
- if( !bt_spinwritetry (bt->table[idx].latch) ) {
- bt_spinreleasewrite (latch->busy);
+ idx %= bt->latchmgr->latchhash;
+
+ if( !bt_spinwritetry (bt->table[idx].latch) )
continue;
- }
- if( latch->pin ) {
- bt_spinreleasewrite (latch->busy);
+ if( slot = bt->table[idx].slot )
+ while( 1 ) {
+ latch = bt->latchsets + slot;
+ if( !latch->next )
+ break;
+ slot = latch->next;
+ }
+
+ if( !slot || latch->pin ) {
bt_spinreleasewrite (bt->table[idx].latch);
continue;
}
// update permanent page area in btree
- page = (BtPage)(victim * bt->page_size + bt->latchpool);
- off = latch->page_no << bt->page_bits;
-#ifdef unix
- if( latch->dirty )
- if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
- return bt->err = BTERR_wrt, NULL;
-#else
- if( latch->dirty ) {
- SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
+ page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
- if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
- return bt->err = BTERR_wrt, NULL;
+ if( page->dirty )
+ if( bt_writepage (bt, page, latch->page_no) )
+ return NULL;
- if( *amt < bt->page_size )
- return bt->err = BTERR_wrt, NULL;
- }
-#endif
- // unlink our available victim from its hash chain
+ // unlink our available slot from its hash chain
if( latch->prev )
bt->latchsets[latch->prev].next = latch->next;
bt->latchsets[latch->next].prev = latch->prev;
bt_spinreleasewrite (bt->table[idx].latch);
-#ifdef unix
- __sync_fetch_and_add(&latch->pin, 1);
-#else
- _InterlockedIncrement16 (&latch->pin);
-#endif
- bt_latchlink (bt, hashidx, victim, page_no);
+
+ if( bt_latchlink (bt, hashidx, slot, page_no) )
+ return NULL;
+
bt_spinreleasewrite (bt->table[hashidx].latch);
- bt_spinreleasewrite (latch->busy);
return latch;
}
}
else if( bits < BT_minbits )
bits = BT_minbits;
+ if( mode == BT_ro ) {
+ fprintf(stderr, "ReadOnly mode not supported: %s\n", name);
+ return NULL;
+ }
#ifdef unix
bt = calloc (1, sizeof(BtDb));
bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
- if( bt->idx == -1 )
+ if( bt->idx == -1 ) {
+ fprintf(stderr, "unable to open %s\n", name);
return free(bt), NULL;
+ }
#else
bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
attr = FILE_ATTRIBUTE_NORMAL;
bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
- if( bt->idx == INVALID_HANDLE_VALUE )
+ if( bt->idx == INVALID_HANDLE_VALUE ) {
+ fprintf(stderr, "unable to open %s\n", name);
return GlobalFree(bt), NULL;
+ }
#endif
#ifdef unix
memset (lock, 0, sizeof(lock));
-
- lock->l_type = F_WRLCK;
lock->l_len = sizeof(struct BtPage_);
- lock->l_whence = 0;
+ lock->l_type = F_WRLCK;
- if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
+ if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) {
+ fprintf(stderr, "unable to lock record zero %s\n", name);
return bt_close (bt), NULL;
+ }
#else
memset (ovl, 0, sizeof(ovl));
- len = sizeof(struct BtPage_);
// use large offsets to
// simulate advisory locking
ovl->OffsetHigh |= 0x80000000;
- if( LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, len, 0L, ovl) )
+ if( !LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) ) {
+ fprintf(stderr, "unable to lock record zero %s, GetLastError = %d\n", name, GetLastError());
return bt_close (bt), NULL;
+ }
#endif
#ifdef unix
if( size = lseek (bt->idx, 0L, 2) ) {
if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
bits = latchmgr->alloc->bits;
- else
+ else {
+ fprintf(stderr, "Unable to read page zero\n");
return free(bt), free(latchmgr), NULL;
- } else if( mode == BT_ro )
- return free(latchmgr), bt_close (bt), NULL;
+ }
+ }
#else
latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
size = GetFileSize(bt->idx, amt);
if( size || *amt ) {
- if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
+ if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) ) {
+ fprintf(stderr, "Unable to read page zero\n");
return bt_close (bt), NULL;
- bits = latchmgr->alloc->bits;
- } else if( mode == BT_ro )
- return bt_close (bt), NULL;
+ } else
+ bits = latchmgr->alloc->bits;
+ }
#endif
bt->page_size = 1 << bits;
bt->mode = mode;
- if( size || *amt )
+ if( size || *amt ) {
+ nlatchpage = latchmgr->nlatchpage;
goto btlatch;
+ }
+
+ if( nodemax < 16 ) {
+ fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
+ return bt_close(bt), NULL;
+ }
// initialize an empty b-tree with latch page, root page, page of leaves
// and page(s) of latches and page pool cache
// calculate number of latch hash table entries
- nlatchpage = (nodemax/8 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
+ nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry);
nlatchpage += nodemax; // size of the buffer pool in pages
latchmgr->nlatchpage = nlatchpage;
latchmgr->latchtotal = nodemax;
latchmgr->latchhash = latchhash;
-#ifdef unix
- if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
- return bt_close (bt), NULL;
-#else
- if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
- return bt_close (bt), NULL;
- if( *amt < bt->page_size )
+ if( bt_writepage (bt, latchmgr->alloc, 0) ) {
+ fprintf (stderr, "Unable to create btree page zero\n");
return bt_close (bt), NULL;
-#endif
+ }
+
memset (latchmgr, 0, 1 << bits);
latchmgr->alloc->bits = bt->page_bits;
for( lvl=MIN_lvl; lvl--; ) {
+ last = MIN_lvl - lvl; // page number
slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
- bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0); // next(lower) page number
+ bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? last + 1 : 0);
key = keyptr(latchmgr->alloc, 1);
key->len = 2; // create stopper key
key->key[0] = 0xff;
key->key[1] = 0xff;
+
latchmgr->alloc->min = bt->page_size - 3;
latchmgr->alloc->lvl = lvl;
latchmgr->alloc->cnt = 1;
latchmgr->alloc->act = 1;
-#ifdef unix
- if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
- return bt_close (bt), NULL;
-#else
- if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
- return bt_close (bt), NULL;
- if( *amt < bt->page_size )
+ if( bt_writepage (bt, latchmgr->alloc, last) ) {
+ fprintf (stderr, "Unable to create btree page %.8x\n", last);
return bt_close (bt), NULL;
-#endif
+ }
}
- // clear out latch manager pages
+ // clear out buffer pool pages
memset(latchmgr, 0, bt->page_size);
- last = MIN_lvl + 1;
+ last = MIN_lvl;
- while( last < ((MIN_lvl + 1 + nlatchpage) ) ) {
- off = (uid)last << bt->page_bits;
+ while( ++last < ((MIN_lvl + 1 + nlatchpage) ) )
+ if( bt_writepage (bt, latchmgr->alloc, last) ) {
+ fprintf (stderr, "Unable to write buffer pool page %.8x\n", last);
+ return bt_close (bt), NULL;
+ }
+
#ifdef unix
- pwrite(bt->idx, latchmgr, bt->page_size, off);
+ free (latchmgr);
#else
- SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
- if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
- return bt_close (bt), NULL;
- if( *amt < bt->page_size )
- return bt_close (bt), NULL;
+ VirtualFree (latchmgr, 0, MEM_RELEASE);
#endif
- last++;
- }
btlatch:
#ifdef unix
lock->l_type = F_UNLCK;
- if( fcntl (bt->idx, F_SETLK, lock) < 0 )
+ if( fcntl (bt->idx, F_SETLK, lock) < 0 ) {
+ fprintf (stderr, "Unable to unlock page zero\n");
return bt_close (bt), NULL;
+ }
#else
- if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) )
+ if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) ) {
+ fprintf (stderr, "Unable to unlock page zero, GetLastError = %d\n", GetLastError());
return bt_close (bt), NULL;
+ }
#endif
#ifdef unix
flag = PROT_READ | PROT_WRITE;
bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
- if( bt->latchmgr == MAP_FAILED )
+ if( bt->latchmgr == MAP_FAILED ) {
+ fprintf (stderr, "Unable to mmap page zero, errno = %d", errno);
return bt_close (bt), NULL;
- bt->table = (void *)mmap (0, bt->latchmgr->nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
- if( bt->table == MAP_FAILED )
+ }
+ bt->table = (void *)mmap (0, (uid)nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
+ if( bt->table == MAP_FAILED ) {
+ fprintf (stderr, "Unable to mmap buffer pool, errno = %d", errno);
return bt_close (bt), NULL;
+ }
#else
flag = PAGE_READWRITE;
- bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, (bt->latchmgr->nlatchpage + LATCH_page) * bt->page_size, NULL);
- if( !bt->halloc )
+ bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size, NULL);
+ if( !bt->halloc ) {
+ fprintf (stderr, "Unable to create file mapping for buffer pool mgr, GetLastError = %d\n", GetLastError());
return bt_close (bt), NULL;
+ }
flag = FILE_MAP_WRITE;
- bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, (bt->latchmgr->nlatchpage + LATCH_page) * bt->page_size);
- if( !bt->latchmgr )
- return GetLastError(), bt_close (bt), NULL;
+ bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size);
+ if( !bt->latchmgr ) {
+ fprintf (stderr, "Unable to map buffer pool, GetLastError = %d\n", GetLastError());
+ return bt_close (bt), NULL;
+ }
bt->table = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
#endif
- bt->latchpool = (unsigned char *)bt->table + (bt->latchmgr->nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
- bt->latchsets = (BtLatchSet *)(bt->latchpool - bt->latchmgr->latchtotal * sizeof(BtLatchSet));
+ bt->pagepool = (unsigned char *)bt->table + (uid)(nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
+ bt->latchsets = (BtLatchSet *)(bt->pagepool - (uid)bt->latchmgr->latchtotal * sizeof(BtLatchSet));
#ifdef unix
- free (latchmgr);
+ bt->mem = malloc (2 * bt->page_size);
#else
- VirtualFree (latchmgr, 0, MEM_RELEASE);
-#endif
-
-#ifdef unix
- bt->mem = malloc (3 * bt->page_size);
-#else
- bt->mem = VirtualAlloc(NULL, 3 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
+ bt->mem = VirtualAlloc(NULL, 2 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
#endif
bt->frame = (BtPage)bt->mem;
bt->cursor = (BtPage)(bt->mem + bt->page_size);
- bt->zero = (BtPage)(bt->mem + 2 * bt->page_size);
-
- memset (bt->zero, 0, bt->page_size);
return bt;
}
BtLatchSet *latch;
uid new_page;
BtPage temp;
-off64_t off;
-uint amt[1];
-int reuse;
// lock allocation page
// else allocate empty page
if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
- latch = bt_pinlatch (bt, new_page);
- temp = bt_mappage (bt, latch);
+ if( latch = bt_pinlatch (bt, new_page) )
+ temp = bt_mappage (bt, latch);
+ else
+ return 0;
bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right));
bt_spinreleasewrite(bt->latchmgr->lock);
memcpy (temp, page, bt->page_size);
- if( bt_update (bt, temp, latch) )
- return 0;
-
+ bt_update (bt, temp);
bt_unpinlatch (latch);
+ return new_page;
} else {
new_page = bt_getid(bt->latchmgr->alloc->right);
bt_putid(bt->latchmgr->alloc->right, new_page+1);
bt_spinreleasewrite(bt->latchmgr->lock);
- off = new_page << bt->page_bits;
-#ifdef unix
- if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
- return bt->err = BTERR_wrt, 0;
-#else
- SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
- if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
- return bt->err = BTERR_wrt, 0;
-
- if( *amt < bt->page_size )
- return bt->err = BTERR_wrt, 0;
-#endif
+ if( bt_writepage (bt, page, new_page) )
+ return 0;
}
+ bt_update (bt, bt->latchmgr->alloc);
return new_page;
}
// Update current page of btree by
// flushing mapped area to disk backing of cache pool.
+// mark page as dirty for rewrite to permanent location
-BTERR bt_update (BtDb *bt, BtPage page, BtLatchSet *latch)
+void bt_update (BtDb *bt, BtPage page)
{
#ifdef unix
msync (page, bt->page_size, MS_ASYNC);
#else
- FlushViewOfFile (page, bt->page_size);
+// FlushViewOfFile (page, bt->page_size);
#endif
- latch->dirty = 1;
- return 0;
+ page->dirty = 1;
}
// map the btree cached page onto current page
BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
{
- return (BtPage)((latch - bt->latchsets) * bt->page_size + bt->latchpool);
+ return (BtPage)((uid)(latch - bt->latchsets) * bt->page_size + bt->pagepool);
}
// deallocate a deleted page
// store chain in second right
bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
bt_putid(bt->latchmgr->alloc[1].right, page_no);
- page->free = 1;
- if( bt_update(bt, page, latch) )
- return bt->err;
+ page->free = 1;
+ bt_update(bt, page);
// unlock released page
// unlock allocation page
bt_spinreleasewrite (bt->latchmgr->lock);
+ bt_update (bt, bt->latchmgr->alloc);
return 0;
}
// determine lock mode of drill level
mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead;
- bt->latch = bt_pinlatch(bt, page_no);
- bt->page_no = page_no;
+ if( bt->latch = bt_pinlatch(bt, page_no) )
+ bt->page_no = page_no;
+ else
+ return 0;
// obtain access lock using lock chaining
memcpy(rightkey, ptr, ptr->len + 1);
memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
- bt->page->dirty = 1;
+ bt->page->clean = 1;
ptr = keyptr(bt->page, bt->page->cnt);
memcpy(leftkey, ptr, ptr->len + 1);
- if( bt_update (bt, bt->page, latch) )
- return bt->err;
-
+ bt_update (bt, bt->page);
bt_lockpage (BtLockParent, latch);
bt_unlockpage (BtLockWrite, latch);
break;
child = bt_getid (slotptr(root, idx)->id);
- latch = bt_pinlatch (bt, child);
+ if( latch = bt_pinlatch (bt, child) )
+ temp = bt_mappage (bt, latch);
+ else
+ return bt->err;
bt_lockpage (BtLockDelete, latch);
bt_lockpage (BtLockWrite, latch);
-
- temp = bt_mappage (bt, latch);
memcpy (root, temp, bt->page_size);
- if( bt_update (bt, root, bt->latch) )
- return bt->err;
+ bt_update (bt, root);
if( bt_freepage (bt, child, latch) )
return bt->err;
if( found = !keycmp (ptr, key, len) )
if( found = slotptr(bt->page, slot)->dead == 0 ) {
dirty = slotptr(bt->page,slot)->dead = 1;
- bt->page->dirty = 1;
+ bt->page->clean = 1;
bt->page->act--;
// collapse empty slots
// return if page is not empty
if( bt->page->act ) {
- if( bt_update(bt, bt->page, latch) )
- return bt->err;
+ bt_update(bt, bt->page);
bt_unlockpage(BtLockWrite, latch);
bt_unpinlatch (latch);
return bt->found = found, 0;
// obtain lock on right page
- rlatch = bt_pinlatch (bt, right);
- bt_lockpage(BtLockWrite, rlatch);
+ if( rlatch = bt_pinlatch (bt, right) )
+ temp = bt_mappage (bt, rlatch);
+ else
+ return bt->err;
- temp = bt_mappage (bt, rlatch);
+ bt_lockpage(BtLockWrite, rlatch);
if( temp->kill ) {
bt_abort(bt, temp, right, 0);
bt_putid(temp->right, page_no);
temp->kill = 1;
- if( bt_update(bt, bt->page, latch) )
- return bt->err;
-
- if( bt_update(bt, temp, rlatch) )
- return bt->err;
+ bt_update(bt, bt->page);
+ bt_update(bt, temp);
bt_lockpage(BtLockParent, latch);
bt_unlockpage(BtLockWrite, latch);
// skip cleanup if nothing to reclaim
- if( !page->dirty )
+ if( !page->clean )
return 0;
memcpy (bt->frame, page, bt->page_size);
// update and release root (bt->page)
- if( bt_update(bt, root, bt->latch) )
- return bt->err;
+ bt_update(bt, root);
bt_unlockpage(BtLockWrite, bt->latch);
bt_unpinlatch(bt->latch);
memcpy (bt->frame, page, bt->page_size);
memset (page+1, 0, bt->page_size - sizeof(*page));
nxt = bt->page_size;
- page->dirty = 0;
+ page->clean = 0;
page->act = 0;
cnt = 0;
idx = 0;
// lock right page
- rlatch = bt_pinlatch (bt, right);
- bt_lockpage (BtLockParent, rlatch);
+ if( rlatch = bt_pinlatch (bt, right) )
+ bt_lockpage (BtLockParent, rlatch);
+ else
+ return bt->err;
// update left (containing) node
- if( bt_update(bt, page, latch) )
- return bt->err;
+ bt_update(bt, page);
bt_lockpage (BtLockParent, latch);
bt_unlockpage (BtLockWrite, latch);
slotptr(page, slot)->tod = tod;
#endif
bt_putid(slotptr(page,slot)->id, id);
- if( bt_update(bt, bt->page, bt->latch) )
- return bt->err;
+ bt_update(bt, bt->page);
bt_unlockpage(BtLockWrite, bt->latch);
bt_unpinlatch (bt->latch);
return 0;
#endif
slotptr(page, slot)->dead = 0;
- if( bt_update(bt, bt->page, bt->latch) )
- return bt->err;
+ bt_update(bt, bt->page);
bt_unlockpage(BtLockWrite, bt->latch);
bt_unpinlatch(bt->latch);
break;
bt->cursor_page = right;
- latch = bt_pinlatch (bt, right);
- bt_lockpage(BtLockRead, latch);
+
+ if( latch = bt_pinlatch (bt, right) )
+ bt_lockpage(BtLockRead, latch);
+ else
+ return 0;
bt->page = bt_mappage (bt, latch);
memcpy (bt->cursor, bt->page, bt->page_size);
BtLatchSet *latch;
uint cnt = 0;
BtPage page;
-off64_t off;
uint amt[1];
BtKey ptr;
-#ifdef unix
if( *(ushort *)(bt->latchmgr->lock) )
fprintf(stderr, "Alloc page locked\n");
*(ushort *)(bt->latchmgr->lock) = 0;
fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
latch->pin = 0;
}
- page = (BtPage)(idx * bt->page_size + bt->latchpool);
- off = latch->page_no << bt->page_bits;
-#ifdef unix
- if( latch->dirty )
- if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
- fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
-#else
- if( latch->dirty ) {
- SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
-
- if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
- fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
+ page = (BtPage)((uid)idx * bt->page_size + bt->pagepool);
- if( *amt < bt->page_size )
+ if( page->dirty )
+ if( bt_writepage (bt, page, latch->page_no) )
fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
- }
-#endif
- latch->dirty = 0;
}
for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
if( idx = bt->table[hashidx].slot ) do {
latch = bt->latchsets + idx;
- if( *(ushort *)latch->busy )
- fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
- *(ushort *)latch->busy = 0;
- if( latch->hash != hashidx )
- fprintf(stderr, "latchset %d wrong hashidx\n", idx);
if( latch->pin )
fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
} while( idx = latch->next );
page_no = LEAF_page;
while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
- pread (bt->idx, bt->frame, bt->page_size, page_no << bt->page_bits);
+ if( bt_readpage (bt, bt->frame, page_no) )
+ fprintf(stderr, "page %.8x unreadable\n", page_no);
if( !bt->frame->free ) {
for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
ptr = keyptr(bt->frame, idx+1);
page_no = next;
}
return cnt - 1;
-#else
- return 0;
-#endif
}
#ifndef unix
break;
case 's':
- scan++;
+ fprintf(stderr, "started scaning\n");
+ cnt = len = key[0] = 0;
+
+ if( slot = bt_startkey (bt, key, len) )
+ slot--;
+ else
+ fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
+
+ while( slot = bt_nextkey (bt, slot) ) {
+ ptr = bt_key(bt, slot);
+ fwrite (ptr->key, ptr->len, 1, stdout);
+ fputc ('\n', stdout);
+ cnt++;
+ }
+
+ fprintf(stderr, " Total keys read %d\n", cnt - 1);
+ break;
case 'c':
fprintf(stderr, "started counting\n");
page_no = LEAF_page;
while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
- uid off = page_no << bt->page_bits;
-#ifdef unix
- pread (bt->idx, bt->frame, bt->page_size, off);
-#else
- DWORD amt[1];
-
- SetFilePointer (bt->idx, (long)off, NULL, FILE_BEGIN);
-
- if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL))
- fprintf (stderr, "unable to read page %.8x", page_no);
-
- if( *amt < bt->page_size )
- fprintf (stderr, "unable to read page %.8x", page_no);
-#endif
- if( !bt->frame->free && !bt->frame->lvl )
- cnt += bt->frame->act;
+ BtLatchSet *latch;
+ BtPage page;
+ if( latch = bt_pinlatch (bt, page_no) )
+ page = bt_mappage (bt, latch);
+ if( !page->free && !page->lvl )
+ cnt += page->act;
if( page_no > LEAF_page )
next = page_no + 1;
+ if( scan )
+ for( idx = 0; idx++ < page->cnt; ) {
+ if( slotptr(page, idx)->dead )
+ continue;
+ ptr = keyptr(page, idx);
+ if( idx != page->cnt && bt_getid (page->right) ) {
+ fwrite (ptr->key, ptr->len, 1, stdout);
+ fputc ('\n', stdout);
+ }
+ }
+ bt_unpinlatch (latch);
page_no = next;
}