]> pd.if.org Git - btree/blob - btree2t.c
incorporate new bt_deletekey code to file I/O version
[btree] / btree2t.c
1 // btree version 2t
2 // 15 FEB 2014
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stddef.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #endif
47
48 #include <stddef.h>
49 #include <memory.h>
50 #include <string.h>
51
52 typedef unsigned long long      uid;
53
54 #ifndef unix
55 typedef unsigned long long      off64_t;
56 typedef unsigned short          ushort;
57 typedef unsigned int            uint;
58 #endif
59
60 #define BT_ro 0x6f72    // ro
61 #define BT_rw 0x7772    // rw
62 #define BT_fl 0x6c66    // fl
63
64 #define BT_maxbits              24                                      // maximum page size in bits
65 #define BT_minbits              9                                       // minimum page size in bits
66 #define BT_minpage              (1 << BT_minbits)       // minimum page size
67
68 /*
69 There are five lock types for each node in three independent sets: 
70 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
71 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
72 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
73 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
74 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
75 */
76
77 typedef enum{
78         BtLockAccess,
79         BtLockDelete,
80         BtLockRead,
81         BtLockWrite,
82         BtLockParent
83 }BtLock;
84
85 //      Define the length of the page and key pointers
86
87 #define BtId 6
88
89 //      Page key slot definition.
90
91 //      If BT_maxbits is 15 or less, you can save 2 bytes
92 //      for each key stored by making the first two uints
93 //      into ushorts.  You can also save 4 bytes by removing
94 //      the tod field from the key.
95
96 //      Keys are marked dead, but remain on the page until
97 //      cleanup is called.
98
99 typedef struct {
100         uint off:BT_maxbits;            // page offset for key start
101         uint dead:1;                            // set for deleted key
102         uint tod;                                       // time-stamp for key
103         unsigned char id[BtId];         // id associated with key
104 } BtSlot;
105
106 //      The key structure occupies space at the upper end of
107 //      each page.  It's a length byte followed by the value
108 //      bytes.
109
110 typedef struct {
111         unsigned char len;
112         unsigned char key[0];
113 } *BtKey;
114
115 //      The first part of an index page.
116 //      It is immediately followed
117 //      by the BtSlot array of keys.
118
119 typedef struct BtPage_ {
120         uint cnt;                                       // count of keys in page
121         uint act;                                       // count of active keys
122         uint min;                                       // next key offset
123         unsigned char bits:7;           // page size in bits
124         unsigned char free:1;           // page is on free list
125         unsigned char lvl:4;            // level of page
126         unsigned char kill:1;           // page is empty
127         unsigned char dirty:1;          // page is dirty
128         unsigned char posted:1;         // page fence is posted
129         unsigned char goright:1;        // continue to right link
130         unsigned char right[BtId];      // page number to right
131         unsigned char fence[256];       // page fence key
132 } *BtPage;
133
134 //  The loadpage interface object
135
136 typedef struct {
137         uid page_no;
138         BtPage page;
139 } BtPageSet;
140
141 //      The memory mapping hash table entry
142
143 typedef struct {
144         BtPage page;            // mapped page pointer
145         uid  page_no;           // mapped page number
146         void *lruprev;          // least recently used previous cache block
147         void *lrunext;          // lru next cache block
148         void *hashprev;         // previous cache block for the same hash idx
149         void *hashnext;         // next cache block for the same hash idx
150 #ifndef unix
151         HANDLE hmap;
152 #endif
153 } BtHash;
154
155 //      The object structure for Btree access
156
157 typedef struct _BtDb {
158         uint page_size;         // each page size       
159         uint page_bits;         // each page size in bits       
160         uint seg_bits;          // segment size in pages in bits
161         uid page_no;            // current page number  
162         uid cursor_page;        // current cursor page number   
163         int  err;
164         uint mode;                      // read-write mode
165         uint mapped_io;         // use memory mapping
166         BtPage temp;            // temporary frame buffer (memory mapped/file IO)
167         BtPage temp2;           // temporary frame buffer (memory mapped/file IO)
168         BtPage parent;          // current page's parent node (memory mapped/file IO)
169         BtPage alloc;           // frame for alloc page  (memory mapped/file IO)
170         BtPage cursor;          // cached frame for start/next (never mapped)
171         BtPage frame;           // spare frame for the page split (never mapped)
172         BtPage zero;            // zeroes frame buffer (never mapped)
173         BtPage page;            // temporary page (memory mapped/file IO)
174 #ifdef unix
175         int idx;
176 #else
177         HANDLE idx;
178 #endif
179         unsigned char *mem;     // frame, cursor, page memory buffer
180         int nodecnt;            // highest page cache segment in use
181         int nodemax;            // highest page cache segment allocated
182         int hashmask;           // number of pages in segments - 1
183         int hashsize;           // size of hash table
184         int posted;                     // last loadpage found posted key
185         int found;                      // last insert/delete found key
186         BtHash *lrufirst;       // lru list head
187         BtHash *lrulast;        // lru list tail
188         ushort *cache;          // hash table for cached segments
189         BtHash nodes[1];        // segment cache follows
190 } BtDb;
191
192 typedef enum {
193 BTERR_ok = 0,
194 BTERR_struct,
195 BTERR_ovflw,
196 BTERR_lock,
197 BTERR_map,
198 BTERR_wrt,
199 BTERR_hash
200 } BTERR;
201
202 // B-Tree functions
203 extern void bt_close (BtDb *bt);
204 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk, uint pgblk);
205 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
206 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
207 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
208 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
209 extern uint bt_nextkey   (BtDb *bt, uint slot);
210
211 //      Internal functions
212
213 BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence);
214
215 //  Helper functions to return slot values
216
217 extern BtKey bt_key (BtDb *bt, uint slot);
218 extern uid bt_uid (BtDb *bt, uint slot);
219 extern uint bt_tod (BtDb *bt, uint slot);
220
221 //  BTree page number constants
222 #define ALLOC_page              0
223 #define ROOT_page               1
224 #define LEAF_page               2
225
226 //      Number of levels to create in a new BTree
227
228 #define MIN_lvl                 2
229
230 //  The page is allocated from low and hi ends.
231 //  The key offsets and row-id's are allocated
232 //  from the bottom, while the text of the key
233 //  is allocated from the top.  When the two
234 //  areas meet, the page is split into two.
235
236 //  A key consists of a length byte, two bytes of
237 //  index number (0 - 65534), and up to 253 bytes
238 //  of key value.  Duplicate keys are discarded.
239 //  Associated with each key is a 48 bit row-id.
240
241 //  The b-tree root is always located at page 1.
242 //      The first leaf page of level zero is always
243 //      located on page 2.
244
245 //      The b-tree pages are linked with right
246 //      pointers to facilitate enumerators,
247 //      and provide for concurrency.
248
249 //      When to root page fills, it is split in two and
250 //      the tree height is raised by a new root at page
251 //      one with two keys.
252
253 //      Deleted keys are marked with a dead bit until
254 //      page cleanup
255
256 //  Groups of pages from the btree are optionally
257 //  cached with memory mapping. A hash table is used to keep
258 //  track of the cached pages.  This behaviour is controlled
259 //  by the number of cache blocks parameter and pages per block
260 //      given to bt_open.
261
262 //  To achieve maximum concurrency one page is locked at a time
263 //  as the tree is traversed to find leaf key in question. The right
264 //  page numbers are used in cases where the page is being split,
265 //      or consolidated.
266
267 //  Page 0 is dedicated to lock for new page extensions,
268 //      and chains empty pages together for reuse.
269
270 //      Parent locks are obtained to prevent resplitting or deleting a node
271 //      before its fence is posted into its upper level.
272
273 //      Empty nodes are chained together through the ALLOC page and reused.
274
275 //      A special open mode of BT_fl is provided to safely access files on
276 //      WIN32 networks. WIN32 network operations should not use memory mapping.
277 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
278 //      to prevent local caching of network file contents.
279
280 //      Access macros to address slot and key values from the page.
281 //      Page slots use 1 based indexing.
282
283 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
284 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
285
286 void bt_putid(unsigned char *dest, uid id)
287 {
288 int i = BtId;
289
290         while( i-- )
291                 dest[i] = (unsigned char)id, id >>= 8;
292 }
293
294 uid bt_getid(unsigned char *src)
295 {
296 uid id = 0;
297 int i;
298
299         for( i = 0; i < BtId; i++ )
300                 id <<= 8, id |= *src++; 
301
302         return id;
303 }
304
305 // place write, read, or parent lock on requested page_no.
306
307 BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode)
308 {
309 off64_t off = page_no << bt->page_bits;
310 #ifdef unix
311 int flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
312 struct flock lock[1];
313 #else
314 uint flags = 0, len;
315 OVERLAPPED ovl[1];
316 #endif
317
318         if( mode == BtLockRead || mode == BtLockWrite )
319                 off +=  1 * sizeof(*bt->page);  // use second segment
320
321         if( mode == BtLockParent )
322                 off +=  2 * sizeof(*bt->page);  // use third segment
323
324 #ifdef unix
325         memset (lock, 0, sizeof(lock));
326
327         lock->l_start = off;
328         lock->l_type = (mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent) ? F_WRLCK : F_RDLCK;
329         lock->l_len = sizeof(*bt->page);
330         lock->l_whence = 0;
331
332         if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
333                 return bt->err = BTERR_lock;
334
335         return 0;
336 #else
337         memset (ovl, 0, sizeof(ovl));
338         ovl->OffsetHigh = (uint)(off >> 32);
339         ovl->Offset = (uint)off;
340         len = sizeof(*bt->page);
341
342         //      use large offsets to
343         //      simulate advisory locking
344
345         ovl->OffsetHigh |= 0x80000000;
346
347         if( mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent )
348                 flags |= LOCKFILE_EXCLUSIVE_LOCK;
349
350         if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) )
351                 return bt->err = 0;
352
353         return bt->err = BTERR_lock;
354 #endif 
355 }
356
357 // remove write, read, or parent lock on requested page_no.
358
359 BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
360 {
361 off64_t off = page_no << bt->page_bits;
362 #ifdef unix
363 struct flock lock[1];
364 #else
365 OVERLAPPED ovl[1];
366 uint len;
367 #endif
368
369         if( mode == BtLockRead || mode == BtLockWrite )
370                 off +=  1 * sizeof(*bt->page);  // use second segment
371
372         if( mode == BtLockParent )
373                 off +=  2 * sizeof(*bt->page);  // use third segment
374
375 #ifdef unix
376         memset (lock, 0, sizeof(lock));
377
378         lock->l_start = off;
379         lock->l_type = F_UNLCK;
380         lock->l_len = sizeof(*bt->page);
381         lock->l_whence = 0;
382
383         if( fcntl (bt->idx, F_SETLK, lock) < 0 )
384                 return bt->err = BTERR_lock;
385 #else
386         memset (ovl, 0, sizeof(ovl));
387         ovl->OffsetHigh = (uint)(off >> 32);
388         ovl->Offset = (uint)off;
389         len = sizeof(*bt->page);
390
391         //      use large offsets to
392         //      simulate advisory locking
393
394         ovl->OffsetHigh |= 0x80000000;
395
396         if( !UnlockFileEx (bt->idx, 0, len, 0, ovl) )
397                 return GetLastError(), bt->err = BTERR_lock;
398 #endif
399
400         return bt->err = 0;
401 }
402
403 //      close and release memory
404
405 void bt_close (BtDb *bt)
406 {
407 BtHash *hash;
408 #ifdef unix
409         // release mapped pages
410
411         if( hash = bt->lrufirst )
412                 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
413                 while(hash = hash->lrunext);
414
415         if ( bt->mem )
416                 free (bt->mem);
417         close (bt->idx);
418         free (bt->cache);
419         free (bt);
420 #else
421         if( hash = bt->lrufirst )
422           do
423           {
424                 FlushViewOfFile(hash->page, 0);
425                 UnmapViewOfFile(hash->page);
426                 CloseHandle(hash->hmap);
427           } while(hash = hash->lrunext);
428
429         if ( bt->mem)
430                 VirtualFree (bt->mem, 0, MEM_RELEASE);
431         FlushFileBuffers(bt->idx);
432         CloseHandle(bt->idx);
433         GlobalFree (bt->cache);
434         GlobalFree (bt);
435 #endif
436 }
437
438 //  open/create new btree
439 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
440 //              size of mapped page cache (e.g. 8192) or zero for no mapping.
441
442 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax, uint pgblk)
443 {
444 uint lvl, attr, cacheblk, last;
445 BtLock lockmode = BtLockWrite;
446 BtPage alloc;
447 off64_t size;
448 uint amt[1];
449 BtDb* bt;
450
451 #ifndef unix
452 SYSTEM_INFO sysinfo[1];
453 #endif
454
455 #ifdef unix
456         bt = malloc (sizeof(BtDb) + nodemax * sizeof(BtHash));
457         memset (bt, 0, sizeof(BtDb));
458
459         switch (mode & 0x7fff)
460         {
461         case BT_fl:
462         case BT_rw:
463                 bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
464                 break;
465
466         case BT_ro:
467         default:
468                 bt->idx = open ((char*)name, O_RDONLY);
469                 lockmode = BtLockRead;
470                 break;
471         }
472         if( bt->idx == -1 )
473                 return free(bt), NULL;
474         
475         if( nodemax )
476                 cacheblk = 4096;        // page size for unix
477         else
478                 cacheblk = 0;
479
480 #else
481         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb) + nodemax * sizeof(BtHash));
482         attr = FILE_ATTRIBUTE_NORMAL;
483         switch (mode & 0x7fff)
484         {
485         case BT_fl:
486                 attr |= FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING;
487
488         case BT_rw:
489                 bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
490                 break;
491
492         case BT_ro:
493         default:
494                 bt->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
495                 lockmode = BtLockRead;
496                 break;
497         }
498         if( bt->idx == INVALID_HANDLE_VALUE )
499                 return GlobalFree(bt), NULL;
500
501         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
502         GetSystemInfo(sysinfo);
503
504         if( nodemax )
505                 cacheblk = sysinfo->dwAllocationGranularity;
506         else
507                 cacheblk = 0;
508 #endif
509
510         // determine sanity of page size
511
512         if( bits > BT_maxbits )
513                 bits = BT_maxbits;
514         else if( bits < BT_minbits )
515                 bits = BT_minbits;
516
517         if ( bt_lockpage(bt, ALLOC_page, lockmode) )
518                 return bt_close (bt), NULL;
519
520 #ifdef unix
521         *amt = 0;
522
523         // read minimum page size to get root info
524
525         if( size = lseek (bt->idx, 0L, 2) ) {
526                 alloc = malloc (BT_minpage);
527                 pread(bt->idx, alloc, BT_minpage, 0);
528                 bits = alloc->bits;
529                 free (alloc);
530         } else if( mode == BT_ro )
531                 return bt_close (bt), NULL;
532 #else
533         size = GetFileSize(bt->idx, amt);
534
535         if( size || *amt ) {
536                 alloc = VirtualAlloc(NULL, BT_minpage, MEM_COMMIT, PAGE_READWRITE);
537                 if( !ReadFile(bt->idx, (char *)alloc, BT_minpage, amt, NULL) )
538                         return bt_close (bt), NULL;
539                 bits = alloc->bits;
540                 VirtualFree (alloc, 0, MEM_RELEASE);
541         } else if( mode == BT_ro )
542                 return bt_close (bt), NULL;
543 #endif
544
545         bt->page_size = 1 << bits;
546         bt->page_bits = bits;
547
548         bt->nodemax = nodemax;
549         bt->mode = mode;
550
551         // setup cache mapping
552
553         if( cacheblk ) {
554                 if( cacheblk < bt->page_size )
555                         cacheblk = bt->page_size;
556
557                 bt->hashsize = nodemax / 8;
558                 bt->hashmask = (cacheblk >> bits) - 1;
559                 bt->mapped_io = 1;
560         }
561
562         //      requested number of pages per memmap segment
563
564         if( cacheblk )
565           if( (1 << pgblk) > bt->hashmask )
566                 bt->hashmask = (1 << pgblk) - 1;
567
568         bt->seg_bits = 0;
569
570         while( (1 << bt->seg_bits) <= bt->hashmask )
571                 bt->seg_bits++;
572
573 #ifdef unix
574         bt->mem = malloc (8 *bt->page_size);
575         bt->cache = calloc (bt->hashsize, sizeof(ushort));
576 #else
577         bt->mem = VirtualAlloc(NULL, 8 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
578         bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, bt->hashsize * sizeof(ushort));
579 #endif
580         bt->frame = (BtPage)bt->mem;
581         bt->cursor = (BtPage)(bt->mem + bt->page_size);
582         bt->page = (BtPage)(bt->mem + 2 * bt->page_size);
583         bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size);
584         bt->temp = (BtPage)(bt->mem + 4 * bt->page_size);
585         bt->temp2 = (BtPage)(bt->mem + 5 * bt->page_size);
586         bt->parent = (BtPage)(bt->mem + 6 * bt->page_size);
587         bt->zero = (BtPage)(bt->mem + 7 * bt->page_size);
588
589         if( size || *amt ) {
590                 if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
591                         return bt_close (bt), NULL;
592
593                 return bt;
594         }
595
596         // initializes an empty b-tree with root page and page of leaves
597
598         memset (bt->alloc, 0, bt->page_size);
599         bt_putid(bt->alloc->right, MIN_lvl+1);
600         bt->alloc->bits = bt->page_bits;
601
602 #ifdef unix
603         if( write (bt->idx, bt->alloc, bt->page_size) < bt->page_size )
604                 return bt_close (bt), NULL;
605 #else
606         if( !WriteFile (bt->idx, (char *)bt->alloc, bt->page_size, amt, NULL) )
607                 return bt_close (bt), NULL;
608
609         if( *amt < bt->page_size )
610                 return bt_close (bt), NULL;
611 #endif
612
613         memset (bt->frame, 0, bt->page_size);
614         bt->frame->bits = bt->page_bits;
615         bt->frame->posted = 1;
616
617         for( lvl=MIN_lvl; lvl--; ) {
618                 slotptr(bt->frame, 1)->off = offsetof(struct BtPage_, fence);
619                 bt_putid(slotptr(bt->frame, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);               // next(lower) page number
620                 bt->frame->fence[0] = 2;
621                 bt->frame->fence[1] = 0xff;
622                 bt->frame->fence[2] = 0xff;
623                 bt->frame->min = bt->page_size;
624                 bt->frame->lvl = lvl;
625                 bt->frame->cnt = 1;
626                 bt->frame->act = 1;
627 #ifdef unix
628                 if( write (bt->idx, bt->frame, bt->page_size) < bt->page_size )
629                         return bt_close (bt), NULL;
630 #else
631                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
632                         return bt_close (bt), NULL;
633
634                 if( *amt < bt->page_size )
635                         return bt_close (bt), NULL;
636 #endif
637         }
638
639         // create empty page area by writing last page of first
640         // cache area (other pages are zeroed by O/S)
641
642         if( bt->mapped_io && bt->hashmask ) {
643                 memset(bt->frame, 0, bt->page_size);
644                 last = bt->hashmask;
645
646                 while( last < MIN_lvl + 1 )
647                         last += bt->hashmask + 1;
648 #ifdef unix
649                 pwrite(bt->idx, bt->frame, bt->page_size, last << bt->page_bits);
650 #else
651                 SetFilePointer (bt->idx, last << bt->page_bits, NULL, FILE_BEGIN);
652                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
653                         return bt_close (bt), NULL;
654                 if( *amt < bt->page_size )
655                         return bt_close (bt), NULL;
656 #endif
657         }
658
659         if( bt_unlockpage(bt, ALLOC_page, lockmode) )
660                 return bt_close (bt), NULL;
661
662         return bt;
663 }
664
665 //  compare two keys, returning > 0, = 0, or < 0
666 //  as the comparison value
667
668 int keycmp (BtKey key1, unsigned char *key2, uint len2)
669 {
670 uint len1 = key1->len;
671 int ans;
672
673         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
674                 return ans;
675
676         if( len1 > len2 )
677                 return 1;
678         if( len1 < len2 )
679                 return -1;
680
681         return 0;
682 }
683
684 //  Update current page of btree by writing file contents
685 //      or flushing mapped area to disk.
686
687 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
688 {
689 off64_t off = page_no << bt->page_bits;
690
691 #ifdef unix
692     if ( !bt->mapped_io )
693          if ( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
694                  return bt->err = BTERR_wrt;
695 #else
696 uint amt[1];
697         if ( !bt->mapped_io )
698         {
699                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
700                 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
701                         return GetLastError(), bt->err = BTERR_wrt;
702
703                 if( *amt < bt->page_size )
704                         return GetLastError(), bt->err = BTERR_wrt;
705         } 
706         else if ( bt->mode == BT_fl ) {
707                         FlushViewOfFile(page, bt->page_size);
708                         FlushFileBuffers(bt->idx);
709         }
710 #endif
711         return 0;
712 }
713
714 // find page in cache 
715
716 BtHash *bt_findhash(BtDb *bt, uid page_no)
717 {
718 BtHash *hash;
719 uint idx;
720
721         // compute cache block first page and hash idx 
722
723         page_no &= ~bt->hashmask;
724         idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
725
726         if( bt->cache[idx] ) 
727                 hash = bt->nodes + bt->cache[idx];
728         else
729                 return NULL;
730
731         do if( hash->page_no == page_no )
732                  break;
733         while(hash = hash->hashnext );
734
735         return hash;
736 }
737
738 // add page cache entry to hash index
739
740 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
741 {
742 uint idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
743 BtHash *hash;
744
745         if( bt->cache[idx] ) {
746                 node->hashnext = hash = bt->nodes + bt->cache[idx];
747                 hash->hashprev = node;
748         }
749
750         node->hashprev = NULL;
751         bt->cache[idx] = (ushort)(node - bt->nodes);
752 }
753
754 // remove cache entry from hash table
755
756 void bt_unlinkhash(BtDb *bt, BtHash *node)
757 {
758 uint idx = (uint)(node->page_no >> bt->seg_bits) % bt->hashsize;
759 BtHash *hash;
760
761         // unlink node
762         if( hash = node->hashprev )
763                 hash->hashnext = node->hashnext;
764         else if( hash = node->hashnext )
765                 bt->cache[idx] = (ushort)(hash - bt->nodes);
766         else
767                 bt->cache[idx] = 0;
768
769         if( hash = node->hashnext )
770                 hash->hashprev = node->hashprev;
771 }
772
773 // add cache page to lru chain and map pages
774
775 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
776 {
777 int flag;
778 off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
779 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
780 BtHash *node;
781
782         memset(hash, 0, sizeof(BtHash));
783         hash->page_no = (page_no & ~bt->hashmask);
784         bt_linkhash(bt, hash, page_no);
785
786         if( node = hash->lrunext = bt->lrufirst )
787                 node->lruprev = hash;
788         else
789                 bt->lrulast = hash;
790
791         bt->lrufirst = hash;
792
793 #ifdef unix
794         flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
795         hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
796         if( hash->page == MAP_FAILED )
797                 return bt->err = BTERR_map, (BtPage)NULL;
798
799 #else
800         flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
801         hash->hmap = CreateFileMapping(bt->idx, NULL, flag,     (DWORD)(limit >> 32), (DWORD)limit, NULL);
802         if( !hash->hmap )
803                 return bt->err = BTERR_map, NULL;
804
805         flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
806         hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
807         if( !hash->page )
808                 return bt->err = BTERR_map, NULL;
809 #endif
810
811         return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
812 }
813
814 //      find or place requested page in page-cache
815 //      return memory address where page is located.
816
817 BtPage bt_hashpage(BtDb *bt, uid page_no)
818 {
819 BtHash *hash, *node, *next;
820 BtPage page;
821
822         // find page in cache and move to top of lru list  
823
824         if( hash = bt_findhash(bt, page_no) ) {
825                 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
826                 // swap node in lru list
827                 if( node = hash->lruprev ) {
828                         if( next = node->lrunext = hash->lrunext )
829                                 next->lruprev = node;
830                         else
831                                 bt->lrulast = node;
832
833                         if( next = hash->lrunext = bt->lrufirst )
834                                 next->lruprev = hash;
835                         else
836                                 return bt->err = BTERR_hash, (BtPage)NULL;
837
838                         hash->lruprev = NULL;
839                         bt->lrufirst = hash;
840                 }
841                 return page;
842         }
843
844         // map pages and add to cache entry
845
846         if( bt->nodecnt < bt->nodemax ) {
847                 hash = bt->nodes + ++bt->nodecnt;
848                 return bt_linklru(bt, hash, page_no);
849         }
850
851         // hash table is already full, replace last lru entry from the cache
852
853         if( hash = bt->lrulast ) {
854                 // unlink from lru list
855                 if( node = bt->lrulast = hash->lruprev )
856                         node->lrunext = NULL;
857                 else
858                         return bt->err = BTERR_hash, (BtPage)NULL;
859
860 #ifdef unix
861                 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
862 #else
863                 FlushViewOfFile(hash->page, 0);
864                 UnmapViewOfFile(hash->page);
865                 CloseHandle(hash->hmap);
866 #endif
867                 // unlink from hash table
868
869                 bt_unlinkhash(bt, hash);
870
871                 // map and add to cache
872
873                 return bt_linklru(bt, hash, page_no);
874         }
875
876         return bt->err = BTERR_hash, (BtPage)NULL;
877 }
878
879 //  map a btree page onto current page
880
881 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
882 {
883 off64_t off = page_no << bt->page_bits;
884 #ifndef unix
885 int amt[1];
886 #endif
887         
888         if( bt->mapped_io ) {
889                 bt->err = 0;
890                 *page = bt_hashpage(bt, page_no);
891                 return bt->err;
892         }
893 #ifdef unix
894         if ( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
895                 return bt->err = BTERR_map;
896 #else
897         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
898
899         if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
900                 return bt->err = BTERR_map;
901
902         if( *amt <  bt->page_size )
903                 return bt->err = BTERR_map;
904 #endif
905         return 0;
906 }
907
908 //      allocate a new page and write page into it
909
910 uid bt_newpage(BtDb *bt, BtPage page)
911 {
912 uid new_page;
913 char *pmap;
914 int reuse;
915
916         // lock page zero
917
918         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite) )
919                 return 0;
920
921         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
922                 return 0;
923
924         // use empty chain first
925         // else allocate empty page
926
927         if( new_page = bt_getid(bt->alloc[1].right) ) {
928                 if( bt_mappage (bt, &bt->temp, new_page) )
929                         return 0;       // don't unlock on error
930                 memcpy(bt->alloc[1].right, bt->temp->right, BtId);
931                 reuse = 1;
932         } else {
933                 new_page = bt_getid(bt->alloc->right);
934                 bt_putid(bt->alloc->right, new_page+1);
935                 reuse = 0;
936         }
937
938         if( bt_update(bt, bt->alloc, ALLOC_page) )
939                 return 0;       // don't unlock on error
940
941         if( !bt->mapped_io ) {
942                 if( bt_update(bt, page, new_page) )
943                         return 0;       //don't unlock on error
944
945                 // unlock page zero 
946
947                 if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
948                         return 0;
949
950                 return new_page;
951         }
952
953 #ifdef unix
954         if ( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
955                 return bt->err = BTERR_wrt, 0;
956
957         // if writing first page of hash block, zero last page in the block
958
959         if ( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
960         {
961                 // use temp buffer to write zeros
962                 memset(bt->zero, 0, bt->page_size);
963                 if ( pwrite(bt->idx,bt->zero, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
964                         return bt->err = BTERR_wrt, 0;
965         }
966 #else
967         //      bring new page into page-cache and copy page.
968         //      this will extend the file into the new pages.
969
970         if( !(pmap = (char*)bt_hashpage(bt, new_page & ~bt->hashmask)) )
971                 return 0;
972
973         memcpy(pmap+((new_page & bt->hashmask) << bt->page_bits), page, bt->page_size);
974 #endif
975
976         // unlock page zero 
977
978         if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
979                 return 0;
980
981         return new_page;
982 }
983
984 //  find slot in page for given key at a given level
985 //      return 0 if beyond fence value
986
987 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
988 {
989 uint diff, higher = set->page->cnt, low = 1, slot;
990
991         //      make stopper key an infinite fence value
992
993         if( bt_getid (set->page->right) )
994                 higher++;
995
996         //      low is the lowest candidate, higher is already
997         //      tested as .ge. the given key, loop ends when they meet
998
999         while( diff = higher - low ) {
1000                 slot = low + ( diff >> 1 );
1001                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1002                         low = slot + 1;
1003                 else
1004                         higher = slot;
1005         }
1006
1007         if( higher <= set->page->cnt )
1008                 return higher;
1009
1010         //      if leaf page, compare against fence value
1011
1012         //      return zero if key is on right link page
1013         //      or return slot beyond last key
1014
1015         if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 )
1016                 return 0;
1017
1018         return higher;
1019 }
1020
1021 //  find and load page at given level for given key
1022 //      leave page rd or wr locked as requested
1023
1024 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock)
1025 {
1026 uid page_no = ROOT_page, prevpage = 0;
1027 uint drill = 0xff, slot;
1028 uint mode, prevmode;
1029
1030   //  start at root of btree and drill down
1031
1032   do {
1033         // determine lock mode of drill level
1034         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1035
1036         set->page_no = page_no;
1037
1038         // obtain access lock using lock chaining
1039
1040         if( page_no > ROOT_page )
1041           if( bt_lockpage(bt, page_no, BtLockAccess) )
1042                 return 0;                                                                       
1043
1044         if( prevpage )
1045           if( bt_unlockpage(bt, prevpage, prevmode) )
1046                 return 0;
1047
1048         // obtain read lock using lock chaining
1049
1050         if( bt_lockpage(bt, page_no, mode) )
1051                 return 0;                                                                       
1052
1053         if( page_no > ROOT_page )
1054           if( bt_unlockpage(bt, page_no, BtLockAccess) )
1055                 return 0;                                                                       
1056
1057         //      map/obtain page contents
1058
1059         if( bt_mappage (bt, &set->page, page_no) )
1060                 return 0;
1061
1062         // re-read and re-lock root after determining actual level of root
1063
1064         if( set->page->lvl != drill) {
1065                 if ( page_no != ROOT_page )
1066                         return bt->err = BTERR_struct, 0;
1067                         
1068                 drill = set->page->lvl;
1069
1070                 if( lock == BtLockWrite && drill == lvl )
1071                   if( bt_unlockpage(bt, page_no, mode) )
1072                         return 0;
1073                   else
1074                         continue;
1075         }
1076
1077         prevpage = page_no;
1078         prevmode = mode;
1079
1080         //      if page is being deleted and we should continue right
1081
1082         if( set->page->kill && set->page->goright ) {
1083                 page_no = bt_getid (set->page->right);
1084                 continue;
1085         }
1086
1087         //      otherwise, wait for deleted node to clear
1088
1089         if( set->page->kill ) {
1090                 if( bt_unlockpage(bt, set->page_no, mode) )
1091                         return bt->err;
1092                 page_no = ROOT_page;
1093                 prevpage = 0;
1094                 drill = 0xff;
1095 #ifdef unix
1096                 sched_yield();
1097 #else
1098                 SwitchToThread();
1099 #endif
1100                 continue;
1101         }
1102
1103         //  find key on page at this level
1104         //  and descend to requested level
1105
1106         if( slot = bt_findslot (set, key, len) ) {
1107           if( drill == lvl )
1108                 return slot;
1109
1110           if( slot > set->page->cnt )
1111                 return bt->err = BTERR_struct, 0;
1112
1113         //  if drilling down, find next active key
1114
1115           while( slotptr(set->page, slot)->dead )
1116                 if( slot++ < set->page->cnt )
1117                         continue;
1118                 else
1119                         return bt->err = BTERR_struct, 0;
1120
1121           page_no = bt_getid(slotptr(set->page, slot)->id);
1122           drill--;
1123           continue;
1124         }
1125
1126         //  or slide right into next page
1127         //  (slide left from deleted page)
1128
1129         page_no = bt_getid(set->page->right);
1130
1131   } while( page_no );
1132
1133   // return error on end of right chain
1134
1135   bt->err = BTERR_struct;
1136   return 0;     // return error
1137 }
1138
1139 // drill down fixing fence values for left sibling tree
1140
1141 //  call with set write locked mapped to bt->temp
1142 //      return with set unlocked & unpinned.
1143
1144 BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence)
1145 {
1146 unsigned char oldfence[256];
1147 BtPageSet next[1];
1148 uid right;
1149 int chk;
1150
1151   next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
1152   memcpy (oldfence, set->page->fence, 256);
1153   next->page = bt->temp2;
1154   bt->temp2 = bt->temp;
1155   bt->temp = next->page;
1156
1157   while( !set->page->kill && set->page->lvl ) {
1158         if( bt_lockpage (bt, next->page_no, BtLockParent) )
1159                 return bt->err;
1160         if( bt_lockpage (bt, next->page_no, BtLockAccess) )
1161                 return bt->err;
1162         if( bt_lockpage (bt, next->page_no, BtLockWrite) )
1163                 return bt->err;
1164         if( bt_unlockpage (bt, next->page_no, BtLockAccess) )
1165                 return bt->err;
1166
1167         if( bt_mappage (bt, &next->page, next->page_no) )
1168                 return bt->err;
1169
1170         chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence);
1171
1172         if( chk < 0 ) {
1173           right = bt_getid (next->page->right);
1174           if( bt_unlockpage (bt, next->page_no, BtLockWrite) )
1175                 return bt->err;
1176           if( bt_unlockpage (bt, next->page_no, BtLockParent) )
1177                 return bt->err;
1178           next->page_no = right;
1179           continue;
1180         }
1181
1182         if( chk > 0 )
1183           return bt->err = BTERR_struct;
1184
1185         if( bt_fixfences (bt, next, newfence) )
1186           return bt->err;
1187
1188         break;
1189   }
1190
1191   set->page = bt->temp;
1192
1193   if( bt_mappage (bt, &set->page, set->page_no) )
1194         return bt->err;
1195
1196   memcpy (set->page->fence, newfence, 256);
1197
1198   if( bt_update(bt, set->page, set->page_no) )
1199         return bt->err;
1200   if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
1201         return bt->err;
1202   if( bt_unlockpage (bt, set->page_no, BtLockParent) )
1203         return bt->err;
1204   return 0;
1205 }
1206
1207
1208 //      return page to free list
1209 //      page must be delete & write locked
1210
1211 BTERR bt_freepage (BtDb *bt, BtPageSet *set)
1212 {
1213         //      lock & map allocation page
1214
1215         if( bt_lockpage (bt, ALLOC_page, BtLockWrite) )
1216                 return bt->err;
1217
1218         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
1219                 return bt->err;
1220
1221         //      store chain in second right
1222         bt_putid(set->page->right, bt_getid(bt->alloc[1].right));
1223         bt_putid(bt->alloc[1].right, set->page_no);
1224         set->page->free = 1;
1225
1226         if( bt_update(bt, bt->alloc, ALLOC_page) )
1227                 return bt->err;
1228         if( bt_update(bt, set->page, set->page_no) )
1229                 return bt->err;
1230
1231         // unlock page zero 
1232
1233         if( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
1234                 return bt->err;
1235
1236         //  remove write lock on deleted node
1237
1238         if( bt_unlockpage(bt, set->page_no, BtLockWrite) )
1239                 return bt->err;
1240
1241         return bt_unlockpage (bt, set->page_no, BtLockDelete);
1242 }
1243
1244 //      remove the root level by promoting its only child
1245
1246 BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child)
1247 {
1248 uid next = 0;
1249
1250   do {
1251         if( next ) {
1252           if( bt_lockpage (bt, next, BtLockDelete) )
1253                 return bt->err;
1254           if( bt_lockpage (bt, next, BtLockWrite) )
1255                 return bt->err;
1256
1257           if( bt_mappage (bt, &child->page, next) )
1258                 return bt->err;
1259
1260           child->page_no = next;
1261         }
1262
1263         memcpy (root->page, child->page, bt->page_size);
1264         next = bt_getid (slotptr(child->page, child->page->cnt)->id);
1265
1266         if( bt_freepage (bt, child) )
1267                 return bt->err;
1268   } while( root->page->lvl > 1 && root->page->cnt == 1 );
1269
1270   if( bt_update (bt, root->page, ROOT_page) )
1271         return bt->err;
1272
1273   return bt_unlockpage (bt, ROOT_page, BtLockWrite);
1274 }
1275
1276 //  pull right page over ourselves in simple merge
1277
1278 BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx)
1279 {
1280         //  install ourselves as child page
1281         //      and delete ourselves from parent
1282
1283         bt_putid (slotptr(parent->page, idx)->id, set->page_no);
1284         slotptr(parent->page, slot)->dead = 1;
1285         parent->page->act--;
1286
1287         //      collapse any empty slots
1288
1289         while( idx = parent->page->cnt - 1 )
1290           if( slotptr(parent->page, idx)->dead ) {
1291                 *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1);
1292                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1293           } else
1294                   break;
1295
1296         memcpy (set->page, right->page, bt->page_size);
1297
1298         if( bt_unlockpage (bt, right->page_no, BtLockParent) )
1299                 return bt->err;
1300
1301         if( bt_freepage (bt, right) )
1302                 return bt->err;
1303
1304         //      do we need to remove a btree level?
1305         //      (leave the first page of leaves alone)
1306
1307         if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
1308           if( set->page->lvl )
1309                 return bt_removeroot (bt, parent, set);
1310
1311         if( bt_update (bt, parent->page, parent->page_no) )
1312                 return bt->err;
1313
1314         if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1315                 return bt->err;
1316
1317         if( bt_update (bt, set->page, set->page_no) )
1318                 return bt->err;
1319
1320         if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
1321                 return bt->err;
1322
1323         if( bt_unlockpage (bt, set->page_no, BtLockDelete) )
1324                 return bt->err;
1325
1326         return 0;
1327 }
1328
1329 //      remove both child and parent from the btree
1330 //      from the fence position in the parent
1331
1332 BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet  *right, BtPageSet *rparent, uint lvl)
1333 {
1334 unsigned char pagefence[256];
1335 uint idx;
1336
1337         //  pull right sibling over ourselves and unlock
1338
1339         memcpy (child->page, right->page, bt->page_size);
1340
1341         if( bt_update(bt, child->page, child->page_no) )
1342                 return bt->err;
1343
1344         if( bt_unlockpage (bt, child->page_no, BtLockWrite) )
1345                 return bt->err;
1346
1347         //  install ourselves into right link of old right page
1348
1349         bt_putid (right->page->right, child->page_no);
1350         right->page->goright = 1;               // tell bt_loadpage to go right to us
1351         right->page->kill = 1;
1352
1353         if( bt_update(bt, right->page, right->page_no) )
1354                 return bt->err;
1355
1356         if( bt_unlockpage (bt, right->page_no, BtLockWrite) )
1357                 return bt->err;
1358
1359         //      remove our slot from our parent
1360         //      signal to move right
1361
1362         parent->page->goright = 1;              // tell bt_findslot to go right to rparent
1363         parent->page->kill = 1;
1364         parent->page->act--;
1365
1366         //      redirect right page pointer in right parent to us
1367
1368         for( idx = 0; idx++ < rparent->page->cnt; )
1369           if( !slotptr(rparent->page, idx)->dead )
1370                 break;
1371
1372         if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no )
1373                 return bt->err = BTERR_struct;
1374
1375         bt_putid (slotptr(rparent->page, idx)->id, child->page_no);
1376
1377         if( bt_update (bt, rparent->page, rparent->page_no) )
1378                 return bt->err;
1379
1380         if( bt_unlockpage (bt, rparent->page_no, BtLockWrite) )
1381                 return bt->err;
1382
1383         //      save parent page fence value
1384
1385         memcpy (pagefence, parent->page->fence, 256);
1386
1387         if( bt_update (bt, parent->page, parent->page_no) )
1388                 return bt->err;
1389         if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1390                 return bt->err;
1391
1392         return bt_removepage (bt, parent->page_no, lvl, pagefence);
1393 }
1394
1395 //      remove page from btree
1396 //      call with page unlocked
1397 //      returns with page on free list
1398
1399 BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence)
1400 {
1401 BtPageSet parent[1], rparent[1], sibling[1], set[1];
1402 unsigned char newfence[256];
1403 uint slot, idx;
1404 BtKey ptr;
1405
1406   parent->page = bt->parent;
1407   set->page_no = page_no;
1408   set->page = bt->page;
1409
1410         //      load and lock our parent
1411
1412   while( 1 ) {
1413         if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
1414                 return bt->err;
1415
1416         //      wait until we are posted in our parent
1417
1418         if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) {
1419                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1420                         return bt->err;
1421 #ifdef unix
1422                 sched_yield();
1423 #else
1424                 SwitchToThread();
1425 #endif
1426                 continue;
1427         }
1428
1429         //      can we do a simple merge entirely
1430         //      between siblings on the parent page?
1431
1432         if( slot < parent->page->cnt ) {
1433                 // find our right neighbor
1434                 //      right must exist because the stopper prevents
1435                 //      the rightmost page from deleting
1436
1437                 for( idx = slot; idx++ < parent->page->cnt; )
1438                   if( !slotptr(parent->page, idx)->dead )
1439                         break;
1440
1441                 sibling->page_no = bt_getid (slotptr (parent->page, idx)->id);
1442
1443                 if( bt_lockpage (bt, set->page_no, BtLockDelete) )
1444                         return bt->err;
1445
1446                 if( bt_lockpage (bt, set->page_no, BtLockWrite) )
1447                         return bt->err;
1448
1449                 if( bt_mappage (bt, &set->page, set->page_no) )
1450                         return bt->err;
1451
1452                 //      merge right if sibling shows up in
1453                 //  our parent and is not being killed
1454
1455                 if( sibling->page_no == bt_getid (set->page->right) ) {
1456                   if( bt_lockpage (bt, sibling->page_no, BtLockParent) )
1457                         return bt->err;
1458
1459                   if( bt_lockpage (bt, sibling->page_no, BtLockDelete) )
1460                         return bt->err;
1461
1462                   if( bt_lockpage (bt, sibling->page_no, BtLockWrite) )
1463                         return bt->err;
1464
1465                   sibling->page = bt->temp;
1466
1467                   if( bt_mappage (bt, &sibling->page, sibling->page_no) )
1468                         return bt->err;
1469
1470                   if( !sibling->page->kill )
1471                         return bt_mergeright(bt, set, parent, sibling, slot, idx);
1472
1473                   //  try again later
1474
1475                   if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) )
1476                         return bt->err;
1477                 }
1478
1479                 if( bt_unlockpage (bt, set->page_no, BtLockDelete) )
1480                         return bt->err;
1481
1482                 if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
1483                         return bt->err;
1484
1485                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1486                         return bt->err;
1487 #ifdef linux
1488                 sched_yield ();
1489 #else
1490                 SwitchToThread();
1491 #endif
1492                 continue;
1493         }
1494
1495         //  find our left neighbor in our parent page
1496
1497         for( idx = slot; --idx; )
1498           if( !slotptr(parent->page, idx)->dead )
1499                 break;
1500
1501         //      if no left neighbor, delete ourselves and our parent
1502
1503         if( !idx ) {
1504                 if( bt_lockpage (bt, set->page_no, BtLockAccess) )
1505                         return bt->err;
1506
1507                 if( bt_lockpage (bt, set->page_no, BtLockWrite) )
1508                         return bt->err;
1509
1510                 if( bt_unlockpage (bt, set->page_no, BtLockAccess) )
1511                         return bt->err;
1512
1513                 if( bt_mappage (bt, &set->page, set->page_no) )
1514                         return bt->err;
1515
1516                 rparent->page_no = bt_getid (parent->page->right);
1517                 rparent->page = bt->temp;
1518
1519                 if( bt_lockpage (bt, rparent->page_no, BtLockAccess) )
1520                         return bt->err;
1521
1522                 if( bt_lockpage (bt, rparent->page_no, BtLockWrite) )
1523                         return bt->err;
1524
1525                 if( bt_unlockpage (bt, rparent->page_no, BtLockAccess) )
1526                         return bt->err;
1527
1528                 if( bt_mappage (bt, &rparent->page, rparent->page_no) )
1529                         return bt->err;
1530
1531                 if( !rparent->page->kill ) {
1532                   sibling->page_no = bt_getid (set->page->right);
1533
1534                   if( bt_lockpage (bt, sibling->page_no, BtLockAccess) )
1535                         return bt->err;
1536
1537                   if( bt_lockpage (bt, sibling->page_no, BtLockWrite) )
1538                         return bt->err;
1539
1540                   if( bt_unlockpage (bt, sibling->page_no, BtLockAccess) )
1541                         return bt->err;
1542
1543                   sibling->page = bt->temp2;
1544
1545                   if( bt_mappage (bt, &sibling->page, sibling->page_no) )
1546                         return bt->err;
1547
1548                   if( !sibling->page->kill )
1549                         return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1);
1550
1551                   //  try again later
1552
1553                   if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) )
1554                         return bt->err;
1555                 }
1556
1557                 if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
1558                         return bt->err;
1559
1560                 if( bt_unlockpage (bt, rparent->page_no, BtLockWrite) )
1561                         return bt->err;
1562
1563                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1564                         return bt->err;
1565 #ifdef linux
1566                 sched_yield();
1567 #else
1568                 SwitchToThread();
1569 #endif
1570                 continue;
1571         }
1572
1573         // redirect parent to our left sibling
1574         // lock and map our left sibling's page
1575
1576         sibling->page_no = bt_getid (slotptr(parent->page, idx)->id);
1577         sibling->page = bt->temp;
1578
1579         //      wait our turn on fence key maintenance
1580
1581         if( bt_lockpage(bt, sibling->page_no, BtLockParent) )
1582                 return bt->err;
1583
1584         if( bt_lockpage(bt, sibling->page_no, BtLockAccess) )
1585                 return bt->err;
1586
1587         if( bt_lockpage(bt, sibling->page_no, BtLockWrite) )
1588                 return bt->err;
1589
1590         if( bt_unlockpage(bt, sibling->page_no, BtLockAccess) )
1591                 return bt->err;
1592
1593         if( bt_mappage (bt, &sibling->page, sibling->page_no) )
1594                 return bt->err;
1595
1596         //  wait until sibling is in our parent
1597
1598         if( bt_getid (sibling->page->right) != set->page_no ) {
1599                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1600                         return bt->err;
1601                 if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) )
1602                         return bt->err;
1603                 if( bt_unlockpage (bt, sibling->page_no, BtLockParent) )
1604                         return bt->err;
1605 #ifdef linux
1606                 sched_yield();
1607 #else
1608                 SwitchToThread();
1609 #endif
1610                 continue;
1611         }
1612
1613         //      map page being killed
1614
1615         if( bt_lockpage (bt, set->page_no, BtLockDelete) )
1616                 return bt->err;
1617
1618         if( bt_lockpage (bt, set->page_no, BtLockWrite) )
1619                 return bt->err;
1620
1621         if( bt_mappage (bt, &set->page, set->page_no) )
1622                 return bt->err;
1623
1624         //      delete our left sibling from parent
1625
1626         slotptr(parent->page,idx)->dead = 1;
1627         parent->page->dirty = 1;
1628         parent->page->act--;
1629
1630         //      redirect our parent slot to our left sibling
1631
1632         bt_putid (slotptr(parent->page, slot)->id, sibling->page_no);
1633         memcpy (sibling->page->right, set->page->right, BtId);
1634
1635         if( bt_update (bt, sibling->page, sibling->page_no) )
1636                 return bt->err;
1637
1638         //      collapse dead slots from parent
1639
1640         while( idx = parent->page->cnt - 1 )
1641           if( slotptr(parent->page, idx)->dead ) {
1642                 *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt);
1643                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1644           } else
1645                   break;
1646
1647         //  update parent page
1648
1649         if( bt_update (bt, parent->page, parent->page_no) )
1650                 return bt->err;
1651
1652         //  free our original page
1653
1654         if( bt_freepage (bt, set) )
1655                 return bt->err;
1656
1657         //      go down the left node's fence keys to the leaf level
1658         //      and update the fence keys in each page
1659
1660         memcpy (newfence, parent->page->fence, 256);
1661
1662         if( bt_fixfences (bt, sibling, newfence) )
1663                 return bt->err;
1664
1665         //      promote sibling as new root?
1666
1667         if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
1668          if( sibling->page->lvl ) {
1669           if( bt_lockpage (bt, sibling->page_no, BtLockDelete) )
1670                 return bt->err;
1671
1672           if( bt_lockpage (bt, sibling->page_no, BtLockWrite) )
1673                 return bt->err;
1674
1675           if( bt_mappage (bt, &sibling->page, set->page_no) )
1676                 return bt->err;
1677
1678           return bt_removeroot (bt, parent, sibling);
1679          }
1680
1681         return bt_unlockpage (bt, parent->page_no, BtLockWrite);
1682   }
1683 }
1684
1685 //  find and delete key on page by marking delete flag bit
1686 //  when page becomes empty, delete it
1687
1688 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
1689 {
1690 unsigned char pagefence[256];
1691 uint slot, found, idx;
1692 BtPageSet set[1];
1693 BtKey ptr;
1694
1695         set->page = bt->page;
1696
1697         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) )
1698                 ptr = keyptr(set->page, slot);
1699         else
1700                 return bt->err;
1701
1702         // if key is found delete it, otherwise ignore request
1703
1704         if( found = slot <= set->page->cnt )
1705           if( found = !keycmp (ptr, key, len) )
1706                 if( found = slotptr(set->page, slot)->dead == 0 ) {
1707                   slotptr(set->page,slot)->dead = 1;
1708                   set->page->dirty = 1;
1709                   set->page->act--;
1710
1711                   //    collapse empty slots
1712
1713                   while( idx = set->page->cnt - 1 )
1714                         if( slotptr(set->page, idx)->dead ) {
1715                           *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1716                           memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1717                         } else
1718                                 break;
1719                 }
1720
1721         if( set->page->act ) {
1722                 if( bt_update(bt, set->page, set->page_no) )
1723                         return bt->err;
1724                 bt->found = found;
1725                 return bt_unlockpage (bt, set->page_no, BtLockWrite);
1726         }
1727
1728         // delete page when empty
1729
1730         memcpy (pagefence, set->page->fence, 256);
1731         set->page->kill = 1;
1732
1733         if( bt_update(bt, set->page, set->page_no) )
1734                 return bt->err;
1735
1736         if( bt_unlockpage(bt, set->page_no, BtLockWrite) )
1737                 return bt->err;
1738
1739         if( bt_removepage (bt, set->page_no, 0, pagefence) )
1740                 return bt->err;
1741
1742         bt->found = found;
1743         return 0;
1744 }
1745
1746 //      find key in leaf level and return row-id
1747
1748 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1749 {
1750 BtPageSet set[1];
1751 uint  slot;
1752 uid id = 0;
1753 BtKey ptr;
1754
1755         set->page = bt->page;
1756
1757         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
1758                 ptr = keyptr(set->page, slot);
1759         else
1760                 return 0;
1761
1762         // if key exists, return row-id
1763         //      otherwise return 0
1764
1765         if( slot <= set->page->cnt )
1766           if( !keycmp (ptr, key, len) )
1767                 id = bt_getid(slotptr(set->page,slot)->id);
1768
1769         if ( bt_unlockpage(bt, set->page_no, BtLockRead) )
1770                 return 0;
1771
1772         return id;
1773 }
1774
1775 //      check page for space available,
1776 //      clean if necessary and return
1777 //      0 - page needs splitting
1778 //      >0 - new slot value
1779  
1780 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
1781 {
1782 uint nxt = bt->page_size, off;
1783 uint cnt = 0, idx = 0;
1784 uint max = page->cnt;
1785 uint newslot = max;
1786 BtKey key;
1787
1788         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1789                 return slot;
1790
1791         //      skip cleanup if nothing to reclaim
1792
1793         if( !page->dirty )
1794                 return 0;
1795
1796         memcpy (bt->frame, page, bt->page_size);
1797
1798         // skip page info and set rest of page to zero
1799
1800         memset (page+1, 0, bt->page_size - sizeof(*page));
1801         page->dirty = 0;
1802         page->act = 0;
1803
1804         while( cnt++ < max ) {
1805                 if( cnt == slot )
1806                         newslot = idx + 1;
1807                 if( slotptr(bt->frame,cnt)->dead )
1808                         continue;
1809
1810                 off = slotptr(bt->frame,cnt)->off;
1811
1812                 // copy key
1813
1814                 if( off >= sizeof(*page) ) {
1815                         key = keyptr(bt->frame, cnt);
1816                         off = nxt -= key->len + 1;
1817                         memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1818                 }
1819
1820                 // copy slot
1821
1822                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1823                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1824                 slotptr(page, idx)->off = off;
1825                 page->act++;
1826         }
1827         page->min = nxt;
1828         page->cnt = idx;
1829
1830         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1831                 return newslot;
1832
1833         return 0;
1834 }
1835
1836 // split the root and raise the height of the btree
1837
1838 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2)
1839 {
1840 unsigned char leftkey[256];
1841 uint nxt = bt->page_size;
1842 uid new_page;
1843
1844         //  Obtain an empty page to use, and copy the current
1845         //  root contents into it, e.g. lower keys
1846
1847         memcpy (leftkey, root->page->fence, 256);
1848         root->page->posted = 1;
1849
1850         if( !(new_page = bt_newpage(bt, root->page)) )
1851                 return bt->err;
1852
1853         // preserve the page info at the bottom
1854         // of higher keys and set rest to zero
1855
1856         memset(root->page+1, 0, bt->page_size - sizeof(*root->page));
1857         memset(root->page->fence, 0, 256);
1858         root->page->fence[0] = 2;
1859         root->page->fence[1] = 0xff;
1860         root->page->fence[2] = 0xff;
1861
1862         // insert new page fence key on newroot page
1863
1864         nxt -= *leftkey + 1;
1865         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1866         bt_putid(slotptr(root->page, 1)->id, new_page);
1867         slotptr(root->page, 1)->off = nxt;
1868         
1869         // insert stopper key on newroot page
1870         // and increase the root height
1871
1872         bt_putid(slotptr(root->page, 2)->id, page_no2);
1873         slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence);
1874
1875         bt_putid(root->page->right, 0);
1876         root->page->min = nxt;          // reset lowest used offset and key count
1877         root->page->cnt = 2;
1878         root->page->act = 2;
1879         root->page->lvl++;
1880
1881         // update and release root
1882
1883         if( bt_update(bt, root->page, root->page_no) )
1884                 return bt->err;
1885
1886         return bt_unlockpage(bt, root->page_no, BtLockWrite);
1887 }
1888
1889 //  split already locked full node
1890 //      return unlocked.
1891
1892 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1893 {
1894 uint cnt = 0, idx = 0, max, nxt = bt->page_size, off;
1895 unsigned char fencekey[256];
1896 uint lvl = set->page->lvl;
1897 uid right;
1898 BtKey key;
1899
1900         //  split higher half of keys to bt->frame
1901
1902         memset (bt->frame, 0, bt->page_size);
1903         max = set->page->cnt;
1904         cnt = max / 2;
1905         idx = 0;
1906
1907         while( cnt++ < max ) {
1908                 if( !lvl || cnt < max ) {
1909                         key = keyptr(set->page, cnt);
1910                         off = nxt -= key->len + 1;
1911                         memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1912                 } else
1913                         off = offsetof(struct BtPage_, fence);
1914
1915                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
1916                 slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
1917                 slotptr(bt->frame, idx)->off = off;
1918                 bt->frame->act++;
1919         }
1920
1921         if( set->page_no == ROOT_page )
1922                 bt->frame->posted = 1;
1923
1924         memcpy (bt->frame->fence, set->page->fence, 256);
1925         bt->frame->bits = bt->page_bits;
1926         bt->frame->min = nxt;
1927         bt->frame->cnt = idx;
1928         bt->frame->lvl = lvl;
1929
1930         // link right node
1931
1932         if( set->page_no > ROOT_page )
1933                 memcpy (bt->frame->right, set->page->right, BtId);
1934
1935         //      get new free page and write higher keys to it.
1936
1937         if( !(right = bt_newpage(bt, bt->frame)) )
1938                 return bt->err;
1939
1940         //      update lower keys to continue in old page
1941
1942         memcpy (bt->frame, set->page, bt->page_size);
1943         memset (set->page+1, 0, bt->page_size - sizeof(*set->page));
1944         nxt = bt->page_size;
1945         set->page->posted = 0;
1946         set->page->dirty = 0;
1947         set->page->act = 0;
1948         cnt = 0;
1949         idx = 0;
1950
1951         //  assemble page of smaller keys
1952
1953         while( cnt++ < max / 2 ) {
1954                 key = keyptr(bt->frame, cnt);
1955
1956                 if( !lvl || cnt < max / 2 ) {
1957                         off = nxt -= key->len + 1;
1958                         memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
1959                 } else 
1960                         off = offsetof(struct BtPage_, fence);
1961
1962                 memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1963                 slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1964                 slotptr(set->page, idx)->off = off;
1965                 set->page->act++;
1966         }
1967
1968         // install fence key for smaller key page
1969
1970         memset(set->page->fence, 0, 256);
1971         memcpy(set->page->fence, key, key->len + 1);
1972
1973         bt_putid(set->page->right, right);
1974         set->page->min = nxt;
1975         set->page->cnt = idx;
1976
1977         // if current page is the root page, split it
1978
1979         if( set->page_no == ROOT_page )
1980                 return bt_splitroot (bt, set, right);
1981
1982         if( bt_update (bt, set->page, set->page_no) )
1983                 return bt->err; 
1984
1985         if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
1986                 return bt->err;
1987
1988         // insert new fences in their parent pages
1989
1990         while( 1 ) {
1991                 if( bt_lockpage (bt, set->page_no, BtLockParent) )
1992                         return bt->err;
1993
1994                 if( bt_lockpage (bt, set->page_no, BtLockWrite) )
1995                         return bt->err;
1996
1997                 if( bt_mappage (bt, &set->page, set->page_no) )
1998                         return bt->err;
1999
2000                 memcpy (fencekey, set->page->fence, 256);
2001                 right = bt_getid (set->page->right);
2002
2003                 if( set->page->posted ) {
2004                   if( bt_unlockpage (bt, set->page_no, BtLockParent) )
2005                         return bt->err;
2006                         
2007                   return bt_unlockpage (bt, set->page_no, BtLockWrite);
2008                 }
2009
2010                 set->page->posted = 1;
2011
2012                 if( bt_update (bt, set->page, set->page_no) )
2013                         return bt->err; 
2014
2015                 if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
2016                         return bt->err;
2017
2018                 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, set->page_no, time(NULL)) )
2019                         return bt->err;
2020
2021                 if( bt_unlockpage (bt, set->page_no, BtLockParent) )
2022                         return bt->err;
2023
2024                 if( !(set->page_no = right) )
2025                         break;
2026         }
2027
2028         return 0;
2029 }
2030
2031 //  Insert new key into the btree at requested level.
2032
2033 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
2034 {
2035 BtPageSet set[1];
2036 uint slot, idx;
2037 BtKey ptr;
2038
2039   set->page = bt->page;
2040
2041   while( 1 ) {
2042         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
2043                 ptr = keyptr(set->page, slot);
2044         else
2045         {
2046                 if ( !bt->err )
2047                         bt->err = BTERR_ovflw;
2048                 return bt->err;
2049         }
2050
2051         // if key already exists, update id and return
2052
2053         if( slot <= set->page->cnt )
2054           if( !keycmp (ptr, key, len) ) {
2055                 if( slotptr(set->page, slot)->dead )
2056                         set->page->act++;
2057
2058                 slotptr(set->page, slot)->dead = 0;
2059                 slotptr(set->page, slot)->tod = tod;
2060                 bt_putid(slotptr(set->page,slot)->id, id);
2061
2062                 if ( bt_update(bt, set->page, set->page_no) )
2063                         return bt->err;
2064
2065                 return bt_unlockpage(bt, set->page_no, BtLockWrite);
2066           }
2067
2068         // check if page has enough space
2069
2070         if( slot = bt_cleanpage (bt, set->page, len, slot) )
2071                 break;
2072
2073         if( bt_splitpage (bt, set) )
2074                 return bt->err;
2075   }
2076
2077   // calculate next available slot and copy key into page
2078
2079   set->page->min -= len + 1; // reset lowest used offset
2080   ((unsigned char *)set->page)[set->page->min] = len;
2081   memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
2082
2083   for( idx = slot; idx <= set->page->cnt; idx++ )
2084         if( slotptr(set->page, idx)->dead )
2085                 break;
2086
2087   // now insert key into array before slot
2088
2089   if( idx > set->page->cnt )
2090         set->page->cnt++;
2091
2092   set->page->act++;
2093
2094   while( idx > slot )
2095         *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2096
2097   bt_putid(slotptr(set->page,slot)->id, id);
2098   slotptr(set->page, slot)->off = set->page->min;
2099   slotptr(set->page, slot)->tod = tod;
2100   slotptr(set->page, slot)->dead = 0;
2101
2102   if( bt_update(bt, set->page, set->page_no) )
2103         return bt->err;
2104
2105   return bt_unlockpage(bt, set->page_no, BtLockWrite);
2106 }
2107
2108 //  cache page of keys into cursor and return starting slot for given key
2109
2110 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2111 {
2112 BtPageSet set[1];
2113 uint slot;
2114
2115         set->page = bt->page;
2116
2117         // cache page for retrieval
2118         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2119                 memcpy (bt->cursor, set->page, bt->page_size);
2120
2121         bt->cursor_page = set->page_no;
2122
2123         if ( bt_unlockpage(bt, set->page_no, BtLockRead) )
2124                 return 0;
2125
2126         return slot;
2127 }
2128
2129 //  return next slot for cursor page
2130 //  or slide cursor right into next page
2131
2132 uint bt_nextkey (BtDb *bt, uint slot)
2133 {
2134 BtPageSet set[1];
2135 uid right;
2136
2137   do {
2138         right = bt_getid(bt->cursor->right);
2139         while( slot++ < bt->cursor->cnt )
2140           if( slotptr(bt->cursor,slot)->dead )
2141                 continue;
2142           else if( right || (slot < bt->cursor->cnt)) // skip infinite stopper
2143                 return slot;
2144           else
2145                 break;
2146
2147         if( !right )
2148                 break;
2149
2150         bt->cursor_page = right;
2151         set->page = bt->page;
2152
2153     if( bt_lockpage(bt, right, BtLockRead) )
2154                 return 0;
2155
2156         if( bt_mappage (bt, &set->page, right) )
2157                 break;
2158
2159         memcpy (bt->cursor, set->page, bt->page_size);
2160
2161         if( bt_unlockpage(bt, right, BtLockRead) )
2162                 return 0;
2163
2164         slot = 0;
2165   } while( 1 );
2166
2167   return bt->err = 0;
2168 }
2169
2170 BtKey bt_key(BtDb *bt, uint slot)
2171 {
2172         return keyptr(bt->cursor, slot);
2173 }
2174
2175 uid bt_uid(BtDb *bt, uint slot)
2176 {
2177         return bt_getid(slotptr(bt->cursor,slot)->id);
2178 }
2179
2180 uint bt_tod(BtDb *bt, uint slot)
2181 {
2182         return slotptr(bt->cursor,slot)->tod;
2183 }
2184
2185
2186 #ifdef STANDALONE
2187 //  standalone program to index file of keys
2188 //  then list them onto std-out
2189
2190 int main (int argc, char **argv)
2191 {
2192 uint slot, line = 0, off = 0, found = 0;
2193 int dead, ch, cnt = 0, bits = 12;
2194 unsigned char key[256];
2195 clock_t done, start;
2196 uint pgblk = 0;
2197 time_t tod[1];
2198 uint scan = 0;
2199 uint len = 0;
2200 uint map = 0;
2201 BtKey ptr;
2202 BtDb *bt;
2203 FILE *in;
2204
2205         if( argc < 4 ) {
2206                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_segments pages_per_segment start_line_number]\n", argv[0]);
2207                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2208                 fprintf (stderr, "  mapped_pool_segments: size of buffer pool in segments\n");
2209                 fprintf (stderr, "  pages_per_segment: size of buffer pool segment in pages in bits\n");
2210                 exit(0);
2211         }
2212
2213         start = clock();
2214         time(tod);
2215
2216         if( argc > 4 )
2217                 bits = atoi(argv[4]);
2218
2219         if( argc > 5 )
2220                 map = atoi(argv[5]);
2221
2222         if( map > 65536 )
2223                 fprintf (stderr, "Warning: buffer_pool > 65536 segments\n");
2224
2225         if( map && map < 8 )
2226                 fprintf (stderr, "Buffer_pool too small\n");
2227
2228         if( argc > 6 )
2229                 pgblk = atoi(argv[6]);
2230
2231         if( bits + pgblk > 30 )
2232                 fprintf (stderr, "Warning: very large buffer pool segment size\n");
2233
2234         if( argc > 7 )
2235                 off = atoi(argv[7]);
2236
2237         bt = bt_open ((argv[1]), BT_rw, bits, map, pgblk);
2238
2239         if( !bt ) {
2240                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2241                 exit (1);
2242         }
2243
2244         switch(argv[3][0]| 0x20)
2245         {
2246         case 'w':
2247                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2248                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2249                   while( ch = getc(in), ch != EOF )
2250                         if( ch == '\n' )
2251                         {
2252                           if( off )
2253                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2254
2255                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2256                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2257                           len = 0;
2258                         }
2259                         else if( len < 245 )
2260                                 key[len++] = ch;
2261                 fprintf(stderr, "finished adding keys, %d \n", line);
2262                 break;
2263
2264         case 'd':
2265                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2266                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2267                   while( ch = getc(in), ch != EOF )
2268                         if( ch == '\n' )
2269                         {
2270                           if( off )
2271                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2272                           line++;
2273                           if( bt_deletekey (bt, key, len) )
2274                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2275                           len = 0;
2276                         }
2277                         else if( len < 245 )
2278                                 key[len++] = ch;
2279                 fprintf(stderr, "finished deleting keys, %d \n", line);
2280                 break;
2281
2282         case 'f':
2283                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2284                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2285                   while( ch = getc(in), ch != EOF )
2286                         if( ch == '\n' )
2287                         {
2288                           if( off )
2289                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2290                           line++;
2291                           if( bt_findkey (bt, key, len) )
2292                                 found++;
2293                           else if( bt->err )
2294                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2295                           len = 0;
2296                         }
2297                         else if( len < 245 )
2298                                 key[len++] = ch;
2299                 fprintf(stderr, "finished search of %d keys, found %d\n", line, found);
2300                 break;
2301
2302         case 's':
2303                 scan++;
2304                 break;
2305
2306         }
2307
2308         done = clock();
2309         fprintf(stderr, " Time to complete: %.2f seconds\n", (float)(done - start) / CLOCKS_PER_SEC);
2310
2311         dead = cnt = 0;
2312         len = key[0] = 0;
2313
2314         fprintf(stderr, "started reading\n");
2315
2316         if( slot = bt_startkey (bt, key, len) )
2317           slot--;
2318         else
2319           fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2320
2321         while( slot = bt_nextkey (bt, slot) )
2322           if( cnt++, scan ) {
2323                         ptr = bt_key(bt, slot);
2324                         fwrite (ptr->key, ptr->len, 1, stdout);
2325                         fputc ('\n', stdout);
2326           }
2327
2328         fprintf(stderr, " Total keys read %d\n", cnt);
2329         return 0;
2330 }
2331
2332 #endif  //STANDALONE