]> pd.if.org Git - btree/blob - btree2t.c
incorporate reworked bt_deletekey into file I/O version
[btree] / btree2t.c
1 // btree version 2t
2 // 15 FEB 2014
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stddef.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #endif
47
48 #include <memory.h>
49 #include <string.h>
50
51 typedef unsigned long long      uid;
52
53 #ifndef unix
54 typedef unsigned long long      off64_t;
55 typedef unsigned short          ushort;
56 typedef unsigned int            uint;
57 #endif
58
59 #define BT_ro 0x6f72    // ro
60 #define BT_rw 0x7772    // rw
61 #define BT_fl 0x6c66    // fl
62
63 #define BT_maxbits              24                                      // maximum page size in bits
64 #define BT_minbits              9                                       // minimum page size in bits
65 #define BT_minpage              (1 << BT_minbits)       // minimum page size
66
67 /*
68 There are five lock types for each node in three independent sets: 
69 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
70 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
71 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
72 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
73 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
74 */
75
76 typedef enum{
77         BtLockAccess,
78         BtLockDelete,
79         BtLockRead,
80         BtLockWrite,
81         BtLockParent
82 }BtLock;
83
84 //      Define the length of the page and key pointers
85
86 #define BtId 6
87
88 //      Page key slot definition.
89
90 //      If BT_maxbits is 15 or less, you can save 2 bytes
91 //      for each key stored by making the first two uints
92 //      into ushorts.  You can also save 4 bytes by removing
93 //      the tod field from the key.
94
95 //      Keys are marked dead, but remain on the page until
96 //      cleanup is called.
97
98 typedef struct {
99         uint off:BT_maxbits;            // page offset for key start
100         uint dead:1;                            // set for deleted key
101         uint tod;                                       // time-stamp for key
102         unsigned char id[BtId];         // id associated with key
103 } BtSlot;
104
105 //      The key structure occupies space at the upper end of
106 //      each page.  It's a length byte followed by the value
107 //      bytes.
108
109 typedef struct {
110         unsigned char len;
111         unsigned char key[0];
112 } *BtKey;
113
114 //      The first part of an index page.
115 //      It is immediately followed
116 //      by the BtSlot array of keys.
117
118 typedef struct BtPage_ {
119         uint cnt;                                       // count of keys in page
120         uint act;                                       // count of active keys
121         uint min;                                       // next key offset
122         unsigned char bits:7;           // page size in bits
123         unsigned char free:1;           // page is on free list
124         unsigned char lvl:4;            // level of page
125         unsigned char kill:1;           // page is empty
126         unsigned char dirty:1;          // page is dirty
127         unsigned char posted:1;         // page fence is posted
128         unsigned char goright:1;        // continue to right link
129         unsigned char right[BtId];      // page number to right
130         unsigned char fence[256];       // page fence key
131 } *BtPage;
132
133 //  The loadpage interface object
134
135 typedef struct {
136         uid page_no;
137         BtPage page;
138 } BtPageSet;
139
140 //      The memory mapping hash table entry
141
142 typedef struct {
143         BtPage page;            // mapped page pointer
144         uid  page_no;           // mapped page number
145         void *lruprev;          // least recently used previous cache block
146         void *lrunext;          // lru next cache block
147         void *hashprev;         // previous cache block for the same hash idx
148         void *hashnext;         // next cache block for the same hash idx
149 #ifndef unix
150         HANDLE hmap;
151 #endif
152 } BtHash;
153
154 //      The object structure for Btree access
155
156 typedef struct _BtDb {
157         uint page_size;         // each page size       
158         uint page_bits;         // each page size in bits       
159         uint seg_bits;          // segment size in pages in bits
160         uid page_no;            // current page number  
161         uid cursor_page;        // current cursor page number   
162         int  err;
163         uint mode;                      // read-write mode
164         uint mapped_io;         // use memory mapping
165         BtPage temp;            // temporary frame buffer (memory mapped/file IO)
166         BtPage temp2;           // temporary frame buffer (memory mapped/file IO)
167         BtPage parent;          // current page's parent node (memory mapped/file IO)
168         BtPage alloc;           // frame for alloc page  (memory mapped/file IO)
169         BtPage cursor;          // cached frame for start/next (never mapped)
170         BtPage frame;           // spare frame for the page split (never mapped)
171         BtPage zero;            // zeroes frame buffer (never mapped)
172         BtPage page;            // temporary page (memory mapped/file IO)
173 #ifdef unix
174         int idx;
175 #else
176         HANDLE idx;
177 #endif
178         unsigned char *mem;     // frame, cursor, page memory buffer
179         int nodecnt;            // highest page cache segment in use
180         int nodemax;            // highest page cache segment allocated
181         int hashmask;           // number of pages in segments - 1
182         int hashsize;           // size of hash table
183         int posted;                     // last loadpage found posted key
184         int found;                      // last insert/delete found key
185         BtHash *lrufirst;       // lru list head
186         BtHash *lrulast;        // lru list tail
187         ushort *cache;          // hash table for cached segments
188         BtHash nodes[1];        // segment cache follows
189 } BtDb;
190
191 typedef enum {
192 BTERR_ok = 0,
193 BTERR_struct,
194 BTERR_ovflw,
195 BTERR_lock,
196 BTERR_map,
197 BTERR_wrt,
198 BTERR_hash
199 } BTERR;
200
201 // B-Tree functions
202 extern void bt_close (BtDb *bt);
203 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk, uint pgblk);
204 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
205 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
206 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
207 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
208 extern uint bt_nextkey   (BtDb *bt, uint slot);
209
210 //      Internal functions
211
212 BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence);
213
214 //  Helper functions to return slot values
215
216 extern BtKey bt_key (BtDb *bt, uint slot);
217 extern uid bt_uid (BtDb *bt, uint slot);
218 extern uint bt_tod (BtDb *bt, uint slot);
219
220 //  BTree page number constants
221 #define ALLOC_page              0
222 #define ROOT_page               1
223 #define LEAF_page               2
224
225 //      Number of levels to create in a new BTree
226
227 #define MIN_lvl                 2
228
229 //  The page is allocated from low and hi ends.
230 //  The key offsets and row-id's are allocated
231 //  from the bottom, while the text of the key
232 //  is allocated from the top.  When the two
233 //  areas meet, the page is split into two.
234
235 //  A key consists of a length byte, two bytes of
236 //  index number (0 - 65534), and up to 253 bytes
237 //  of key value.  Duplicate keys are discarded.
238 //  Associated with each key is a 48 bit row-id.
239
240 //  The b-tree root is always located at page 1.
241 //      The first leaf page of level zero is always
242 //      located on page 2.
243
244 //      The b-tree pages are linked with right
245 //      pointers to facilitate enumerators,
246 //      and provide for concurrency.
247
248 //      When to root page fills, it is split in two and
249 //      the tree height is raised by a new root at page
250 //      one with two keys.
251
252 //      Deleted keys are marked with a dead bit until
253 //      page cleanup
254
255 //  Groups of pages from the btree are optionally
256 //  cached with memory mapping. A hash table is used to keep
257 //  track of the cached pages.  This behaviour is controlled
258 //  by the number of cache blocks parameter and pages per block
259 //      given to bt_open.
260
261 //  To achieve maximum concurrency one page is locked at a time
262 //  as the tree is traversed to find leaf key in question. The right
263 //  page numbers are used in cases where the page is being split,
264 //      or consolidated.
265
266 //  Page 0 is dedicated to lock for new page extensions,
267 //      and chains empty pages together for reuse.
268
269 //      Parent locks are obtained to prevent resplitting or deleting a node
270 //      before its fence is posted into its upper level.
271
272 //      Empty nodes are chained together through the ALLOC page and reused.
273
274 //      A special open mode of BT_fl is provided to safely access files on
275 //      WIN32 networks. WIN32 network operations should not use memory mapping.
276 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
277 //      to prevent local caching of network file contents.
278
279 //      Access macros to address slot and key values from the page.
280 //      Page slots use 1 based indexing.
281
282 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
283 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
284
285 void bt_putid(unsigned char *dest, uid id)
286 {
287 int i = BtId;
288
289         while( i-- )
290                 dest[i] = (unsigned char)id, id >>= 8;
291 }
292
293 uid bt_getid(unsigned char *src)
294 {
295 uid id = 0;
296 int i;
297
298         for( i = 0; i < BtId; i++ )
299                 id <<= 8, id |= *src++; 
300
301         return id;
302 }
303
304 // place write, read, or parent lock on requested page_no.
305
306 BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode)
307 {
308 off64_t off = page_no << bt->page_bits;
309 #ifdef unix
310 int flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
311 struct flock lock[1];
312 #else
313 uint flags = 0, len;
314 OVERLAPPED ovl[1];
315 #endif
316
317         if( mode == BtLockRead || mode == BtLockWrite )
318                 off +=  1 * sizeof(*bt->page);  // use second segment
319
320         if( mode == BtLockParent )
321                 off +=  2 * sizeof(*bt->page);  // use third segment
322
323 #ifdef unix
324         memset (lock, 0, sizeof(lock));
325
326         lock->l_start = off;
327         lock->l_type = (mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent) ? F_WRLCK : F_RDLCK;
328         lock->l_len = sizeof(*bt->page);
329         lock->l_whence = 0;
330
331         if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
332                 return bt->err = BTERR_lock;
333
334         return 0;
335 #else
336         memset (ovl, 0, sizeof(ovl));
337         ovl->OffsetHigh = (uint)(off >> 32);
338         ovl->Offset = (uint)off;
339         len = sizeof(*bt->page);
340
341         //      use large offsets to
342         //      simulate advisory locking
343
344         ovl->OffsetHigh |= 0x80000000;
345
346         if( mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent )
347                 flags |= LOCKFILE_EXCLUSIVE_LOCK;
348
349         if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) )
350                 return bt->err = 0;
351
352         return bt->err = BTERR_lock;
353 #endif 
354 }
355
356 // remove write, read, or parent lock on requested page_no.
357
358 BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
359 {
360 off64_t off = page_no << bt->page_bits;
361 #ifdef unix
362 struct flock lock[1];
363 #else
364 OVERLAPPED ovl[1];
365 uint len;
366 #endif
367
368         if( mode == BtLockRead || mode == BtLockWrite )
369                 off +=  1 * sizeof(*bt->page);  // use second segment
370
371         if( mode == BtLockParent )
372                 off +=  2 * sizeof(*bt->page);  // use third segment
373
374 #ifdef unix
375         memset (lock, 0, sizeof(lock));
376
377         lock->l_start = off;
378         lock->l_type = F_UNLCK;
379         lock->l_len = sizeof(*bt->page);
380         lock->l_whence = 0;
381
382         if( fcntl (bt->idx, F_SETLK, lock) < 0 )
383                 return bt->err = BTERR_lock;
384 #else
385         memset (ovl, 0, sizeof(ovl));
386         ovl->OffsetHigh = (uint)(off >> 32);
387         ovl->Offset = (uint)off;
388         len = sizeof(*bt->page);
389
390         //      use large offsets to
391         //      simulate advisory locking
392
393         ovl->OffsetHigh |= 0x80000000;
394
395         if( !UnlockFileEx (bt->idx, 0, len, 0, ovl) )
396                 return GetLastError(), bt->err = BTERR_lock;
397 #endif
398
399         return bt->err = 0;
400 }
401
402 //      close and release memory
403
404 void bt_close (BtDb *bt)
405 {
406 BtHash *hash;
407 #ifdef unix
408         // release mapped pages
409
410         if( hash = bt->lrufirst )
411                 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
412                 while(hash = hash->lrunext);
413
414         if ( bt->mem )
415                 free (bt->mem);
416         close (bt->idx);
417         free (bt->cache);
418         free (bt);
419 #else
420         if( hash = bt->lrufirst )
421           do
422           {
423                 FlushViewOfFile(hash->page, 0);
424                 UnmapViewOfFile(hash->page);
425                 CloseHandle(hash->hmap);
426           } while(hash = hash->lrunext);
427
428         if ( bt->mem)
429                 VirtualFree (bt->mem, 0, MEM_RELEASE);
430         FlushFileBuffers(bt->idx);
431         CloseHandle(bt->idx);
432         GlobalFree (bt->cache);
433         GlobalFree (bt);
434 #endif
435 }
436
437 //  open/create new btree
438 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
439 //              size of mapped page cache (e.g. 8192) or zero for no mapping.
440
441 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax, uint pgblk)
442 {
443 uint lvl, attr, cacheblk, last;
444 BtLock lockmode = BtLockWrite;
445 BtPage alloc;
446 off64_t size;
447 uint amt[1];
448 BtDb* bt;
449
450 #ifndef unix
451 SYSTEM_INFO sysinfo[1];
452 #endif
453
454 #ifdef unix
455         bt = malloc (sizeof(BtDb) + nodemax * sizeof(BtHash));
456         memset (bt, 0, sizeof(BtDb));
457
458         switch (mode & 0x7fff)
459         {
460         case BT_fl:
461         case BT_rw:
462                 bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
463                 break;
464
465         case BT_ro:
466         default:
467                 bt->idx = open ((char*)name, O_RDONLY);
468                 lockmode = BtLockRead;
469                 break;
470         }
471         if( bt->idx == -1 )
472                 return free(bt), NULL;
473         
474         if( nodemax )
475                 cacheblk = 4096;        // page size for unix
476         else
477                 cacheblk = 0;
478
479 #else
480         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb) + nodemax * sizeof(BtHash));
481         attr = FILE_ATTRIBUTE_NORMAL;
482         switch (mode & 0x7fff)
483         {
484         case BT_fl:
485                 attr |= FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING;
486
487         case BT_rw:
488                 bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
489                 break;
490
491         case BT_ro:
492         default:
493                 bt->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
494                 lockmode = BtLockRead;
495                 break;
496         }
497         if( bt->idx == INVALID_HANDLE_VALUE )
498                 return GlobalFree(bt), NULL;
499
500         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
501         GetSystemInfo(sysinfo);
502
503         if( nodemax )
504                 cacheblk = sysinfo->dwAllocationGranularity;
505         else
506                 cacheblk = 0;
507 #endif
508
509         // determine sanity of page size
510
511         if( bits > BT_maxbits )
512                 bits = BT_maxbits;
513         else if( bits < BT_minbits )
514                 bits = BT_minbits;
515
516         if ( bt_lockpage(bt, ALLOC_page, lockmode) )
517                 return bt_close (bt), NULL;
518
519 #ifdef unix
520         *amt = 0;
521
522         // read minimum page size to get root info
523
524         if( size = lseek (bt->idx, 0L, 2) ) {
525                 alloc = malloc (BT_minpage);
526                 pread(bt->idx, alloc, BT_minpage, 0);
527                 bits = alloc->bits;
528                 free (alloc);
529         } else if( mode == BT_ro )
530                 return bt_close (bt), NULL;
531 #else
532         size = GetFileSize(bt->idx, amt);
533
534         if( size || *amt ) {
535                 alloc = VirtualAlloc(NULL, BT_minpage, MEM_COMMIT, PAGE_READWRITE);
536                 if( !ReadFile(bt->idx, (char *)alloc, BT_minpage, amt, NULL) )
537                         return bt_close (bt), NULL;
538                 bits = alloc->bits;
539                 VirtualFree (alloc, 0, MEM_RELEASE);
540         } else if( mode == BT_ro )
541                 return bt_close (bt), NULL;
542 #endif
543
544         bt->page_size = 1 << bits;
545         bt->page_bits = bits;
546
547         bt->nodemax = nodemax;
548         bt->mode = mode;
549
550         // setup cache mapping
551
552         if( cacheblk ) {
553                 if( cacheblk < bt->page_size )
554                         cacheblk = bt->page_size;
555
556                 bt->hashsize = nodemax / 8;
557                 bt->hashmask = (cacheblk >> bits) - 1;
558                 bt->mapped_io = 1;
559         }
560
561         //      requested number of pages per memmap segment
562
563         if( cacheblk )
564           if( (1 << pgblk) > bt->hashmask )
565                 bt->hashmask = (1 << pgblk) - 1;
566
567         bt->seg_bits = 0;
568
569         while( (1 << bt->seg_bits) <= bt->hashmask )
570                 bt->seg_bits++;
571
572 #ifdef unix
573         bt->mem = malloc (8 *bt->page_size);
574         bt->cache = calloc (bt->hashsize, sizeof(ushort));
575 #else
576         bt->mem = VirtualAlloc(NULL, 8 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
577         bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, bt->hashsize * sizeof(ushort));
578 #endif
579         bt->frame = (BtPage)bt->mem;
580         bt->cursor = (BtPage)(bt->mem + bt->page_size);
581         bt->page = (BtPage)(bt->mem + 2 * bt->page_size);
582         bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size);
583         bt->temp = (BtPage)(bt->mem + 4 * bt->page_size);
584         bt->temp2 = (BtPage)(bt->mem + 5 * bt->page_size);
585         bt->parent = (BtPage)(bt->mem + 6 * bt->page_size);
586         bt->zero = (BtPage)(bt->mem + 7 * bt->page_size);
587
588         if( size || *amt ) {
589                 if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
590                         return bt_close (bt), NULL;
591
592                 return bt;
593         }
594
595         // initializes an empty b-tree with root page and page of leaves
596
597         memset (bt->alloc, 0, bt->page_size);
598         bt_putid(bt->alloc->right, MIN_lvl+1);
599         bt->alloc->bits = bt->page_bits;
600
601 #ifdef unix
602         if( write (bt->idx, bt->alloc, bt->page_size) < bt->page_size )
603                 return bt_close (bt), NULL;
604 #else
605         if( !WriteFile (bt->idx, (char *)bt->alloc, bt->page_size, amt, NULL) )
606                 return bt_close (bt), NULL;
607
608         if( *amt < bt->page_size )
609                 return bt_close (bt), NULL;
610 #endif
611
612         memset (bt->frame, 0, bt->page_size);
613         bt->frame->bits = bt->page_bits;
614         bt->frame->posted = 1;
615
616         for( lvl=MIN_lvl; lvl--; ) {
617                 slotptr(bt->frame, 1)->off = offsetof(struct BtPage_, fence);
618                 bt_putid(slotptr(bt->frame, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);               // next(lower) page number
619                 bt->frame->fence[0] = 2;
620                 bt->frame->fence[1] = 0xff;
621                 bt->frame->fence[2] = 0xff;
622                 bt->frame->min = bt->page_size;
623                 bt->frame->lvl = lvl;
624                 bt->frame->cnt = 1;
625                 bt->frame->act = 1;
626 #ifdef unix
627                 if( write (bt->idx, bt->frame, bt->page_size) < bt->page_size )
628                         return bt_close (bt), NULL;
629 #else
630                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
631                         return bt_close (bt), NULL;
632
633                 if( *amt < bt->page_size )
634                         return bt_close (bt), NULL;
635 #endif
636         }
637
638         // create empty page area by writing last page of first
639         // cache area (other pages are zeroed by O/S)
640
641         if( bt->mapped_io && bt->hashmask ) {
642                 memset(bt->frame, 0, bt->page_size);
643                 last = bt->hashmask;
644
645                 while( last < MIN_lvl + 1 )
646                         last += bt->hashmask + 1;
647 #ifdef unix
648                 pwrite(bt->idx, bt->frame, bt->page_size, last << bt->page_bits);
649 #else
650                 SetFilePointer (bt->idx, last << bt->page_bits, NULL, FILE_BEGIN);
651                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
652                         return bt_close (bt), NULL;
653                 if( *amt < bt->page_size )
654                         return bt_close (bt), NULL;
655 #endif
656         }
657
658         if( bt_unlockpage(bt, ALLOC_page, lockmode) )
659                 return bt_close (bt), NULL;
660
661         return bt;
662 }
663
664 //  compare two keys, returning > 0, = 0, or < 0
665 //  as the comparison value
666
667 int keycmp (BtKey key1, unsigned char *key2, uint len2)
668 {
669 uint len1 = key1->len;
670 int ans;
671
672         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
673                 return ans;
674
675         if( len1 > len2 )
676                 return 1;
677         if( len1 < len2 )
678                 return -1;
679
680         return 0;
681 }
682
683 //  Update current page of btree by writing file contents
684 //      or flushing mapped area to disk.
685
686 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
687 {
688 off64_t off = page_no << bt->page_bits;
689
690 #ifdef unix
691     if ( !bt->mapped_io )
692          if ( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
693                  return bt->err = BTERR_wrt;
694 #else
695 uint amt[1];
696         if ( !bt->mapped_io )
697         {
698                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
699                 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
700                         return GetLastError(), bt->err = BTERR_wrt;
701
702                 if( *amt < bt->page_size )
703                         return GetLastError(), bt->err = BTERR_wrt;
704         } 
705         else if ( bt->mode == BT_fl ) {
706                         FlushViewOfFile(page, bt->page_size);
707                         FlushFileBuffers(bt->idx);
708         }
709 #endif
710         return 0;
711 }
712
713 // find page in cache 
714
715 BtHash *bt_findhash(BtDb *bt, uid page_no)
716 {
717 BtHash *hash;
718 uint idx;
719
720         // compute cache block first page and hash idx 
721
722         page_no &= ~bt->hashmask;
723         idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
724
725         if( bt->cache[idx] ) 
726                 hash = bt->nodes + bt->cache[idx];
727         else
728                 return NULL;
729
730         do if( hash->page_no == page_no )
731                  break;
732         while(hash = hash->hashnext );
733
734         return hash;
735 }
736
737 // add page cache entry to hash index
738
739 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
740 {
741 uint idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
742 BtHash *hash;
743
744         if( bt->cache[idx] ) {
745                 node->hashnext = hash = bt->nodes + bt->cache[idx];
746                 hash->hashprev = node;
747         }
748
749         node->hashprev = NULL;
750         bt->cache[idx] = (ushort)(node - bt->nodes);
751 }
752
753 // remove cache entry from hash table
754
755 void bt_unlinkhash(BtDb *bt, BtHash *node)
756 {
757 uint idx = (uint)(node->page_no >> bt->seg_bits) % bt->hashsize;
758 BtHash *hash;
759
760         // unlink node
761         if( hash = node->hashprev )
762                 hash->hashnext = node->hashnext;
763         else if( hash = node->hashnext )
764                 bt->cache[idx] = (ushort)(hash - bt->nodes);
765         else
766                 bt->cache[idx] = 0;
767
768         if( hash = node->hashnext )
769                 hash->hashprev = node->hashprev;
770 }
771
772 // add cache page to lru chain and map pages
773
774 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
775 {
776 int flag;
777 off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
778 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
779 BtHash *node;
780
781         memset(hash, 0, sizeof(BtHash));
782         hash->page_no = (page_no & ~bt->hashmask);
783         bt_linkhash(bt, hash, page_no);
784
785         if( node = hash->lrunext = bt->lrufirst )
786                 node->lruprev = hash;
787         else
788                 bt->lrulast = hash;
789
790         bt->lrufirst = hash;
791
792 #ifdef unix
793         flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
794         hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
795         if( hash->page == MAP_FAILED )
796                 return bt->err = BTERR_map, (BtPage)NULL;
797
798 #else
799         flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
800         hash->hmap = CreateFileMapping(bt->idx, NULL, flag,     (DWORD)(limit >> 32), (DWORD)limit, NULL);
801         if( !hash->hmap )
802                 return bt->err = BTERR_map, NULL;
803
804         flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
805         hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
806         if( !hash->page )
807                 return bt->err = BTERR_map, NULL;
808 #endif
809
810         return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
811 }
812
813 //      find or place requested page in page-cache
814 //      return memory address where page is located.
815
816 BtPage bt_hashpage(BtDb *bt, uid page_no)
817 {
818 BtHash *hash, *node, *next;
819 BtPage page;
820
821         // find page in cache and move to top of lru list  
822
823         if( hash = bt_findhash(bt, page_no) ) {
824                 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
825                 // swap node in lru list
826                 if( node = hash->lruprev ) {
827                         if( next = node->lrunext = hash->lrunext )
828                                 next->lruprev = node;
829                         else
830                                 bt->lrulast = node;
831
832                         if( next = hash->lrunext = bt->lrufirst )
833                                 next->lruprev = hash;
834                         else
835                                 return bt->err = BTERR_hash, (BtPage)NULL;
836
837                         hash->lruprev = NULL;
838                         bt->lrufirst = hash;
839                 }
840                 return page;
841         }
842
843         // map pages and add to cache entry
844
845         if( bt->nodecnt < bt->nodemax ) {
846                 hash = bt->nodes + ++bt->nodecnt;
847                 return bt_linklru(bt, hash, page_no);
848         }
849
850         // hash table is already full, replace last lru entry from the cache
851
852         if( hash = bt->lrulast ) {
853                 // unlink from lru list
854                 if( node = bt->lrulast = hash->lruprev )
855                         node->lrunext = NULL;
856                 else
857                         return bt->err = BTERR_hash, (BtPage)NULL;
858
859 #ifdef unix
860                 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
861 #else
862                 FlushViewOfFile(hash->page, 0);
863                 UnmapViewOfFile(hash->page);
864                 CloseHandle(hash->hmap);
865 #endif
866                 // unlink from hash table
867
868                 bt_unlinkhash(bt, hash);
869
870                 // map and add to cache
871
872                 return bt_linklru(bt, hash, page_no);
873         }
874
875         return bt->err = BTERR_hash, (BtPage)NULL;
876 }
877
878 //  map a btree page onto current page
879
880 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
881 {
882 off64_t off = page_no << bt->page_bits;
883 #ifndef unix
884 int amt[1];
885 #endif
886         
887         if( bt->mapped_io ) {
888                 bt->err = 0;
889                 *page = bt_hashpage(bt, page_no);
890                 return bt->err;
891         }
892 #ifdef unix
893         if ( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
894                 return bt->err = BTERR_map;
895 #else
896         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
897
898         if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
899                 return bt->err = BTERR_map;
900
901         if( *amt <  bt->page_size )
902                 return bt->err = BTERR_map;
903 #endif
904         return 0;
905 }
906
907 //      allocate a new page and write page into it
908
909 uid bt_newpage(BtDb *bt, BtPage page)
910 {
911 uid new_page;
912 char *pmap;
913 int reuse;
914
915         // lock page zero
916
917         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite) )
918                 return 0;
919
920         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
921                 return 0;
922
923         // use empty chain first
924         // else allocate empty page
925
926         if( new_page = bt_getid(bt->alloc[1].right) ) {
927                 if( bt_mappage (bt, &bt->temp, new_page) )
928                         return 0;       // don't unlock on error
929                 memcpy(bt->alloc[1].right, bt->temp->right, BtId);
930                 reuse = 1;
931         } else {
932                 new_page = bt_getid(bt->alloc->right);
933                 bt_putid(bt->alloc->right, new_page+1);
934                 reuse = 0;
935         }
936
937         if( bt_update(bt, bt->alloc, ALLOC_page) )
938                 return 0;       // don't unlock on error
939
940         if( !bt->mapped_io ) {
941                 if( bt_update(bt, page, new_page) )
942                         return 0;       //don't unlock on error
943
944                 // unlock page zero 
945
946                 if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
947                         return 0;
948
949                 return new_page;
950         }
951
952 #ifdef unix
953         if ( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
954                 return bt->err = BTERR_wrt, 0;
955
956         // if writing first page of hash block, zero last page in the block
957
958         if ( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
959         {
960                 // use temp buffer to write zeros
961                 memset(bt->zero, 0, bt->page_size);
962                 if ( pwrite(bt->idx,bt->zero, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
963                         return bt->err = BTERR_wrt, 0;
964         }
965 #else
966         //      bring new page into page-cache and copy page.
967         //      this will extend the file into the new pages.
968
969         if( !(pmap = (char*)bt_hashpage(bt, new_page & ~bt->hashmask)) )
970                 return 0;
971
972         memcpy(pmap+((new_page & bt->hashmask) << bt->page_bits), page, bt->page_size);
973 #endif
974
975         // unlock page zero 
976
977         if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
978                 return 0;
979
980         return new_page;
981 }
982
983 //  find slot in page for given key at a given level
984 //      return 0 if beyond fence value
985
986 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
987 {
988 uint diff, higher = set->page->cnt, low = 1, slot;
989
990         //      make stopper key an infinite fence value
991
992         if( bt_getid (set->page->right) )
993                 higher++;
994
995         //      low is the lowest candidate, higher is already
996         //      tested as .ge. the given key, loop ends when they meet
997
998         while( diff = higher - low ) {
999                 slot = low + ( diff >> 1 );
1000                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1001                         low = slot + 1;
1002                 else
1003                         higher = slot;
1004         }
1005
1006         if( higher <= set->page->cnt )
1007                 return higher;
1008
1009         //      if leaf page, compare against fence value
1010
1011         //      return zero if key is on right link page
1012         //      or return slot beyond last key
1013
1014         if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 )
1015                 return 0;
1016
1017         return higher;
1018 }
1019
1020 //  find and load page at given level for given key
1021 //      leave page rd or wr locked as requested
1022
1023 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock)
1024 {
1025 uid page_no = ROOT_page, prevpage = 0;
1026 uint drill = 0xff, slot;
1027 uint mode, prevmode;
1028
1029   //  start at root of btree and drill down
1030
1031   do {
1032         // determine lock mode of drill level
1033         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1034
1035         set->page_no = page_no;
1036
1037         // obtain access lock using lock chaining
1038
1039         if( page_no > ROOT_page )
1040           if( bt_lockpage(bt, page_no, BtLockAccess) )
1041                 return 0;                                                                       
1042
1043         if( prevpage )
1044           if( bt_unlockpage(bt, prevpage, prevmode) )
1045                 return 0;
1046
1047         // obtain read lock using lock chaining
1048
1049         if( bt_lockpage(bt, page_no, mode) )
1050                 return 0;                                                                       
1051
1052         if( page_no > ROOT_page )
1053           if( bt_unlockpage(bt, page_no, BtLockAccess) )
1054                 return 0;                                                                       
1055
1056         //      map/obtain page contents
1057
1058         if( bt_mappage (bt, &set->page, page_no) )
1059                 return 0;
1060
1061         // re-read and re-lock root after determining actual level of root
1062
1063         if( set->page->lvl != drill) {
1064                 if ( page_no != ROOT_page )
1065                         return bt->err = BTERR_struct, 0;
1066                         
1067                 drill = set->page->lvl;
1068
1069                 if( lock == BtLockWrite && drill == lvl )
1070                   if( bt_unlockpage(bt, page_no, mode) )
1071                         return 0;
1072                   else
1073                         continue;
1074         }
1075
1076         prevpage = page_no;
1077         prevmode = mode;
1078
1079         //      if page is being deleted and we should continue right
1080
1081         if( set->page->kill && set->page->goright ) {
1082                 page_no = bt_getid (set->page->right);
1083                 continue;
1084         }
1085
1086         //      otherwise, wait for deleted node to clear
1087
1088         if( set->page->kill ) {
1089                 if( bt_unlockpage(bt, set->page_no, mode) )
1090                         return bt->err;
1091                 page_no = ROOT_page;
1092                 prevpage = 0;
1093                 drill = 0xff;
1094 #ifdef unix
1095                 sched_yield();
1096 #else
1097                 SwitchToThread();
1098 #endif
1099                 continue;
1100         }
1101
1102         //  find key on page at this level
1103         //  and descend to requested level
1104
1105         if( slot = bt_findslot (set, key, len) ) {
1106           if( drill == lvl )
1107                 return slot;
1108
1109           if( slot > set->page->cnt )
1110                 return bt->err = BTERR_struct, 0;
1111
1112         //  if drilling down, find next active key
1113
1114           while( slotptr(set->page, slot)->dead )
1115                 if( slot++ < set->page->cnt )
1116                         continue;
1117                 else
1118                         return bt->err = BTERR_struct, 0;
1119
1120           page_no = bt_getid(slotptr(set->page, slot)->id);
1121           drill--;
1122           continue;
1123         }
1124
1125         //  or slide right into next page
1126         //  (slide left from deleted page)
1127
1128         page_no = bt_getid(set->page->right);
1129
1130   } while( page_no );
1131
1132   // return error on end of right chain
1133
1134   bt->err = BTERR_struct;
1135   return 0;     // return error
1136 }
1137
1138 // drill down fixing fence values for left sibling tree
1139
1140 //  call with set write locked mapped to bt->temp
1141 //      return with set unlocked & unpinned.
1142
1143 BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence)
1144 {
1145 unsigned char oldfence[256];
1146 BtPageSet next[1];
1147 uid right;
1148 int chk;
1149
1150   next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
1151   memcpy (oldfence, set->page->fence, 256);
1152   next->page = bt->temp2;
1153   bt->temp2 = bt->temp;
1154   bt->temp = next->page;
1155
1156   while( !set->page->kill && set->page->lvl ) {
1157         if( bt_lockpage (bt, next->page_no, BtLockParent) )
1158                 return bt->err;
1159         if( bt_lockpage (bt, next->page_no, BtLockAccess) )
1160                 return bt->err;
1161         if( bt_lockpage (bt, next->page_no, BtLockWrite) )
1162                 return bt->err;
1163         if( bt_unlockpage (bt, next->page_no, BtLockAccess) )
1164                 return bt->err;
1165
1166         if( bt_mappage (bt, &next->page, next->page_no) )
1167                 return bt->err;
1168
1169         chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence);
1170
1171         if( chk < 0 ) {
1172           right = bt_getid (next->page->right);
1173           if( bt_unlockpage (bt, next->page_no, BtLockWrite) )
1174                 return bt->err;
1175           if( bt_unlockpage (bt, next->page_no, BtLockParent) )
1176                 return bt->err;
1177           next->page_no = right;
1178           continue;
1179         }
1180
1181         if( chk > 0 )
1182           return bt->err = BTERR_struct;
1183
1184         if( bt_fixfences (bt, next, newfence) )
1185           return bt->err;
1186
1187         break;
1188   }
1189
1190   set->page = bt->temp;
1191
1192   if( bt_mappage (bt, &set->page, set->page_no) )
1193         return bt->err;
1194
1195   memcpy (set->page->fence, newfence, 256);
1196
1197   if( bt_update(bt, set->page, set->page_no) )
1198         return bt->err;
1199   if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
1200         return bt->err;
1201   if( bt_unlockpage (bt, set->page_no, BtLockParent) )
1202         return bt->err;
1203   return 0;
1204 }
1205
1206
1207 //      return page to free list
1208 //      page must be delete & write locked
1209
1210 BTERR bt_freepage (BtDb *bt, BtPageSet *set)
1211 {
1212         //      lock & map allocation page
1213
1214         if( bt_lockpage (bt, ALLOC_page, BtLockWrite) )
1215                 return bt->err;
1216
1217         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
1218                 return bt->err;
1219
1220         //      store chain in second right
1221         bt_putid(set->page->right, bt_getid(bt->alloc[1].right));
1222         bt_putid(bt->alloc[1].right, set->page_no);
1223         set->page->free = 1;
1224
1225         if( bt_update(bt, bt->alloc, ALLOC_page) )
1226                 return bt->err;
1227         if( bt_update(bt, set->page, set->page_no) )
1228                 return bt->err;
1229
1230         // unlock page zero 
1231
1232         if( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
1233                 return bt->err;
1234
1235         //  remove write lock on deleted node
1236
1237         if( bt_unlockpage(bt, set->page_no, BtLockWrite) )
1238                 return bt->err;
1239
1240         return bt_unlockpage (bt, set->page_no, BtLockDelete);
1241 }
1242
1243 //      remove the root level by promoting its only child
1244
1245 BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child)
1246 {
1247 uid next = 0;
1248
1249   do {
1250         if( next ) {
1251           if( bt_lockpage (bt, next, BtLockDelete) )
1252                 return bt->err;
1253           if( bt_lockpage (bt, next, BtLockWrite) )
1254                 return bt->err;
1255
1256           if( bt_mappage (bt, &child->page, next) )
1257                 return bt->err;
1258
1259           child->page_no = next;
1260         }
1261
1262         memcpy (root->page, child->page, bt->page_size);
1263         next = bt_getid (slotptr(child->page, child->page->cnt)->id);
1264
1265         if( bt_freepage (bt, child) )
1266                 return bt->err;
1267   } while( root->page->lvl > 1 && root->page->cnt == 1 );
1268
1269   if( bt_update (bt, root->page, ROOT_page) )
1270         return bt->err;
1271
1272   return bt_unlockpage (bt, ROOT_page, BtLockWrite);
1273 }
1274
1275 //  pull right page over ourselves in simple merge
1276
1277 BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx)
1278 {
1279         //  install ourselves as child page
1280         //      and delete ourselves from parent
1281
1282         bt_putid (slotptr(parent->page, idx)->id, set->page_no);
1283         slotptr(parent->page, slot)->dead = 1;
1284         parent->page->act--;
1285
1286         //      collapse any empty slots
1287
1288         while( idx = parent->page->cnt - 1 )
1289           if( slotptr(parent->page, idx)->dead ) {
1290                 *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1);
1291                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1292           } else
1293                   break;
1294
1295         memcpy (set->page, right->page, bt->page_size);
1296
1297         if( bt_unlockpage (bt, right->page_no, BtLockParent) )
1298                 return bt->err;
1299
1300         if( bt_freepage (bt, right) )
1301                 return bt->err;
1302
1303         //      do we need to remove a btree level?
1304         //      (leave the first page of leaves alone)
1305
1306         if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
1307           if( set->page->lvl )
1308                 return bt_removeroot (bt, parent, set);
1309
1310         if( bt_update (bt, parent->page, parent->page_no) )
1311                 return bt->err;
1312
1313         if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1314                 return bt->err;
1315
1316         if( bt_update (bt, set->page, set->page_no) )
1317                 return bt->err;
1318
1319         if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
1320                 return bt->err;
1321
1322         if( bt_unlockpage (bt, set->page_no, BtLockDelete) )
1323                 return bt->err;
1324
1325         return 0;
1326 }
1327
1328 //      remove both child and parent from the btree
1329 //      from the fence position in the parent
1330
1331 BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet  *right, BtPageSet *rparent, uint lvl)
1332 {
1333 unsigned char pagefence[256];
1334 uint idx;
1335
1336         //  pull right sibling over ourselves and unlock
1337
1338         memcpy (child->page, right->page, bt->page_size);
1339
1340         if( bt_update(bt, child->page, child->page_no) )
1341                 return bt->err;
1342
1343         if( bt_unlockpage (bt, child->page_no, BtLockWrite) )
1344                 return bt->err;
1345
1346         //  install ourselves into right link of old right page
1347
1348         bt_putid (right->page->right, child->page_no);
1349         right->page->goright = 1;               // tell bt_loadpage to go right to us
1350         right->page->kill = 1;
1351
1352         if( bt_update(bt, right->page, right->page_no) )
1353                 return bt->err;
1354
1355         if( bt_unlockpage (bt, right->page_no, BtLockWrite) )
1356                 return bt->err;
1357
1358         //      remove our slot from our parent
1359         //      signal to move right
1360
1361         parent->page->goright = 1;              // tell bt_findslot to go right to rparent
1362         parent->page->kill = 1;
1363         parent->page->act--;
1364
1365         //      redirect right page pointer in right parent to us
1366
1367         for( idx = 0; idx++ < rparent->page->cnt; )
1368           if( !slotptr(rparent->page, idx)->dead )
1369                 break;
1370
1371         if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no )
1372                 return bt->err = BTERR_struct;
1373
1374         bt_putid (slotptr(rparent->page, idx)->id, child->page_no);
1375
1376         if( bt_update (bt, rparent->page, rparent->page_no) )
1377                 return bt->err;
1378
1379         if( bt_unlockpage (bt, rparent->page_no, BtLockWrite) )
1380                 return bt->err;
1381
1382         //      save parent page fence value
1383
1384         memcpy (pagefence, parent->page->fence, 256);
1385
1386         if( bt_update (bt, parent->page, parent->page_no) )
1387                 return bt->err;
1388         if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1389                 return bt->err;
1390
1391         return bt_removepage (bt, parent->page_no, lvl, pagefence);
1392 }
1393
1394 //      remove page from btree
1395 //      call with page unlocked
1396 //      returns with page on free list
1397
1398 BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence)
1399 {
1400 BtPageSet parent[1], rparent[1], sibling[1], set[1];
1401 unsigned char newfence[256];
1402 uint slot, idx;
1403 BtKey ptr;
1404
1405   parent->page = bt->parent;
1406   set->page_no = page_no;
1407   set->page = bt->page;
1408
1409         //      load and lock our parent
1410
1411   while( 1 ) {
1412         if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
1413                 return bt->err;
1414
1415         //      wait until we are posted in our parent
1416
1417         if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) {
1418                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1419                         return bt->err;
1420 #ifdef unix
1421                 sched_yield();
1422 #else
1423                 SwitchToThread();
1424 #endif
1425                 continue;
1426         }
1427
1428         //      can we do a simple merge entirely
1429         //      between siblings on the parent page?
1430
1431         if( slot < parent->page->cnt ) {
1432                 // find our right neighbor
1433                 //      right must exist because the stopper prevents
1434                 //      the rightmost page from deleting
1435
1436                 for( idx = slot; idx++ < parent->page->cnt; )
1437                   if( !slotptr(parent->page, idx)->dead )
1438                         break;
1439
1440                 sibling->page_no = bt_getid (slotptr (parent->page, idx)->id);
1441
1442                 if( bt_lockpage (bt, set->page_no, BtLockDelete) )
1443                         return bt->err;
1444
1445                 if( bt_lockpage (bt, set->page_no, BtLockWrite) )
1446                         return bt->err;
1447
1448                 if( bt_mappage (bt, &set->page, set->page_no) )
1449                         return bt->err;
1450
1451                 //      merge right if sibling shows up in
1452                 //  our parent and is not being killed
1453
1454                 if( sibling->page_no == bt_getid (set->page->right) ) {
1455                   if( bt_lockpage (bt, sibling->page_no, BtLockParent) )
1456                         return bt->err;
1457
1458                   if( bt_lockpage (bt, sibling->page_no, BtLockDelete) )
1459                         return bt->err;
1460
1461                   if( bt_lockpage (bt, sibling->page_no, BtLockWrite) )
1462                         return bt->err;
1463
1464                   sibling->page = bt->temp;
1465
1466                   if( bt_mappage (bt, &sibling->page, sibling->page_no) )
1467                         return bt->err;
1468
1469                   if( !sibling->page->kill )
1470                         return bt_mergeright(bt, set, parent, sibling, slot, idx);
1471
1472                   //  try again later
1473
1474                   if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) )
1475                         return bt->err;
1476                 }
1477
1478                 if( bt_unlockpage (bt, set->page_no, BtLockDelete) )
1479                         return bt->err;
1480
1481                 if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
1482                         return bt->err;
1483
1484                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1485                         return bt->err;
1486 #ifdef linux
1487                 sched_yield ();
1488 #else
1489                 SwitchToThread();
1490 #endif
1491                 continue;
1492         }
1493
1494         //  find our left neighbor in our parent page
1495
1496         for( idx = slot; --idx; )
1497           if( !slotptr(parent->page, idx)->dead )
1498                 break;
1499
1500         //      if no left neighbor, delete ourselves and our parent
1501
1502         if( !idx ) {
1503                 if( bt_lockpage (bt, set->page_no, BtLockAccess) )
1504                         return bt->err;
1505
1506                 if( bt_lockpage (bt, set->page_no, BtLockWrite) )
1507                         return bt->err;
1508
1509                 if( bt_unlockpage (bt, set->page_no, BtLockAccess) )
1510                         return bt->err;
1511
1512                 if( bt_mappage (bt, &set->page, set->page_no) )
1513                         return bt->err;
1514
1515                 rparent->page_no = bt_getid (parent->page->right);
1516                 rparent->page = bt->temp;
1517
1518                 if( bt_lockpage (bt, rparent->page_no, BtLockAccess) )
1519                         return bt->err;
1520
1521                 if( bt_lockpage (bt, rparent->page_no, BtLockWrite) )
1522                         return bt->err;
1523
1524                 if( bt_unlockpage (bt, rparent->page_no, BtLockAccess) )
1525                         return bt->err;
1526
1527                 if( bt_mappage (bt, &rparent->page, rparent->page_no) )
1528                         return bt->err;
1529
1530                 if( !rparent->page->kill ) {
1531                   sibling->page_no = bt_getid (set->page->right);
1532
1533                   if( bt_lockpage (bt, sibling->page_no, BtLockAccess) )
1534                         return bt->err;
1535
1536                   if( bt_lockpage (bt, sibling->page_no, BtLockWrite) )
1537                         return bt->err;
1538
1539                   if( bt_unlockpage (bt, sibling->page_no, BtLockAccess) )
1540                         return bt->err;
1541
1542                   sibling->page = bt->temp2;
1543
1544                   if( bt_mappage (bt, &sibling->page, sibling->page_no) )
1545                         return bt->err;
1546
1547                   if( !sibling->page->kill )
1548                         return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1);
1549
1550                   //  try again later
1551
1552                   if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) )
1553                         return bt->err;
1554                 }
1555
1556                 if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
1557                         return bt->err;
1558
1559                 if( bt_unlockpage (bt, rparent->page_no, BtLockWrite) )
1560                         return bt->err;
1561
1562                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1563                         return bt->err;
1564 #ifdef linux
1565                 sched_yield();
1566 #else
1567                 SwitchToThread();
1568 #endif
1569                 continue;
1570         }
1571
1572         // redirect parent to our left sibling
1573         // lock and map our left sibling's page
1574
1575         sibling->page_no = bt_getid (slotptr(parent->page, idx)->id);
1576         sibling->page = bt->temp;
1577
1578         //      wait our turn on fence key maintenance
1579
1580         if( bt_lockpage(bt, sibling->page_no, BtLockParent) )
1581                 return bt->err;
1582
1583         if( bt_lockpage(bt, sibling->page_no, BtLockAccess) )
1584                 return bt->err;
1585
1586         if( bt_lockpage(bt, sibling->page_no, BtLockWrite) )
1587                 return bt->err;
1588
1589         if( bt_unlockpage(bt, sibling->page_no, BtLockAccess) )
1590                 return bt->err;
1591
1592         if( bt_mappage (bt, &sibling->page, sibling->page_no) )
1593                 return bt->err;
1594
1595         //  wait until sibling is in our parent
1596
1597         if( bt_getid (sibling->page->right) != set->page_no ) {
1598                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1599                         return bt->err;
1600                 if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) )
1601                         return bt->err;
1602                 if( bt_unlockpage (bt, sibling->page_no, BtLockParent) )
1603                         return bt->err;
1604 #ifdef linux
1605                 sched_yield();
1606 #else
1607                 SwitchToThread();
1608 #endif
1609                 continue;
1610         }
1611
1612         //      map page being killed
1613
1614         if( bt_lockpage (bt, set->page_no, BtLockDelete) )
1615                 return bt->err;
1616
1617         if( bt_lockpage (bt, set->page_no, BtLockWrite) )
1618                 return bt->err;
1619
1620         if( bt_mappage (bt, &set->page, set->page_no) )
1621                 return bt->err;
1622
1623         //      delete our left sibling from parent
1624
1625         slotptr(parent->page,idx)->dead = 1;
1626         parent->page->dirty = 1;
1627         parent->page->act--;
1628
1629         //      redirect our parent slot to our left sibling
1630
1631         bt_putid (slotptr(parent->page, slot)->id, sibling->page_no);
1632         memcpy (sibling->page->right, set->page->right, BtId);
1633
1634         if( bt_update (bt, sibling->page, sibling->page_no) )
1635                 return bt->err;
1636
1637         //      collapse dead slots from parent
1638
1639         while( idx = parent->page->cnt - 1 )
1640           if( slotptr(parent->page, idx)->dead ) {
1641                 *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt);
1642                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1643           } else
1644                   break;
1645
1646         //  update parent page
1647
1648         if( bt_update (bt, parent->page, parent->page_no) )
1649                 return bt->err;
1650
1651         //  free our original page
1652
1653         if( bt_freepage (bt, set) )
1654                 return bt->err;
1655
1656         //      go down the left node's fence keys to the leaf level
1657         //      and update the fence keys in each page
1658
1659         memcpy (newfence, parent->page->fence, 256);
1660
1661         if( bt_fixfences (bt, sibling, newfence) )
1662                 return bt->err;
1663
1664         //      promote sibling as new root?
1665
1666         if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
1667          if( sibling->page->lvl ) {
1668           if( bt_lockpage (bt, sibling->page_no, BtLockDelete) )
1669                 return bt->err;
1670
1671           if( bt_lockpage (bt, sibling->page_no, BtLockWrite) )
1672                 return bt->err;
1673
1674           if( bt_mappage (bt, &sibling->page, set->page_no) )
1675                 return bt->err;
1676
1677           return bt_removeroot (bt, parent, sibling);
1678          }
1679
1680         return bt_unlockpage (bt, parent->page_no, BtLockWrite);
1681   }
1682 }
1683
1684 //  find and delete key on page by marking delete flag bit
1685 //  when page becomes empty, delete it
1686
1687 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
1688 {
1689 unsigned char pagefence[256];
1690 uint slot, found, idx;
1691 BtPageSet set[1];
1692 BtKey ptr;
1693
1694         set->page = bt->page;
1695
1696         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) )
1697                 ptr = keyptr(set->page, slot);
1698         else
1699                 return bt->err;
1700
1701         // if key is found delete it, otherwise ignore request
1702
1703         if( found = slot <= set->page->cnt )
1704           if( found = !keycmp (ptr, key, len) )
1705                 if( found = slotptr(set->page, slot)->dead == 0 ) {
1706                   slotptr(set->page,slot)->dead = 1;
1707                   set->page->dirty = 1;
1708                   set->page->act--;
1709
1710                   //    collapse empty slots
1711
1712                   while( idx = set->page->cnt - 1 )
1713                         if( slotptr(set->page, idx)->dead ) {
1714                           *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1715                           memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1716                         } else
1717                                 break;
1718                 }
1719
1720         if( set->page->act ) {
1721                 if( bt_update(bt, set->page, set->page_no) )
1722                         return bt->err;
1723                 bt->found = found;
1724                 return bt_unlockpage (bt, set->page_no, BtLockWrite);
1725         }
1726
1727         // delete page when empty
1728
1729         memcpy (pagefence, set->page->fence, 256);
1730         set->page->kill = 1;
1731
1732         if( bt_update(bt, set->page, set->page_no) )
1733                 return bt->err;
1734
1735         if( bt_unlockpage(bt, set->page_no, BtLockWrite) )
1736                 return bt->err;
1737
1738         if( bt_removepage (bt, set->page_no, 0, pagefence) )
1739                 return bt->err;
1740
1741         bt->found = found;
1742         return 0;
1743 }
1744
1745 //      find key in leaf level and return row-id
1746
1747 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1748 {
1749 BtPageSet set[1];
1750 uint  slot;
1751 uid id = 0;
1752 BtKey ptr;
1753
1754         set->page = bt->page;
1755
1756         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
1757                 ptr = keyptr(set->page, slot);
1758         else
1759                 return 0;
1760
1761         // if key exists, return row-id
1762         //      otherwise return 0
1763
1764         if( slot <= set->page->cnt )
1765           if( !keycmp (ptr, key, len) )
1766                 id = bt_getid(slotptr(set->page,slot)->id);
1767
1768         if ( bt_unlockpage(bt, set->page_no, BtLockRead) )
1769                 return 0;
1770
1771         return id;
1772 }
1773
1774 //      check page for space available,
1775 //      clean if necessary and return
1776 //      0 - page needs splitting
1777 //      >0 - new slot value
1778  
1779 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
1780 {
1781 uint nxt = bt->page_size, off;
1782 uint cnt = 0, idx = 0;
1783 uint max = page->cnt;
1784 uint newslot = max;
1785 BtKey key;
1786
1787         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1788                 return slot;
1789
1790         //      skip cleanup if nothing to reclaim
1791
1792         if( !page->dirty )
1793                 return 0;
1794
1795         memcpy (bt->frame, page, bt->page_size);
1796
1797         // skip page info and set rest of page to zero
1798
1799         memset (page+1, 0, bt->page_size - sizeof(*page));
1800         page->dirty = 0;
1801         page->act = 0;
1802
1803         while( cnt++ < max ) {
1804                 if( cnt == slot )
1805                         newslot = idx + 1;
1806                 if( slotptr(bt->frame,cnt)->dead )
1807                         continue;
1808
1809                 off = slotptr(bt->frame,cnt)->off;
1810
1811                 // copy key
1812
1813                 if( off >= sizeof(*page) ) {
1814                         key = keyptr(bt->frame, cnt);
1815                         off = nxt -= key->len + 1;
1816                         memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1817                 }
1818
1819                 // copy slot
1820
1821                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1822                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1823                 slotptr(page, idx)->off = off;
1824                 page->act++;
1825         }
1826         page->min = nxt;
1827         page->cnt = idx;
1828
1829         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1830                 return newslot;
1831
1832         return 0;
1833 }
1834
1835 // split the root and raise the height of the btree
1836
1837 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2)
1838 {
1839 unsigned char leftkey[256];
1840 uint nxt = bt->page_size;
1841 uid new_page;
1842
1843         //  Obtain an empty page to use, and copy the current
1844         //  root contents into it, e.g. lower keys
1845
1846         memcpy (leftkey, root->page->fence, 256);
1847         root->page->posted = 1;
1848
1849         if( !(new_page = bt_newpage(bt, root->page)) )
1850                 return bt->err;
1851
1852         // preserve the page info at the bottom
1853         // of higher keys and set rest to zero
1854
1855         memset(root->page+1, 0, bt->page_size - sizeof(*root->page));
1856         memset(root->page->fence, 0, 256);
1857         root->page->fence[0] = 2;
1858         root->page->fence[1] = 0xff;
1859         root->page->fence[2] = 0xff;
1860
1861         // insert new page fence key on newroot page
1862
1863         nxt -= *leftkey + 1;
1864         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1865         bt_putid(slotptr(root->page, 1)->id, new_page);
1866         slotptr(root->page, 1)->off = nxt;
1867         
1868         // insert stopper key on newroot page
1869         // and increase the root height
1870
1871         bt_putid(slotptr(root->page, 2)->id, page_no2);
1872         slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence);
1873
1874         bt_putid(root->page->right, 0);
1875         root->page->min = nxt;          // reset lowest used offset and key count
1876         root->page->cnt = 2;
1877         root->page->act = 2;
1878         root->page->lvl++;
1879
1880         // update and release root
1881
1882         if( bt_update(bt, root->page, root->page_no) )
1883                 return bt->err;
1884
1885         return bt_unlockpage(bt, root->page_no, BtLockWrite);
1886 }
1887
1888 //  split already locked full node
1889 //      return unlocked.
1890
1891 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1892 {
1893 uint cnt = 0, idx = 0, max, nxt = bt->page_size, off;
1894 unsigned char fencekey[256];
1895 uint lvl = set->page->lvl;
1896 uid right;
1897 BtKey key;
1898
1899         //  split higher half of keys to bt->frame
1900
1901         memset (bt->frame, 0, bt->page_size);
1902         max = set->page->cnt;
1903         cnt = max / 2;
1904         idx = 0;
1905
1906         while( cnt++ < max ) {
1907                 if( !lvl || cnt < max ) {
1908                         key = keyptr(set->page, cnt);
1909                         off = nxt -= key->len + 1;
1910                         memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1911                 } else
1912                         off = offsetof(struct BtPage_, fence);
1913
1914                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
1915                 slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
1916                 slotptr(bt->frame, idx)->off = off;
1917                 bt->frame->act++;
1918         }
1919
1920         if( set->page_no == ROOT_page )
1921                 bt->frame->posted = 1;
1922
1923         memcpy (bt->frame->fence, set->page->fence, 256);
1924         bt->frame->bits = bt->page_bits;
1925         bt->frame->min = nxt;
1926         bt->frame->cnt = idx;
1927         bt->frame->lvl = lvl;
1928
1929         // link right node
1930
1931         if( set->page_no > ROOT_page )
1932                 memcpy (bt->frame->right, set->page->right, BtId);
1933
1934         //      get new free page and write higher keys to it.
1935
1936         if( !(right = bt_newpage(bt, bt->frame)) )
1937                 return bt->err;
1938
1939         //      update lower keys to continue in old page
1940
1941         memcpy (bt->frame, set->page, bt->page_size);
1942         memset (set->page+1, 0, bt->page_size - sizeof(*set->page));
1943         nxt = bt->page_size;
1944         set->page->posted = 0;
1945         set->page->dirty = 0;
1946         set->page->act = 0;
1947         cnt = 0;
1948         idx = 0;
1949
1950         //  assemble page of smaller keys
1951
1952         while( cnt++ < max / 2 ) {
1953                 key = keyptr(bt->frame, cnt);
1954
1955                 if( !lvl || cnt < max / 2 ) {
1956                         off = nxt -= key->len + 1;
1957                         memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
1958                 } else 
1959                         off = offsetof(struct BtPage_, fence);
1960
1961                 memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1962                 slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1963                 slotptr(set->page, idx)->off = off;
1964                 set->page->act++;
1965         }
1966
1967         // install fence key for smaller key page
1968
1969         memset(set->page->fence, 0, 256);
1970         memcpy(set->page->fence, key, key->len + 1);
1971
1972         bt_putid(set->page->right, right);
1973         set->page->min = nxt;
1974         set->page->cnt = idx;
1975
1976         // if current page is the root page, split it
1977
1978         if( set->page_no == ROOT_page )
1979                 return bt_splitroot (bt, set, right);
1980
1981         if( bt_update (bt, set->page, set->page_no) )
1982                 return bt->err; 
1983
1984         if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
1985                 return bt->err;
1986
1987         // insert new fences in their parent pages
1988
1989         while( 1 ) {
1990                 if( bt_lockpage (bt, set->page_no, BtLockParent) )
1991                         return bt->err;
1992
1993                 if( bt_lockpage (bt, set->page_no, BtLockWrite) )
1994                         return bt->err;
1995
1996                 if( bt_mappage (bt, &set->page, set->page_no) )
1997                         return bt->err;
1998
1999                 memcpy (fencekey, set->page->fence, 256);
2000                 right = bt_getid (set->page->right);
2001
2002                 if( set->page->posted ) {
2003                   if( bt_unlockpage (bt, set->page_no, BtLockParent) )
2004                         return bt->err;
2005                         
2006                   return bt_unlockpage (bt, set->page_no, BtLockWrite);
2007                 }
2008
2009                 set->page->posted = 1;
2010
2011                 if( bt_update (bt, set->page, set->page_no) )
2012                         return bt->err; 
2013
2014                 if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
2015                         return bt->err;
2016
2017                 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, set->page_no, time(NULL)) )
2018                         return bt->err;
2019
2020                 if( bt_unlockpage (bt, set->page_no, BtLockParent) )
2021                         return bt->err;
2022
2023                 if( !(set->page_no = right) )
2024                         break;
2025         }
2026
2027         return 0;
2028 }
2029
2030 //  Insert new key into the btree at requested level.
2031
2032 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
2033 {
2034 BtPageSet set[1];
2035 uint slot, idx;
2036 BtKey ptr;
2037
2038   set->page = bt->page;
2039
2040   while( 1 ) {
2041         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
2042                 ptr = keyptr(set->page, slot);
2043         else
2044         {
2045                 if ( !bt->err )
2046                         bt->err = BTERR_ovflw;
2047                 return bt->err;
2048         }
2049
2050         // if key already exists, update id and return
2051
2052         if( slot <= set->page->cnt )
2053           if( !keycmp (ptr, key, len) ) {
2054                 if( slotptr(set->page, slot)->dead )
2055                         set->page->act++;
2056
2057                 slotptr(set->page, slot)->dead = 0;
2058                 slotptr(set->page, slot)->tod = tod;
2059                 bt_putid(slotptr(set->page,slot)->id, id);
2060
2061                 if ( bt_update(bt, set->page, set->page_no) )
2062                         return bt->err;
2063
2064                 return bt_unlockpage(bt, set->page_no, BtLockWrite);
2065           }
2066
2067         // check if page has enough space
2068
2069         if( slot = bt_cleanpage (bt, set->page, len, slot) )
2070                 break;
2071
2072         if( bt_splitpage (bt, set) )
2073                 return bt->err;
2074   }
2075
2076   // calculate next available slot and copy key into page
2077
2078   set->page->min -= len + 1; // reset lowest used offset
2079   ((unsigned char *)set->page)[set->page->min] = len;
2080   memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
2081
2082   for( idx = slot; idx <= set->page->cnt; idx++ )
2083         if( slotptr(set->page, idx)->dead )
2084                 break;
2085
2086   // now insert key into array before slot
2087
2088   if( idx > set->page->cnt )
2089         set->page->cnt++;
2090
2091   set->page->act++;
2092
2093   while( idx > slot )
2094         *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2095
2096   bt_putid(slotptr(set->page,slot)->id, id);
2097   slotptr(set->page, slot)->off = set->page->min;
2098   slotptr(set->page, slot)->tod = tod;
2099   slotptr(set->page, slot)->dead = 0;
2100
2101   if( bt_update(bt, set->page, set->page_no) )
2102         return bt->err;
2103
2104   return bt_unlockpage(bt, set->page_no, BtLockWrite);
2105 }
2106
2107 //  cache page of keys into cursor and return starting slot for given key
2108
2109 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2110 {
2111 BtPageSet set[1];
2112 uint slot;
2113
2114         set->page = bt->page;
2115
2116         // cache page for retrieval
2117         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2118                 memcpy (bt->cursor, set->page, bt->page_size);
2119
2120         bt->cursor_page = set->page_no;
2121
2122         if ( bt_unlockpage(bt, set->page_no, BtLockRead) )
2123                 return 0;
2124
2125         return slot;
2126 }
2127
2128 //  return next slot for cursor page
2129 //  or slide cursor right into next page
2130
2131 uint bt_nextkey (BtDb *bt, uint slot)
2132 {
2133 BtPageSet set[1];
2134 uid right;
2135
2136   do {
2137         right = bt_getid(bt->cursor->right);
2138         while( slot++ < bt->cursor->cnt )
2139           if( slotptr(bt->cursor,slot)->dead )
2140                 continue;
2141           else if( right || (slot < bt->cursor->cnt)) // skip infinite stopper
2142                 return slot;
2143           else
2144                 break;
2145
2146         if( !right )
2147                 break;
2148
2149         bt->cursor_page = right;
2150         set->page = bt->page;
2151
2152     if( bt_lockpage(bt, right, BtLockRead) )
2153                 return 0;
2154
2155         if( bt_mappage (bt, &set->page, right) )
2156                 break;
2157
2158         memcpy (bt->cursor, set->page, bt->page_size);
2159
2160         if( bt_unlockpage(bt, right, BtLockRead) )
2161                 return 0;
2162
2163         slot = 0;
2164   } while( 1 );
2165
2166   return bt->err = 0;
2167 }
2168
2169 BtKey bt_key(BtDb *bt, uint slot)
2170 {
2171         return keyptr(bt->cursor, slot);
2172 }
2173
2174 uid bt_uid(BtDb *bt, uint slot)
2175 {
2176         return bt_getid(slotptr(bt->cursor,slot)->id);
2177 }
2178
2179 uint bt_tod(BtDb *bt, uint slot)
2180 {
2181         return slotptr(bt->cursor,slot)->tod;
2182 }
2183
2184
2185 #ifdef STANDALONE
2186 //  standalone program to index file of keys
2187 //  then list them onto std-out
2188
2189 int main (int argc, char **argv)
2190 {
2191 uint slot, line = 0, off = 0, found = 0;
2192 int dead, ch, cnt = 0, bits = 12;
2193 unsigned char key[256];
2194 clock_t done, start;
2195 uint pgblk = 0;
2196 time_t tod[1];
2197 uint scan = 0;
2198 uint len = 0;
2199 uint map = 0;
2200 BtKey ptr;
2201 BtDb *bt;
2202 FILE *in;
2203
2204         if( argc < 4 ) {
2205                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_segments pages_per_segment start_line_number]\n", argv[0]);
2206                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2207                 fprintf (stderr, "  mapped_pool_segments: size of buffer pool in segments\n");
2208                 fprintf (stderr, "  pages_per_segment: size of buffer pool segment in pages in bits\n");
2209                 exit(0);
2210         }
2211
2212         start = clock();
2213         time(tod);
2214
2215         if( argc > 4 )
2216                 bits = atoi(argv[4]);
2217
2218         if( argc > 5 )
2219                 map = atoi(argv[5]);
2220
2221         if( map > 65536 )
2222                 fprintf (stderr, "Warning: buffer_pool > 65536 segments\n");
2223
2224         if( map && map < 8 )
2225                 fprintf (stderr, "Buffer_pool too small\n");
2226
2227         if( argc > 6 )
2228                 pgblk = atoi(argv[6]);
2229
2230         if( bits + pgblk > 30 )
2231                 fprintf (stderr, "Warning: very large buffer pool segment size\n");
2232
2233         if( argc > 7 )
2234                 off = atoi(argv[7]);
2235
2236         bt = bt_open ((argv[1]), BT_rw, bits, map, pgblk);
2237
2238         if( !bt ) {
2239                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2240                 exit (1);
2241         }
2242
2243         switch(argv[3][0]| 0x20)
2244         {
2245         case 'w':
2246                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2247                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2248                   while( ch = getc(in), ch != EOF )
2249                         if( ch == '\n' )
2250                         {
2251                           if( off )
2252                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2253
2254                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2255                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2256                           len = 0;
2257                         }
2258                         else if( len < 245 )
2259                                 key[len++] = ch;
2260                 fprintf(stderr, "finished adding keys, %d \n", line);
2261                 break;
2262
2263         case 'd':
2264                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2265                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2266                   while( ch = getc(in), ch != EOF )
2267                         if( ch == '\n' )
2268                         {
2269                           if( off )
2270                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2271                           line++;
2272                           if( bt_deletekey (bt, key, len) )
2273                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2274                           len = 0;
2275                         }
2276                         else if( len < 245 )
2277                                 key[len++] = ch;
2278                 fprintf(stderr, "finished deleting keys, %d \n", line);
2279                 break;
2280
2281         case 'f':
2282                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2283                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2284                   while( ch = getc(in), ch != EOF )
2285                         if( ch == '\n' )
2286                         {
2287                           if( off )
2288                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2289                           line++;
2290                           if( bt_findkey (bt, key, len) )
2291                                 found++;
2292                           else if( bt->err )
2293                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2294                           len = 0;
2295                         }
2296                         else if( len < 245 )
2297                                 key[len++] = ch;
2298                 fprintf(stderr, "finished search of %d keys, found %d\n", line, found);
2299                 break;
2300
2301         case 's':
2302                 scan++;
2303                 break;
2304
2305         }
2306
2307         done = clock();
2308         fprintf(stderr, " Time to complete: %.2f seconds\n", (float)(done - start) / CLOCKS_PER_SEC);
2309
2310         dead = cnt = 0;
2311         len = key[0] = 0;
2312
2313         fprintf(stderr, "started reading\n");
2314
2315         if( slot = bt_startkey (bt, key, len) )
2316           slot--;
2317         else
2318           fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2319
2320         while( slot = bt_nextkey (bt, slot) )
2321           if( cnt++, scan ) {
2322                         ptr = bt_key(bt, slot);
2323                         fwrite (ptr->key, ptr->len, 1, stdout);
2324                         fputc ('\n', stdout);
2325           }
2326
2327         fprintf(stderr, " Total keys read %d\n", cnt);
2328         return 0;
2329 }
2330
2331 #endif  //STANDALONE