]> pd.if.org Git - btree/blob - btree2t.c
00c6f24ccc0c6ccad212e1be350dcc6f91858bfe
[btree] / btree2t.c
1 // btree version 2t
2 // 04 FEB 2014
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stddef.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #endif
47
48 #include <memory.h>
49 #include <string.h>
50
51 typedef unsigned long long      uid;
52
53 #ifndef unix
54 typedef unsigned long long      off64_t;
55 typedef unsigned short          ushort;
56 typedef unsigned int            uint;
57 #endif
58
59 #define BT_ro 0x6f72    // ro
60 #define BT_rw 0x7772    // rw
61 #define BT_fl 0x6c66    // fl
62
63 #define BT_maxbits              24                                      // maximum page size in bits
64 #define BT_minbits              9                                       // minimum page size in bits
65 #define BT_minpage              (1 << BT_minbits)       // minimum page size
66
67 /*
68 There are five lock types for each node in three independent sets: 
69 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
70 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
71 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
72 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
73 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
74 */
75
76 typedef enum{
77         BtLockAccess,
78         BtLockDelete,
79         BtLockRead,
80         BtLockWrite,
81         BtLockParent
82 }BtLock;
83
84 //      Define the length of the page and key pointers
85
86 #define BtId 6
87
88 //      Page key slot definition.
89
90 //      If BT_maxbits is 15 or less, you can save 2 bytes
91 //      for each key stored by making the first two uints
92 //      into ushorts.  You can also save 4 bytes by removing
93 //      the tod field from the key.
94
95 //      Keys are marked dead, but remain on the page until
96 //      cleanup is called.
97
98 typedef struct {
99         uint off:BT_maxbits;            // page offset for key start
100         uint dead:1;                            // set for deleted key
101         uint tod;                                       // time-stamp for key
102         unsigned char id[BtId];         // id associated with key
103 } BtSlot;
104
105 //      The key structure occupies space at the upper end of
106 //      each page.  It's a length byte followed by the value
107 //      bytes.
108
109 typedef struct {
110         unsigned char len;
111         unsigned char key[0];
112 } *BtKey;
113
114 //      The first part of an index page.
115 //      It is immediately followed
116 //      by the BtSlot array of keys.
117
118 typedef struct BtPage_ {
119         uint cnt;                                       // count of keys in page
120         uint act;                                       // count of active keys
121         uint min;                                       // next key offset
122         unsigned char bits;                     // page size in bits
123         unsigned char lvl:6;            // level of page
124         unsigned char dirty:1;          // page is dirty
125         unsigned char posted:1;         // page fence is posted
126         unsigned char right[BtId];      // page number to right
127         unsigned char fence[256];       // page fence key
128 } *BtPage;
129
130 //  The loadpage interface object
131
132 typedef struct {
133         uid page_no;
134         BtPage page;
135 } BtPageSet;
136
137 //      The memory mapping hash table entry
138
139 typedef struct {
140         BtPage page;            // mapped page pointer
141         uid  page_no;           // mapped page number
142         void *lruprev;          // least recently used previous cache block
143         void *lrunext;          // lru next cache block
144         void *hashprev;         // previous cache block for the same hash idx
145         void *hashnext;         // next cache block for the same hash idx
146 #ifndef unix
147         HANDLE hmap;
148 #endif
149 } BtHash;
150
151 //      The object structure for Btree access
152
153 typedef struct _BtDb {
154         uint page_size;         // each page size       
155         uint page_bits;         // each page size in bits       
156         uint seg_bits;          // segment size in pages in bits
157         uid page_no;            // current page number  
158         uid cursor_page;        // current cursor page number   
159         int  err;
160         uint mode;                      // read-write mode
161         uint mapped_io;         // use memory mapping
162         BtPage temp;            // temporary frame buffer (memory mapped/file IO)
163         BtPage parent;          // current page's parent node (memory mapped/file IO)
164         BtPage alloc;           // frame for alloc page  (memory mapped/file IO)
165         BtPage cursor;          // cached frame for start/next (never mapped)
166         BtPage frame;           // spare frame for the page split (never mapped)
167         BtPage zero;            // zeroes frame buffer (never mapped)
168         BtPage page;            // temporary page (memory mapped/file IO)
169 #ifdef unix
170         int idx;
171 #else
172         HANDLE idx;
173 #endif
174         unsigned char *mem;     // frame, cursor, page memory buffer
175         int nodecnt;            // highest page cache segment in use
176         int nodemax;            // highest page cache segment allocated
177         int hashmask;           // number of pages in segments - 1
178         int hashsize;           // size of hash table
179         int posted;                     // last loadpage found posted key
180         int found;                      // last insert/delete found key
181         BtHash *lrufirst;       // lru list head
182         BtHash *lrulast;        // lru list tail
183         ushort *cache;          // hash table for cached segments
184         BtHash nodes[1];        // segment cache follows
185 } BtDb;
186
187 typedef enum {
188 BTERR_ok = 0,
189 BTERR_struct,
190 BTERR_ovflw,
191 BTERR_lock,
192 BTERR_map,
193 BTERR_wrt,
194 BTERR_hash
195 } BTERR;
196
197 // B-Tree functions
198 extern void bt_close (BtDb *bt);
199 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk, uint pgblk);
200 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
201 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
202 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
203 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
204 extern uint bt_nextkey   (BtDb *bt, uint slot);
205
206 //      Internal functions
207
208 BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence);
209
210 //  Helper functions to return slot values
211
212 extern BtKey bt_key (BtDb *bt, uint slot);
213 extern uid bt_uid (BtDb *bt, uint slot);
214 extern uint bt_tod (BtDb *bt, uint slot);
215
216 //  BTree page number constants
217 #define ALLOC_page              0
218 #define ROOT_page               1
219
220 //      Number of levels to create in a new BTree
221
222 #define MIN_lvl                 2
223
224 //  The page is allocated from low and hi ends.
225 //  The key offsets and row-id's are allocated
226 //  from the bottom, while the text of the key
227 //  is allocated from the top.  When the two
228 //  areas meet, the page is split into two.
229
230 //  A key consists of a length byte, two bytes of
231 //  index number (0 - 65534), and up to 253 bytes
232 //  of key value.  Duplicate keys are discarded.
233 //  Associated with each key is a 48 bit row-id.
234
235 //  The b-tree root is always located at page 1.
236 //      The first leaf page of level zero is always
237 //      located on page 2.
238
239 //      The b-tree pages are linked with right
240 //      pointers to facilitate enumerators,
241 //      and provide for concurrency.
242
243 //      When to root page fills, it is split in two and
244 //      the tree height is raised by a new root at page
245 //      one with two keys.
246
247 //      Deleted keys are marked with a dead bit until
248 //      page cleanup
249
250 //  Groups of pages from the btree are optionally
251 //  cached with memory mapping. A hash table is used to keep
252 //  track of the cached pages.  This behaviour is controlled
253 //  by the number of cache blocks parameter and pages per block
254 //      given to bt_open.
255
256 //  To achieve maximum concurrency one page is locked at a time
257 //  as the tree is traversed to find leaf key in question. The right
258 //  page numbers are used in cases where the page is being split,
259 //      or consolidated.
260
261 //  Page 0 is dedicated to lock for new page extensions,
262 //      and chains empty pages together for reuse.
263
264 //      Parent locks are obtained to prevent resplitting or deleting a node
265 //      before its fence is posted into its upper level.
266
267 //      Empty nodes are chained together through the ALLOC page and reused.
268
269 //      A special open mode of BT_fl is provided to safely access files on
270 //      WIN32 networks. WIN32 network operations should not use memory mapping.
271 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
272 //      to prevent local caching of network file contents.
273
274 //      Access macros to address slot and key values from the page.
275 //      Page slots use 1 based indexing.
276
277 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
278 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
279
280 void bt_putid(unsigned char *dest, uid id)
281 {
282 int i = BtId;
283
284         while( i-- )
285                 dest[i] = (unsigned char)id, id >>= 8;
286 }
287
288 uid bt_getid(unsigned char *src)
289 {
290 uid id = 0;
291 int i;
292
293         for( i = 0; i < BtId; i++ )
294                 id <<= 8, id |= *src++; 
295
296         return id;
297 }
298
299 // place write, read, or parent lock on requested page_no.
300
301 BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode)
302 {
303 off64_t off = page_no << bt->page_bits;
304 #ifdef unix
305 int flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
306 struct flock lock[1];
307 #else
308 uint flags = 0, len;
309 OVERLAPPED ovl[1];
310 #endif
311
312         if( mode == BtLockRead || mode == BtLockWrite )
313                 off +=  1 * sizeof(*bt->page);  // use second segment
314
315         if( mode == BtLockParent )
316                 off +=  2 * sizeof(*bt->page);  // use third segment
317
318 #ifdef unix
319         memset (lock, 0, sizeof(lock));
320
321         lock->l_start = off;
322         lock->l_type = (mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent) ? F_WRLCK : F_RDLCK;
323         lock->l_len = sizeof(*bt->page);
324         lock->l_whence = 0;
325
326         if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
327                 return bt->err = BTERR_lock;
328
329         return 0;
330 #else
331         memset (ovl, 0, sizeof(ovl));
332         ovl->OffsetHigh = (uint)(off >> 32);
333         ovl->Offset = (uint)off;
334         len = sizeof(*bt->page);
335
336         //      use large offsets to
337         //      simulate advisory locking
338
339         ovl->OffsetHigh |= 0x80000000;
340
341         if( mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent )
342                 flags |= LOCKFILE_EXCLUSIVE_LOCK;
343
344         if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) )
345                 return bt->err = 0;
346
347         return bt->err = BTERR_lock;
348 #endif 
349 }
350
351 // remove write, read, or parent lock on requested page_no.
352
353 BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
354 {
355 off64_t off = page_no << bt->page_bits;
356 #ifdef unix
357 struct flock lock[1];
358 #else
359 OVERLAPPED ovl[1];
360 uint len;
361 #endif
362
363         if( mode == BtLockRead || mode == BtLockWrite )
364                 off +=  1 * sizeof(*bt->page);  // use second segment
365
366         if( mode == BtLockParent )
367                 off +=  2 * sizeof(*bt->page);  // use third segment
368
369 #ifdef unix
370         memset (lock, 0, sizeof(lock));
371
372         lock->l_start = off;
373         lock->l_type = F_UNLCK;
374         lock->l_len = sizeof(*bt->page);
375         lock->l_whence = 0;
376
377         if( fcntl (bt->idx, F_SETLK, lock) < 0 )
378                 return bt->err = BTERR_lock;
379 #else
380         memset (ovl, 0, sizeof(ovl));
381         ovl->OffsetHigh = (uint)(off >> 32);
382         ovl->Offset = (uint)off;
383         len = sizeof(*bt->page);
384
385         //      use large offsets to
386         //      simulate advisory locking
387
388         ovl->OffsetHigh |= 0x80000000;
389
390         if( !UnlockFileEx (bt->idx, 0, len, 0, ovl) )
391                 return GetLastError(), bt->err = BTERR_lock;
392 #endif
393
394         return bt->err = 0;
395 }
396
397 //      close and release memory
398
399 void bt_close (BtDb *bt)
400 {
401 BtHash *hash;
402 #ifdef unix
403         // release mapped pages
404
405         if( hash = bt->lrufirst )
406                 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
407                 while(hash = hash->lrunext);
408
409         if ( bt->mem )
410                 free (bt->mem);
411         close (bt->idx);
412         free (bt->cache);
413         free (bt);
414 #else
415         if( hash = bt->lrufirst )
416           do
417           {
418                 FlushViewOfFile(hash->page, 0);
419                 UnmapViewOfFile(hash->page);
420                 CloseHandle(hash->hmap);
421           } while(hash = hash->lrunext);
422
423         if ( bt->mem)
424                 VirtualFree (bt->mem, 0, MEM_RELEASE);
425         FlushFileBuffers(bt->idx);
426         CloseHandle(bt->idx);
427         GlobalFree (bt->cache);
428         GlobalFree (bt);
429 #endif
430 }
431
432 //  open/create new btree
433 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
434 //              size of mapped page cache (e.g. 8192) or zero for no mapping.
435
436 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax, uint pgblk)
437 {
438 uint lvl, attr, cacheblk, last;
439 BtLock lockmode = BtLockWrite;
440 BtPage alloc;
441 off64_t size;
442 uint amt[1];
443 BtDb* bt;
444
445 #ifndef unix
446 SYSTEM_INFO sysinfo[1];
447 #endif
448
449 #ifdef unix
450         bt = malloc (sizeof(BtDb) + nodemax * sizeof(BtHash));
451         memset (bt, 0, sizeof(BtDb));
452
453         switch (mode & 0x7fff)
454         {
455         case BT_fl:
456         case BT_rw:
457                 bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
458                 break;
459
460         case BT_ro:
461         default:
462                 bt->idx = open ((char*)name, O_RDONLY);
463                 lockmode = BtLockRead;
464                 break;
465         }
466         if( bt->idx == -1 )
467                 return free(bt), NULL;
468         
469         if( nodemax )
470                 cacheblk = 4096;        // page size for unix
471         else
472                 cacheblk = 0;
473
474 #else
475         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb) + nodemax * sizeof(BtHash));
476         attr = FILE_ATTRIBUTE_NORMAL;
477         switch (mode & 0x7fff)
478         {
479         case BT_fl:
480                 attr |= FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING;
481
482         case BT_rw:
483                 bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
484                 break;
485
486         case BT_ro:
487         default:
488                 bt->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
489                 lockmode = BtLockRead;
490                 break;
491         }
492         if( bt->idx == INVALID_HANDLE_VALUE )
493                 return GlobalFree(bt), NULL;
494
495         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
496         GetSystemInfo(sysinfo);
497
498         if( nodemax )
499                 cacheblk = sysinfo->dwAllocationGranularity;
500         else
501                 cacheblk = 0;
502 #endif
503
504         // determine sanity of page size
505
506         if( bits > BT_maxbits )
507                 bits = BT_maxbits;
508         else if( bits < BT_minbits )
509                 bits = BT_minbits;
510
511         if ( bt_lockpage(bt, ALLOC_page, lockmode) )
512                 return bt_close (bt), NULL;
513
514 #ifdef unix
515         *amt = 0;
516
517         // read minimum page size to get root info
518
519         if( size = lseek (bt->idx, 0L, 2) ) {
520                 alloc = malloc (BT_minpage);
521                 pread(bt->idx, alloc, BT_minpage, 0);
522                 bits = alloc->bits;
523                 free (alloc);
524         } else if( mode == BT_ro )
525                 return bt_close (bt), NULL;
526 #else
527         size = GetFileSize(bt->idx, amt);
528
529         if( size || *amt ) {
530                 alloc = VirtualAlloc(NULL, BT_minpage, MEM_COMMIT, PAGE_READWRITE);
531                 if( !ReadFile(bt->idx, (char *)alloc, BT_minpage, amt, NULL) )
532                         return bt_close (bt), NULL;
533                 bits = alloc->bits;
534                 VirtualFree (alloc, 0, MEM_RELEASE);
535         } else if( mode == BT_ro )
536                 return bt_close (bt), NULL;
537 #endif
538
539         bt->page_size = 1 << bits;
540         bt->page_bits = bits;
541
542         bt->nodemax = nodemax;
543         bt->mode = mode;
544
545         // setup cache mapping
546
547         if( cacheblk ) {
548                 if( cacheblk < bt->page_size )
549                         cacheblk = bt->page_size;
550
551                 bt->hashsize = nodemax / 8;
552                 bt->hashmask = (cacheblk >> bits) - 1;
553                 bt->mapped_io = 1;
554         }
555
556         //      requested number of pages per memmap segment
557
558         if( cacheblk )
559           if( (1 << pgblk) > bt->hashmask )
560                 bt->hashmask = (1 << pgblk) - 1;
561
562         bt->seg_bits = 0;
563
564         while( (1 << bt->seg_bits) <= bt->hashmask )
565                 bt->seg_bits++;
566
567 #ifdef unix
568         bt->mem = malloc (7 *bt->page_size);
569         bt->cache = calloc (bt->hashsize, sizeof(ushort));
570 #else
571         bt->mem = VirtualAlloc(NULL, 7 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
572         bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, bt->hashsize * sizeof(ushort));
573 #endif
574         bt->frame = (BtPage)bt->mem;
575         bt->cursor = (BtPage)(bt->mem + bt->page_size);
576         bt->page = (BtPage)(bt->mem + 2 * bt->page_size);
577         bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size);
578         bt->temp = (BtPage)(bt->mem + 4 * bt->page_size);
579         bt->parent = (BtPage)(bt->mem + 5 * bt->page_size);
580         bt->zero = (BtPage)(bt->mem + 6 * bt->page_size);
581
582         if( size || *amt ) {
583                 if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
584                         return bt_close (bt), NULL;
585
586                 return bt;
587         }
588
589         // initializes an empty b-tree with root page and page of leaves
590
591         memset (bt->alloc, 0, bt->page_size);
592         bt_putid(bt->alloc->right, MIN_lvl+1);
593         bt->alloc->bits = bt->page_bits;
594
595 #ifdef unix
596         if( write (bt->idx, bt->alloc, bt->page_size) < bt->page_size )
597                 return bt_close (bt), NULL;
598 #else
599         if( !WriteFile (bt->idx, (char *)bt->alloc, bt->page_size, amt, NULL) )
600                 return bt_close (bt), NULL;
601
602         if( *amt < bt->page_size )
603                 return bt_close (bt), NULL;
604 #endif
605
606         memset (bt->frame, 0, bt->page_size);
607         bt->frame->bits = bt->page_bits;
608         bt->frame->posted = 1;
609
610         for( lvl=MIN_lvl; lvl--; ) {
611                 slotptr(bt->frame, 1)->off = offsetof(struct BtPage_, fence);
612                 bt_putid(slotptr(bt->frame, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);               // next(lower) page number
613                 bt->frame->fence[0] = 2;
614                 bt->frame->fence[1] = 0xff;
615                 bt->frame->fence[2] = 0xff;
616                 bt->frame->min = bt->page_size;
617                 bt->frame->lvl = lvl;
618                 bt->frame->cnt = 1;
619                 bt->frame->act = 1;
620 #ifdef unix
621                 if( write (bt->idx, bt->frame, bt->page_size) < bt->page_size )
622                         return bt_close (bt), NULL;
623 #else
624                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
625                         return bt_close (bt), NULL;
626
627                 if( *amt < bt->page_size )
628                         return bt_close (bt), NULL;
629 #endif
630         }
631
632         // create empty page area by writing last page of first
633         // cache area (other pages are zeroed by O/S)
634
635         if( bt->mapped_io && bt->hashmask ) {
636                 memset(bt->frame, 0, bt->page_size);
637                 last = bt->hashmask;
638
639                 while( last < MIN_lvl + 1 )
640                         last += bt->hashmask + 1;
641 #ifdef unix
642                 pwrite(bt->idx, bt->frame, bt->page_size, last << bt->page_bits);
643 #else
644                 SetFilePointer (bt->idx, last << bt->page_bits, NULL, FILE_BEGIN);
645                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
646                         return bt_close (bt), NULL;
647                 if( *amt < bt->page_size )
648                         return bt_close (bt), NULL;
649 #endif
650         }
651
652         if( bt_unlockpage(bt, ALLOC_page, lockmode) )
653                 return bt_close (bt), NULL;
654
655         return bt;
656 }
657
658 //  compare two keys, returning > 0, = 0, or < 0
659 //  as the comparison value
660
661 int keycmp (BtKey key1, unsigned char *key2, uint len2)
662 {
663 uint len1 = key1->len;
664 int ans;
665
666         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
667                 return ans;
668
669         if( len1 > len2 )
670                 return 1;
671         if( len1 < len2 )
672                 return -1;
673
674         return 0;
675 }
676
677 //  Update current page of btree by writing file contents
678 //      or flushing mapped area to disk.
679
680 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
681 {
682 off64_t off = page_no << bt->page_bits;
683
684 #ifdef unix
685     if ( !bt->mapped_io )
686          if ( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
687                  return bt->err = BTERR_wrt;
688 #else
689 uint amt[1];
690         if ( !bt->mapped_io )
691         {
692                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
693                 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
694                         return GetLastError(), bt->err = BTERR_wrt;
695
696                 if( *amt < bt->page_size )
697                         return GetLastError(), bt->err = BTERR_wrt;
698         } 
699         else if ( bt->mode == BT_fl ) {
700                         FlushViewOfFile(page, bt->page_size);
701                         FlushFileBuffers(bt->idx);
702         }
703 #endif
704         return 0;
705 }
706
707 // find page in cache 
708
709 BtHash *bt_findhash(BtDb *bt, uid page_no)
710 {
711 BtHash *hash;
712 uint idx;
713
714         // compute cache block first page and hash idx 
715
716         page_no &= ~bt->hashmask;
717         idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
718
719         if( bt->cache[idx] ) 
720                 hash = bt->nodes + bt->cache[idx];
721         else
722                 return NULL;
723
724         do if( hash->page_no == page_no )
725                  break;
726         while(hash = hash->hashnext );
727
728         return hash;
729 }
730
731 // add page cache entry to hash index
732
733 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
734 {
735 uint idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
736 BtHash *hash;
737
738         if( bt->cache[idx] ) {
739                 node->hashnext = hash = bt->nodes + bt->cache[idx];
740                 hash->hashprev = node;
741         }
742
743         node->hashprev = NULL;
744         bt->cache[idx] = (ushort)(node - bt->nodes);
745 }
746
747 // remove cache entry from hash table
748
749 void bt_unlinkhash(BtDb *bt, BtHash *node)
750 {
751 uint idx = (uint)(node->page_no >> bt->seg_bits) % bt->hashsize;
752 BtHash *hash;
753
754         // unlink node
755         if( hash = node->hashprev )
756                 hash->hashnext = node->hashnext;
757         else if( hash = node->hashnext )
758                 bt->cache[idx] = (ushort)(hash - bt->nodes);
759         else
760                 bt->cache[idx] = 0;
761
762         if( hash = node->hashnext )
763                 hash->hashprev = node->hashprev;
764 }
765
766 // add cache page to lru chain and map pages
767
768 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
769 {
770 int flag;
771 off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
772 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
773 BtHash *node;
774
775         memset(hash, 0, sizeof(BtHash));
776         hash->page_no = (page_no & ~bt->hashmask);
777         bt_linkhash(bt, hash, page_no);
778
779         if( node = hash->lrunext = bt->lrufirst )
780                 node->lruprev = hash;
781         else
782                 bt->lrulast = hash;
783
784         bt->lrufirst = hash;
785
786 #ifdef unix
787         flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
788         hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
789         if( hash->page == MAP_FAILED )
790                 return bt->err = BTERR_map, (BtPage)NULL;
791
792 #else
793         flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
794         hash->hmap = CreateFileMapping(bt->idx, NULL, flag,     (DWORD)(limit >> 32), (DWORD)limit, NULL);
795         if( !hash->hmap )
796                 return bt->err = BTERR_map, NULL;
797
798         flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
799         hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
800         if( !hash->page )
801                 return bt->err = BTERR_map, NULL;
802 #endif
803
804         return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
805 }
806
807 //      find or place requested page in page-cache
808 //      return memory address where page is located.
809
810 BtPage bt_hashpage(BtDb *bt, uid page_no)
811 {
812 BtHash *hash, *node, *next;
813 BtPage page;
814
815         // find page in cache and move to top of lru list  
816
817         if( hash = bt_findhash(bt, page_no) ) {
818                 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
819                 // swap node in lru list
820                 if( node = hash->lruprev ) {
821                         if( next = node->lrunext = hash->lrunext )
822                                 next->lruprev = node;
823                         else
824                                 bt->lrulast = node;
825
826                         if( next = hash->lrunext = bt->lrufirst )
827                                 next->lruprev = hash;
828                         else
829                                 return bt->err = BTERR_hash, (BtPage)NULL;
830
831                         hash->lruprev = NULL;
832                         bt->lrufirst = hash;
833                 }
834                 return page;
835         }
836
837         // map pages and add to cache entry
838
839         if( bt->nodecnt < bt->nodemax ) {
840                 hash = bt->nodes + ++bt->nodecnt;
841                 return bt_linklru(bt, hash, page_no);
842         }
843
844         // hash table is already full, replace last lru entry from the cache
845
846         if( hash = bt->lrulast ) {
847                 // unlink from lru list
848                 if( node = bt->lrulast = hash->lruprev )
849                         node->lrunext = NULL;
850                 else
851                         return bt->err = BTERR_hash, (BtPage)NULL;
852
853 #ifdef unix
854                 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
855 #else
856                 FlushViewOfFile(hash->page, 0);
857                 UnmapViewOfFile(hash->page);
858                 CloseHandle(hash->hmap);
859 #endif
860                 // unlink from hash table
861
862                 bt_unlinkhash(bt, hash);
863
864                 // map and add to cache
865
866                 return bt_linklru(bt, hash, page_no);
867         }
868
869         return bt->err = BTERR_hash, (BtPage)NULL;
870 }
871
872 //  map a btree page onto current page
873
874 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
875 {
876 off64_t off = page_no << bt->page_bits;
877 #ifndef unix
878 int amt[1];
879 #endif
880         
881         if( bt->mapped_io ) {
882                 bt->err = 0;
883                 *page = bt_hashpage(bt, page_no);
884                 return bt->err;
885         }
886 #ifdef unix
887         if ( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
888                 return bt->err = BTERR_map;
889 #else
890         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
891
892         if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
893                 return bt->err = BTERR_map;
894
895         if( *amt <  bt->page_size )
896                 return bt->err = BTERR_map;
897 #endif
898         return 0;
899 }
900
901 //      allocate a new page and write page into it
902
903 uid bt_newpage(BtDb *bt, BtPage page)
904 {
905 uid new_page;
906 char *pmap;
907 int reuse;
908
909         // lock page zero
910
911         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite) )
912                 return 0;
913
914         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
915                 return 0;
916
917         // use empty chain first
918         // else allocate empty page
919
920         if( new_page = bt_getid(bt->alloc[1].right) ) {
921                 if( bt_mappage (bt, &bt->temp, new_page) )
922                         return 0;       // don't unlock on error
923                 memcpy(bt->alloc[1].right, bt->temp->right, BtId);
924                 reuse = 1;
925         } else {
926                 new_page = bt_getid(bt->alloc->right);
927                 bt_putid(bt->alloc->right, new_page+1);
928                 reuse = 0;
929         }
930
931         if( bt_update(bt, bt->alloc, ALLOC_page) )
932                 return 0;       // don't unlock on error
933
934         if( !bt->mapped_io ) {
935                 if( bt_update(bt, page, new_page) )
936                         return 0;       //don't unlock on error
937
938                 // unlock page zero 
939
940                 if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
941                         return 0;
942
943                 return new_page;
944         }
945
946 #ifdef unix
947         if ( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
948                 return bt->err = BTERR_wrt, 0;
949
950         // if writing first page of hash block, zero last page in the block
951
952         if ( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
953         {
954                 // use temp buffer to write zeros
955                 memset(bt->zero, 0, bt->page_size);
956                 if ( pwrite(bt->idx,bt->zero, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
957                         return bt->err = BTERR_wrt, 0;
958         }
959 #else
960         //      bring new page into page-cache and copy page.
961         //      this will extend the file into the new pages.
962
963         if( !(pmap = (char*)bt_hashpage(bt, new_page & ~bt->hashmask)) )
964                 return 0;
965
966         memcpy(pmap+((new_page & bt->hashmask) << bt->page_bits), page, bt->page_size);
967 #endif
968
969         // unlock page zero 
970
971         if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
972                 return 0;
973
974         return new_page;
975 }
976
977 //  find slot in page for given key at a given level
978 //      return 0 if beyond fence value
979
980 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
981 {
982 uint diff, higher = set->page->cnt, low = 1, slot;
983
984         //      is page being deleted?  if so,
985         //      tell caller to follow right link
986
987         if( !set->page->act )
988                 return 0;
989
990         //      make stopper key an infinite fence value
991
992         if( bt_getid (set->page->right) )
993                 higher++;
994
995         //      low is the next candidate, higher is already
996         //      tested as .ge. the given key, loop ends when they meet
997
998         while( diff = higher - low ) {
999                 slot = low + ( diff >> 1 );
1000                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1001                         low = slot + 1;
1002                 else
1003                         higher = slot;
1004         }
1005
1006         if( higher <= set->page->cnt )
1007                 return higher;
1008
1009         //      if leaf page, compare against fence value
1010
1011         //      return zero if key is on right link page
1012         //      or return slot beyond last key
1013
1014         if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 )
1015                 return 0;
1016
1017         return higher;
1018 }
1019
1020 //  find and load page at given level for given key
1021 //      leave page rd or wr locked as requested
1022
1023 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock)
1024 {
1025 uid page_no = ROOT_page, prevpage = 0;
1026 uint drill = 0xff, slot;
1027 uint mode, prevmode;
1028 int posted = 1;
1029
1030   //  start at root of btree and drill down
1031
1032   do {
1033         // determine lock mode of drill level
1034         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1035
1036         set->page_no = page_no;
1037
1038         // obtain access lock using lock chaining
1039
1040         if( page_no > ROOT_page )
1041           if( bt_lockpage(bt, page_no, BtLockAccess) )
1042                 return 0;                                                                       
1043
1044         if( prevpage )
1045           if( bt_unlockpage(bt, prevpage, prevmode) )
1046                 return 0;
1047
1048         // obtain read lock using lock chaining
1049
1050         if( bt_lockpage(bt, page_no, mode) )
1051                 return 0;                                                                       
1052
1053         if( page_no > ROOT_page )
1054           if( bt_unlockpage(bt, page_no, BtLockAccess) )
1055                 return 0;                                                                       
1056
1057         //      map/obtain page contents
1058
1059         if( bt_mappage (bt, &set->page, page_no) )
1060                 return 0;
1061
1062         // re-read and re-lock root after determining actual level of root
1063
1064         if( set->page->lvl != drill) {
1065                 if ( page_no != ROOT_page )
1066                         return bt->err = BTERR_struct, 0;
1067                         
1068                 drill = set->page->lvl;
1069
1070                 if( lock == BtLockWrite && drill == lvl )
1071                   if( bt_unlockpage(bt, page_no, mode) )
1072                         return 0;
1073                   else
1074                         continue;
1075         }
1076
1077         prevpage = page_no;
1078         prevmode = mode;
1079
1080         //  find key on page at this level
1081         //  and descend to requested level
1082
1083         if( slot = bt_findslot (set, key, len) ) {
1084           if( drill == lvl )
1085                 return bt->posted = posted, slot;
1086
1087           if( slot > set->page->cnt )
1088                 return bt->err = BTERR_struct, 0;
1089
1090         //  if drilling down, find next active key
1091
1092           while( slotptr(set->page, slot)->dead )
1093                 if( slot++ < set->page->cnt )
1094                         continue;
1095                 else
1096                         return bt->err = BTERR_struct, 0;
1097
1098           page_no = bt_getid(slotptr(set->page, slot)->id);
1099           posted = 1;
1100           drill--;
1101           continue;
1102         }
1103
1104         //  or slide right into next page
1105         //  (slide left from deleted page)
1106
1107         page_no = bt_getid(set->page->right);
1108         posted = 0;
1109
1110   } while( page_no );
1111
1112   // return error on end of right chain
1113
1114   bt->err = BTERR_struct;
1115   return 0;     // return error
1116 }
1117
1118 // drill down fixing fence values for left sibling tree
1119 //      starting with current left page in bt->temp
1120 //  call with bt->temp write locked
1121 //      return with all pages unlocked.
1122
1123 BTERR bt_fixfences (BtDb *bt, uid page_no, unsigned char *newfence)
1124 {
1125 unsigned char oldfence[256];
1126 uid prevpage = 0;
1127 int chk;
1128
1129   memcpy (oldfence, bt->temp->fence, 256);
1130
1131   //  start at left page of btree and drill down
1132   //  to update their fence values
1133
1134   do {
1135         // obtain access lock using lock chaining
1136
1137         if( prevpage ) {
1138           if( bt_unlockpage(bt, prevpage, BtLockWrite) )
1139                 return bt->err;
1140           if( bt_unlockpage(bt, prevpage, BtLockParent) )
1141                 return bt->err;
1142
1143           // obtain parent/fence key maintenance lock
1144
1145           if( bt_lockpage(bt, page_no, BtLockParent) )
1146                 return bt->err;                                                                 
1147
1148           if( bt_lockpage(bt, page_no, BtLockAccess) )
1149                 return bt->err;                                                                 
1150
1151           if( bt_unlockpage(bt, prevpage, BtLockWrite) )
1152                 return bt->err;
1153
1154           // obtain write lock using lock chaining
1155
1156           if( bt_lockpage(bt, page_no, BtLockWrite) )
1157                 return bt->err;                                                                 
1158
1159           if( bt_unlockpage(bt, page_no, BtLockAccess) )
1160                 return bt->err;                                                                 
1161
1162           //  map/obtain page contents
1163
1164           if( bt_mappage (bt, &bt->temp, page_no) )
1165                 return bt->err;
1166         }
1167
1168         chk = keycmp ((BtKey)bt->temp->fence, oldfence + 1, *oldfence);
1169     prevpage = page_no;
1170
1171         if( chk < 0 ) {
1172                 page_no = bt_getid (bt->temp->right);
1173                 continue;
1174         }
1175
1176         if( chk > 0 )
1177                 return bt->err = BTERR_struct;
1178
1179         memcpy (bt->temp->fence, newfence, 256);
1180
1181         if( bt_update (bt, bt->temp, page_no) )
1182                 return bt->err;
1183
1184         //  return when we reach a leaf page
1185
1186         if( !bt->temp->lvl ) {
1187                 if( bt_unlockpage (bt, page_no, BtLockWrite) )
1188                         return bt->err;
1189                 return bt_unlockpage (bt, page_no, BtLockParent);
1190         }
1191
1192         page_no = bt_getid(slotptr(bt->temp, bt->temp->cnt)->id);
1193
1194   } while( page_no );
1195
1196   // return error on end of right chain
1197
1198   return bt->err = BTERR_struct;
1199 }
1200
1201 //      return page to free list
1202 //      page must be delete & write locked
1203
1204 BTERR bt_freepage (BtDb *bt, BtPage page, uid page_no)
1205 {
1206         //      lock & map allocation page
1207
1208         if( bt_lockpage (bt, ALLOC_page, BtLockWrite) )
1209                 return bt->err;
1210
1211         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
1212                 return bt->err;
1213
1214         //      store chain in second right
1215         bt_putid(page->right, bt_getid(bt->alloc[1].right));
1216         bt_putid(bt->alloc[1].right, page_no);
1217
1218         if( bt_update(bt, bt->alloc, ALLOC_page) )
1219                 return bt->err;
1220         if( bt_update(bt, page, page_no) )
1221                 return bt->err;
1222
1223         // unlock page zero 
1224
1225         if( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
1226                 return bt->err;
1227
1228         //  remove write lock on deleted node
1229
1230         if( bt_unlockpage(bt, page_no, BtLockWrite) )
1231                 return bt->err;
1232
1233         return bt_unlockpage (bt, page_no, BtLockDelete);
1234 }
1235
1236 //      remove the root level by promoting its only child
1237
1238 BTERR bt_removeroot (BtDb *bt, BtPage root, BtPage child, uid page_no)
1239 {
1240 uid next = 0;
1241
1242   do {
1243         if( next ) {
1244           if( bt_lockpage (bt, next, BtLockDelete) )
1245                 return bt->err;
1246           if( bt_lockpage (bt, next, BtLockWrite) )
1247                 return bt->err;
1248
1249           if( bt_mappage (bt, &child, next) )
1250                 return bt->err;
1251
1252           page_no = next;
1253         }
1254
1255         memcpy (root, child, bt->page_size);
1256         next = bt_getid (slotptr(child, child->cnt)->id);
1257
1258         if( bt_freepage (bt, child, page_no) )
1259                 return bt->err;
1260   } while( root->lvl > 1 && root->cnt == 1 );
1261
1262   if( bt_update (bt, root, ROOT_page) )
1263         return bt->err;
1264
1265   return bt_unlockpage (bt, ROOT_page, BtLockWrite);
1266 }
1267
1268 //  pull right page over ourselves in simple merge
1269
1270 BTERR bt_mergeright (BtDb *bt, uid page_no, BtPageSet *parent, uint slot)
1271 {
1272 uid right;
1273 uint idx;
1274
1275         // find our right neighbor
1276         //      right must exist because the stopper prevents
1277         //      the rightmost page from deleting
1278
1279         for( idx = slot; idx++ < parent->page->cnt; )
1280           if( !slotptr(parent->page, idx)->dead )
1281                 break;
1282
1283         right = bt_getid (slotptr (parent->page, idx)->id);
1284
1285         if( right != bt_getid (bt->page->right) )
1286                 return bt->err = BTERR_struct;
1287
1288         if( bt_lockpage (bt, right, BtLockDelete) )
1289                 return bt->err;
1290
1291         if( bt_lockpage (bt, right, BtLockWrite) )
1292                 return bt->err;
1293
1294         if( bt_mappage (bt, &bt->temp, right) )
1295                 return bt->err;
1296
1297         memcpy (bt->page, bt->temp, bt->page_size);
1298
1299         if( bt_update(bt, bt->page, page_no) )
1300                 return bt->err;
1301
1302         //  install ourselves as child page
1303         //      and delete ourselves from parent
1304
1305         bt_putid (slotptr(parent->page, idx)->id, page_no);
1306         slotptr(parent->page, slot)->dead = 1;
1307         parent->page->act--;
1308
1309         //      collapse any empty slots
1310
1311         while( idx = parent->page->cnt - 1 )
1312           if( slotptr(parent->page, idx)->dead ) {
1313                 *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1);
1314                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1315           } else
1316                   break;
1317
1318         if( bt_freepage (bt, bt->temp, right) )
1319                 return bt->err;
1320
1321         //      do we need to remove a btree level?
1322         //      (leave the first page of leaves alone)
1323
1324         if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
1325           if( bt->page->lvl )
1326                 return bt_removeroot (bt, parent->page, bt->page, page_no);
1327
1328         if( bt_update (bt, parent->page, parent->page_no) )
1329                 return bt->err;
1330
1331         if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1332                 return bt->err;
1333
1334         if( bt_unlockpage (bt, page_no, BtLockWrite) )
1335                 return bt->err;
1336
1337         if( bt_unlockpage (bt, page_no, BtLockDelete) )
1338                 return bt->err;
1339
1340         return 0;
1341 }
1342
1343 //      remove both child and parent from the btree
1344 //      from the fence position in the parent
1345
1346 BTERR bt_removeparent (BtDb *bt, uid page_no, BtPageSet *parent, uint lvl)
1347 {
1348 unsigned char rightfence[256], pagefence[256];
1349 uid right, ppage_no;
1350
1351         right = bt_getid (bt->page->right);
1352
1353         if( bt_lockpage (bt, right, BtLockParent) )
1354                 return bt->err;
1355
1356         if( bt_lockpage (bt, right, BtLockAccess) )
1357                 return bt->err;
1358
1359         if( bt_lockpage (bt, right, BtLockWrite) )
1360                 return bt->err;
1361
1362         if( bt_unlockpage (bt, right, BtLockAccess) )
1363                 return bt->err;
1364
1365         if( bt_mappage (bt, &bt->temp, right) )
1366                 return bt->err;
1367
1368         //      save right page fence value and
1369         //      parent fence value
1370
1371         memcpy (rightfence, bt->temp->fence, 256);
1372         memcpy (pagefence, parent->page->fence, 256);
1373         ppage_no = parent->page_no;
1374
1375         //  pull right sibling over ourselves and unlock
1376
1377         memcpy (bt->page, bt->temp, bt->page_size);
1378
1379         if( bt_update(bt, bt->page, page_no) )
1380                 return bt->err;
1381
1382         if( bt_unlockpage (bt, page_no, BtLockDelete) )
1383                 return bt->err;
1384
1385         if( bt_unlockpage (bt, page_no, BtLockWrite) )
1386                 return bt->err;
1387
1388         //  install ourselves into right link from old right page
1389
1390         bt->temp->act = 0;              // tell bt_findslot to go right (left)
1391         bt_putid (bt->temp->right, page_no);
1392
1393         if( bt_update (bt, bt->temp, right) )
1394                 return bt->err;
1395
1396         if( bt_unlockpage (bt, right, BtLockWrite) )
1397                 return bt->err;
1398
1399         //      remove our slot from our parent
1400         //      clear act to signal bt_findslot to move right
1401
1402         slotptr(parent->page, parent->page->cnt)->dead = 1;
1403         parent->page->act = 0;
1404
1405         if( bt_update (bt, parent->page, ppage_no) )
1406                 return bt->err;
1407         if( bt_unlockpage (bt, ppage_no, BtLockWrite) )
1408                 return bt->err;
1409
1410         //      redirect right page pointer in its parent to our left
1411         //      and free the right page
1412
1413         if( bt_insertkey (bt, rightfence+1, *rightfence, lvl, page_no, time(NULL)) )
1414                 return bt->err;
1415
1416         if( bt_removepage (bt, ppage_no, lvl, pagefence) )
1417                 return bt->err;
1418
1419         if( bt_unlockpage (bt, right, BtLockParent) )
1420                 return bt->err;
1421
1422         //      wait for others to drain away
1423
1424         if( bt_lockpage (bt, right, BtLockDelete) )
1425                 return bt->err;
1426
1427         if( bt_lockpage (bt, right, BtLockWrite) )
1428                 return bt->err;
1429
1430         if( bt_mappage (bt, &bt->temp, right) )
1431                 return bt->err;
1432
1433         return bt_freepage (bt, bt->temp, right);
1434 }
1435
1436 //      remove page from btree
1437 //      call with page unlocked
1438 //      returns with page on free list
1439
1440 BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence)
1441 {
1442 BtPageSet parent[1];
1443 uint slot, idx;
1444 BtKey ptr;
1445 uid left;
1446
1447         parent->page = bt->parent;
1448
1449         //      load and lock our parent
1450
1451 retry:
1452         if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
1453                 return bt->err;
1454
1455         //      wait until we are posted in our parent
1456
1457         if( !bt->posted ) {
1458                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1459                         return bt->err;
1460 #ifdef unix
1461                 sched_yield();
1462 #else
1463                 SwitchToThread();
1464 #endif
1465                 goto retry;
1466         }
1467
1468         //      wait for others to finish
1469         //      and obtain final WriteLock
1470
1471         if( bt_lockpage (bt, page_no, BtLockDelete) )
1472                 return bt->err;
1473
1474         if( bt_lockpage (bt, page_no, BtLockWrite) )
1475                 return bt->err;
1476
1477         if( bt_mappage (bt, &bt->page, page_no) )
1478                 return bt->err;
1479
1480         //  was page re-established?
1481
1482         if( bt->page->act ) {
1483                 if( bt_unlockpage (bt, page_no, BtLockWrite) )
1484                         return bt->err;
1485                 if( bt_unlockpage (bt, page_no, BtLockDelete) )
1486                         return bt->err;
1487
1488                 return bt_unlockpage (bt, parent->page_no, BtLockWrite);
1489         }
1490                 
1491         //      can we do a simple merge entirely
1492         //      between siblings on the parent page?
1493
1494         if( slot < parent->page->cnt )
1495                 return bt_mergeright(bt, page_no, parent, slot);
1496
1497         //  find our left neighbor in our parent page
1498
1499         for( idx = slot; --idx; )
1500           if( !slotptr(parent->page, idx)->dead )
1501                 break;
1502
1503         //      if no left neighbor, delete ourselves and our parent
1504
1505         if( !idx )
1506                 return bt_removeparent (bt, page_no, parent, lvl+1);
1507
1508         // lock and map our left neighbor's page
1509
1510         left = bt_getid (slotptr(parent->page, idx)->id);
1511
1512         //      wait our turn on fence key maintenance
1513
1514         if( bt_lockpage(bt, left, BtLockParent) )
1515                 return bt->err;
1516
1517         if( bt_lockpage(bt, left, BtLockAccess) )
1518                 return bt->err;
1519
1520         if( bt_lockpage(bt, left, BtLockWrite) )
1521                 return bt->err;
1522
1523         if( bt_unlockpage(bt, left, BtLockAccess) )
1524                 return bt->err;
1525
1526         if( bt_mappage (bt, &bt->temp, left) )
1527                 return bt->err;
1528
1529         //  wait until sibling is in our parent
1530
1531         if( bt_getid (bt->temp->right) != page_no ) {
1532                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1533                         return bt->err;
1534                 if( bt_unlockpage (bt, left, BtLockWrite) )
1535                         return bt->err;
1536                 if( bt_unlockpage (bt, left, BtLockParent) )
1537                         return bt->err;
1538                 if( bt_unlockpage (bt, page_no, BtLockWrite) )
1539                         return bt->err;
1540                 if( bt_unlockpage (bt, page_no, BtLockDelete) )
1541                         return bt->err;
1542 #ifdef linux
1543                 sched_yield();
1544 #else
1545                 SwitchToThread();
1546 #endif
1547                 goto retry;
1548         }
1549
1550         //      delete our left sibling from parent
1551
1552         slotptr(parent->page,idx)->dead = 1;
1553         parent->page->dirty = 1;
1554         parent->page->act--;
1555
1556         //      redirect our parent slot to our left sibling
1557
1558         bt_putid (slotptr(parent->page, slot)->id, left);
1559
1560         //      update left page with our fence key
1561
1562         memcpy (bt->temp->right, bt->page->right, BtId);
1563
1564         //      collapse dead slots from parent
1565
1566         while( idx = parent->page->cnt - 1 )
1567           if( slotptr(parent->page, idx)->dead ) {
1568                 *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt);
1569                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1570           } else
1571                   break;
1572
1573         //  update parent page
1574
1575         if( bt_update (bt, parent->page, parent->page_no) )
1576                 return bt->err;
1577
1578         //      go down the left node's fence keys to the leaf level
1579         //      and update the fence keys in each page
1580
1581         if( bt_fixfences (bt, left, pagefence) )
1582                 return bt->err;
1583
1584         if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1585                 return bt->err;
1586
1587         //  free our original page
1588
1589         if( bt_mappage (bt, &bt->temp, page_no) )
1590                 return bt->err;
1591
1592         return bt_freepage (bt, bt->temp,page_no);
1593 }
1594
1595 //  find and delete key on page by marking delete flag bit
1596 //  when page becomes empty, delete it
1597
1598 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
1599 {
1600 unsigned char pagefence[256];
1601 uint slot, found, act, idx;
1602 BtPageSet set[1];
1603 BtKey ptr;
1604
1605         set->page = bt->page;
1606
1607         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) )
1608                 ptr = keyptr(set->page, slot);
1609         else
1610                 return bt->err;
1611
1612         // if key is found delete it, otherwise ignore request
1613
1614         if( found = slot <= set->page->cnt )
1615           if( found = !keycmp (ptr, key, len) )
1616                 if( found = slotptr(set->page, slot)->dead == 0 ) {
1617                   slotptr(set->page,slot)->dead = 1;
1618                   set->page->dirty = 1;
1619                   set->page->act--;
1620
1621                   //    collapse empty slots
1622
1623                   while( idx = set->page->cnt - 1 )
1624                         if( slotptr(set->page, idx)->dead ) {
1625                           *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1626                           memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1627                         } else
1628                                 break;
1629
1630                   if( bt_update(bt, set->page, set->page_no) )
1631                         return bt->err;
1632                 }
1633
1634         // delete page when empty
1635
1636         memcpy (pagefence, set->page->fence, 256);
1637         act = set->page->act;
1638
1639         if( bt_unlockpage(bt, set->page_no, BtLockWrite) )
1640                 return bt->err;
1641
1642         if( !act )
1643           if( bt_removepage (bt, set->page_no, 0, pagefence) )
1644                 return bt->err;
1645
1646         bt->found = found;
1647         return 0;
1648 }
1649
1650 //      find key in leaf level and return row-id
1651
1652 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1653 {
1654 BtPageSet set[1];
1655 uint  slot;
1656 uid id = 0;
1657 BtKey ptr;
1658
1659         set->page = bt->page;
1660
1661         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
1662                 ptr = keyptr(set->page, slot);
1663         else
1664                 return 0;
1665
1666         // if key exists, return row-id
1667         //      otherwise return 0
1668
1669         if( slot <= set->page->cnt )
1670           if( !keycmp (ptr, key, len) )
1671                 id = bt_getid(slotptr(set->page,slot)->id);
1672
1673         if ( bt_unlockpage(bt, set->page_no, BtLockRead) )
1674                 return 0;
1675
1676         return id;
1677 }
1678
1679 //      check page for space available,
1680 //      clean if necessary and return
1681 //      0 - page needs splitting
1682 //      >0 - new slot value
1683  
1684 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
1685 {
1686 uint nxt = bt->page_size, off;
1687 uint cnt = 0, idx = 0;
1688 uint max = page->cnt;
1689 uint newslot = max;
1690 BtKey key;
1691
1692         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1693                 return slot;
1694
1695         //      skip cleanup if nothing to reclaim
1696
1697         if( !page->dirty )
1698                 return 0;
1699
1700         memcpy (bt->frame, page, bt->page_size);
1701
1702         // skip page info and set rest of page to zero
1703
1704         memset (page+1, 0, bt->page_size - sizeof(*page));
1705         page->dirty = 0;
1706         page->act = 0;
1707
1708         while( cnt++ < max ) {
1709                 if( cnt == slot )
1710                         newslot = idx + 1;
1711                 if( slotptr(bt->frame,cnt)->dead )
1712                         continue;
1713
1714                 // copy key
1715                 if( !page->lvl || cnt < max ) {
1716                         key = keyptr(bt->frame, cnt);
1717                         off = nxt -= key->len + 1;
1718                         memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1719                 } else
1720                         off = offsetof(struct BtPage_, fence);
1721
1722                 // copy slot
1723                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1724                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1725                 slotptr(page, idx)->off = off;
1726                 page->act++;
1727         }
1728         page->min = nxt;
1729         page->cnt = idx;
1730
1731         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1732                 return newslot;
1733
1734         return 0;
1735 }
1736
1737 // split the root and raise the height of the btree
1738
1739 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2)
1740 {
1741 unsigned char leftkey[256];
1742 uint nxt = bt->page_size;
1743 uid new_page;
1744
1745         //  Obtain an empty page to use, and copy the current
1746         //  root contents into it, e.g. lower keys
1747
1748         memcpy (leftkey, root->page->fence, 256);
1749         root->page->posted = 1;
1750
1751         if( !(new_page = bt_newpage(bt, root->page)) )
1752                 return bt->err;
1753
1754         // preserve the page info at the bottom
1755         // of higher keys and set rest to zero
1756
1757         memset(root->page+1, 0, bt->page_size - sizeof(*root->page));
1758         memset(root->page->fence, 0, 256);
1759         root->page->fence[0] = 2;
1760         root->page->fence[1] = 0xff;
1761         root->page->fence[2] = 0xff;
1762
1763         // insert new page fence key on newroot page
1764
1765         nxt -= *leftkey + 1;
1766         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1767         bt_putid(slotptr(root->page, 1)->id, new_page);
1768         slotptr(root->page, 1)->off = nxt;
1769         
1770         // insert stopper key on newroot page
1771         // and increase the root height
1772
1773         bt_putid(slotptr(root->page, 2)->id, page_no2);
1774         slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence);
1775
1776         bt_putid(root->page->right, 0);
1777         root->page->min = nxt;          // reset lowest used offset and key count
1778         root->page->cnt = 2;
1779         root->page->act = 2;
1780         root->page->lvl++;
1781
1782         // update and release root
1783
1784         if( bt_update(bt, root->page, root->page_no) )
1785                 return bt->err;
1786
1787         return bt_unlockpage(bt, root->page_no, BtLockWrite);
1788 }
1789
1790 //  split already locked full node
1791 //      return unlocked.
1792
1793 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1794 {
1795 uint cnt = 0, idx = 0, max, nxt = bt->page_size, off;
1796 uid right, page_no = set->page_no;
1797 unsigned char fencekey[256];
1798 uint lvl = set->page->lvl;
1799 BtKey key;
1800
1801         //  split higher half of keys to bt->frame
1802
1803         memset (bt->frame, 0, bt->page_size);
1804         max = set->page->cnt;
1805         cnt = max / 2;
1806         idx = 0;
1807
1808         while( cnt++ < max ) {
1809                 if( !lvl || cnt < max ) {
1810                         key = keyptr(set->page, cnt);
1811                         off = nxt -= key->len + 1;
1812                         memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1813                 } else
1814                         off = offsetof(struct BtPage_, fence);
1815
1816                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
1817                 slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
1818                 slotptr(bt->frame, idx)->off = off;
1819                 bt->frame->act++;
1820         }
1821
1822         if( page_no == ROOT_page )
1823                 bt->frame->posted = 1;
1824
1825         memcpy (bt->frame->fence, set->page->fence, 256);
1826         bt->frame->bits = bt->page_bits;
1827         bt->frame->min = nxt;
1828         bt->frame->cnt = idx;
1829         bt->frame->lvl = lvl;
1830
1831         // link right node
1832
1833         if( page_no > ROOT_page )
1834                 memcpy (bt->frame->right, set->page->right, BtId);
1835
1836         //      get new free page and write higher keys to it.
1837
1838         if( !(right = bt_newpage(bt, bt->frame)) )
1839                 return bt->err;
1840
1841         //      update lower keys to continue in old page
1842
1843         memcpy (bt->frame, set->page, bt->page_size);
1844         memset (set->page+1, 0, bt->page_size - sizeof(*set->page));
1845         nxt = bt->page_size;
1846         set->page->posted = 0;
1847         set->page->dirty = 0;
1848         set->page->act = 0;
1849         cnt = 0;
1850         idx = 0;
1851
1852         //  assemble page of smaller keys
1853
1854         while( cnt++ < max / 2 ) {
1855                 key = keyptr(bt->frame, cnt);
1856
1857                 if( !lvl || cnt < max / 2 ) {
1858                         off = nxt -= key->len + 1;
1859                         memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
1860                 } else 
1861                         off = offsetof(struct BtPage_, fence);
1862
1863                 memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1864                 slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1865                 slotptr(set->page, idx)->off = off;
1866                 set->page->act++;
1867         }
1868
1869         // install fence key for smaller key page
1870
1871         memset(set->page->fence, 0, 256);
1872         memcpy(set->page->fence, key, key->len + 1);
1873
1874         bt_putid(set->page->right, right);
1875         set->page->min = nxt;
1876         set->page->cnt = idx;
1877
1878         // if current page is the root page, split it
1879
1880         if( page_no == ROOT_page )
1881                 return bt_splitroot (bt, set, right);
1882
1883         if( bt_update (bt, set->page, page_no) )
1884                 return bt->err; 
1885
1886         if( bt_unlockpage (bt, page_no, BtLockWrite) )
1887                 return bt->err;
1888
1889         // insert new fences in their parent pages
1890
1891         while( 1 ) {
1892                 if( bt_lockpage (bt, page_no, BtLockParent) )
1893                         return bt->err;
1894
1895                 if( bt_lockpage (bt, page_no, BtLockRead) )
1896                         return bt->err;
1897
1898                 if( bt_mappage (bt, &set->page, page_no) )
1899                         return bt->err;
1900
1901                 memcpy (fencekey, set->page->fence, 256);
1902
1903                 if( set->page->posted ) {
1904                   if( bt_unlockpage (bt, page_no, BtLockParent) )
1905                         return bt->err;
1906                         
1907                   return bt_unlockpage (bt, page_no, BtLockRead);
1908                 }
1909
1910                 if( bt_unlockpage (bt, page_no, BtLockRead) )
1911                         return bt->err;
1912
1913                 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1914                         return bt->err;
1915
1916                 if( bt_lockpage (bt, page_no, BtLockWrite) )
1917                         return bt->err;
1918
1919                 if( bt_mappage (bt, &set->page, page_no) )
1920                         return bt->err;
1921
1922                 right = bt_getid (set->page->right);
1923                 set->page->posted = 1;
1924
1925                 if( bt_update (bt, set->page, page_no) )
1926                         return bt->err; 
1927
1928                 if( bt_unlockpage (bt, page_no, BtLockWrite) )
1929                         return bt->err;
1930
1931                 if( bt_unlockpage (bt, page_no, BtLockParent) )
1932                         return bt->err;
1933
1934                 if( !(page_no = right) )
1935                         break;
1936
1937                 if( bt_mappage (bt, &set->page, page_no) )
1938                         return bt->err;
1939         }
1940
1941         return 0;
1942 }
1943
1944 //  Insert new key into the btree at requested level.
1945
1946 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1947 {
1948 BtPageSet set[1];
1949 uint slot, idx;
1950 BtKey ptr;
1951
1952   set->page = bt->page;
1953
1954   while( 1 ) {
1955         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1956                 ptr = keyptr(set->page, slot);
1957         else
1958         {
1959                 if ( !bt->err )
1960                         bt->err = BTERR_ovflw;
1961                 return bt->err;
1962         }
1963
1964         // if key already exists, update id and return
1965
1966         if( slot <= set->page->cnt )
1967           if( !keycmp (ptr, key, len) ) {
1968                 if( slotptr(set->page, slot)->dead )
1969                         set->page->act++;
1970
1971                 slotptr(set->page, slot)->dead = 0;
1972                 slotptr(set->page, slot)->tod = tod;
1973                 bt_putid(slotptr(set->page,slot)->id, id);
1974
1975                 if ( bt_update(bt, set->page, set->page_no) )
1976                         return bt->err;
1977
1978                 return bt_unlockpage(bt, set->page_no, BtLockWrite);
1979           }
1980
1981         // check if page has enough space
1982
1983         if( slot = bt_cleanpage (bt, set->page, len, slot) )
1984                 break;
1985
1986         if( bt_splitpage (bt, set) )
1987                 return bt->err;
1988   }
1989
1990   // calculate next available slot and copy key into page
1991
1992   set->page->min -= len + 1; // reset lowest used offset
1993   ((unsigned char *)set->page)[set->page->min] = len;
1994   memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
1995
1996   for( idx = slot; idx <= set->page->cnt; idx++ )
1997         if( slotptr(set->page, idx)->dead )
1998                 break;
1999
2000   // now insert key into array before slot
2001
2002   if( idx > set->page->cnt )
2003         set->page->cnt++;
2004
2005   set->page->act++;
2006
2007   while( idx > slot )
2008         *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2009
2010   bt_putid(slotptr(set->page,slot)->id, id);
2011   slotptr(set->page, slot)->off = set->page->min;
2012   slotptr(set->page, slot)->tod = tod;
2013   slotptr(set->page, slot)->dead = 0;
2014
2015   if( bt_update(bt, set->page, set->page_no) )
2016         return bt->err;
2017
2018   return bt_unlockpage(bt, set->page_no, BtLockWrite);
2019 }
2020
2021 //  cache page of keys into cursor and return starting slot for given key
2022
2023 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2024 {
2025 BtPageSet set[1];
2026 uint slot;
2027
2028         set->page = bt->page;
2029
2030         // cache page for retrieval
2031         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2032                 memcpy (bt->cursor, set->page, bt->page_size);
2033         bt->cursor_page = set->page_no;
2034         if ( bt_unlockpage(bt, set->page_no, BtLockRead) )
2035                 return 0;
2036
2037         return slot;
2038 }
2039
2040 //  return next slot for cursor page
2041 //  or slide cursor right into next page
2042
2043 uint bt_nextkey (BtDb *bt, uint slot)
2044 {
2045 BtPageSet set[1];
2046 uid right;
2047
2048   do {
2049         right = bt_getid(bt->cursor->right);
2050         while( slot++ < bt->cursor->cnt )
2051           if( slotptr(bt->cursor,slot)->dead )
2052                 continue;
2053           else if( right || (slot < bt->cursor->cnt)) // skip infinite stopper
2054                 return slot;
2055           else
2056                 break;
2057
2058         if( !right )
2059                 break;
2060
2061         bt->cursor_page = right;
2062         set->page = bt->page;
2063
2064     if( bt_lockpage(bt, right, BtLockRead) )
2065                 return 0;
2066
2067         if( bt_mappage (bt, &set->page, right) )
2068                 break;
2069
2070         memcpy (bt->cursor, set->page, bt->page_size);
2071
2072         if( bt_unlockpage(bt, right, BtLockRead) )
2073                 return 0;
2074
2075         slot = 0;
2076   } while( 1 );
2077
2078   return bt->err = 0;
2079 }
2080
2081 BtKey bt_key(BtDb *bt, uint slot)
2082 {
2083         return keyptr(bt->cursor, slot);
2084 }
2085
2086 uid bt_uid(BtDb *bt, uint slot)
2087 {
2088         return bt_getid(slotptr(bt->cursor,slot)->id);
2089 }
2090
2091 uint bt_tod(BtDb *bt, uint slot)
2092 {
2093         return slotptr(bt->cursor,slot)->tod;
2094 }
2095
2096
2097 #ifdef STANDALONE
2098 //  standalone program to index file of keys
2099 //  then list them onto std-out
2100
2101 int main (int argc, char **argv)
2102 {
2103 uint slot, line = 0, off = 0, found = 0;
2104 int dead, ch, cnt = 0, bits = 12;
2105 unsigned char key[256];
2106 clock_t done, start;
2107 uint pgblk = 0;
2108 time_t tod[1];
2109 uint scan = 0;
2110 uint len = 0;
2111 uint map = 0;
2112 BtKey ptr;
2113 BtDb *bt;
2114 FILE *in;
2115
2116         if( argc < 4 ) {
2117                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_segments pages_per_segment start_line_number]\n", argv[0]);
2118                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2119                 fprintf (stderr, "  mapped_pool_segments: size of buffer pool in segments\n");
2120                 fprintf (stderr, "  pages_per_segment: size of buffer pool segment in pages in bits\n");
2121                 exit(0);
2122         }
2123
2124         start = clock();
2125         time(tod);
2126
2127         if( argc > 4 )
2128                 bits = atoi(argv[4]);
2129
2130         if( argc > 5 )
2131                 map = atoi(argv[5]);
2132
2133         if( map > 65536 )
2134                 fprintf (stderr, "Warning: buffer_pool > 65536 segments\n");
2135
2136         if( map && map < 8 )
2137                 fprintf (stderr, "Buffer_pool too small\n");
2138
2139         if( argc > 6 )
2140                 pgblk = atoi(argv[6]);
2141
2142         if( bits + pgblk > 30 )
2143                 fprintf (stderr, "Warning: very large buffer pool segment size\n");
2144
2145         if( argc > 7 )
2146                 off = atoi(argv[7]);
2147
2148         bt = bt_open ((argv[1]), BT_rw, bits, map, pgblk);
2149
2150         if( !bt ) {
2151                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2152                 exit (1);
2153         }
2154
2155         switch(argv[3][0]| 0x20)
2156         {
2157         case 'w':
2158                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2159                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2160                   while( ch = getc(in), ch != EOF )
2161                         if( ch == '\n' )
2162                         {
2163                           if( off )
2164                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2165
2166                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2167                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2168                           len = 0;
2169                         }
2170                         else if( len < 245 )
2171                                 key[len++] = ch;
2172                 fprintf(stderr, "finished adding keys, %d \n", line);
2173                 break;
2174
2175         case 'd':
2176                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2177                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2178                   while( ch = getc(in), ch != EOF )
2179                         if( ch == '\n' )
2180                         {
2181                           if( off )
2182                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2183                           line++;
2184                           if( bt_deletekey (bt, key, len) )
2185                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2186                           len = 0;
2187                         }
2188                         else if( len < 245 )
2189                                 key[len++] = ch;
2190                 fprintf(stderr, "finished deleting keys, %d \n", line);
2191                 break;
2192
2193         case 'f':
2194                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2195                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2196                   while( ch = getc(in), ch != EOF )
2197                         if( ch == '\n' )
2198                         {
2199                           if( off )
2200                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2201                           line++;
2202                           if( bt_findkey (bt, key, len) )
2203                                 found++;
2204                           else if( bt->err )
2205                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2206                           len = 0;
2207                         }
2208                         else if( len < 245 )
2209                                 key[len++] = ch;
2210                 fprintf(stderr, "finished search of %d keys, found %d\n", line, found);
2211                 break;
2212
2213         case 's':
2214                 scan++;
2215                 break;
2216
2217         }
2218
2219         done = clock();
2220         fprintf(stderr, " Time to complete: %.2f seconds\n", (float)(done - start) / CLOCKS_PER_SEC);
2221
2222         dead = cnt = 0;
2223         len = key[0] = 0;
2224
2225         fprintf(stderr, "started reading\n");
2226
2227         if( slot = bt_startkey (bt, key, len) )
2228           slot--;
2229         else
2230           fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2231
2232         while( slot = bt_nextkey (bt, slot) )
2233           if( cnt++, scan ) {
2234                         ptr = bt_key(bt, slot);
2235                         fwrite (ptr->key, ptr->len, 1, stdout);
2236                         fputc ('\n', stdout);
2237           }
2238
2239         fprintf(stderr, " Total keys read %d\n", cnt);
2240         return 0;
2241 }
2242
2243 #endif  //STANDALONE