]> pd.if.org Git - btree/blob - btree2t.c
5ec052d1b4a15b4b8f61740ea0f985764fc994fa
[btree] / btree2t.c
1 // btree version 2t
2 // 04 FEB 2014
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stddef.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #endif
47
48 #include <memory.h>
49 #include <string.h>
50
51 typedef unsigned long long      uid;
52
53 #ifndef unix
54 typedef unsigned long long      off64_t;
55 typedef unsigned short          ushort;
56 typedef unsigned int            uint;
57 #endif
58
59 #define BT_ro 0x6f72    // ro
60 #define BT_rw 0x7772    // rw
61 #define BT_fl 0x6c66    // fl
62
63 #define BT_maxbits              24                                      // maximum page size in bits
64 #define BT_minbits              9                                       // minimum page size in bits
65 #define BT_minpage              (1 << BT_minbits)       // minimum page size
66
67 /*
68 There are five lock types for each node in three independent sets: 
69 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
70 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
71 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
72 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
73 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
74 */
75
76 typedef enum{
77         BtLockAccess,
78         BtLockDelete,
79         BtLockRead,
80         BtLockWrite,
81         BtLockParent
82 }BtLock;
83
84 //      Define the length of the page and key pointers
85
86 #define BtId 6
87
88 //      Page key slot definition.
89
90 //      If BT_maxbits is 15 or less, you can save 2 bytes
91 //      for each key stored by making the first two uints
92 //      into ushorts.  You can also save 4 bytes by removing
93 //      the tod field from the key.
94
95 //      Keys are marked dead, but remain on the page until
96 //      cleanup is called.
97
98 typedef struct {
99         uint off:BT_maxbits;            // page offset for key start
100         uint dead:1;                            // set for deleted key
101         uint tod;                                       // time-stamp for key
102         unsigned char id[BtId];         // id associated with key
103 } BtSlot;
104
105 //      The key structure occupies space at the upper end of
106 //      each page.  It's a length byte followed by the value
107 //      bytes.
108
109 typedef struct {
110         unsigned char len;
111         unsigned char key[0];
112 } *BtKey;
113
114 //      The first part of an index page.
115 //      It is immediately followed
116 //      by the BtSlot array of keys.
117
118 typedef struct BtPage_ {
119         uint cnt;                                       // count of keys in page
120         uint act;                                       // count of active keys
121         uint min;                                       // next key offset
122         unsigned char bits;                     // page size in bits
123         unsigned char lvl:6;            // level of page
124         unsigned char dirty:1;          // page is dirty
125         unsigned char posted:1;         // page fence is posted
126         unsigned char right[BtId];      // page number to right
127         unsigned char fence[256];       // page fence key
128 } *BtPage;
129
130 //  The loadpage interface object
131
132 typedef struct {
133         uid page_no;
134         BtPage page;
135 } BtPageSet;
136
137 //      The memory mapping hash table entry
138
139 typedef struct {
140         BtPage page;            // mapped page pointer
141         uid  page_no;           // mapped page number
142         void *lruprev;          // least recently used previous cache block
143         void *lrunext;          // lru next cache block
144         void *hashprev;         // previous cache block for the same hash idx
145         void *hashnext;         // next cache block for the same hash idx
146 #ifndef unix
147         HANDLE hmap;
148 #endif
149 }BtHash;
150
151 //      The object structure for Btree access
152
153 typedef struct _BtDb {
154         uint page_size;         // each page size       
155         uint page_bits;         // each page size in bits       
156         uint seg_bits;          // segment size in pages in bits
157         uid page_no;            // current page number  
158         uid cursor_page;        // current cursor page number   
159         int  err;
160         uint mode;                      // read-write mode
161         uint mapped_io;         // use memory mapping
162         BtPage temp;            // temporary frame buffer (memory mapped/file IO)
163         BtPage parent;          // current page's parent node (memory mapped/file IO)
164         BtPage alloc;           // frame for alloc page  (memory mapped/file IO)
165         BtPage cursor;          // cached frame for start/next (never mapped)
166         BtPage frame;           // spare frame for the page split (never mapped)
167         BtPage zero;            // zeroes frame buffer (never mapped)
168         BtPage page;            // temporary page (memory mapped/file IO)
169 #ifdef unix
170         int idx;
171 #else
172         HANDLE idx;
173 #endif
174         unsigned char *mem;     // frame, cursor, page memory buffer
175         int nodecnt;            // highest page cache segment in use
176         int nodemax;            // highest page cache segment allocated
177         int hashmask;           // number of pages in segments - 1
178         int hashsize;           // size of hash table
179         BtHash *lrufirst;       // lru list head
180         BtHash *lrulast;        // lru list tail
181         ushort *cache;          // hash table for cached segments
182         BtHash nodes[1];        // segment cache follows
183         int posted;                     // last loadpage found posted key
184         int found;                      // last insert/delete found key
185 } BtDb;
186
187 typedef enum {
188 BTERR_ok = 0,
189 BTERR_struct,
190 BTERR_ovflw,
191 BTERR_lock,
192 BTERR_map,
193 BTERR_wrt,
194 BTERR_hash
195 } BTERR;
196
197 // B-Tree functions
198 extern void bt_close (BtDb *bt);
199 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk, uint pgblk);
200 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
201 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
202 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
203 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
204 extern uint bt_nextkey   (BtDb *bt, uint slot);
205
206 //      Internal functions
207
208 BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence);
209
210 //  Helper functions to return slot values
211
212 extern BtKey bt_key (BtDb *bt, uint slot);
213 extern uid bt_uid (BtDb *bt, uint slot);
214 extern uint bt_tod (BtDb *bt, uint slot);
215
216 //  BTree page number constants
217 #define ALLOC_page              0
218 #define ROOT_page               1
219
220 //      Number of levels to create in a new BTree
221
222 #define MIN_lvl                 2
223
224 //  The page is allocated from low and hi ends.
225 //  The key offsets and row-id's are allocated
226 //  from the bottom, while the text of the key
227 //  is allocated from the top.  When the two
228 //  areas meet, the page is split into two.
229
230 //  A key consists of a length byte, two bytes of
231 //  index number (0 - 65534), and up to 253 bytes
232 //  of key value.  Duplicate keys are discarded.
233 //  Associated with each key is a 48 bit row-id.
234
235 //  The b-tree root is always located at page 1.
236 //      The first leaf page of level zero is always
237 //      located on page 2.
238
239 //      The b-tree pages are linked with right
240 //      pointers to facilitate enumerators,
241 //      and provide for concurrency.
242
243 //      When to root page fills, it is split in two and
244 //      the tree height is raised by a new root at page
245 //      one with two keys.
246
247 //      Deleted keys are marked with a dead bit until
248 //      page cleanup
249
250 //  Groups of pages from the btree are optionally
251 //  cached with memory mapping. A hash table is used to keep
252 //  track of the cached pages.  This behaviour is controlled
253 //  by the number of cache blocks parameter and pages per block
254 //      given to bt_open.
255
256 //  To achieve maximum concurrency one page is locked at a time
257 //  as the tree is traversed to find leaf key in question. The right
258 //  page numbers are used in cases where the page is being split,
259 //      or consolidated.
260
261 //  Page 0 is dedicated to lock for new page extensions,
262 //      and chains empty pages together for reuse.
263
264 //      Parent locks are obtained to prevent resplitting or deleting a node
265 //      before its fence is posted into its upper level.
266
267 //      Empty nodes are chained together through the ALLOC page and reused.
268
269 //      A special open mode of BT_fl is provided to safely access files on
270 //      WIN32 networks. WIN32 network operations should not use memory mapping.
271 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
272 //      to prevent local caching of network file contents.
273
274 //      Access macros to address slot and key values from the page.
275 //      Page slots use 1 based indexing.
276
277 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
278 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
279
280 void bt_putid(unsigned char *dest, uid id)
281 {
282 int i = BtId;
283
284         while( i-- )
285                 dest[i] = (unsigned char)id, id >>= 8;
286 }
287
288 uid bt_getid(unsigned char *src)
289 {
290 uid id = 0;
291 int i;
292
293         for( i = 0; i < BtId; i++ )
294                 id <<= 8, id |= *src++; 
295
296         return id;
297 }
298
299 // place write, read, or parent lock on requested page_no.
300
301 BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode)
302 {
303 off64_t off = page_no << bt->page_bits;
304 #ifdef unix
305 int flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
306 struct flock lock[1];
307 #else
308 uint flags = 0, len;
309 OVERLAPPED ovl[1];
310 #endif
311
312         if( mode == BtLockRead || mode == BtLockWrite )
313                 off +=  1 * sizeof(*bt->page);  // use second segment
314
315         if( mode == BtLockParent )
316                 off +=  2 * sizeof(*bt->page);  // use third segment
317
318 #ifdef unix
319         memset (lock, 0, sizeof(lock));
320
321         lock->l_start = off;
322         lock->l_type = (mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent) ? F_WRLCK : F_RDLCK;
323         lock->l_len = sizeof(*bt->page);
324         lock->l_whence = 0;
325
326         if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
327                 return bt->err = BTERR_lock;
328
329         return 0;
330 #else
331         memset (ovl, 0, sizeof(ovl));
332         ovl->OffsetHigh = (uint)(off >> 32);
333         ovl->Offset = (uint)off;
334         len = sizeof(*bt->page);
335
336         //      use large offsets to
337         //      simulate advisory locking
338
339         ovl->OffsetHigh |= 0x80000000;
340
341         if( mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent )
342                 flags |= LOCKFILE_EXCLUSIVE_LOCK;
343
344         if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) )
345                 return bt->err = 0;
346
347         return bt->err = BTERR_lock;
348 #endif 
349 }
350
351 // remove write, read, or parent lock on requested page_no.
352
353 BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
354 {
355 off64_t off = page_no << bt->page_bits;
356 #ifdef unix
357 struct flock lock[1];
358 #else
359 OVERLAPPED ovl[1];
360 uint len;
361 #endif
362
363         if( mode == BtLockRead || mode == BtLockWrite )
364                 off +=  1 * sizeof(*bt->page);  // use second segment
365
366         if( mode == BtLockParent )
367                 off +=  2 * sizeof(*bt->page);  // use third segment
368
369 #ifdef unix
370         memset (lock, 0, sizeof(lock));
371
372         lock->l_start = off;
373         lock->l_type = F_UNLCK;
374         lock->l_len = sizeof(*bt->page);
375         lock->l_whence = 0;
376
377         if( fcntl (bt->idx, F_SETLK, lock) < 0 )
378                 return bt->err = BTERR_lock;
379 #else
380         memset (ovl, 0, sizeof(ovl));
381         ovl->OffsetHigh = (uint)(off >> 32);
382         ovl->Offset = (uint)off;
383         len = sizeof(*bt->page);
384
385         //      use large offsets to
386         //      simulate advisory locking
387
388         ovl->OffsetHigh |= 0x80000000;
389
390         if( !UnlockFileEx (bt->idx, 0, len, 0, ovl) )
391                 return GetLastError(), bt->err = BTERR_lock;
392 #endif
393
394         return bt->err = 0;
395 }
396
397 //      close and release memory
398
399 void bt_close (BtDb *bt)
400 {
401 BtHash *hash;
402 #ifdef unix
403         // release mapped pages
404
405         if( hash = bt->lrufirst )
406                 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
407                 while(hash = hash->lrunext);
408
409         if ( bt->mem )
410                 free (bt->mem);
411         close (bt->idx);
412         free (bt->cache);
413         free (bt);
414 #else
415         if( hash = bt->lrufirst )
416           do
417           {
418                 FlushViewOfFile(hash->page, 0);
419                 UnmapViewOfFile(hash->page);
420                 CloseHandle(hash->hmap);
421           } while(hash = hash->lrunext);
422
423         if ( bt->mem)
424                 VirtualFree (bt->mem, 0, MEM_RELEASE);
425         FlushFileBuffers(bt->idx);
426         CloseHandle(bt->idx);
427         GlobalFree (bt->cache);
428         GlobalFree (bt);
429 #endif
430 }
431
432 //  open/create new btree
433 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
434 //              size of mapped page cache (e.g. 8192) or zero for no mapping.
435
436 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax, uint pgblk)
437 {
438 uint lvl, attr, cacheblk, last;
439 BtLock lockmode = BtLockWrite;
440 BtPage alloc;
441 off64_t size;
442 uint amt[1];
443 BtKey key;
444 BtDb* bt;
445
446 #ifndef unix
447 SYSTEM_INFO sysinfo[1];
448 #endif
449
450 #ifdef unix
451         bt = malloc (sizeof(BtDb) + nodemax * sizeof(BtHash));
452         memset (bt, 0, sizeof(BtDb));
453
454         switch (mode & 0x7fff)
455         {
456         case BT_fl:
457         case BT_rw:
458                 bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
459                 break;
460
461         case BT_ro:
462         default:
463                 bt->idx = open ((char*)name, O_RDONLY);
464                 lockmode = BtLockRead;
465                 break;
466         }
467         if( bt->idx == -1 )
468                 return free(bt), NULL;
469         
470         if( nodemax )
471                 cacheblk = 4096;        // page size for unix
472         else
473                 cacheblk = 0;
474
475 #else
476         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb) + nodemax * sizeof(BtHash));
477         attr = FILE_ATTRIBUTE_NORMAL;
478         switch (mode & 0x7fff)
479         {
480         case BT_fl:
481                 attr |= FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING;
482
483         case BT_rw:
484                 bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
485                 break;
486
487         case BT_ro:
488         default:
489                 bt->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
490                 lockmode = BtLockRead;
491                 break;
492         }
493         if( bt->idx == INVALID_HANDLE_VALUE )
494                 return GlobalFree(bt), NULL;
495
496         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
497         GetSystemInfo(sysinfo);
498
499         if( nodemax )
500                 cacheblk = sysinfo->dwAllocationGranularity;
501         else
502                 cacheblk = 0;
503 #endif
504
505         // determine sanity of page size
506
507         if( bits > BT_maxbits )
508                 bits = BT_maxbits;
509         else if( bits < BT_minbits )
510                 bits = BT_minbits;
511
512         if ( bt_lockpage(bt, ALLOC_page, lockmode) )
513                 return bt_close (bt), NULL;
514
515 #ifdef unix
516         *amt = 0;
517
518         // read minimum page size to get root info
519
520         if( size = lseek (bt->idx, 0L, 2) ) {
521                 alloc = malloc (BT_minpage);
522                 pread(bt->idx, alloc, BT_minpage, 0);
523                 bits = alloc->bits;
524                 free (alloc);
525         } else if( mode == BT_ro )
526                 return bt_close (bt), NULL;
527 #else
528         size = GetFileSize(bt->idx, amt);
529
530         if( size || *amt ) {
531                 alloc = VirtualAlloc(NULL, BT_minpage, MEM_COMMIT, PAGE_READWRITE);
532                 if( !ReadFile(bt->idx, (char *)alloc, BT_minpage, amt, NULL) )
533                         return bt_close (bt), NULL;
534                 bits = alloc->bits;
535                 VirtualFree (alloc, 0, MEM_RELEASE);
536         } else if( mode == BT_ro )
537                 return bt_close (bt), NULL;
538 #endif
539
540         bt->page_size = 1 << bits;
541         bt->page_bits = bits;
542
543         bt->nodemax = nodemax;
544         bt->mode = mode;
545
546         // setup cache mapping
547
548         if( cacheblk ) {
549                 if( cacheblk < bt->page_size )
550                         cacheblk = bt->page_size;
551
552                 bt->hashsize = nodemax / 8;
553                 bt->hashmask = (cacheblk >> bits) - 1;
554                 bt->mapped_io = 1;
555         }
556
557         //      requested number of pages per memmap segment
558
559         if( cacheblk )
560           if( (1 << pgblk) > bt->hashmask )
561                 bt->hashmask = (1 << pgblk) - 1;
562
563         bt->seg_bits = 0;
564
565         while( (1 << bt->seg_bits) <= bt->hashmask )
566                 bt->seg_bits++;
567
568 #ifdef unix
569         bt->mem = malloc (7 *bt->page_size);
570         bt->cache = calloc (bt->hashsize, sizeof(ushort));
571 #else
572         bt->mem = VirtualAlloc(NULL, 7 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
573         bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, bt->hashsize * sizeof(ushort));
574 #endif
575         bt->frame = (BtPage)bt->mem;
576         bt->cursor = (BtPage)(bt->mem + bt->page_size);
577         bt->page = (BtPage)(bt->mem + 2 * bt->page_size);
578         bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size);
579         bt->temp = (BtPage)(bt->mem + 4 * bt->page_size);
580         bt->parent = (BtPage)(bt->mem + 5 * bt->page_size);
581         bt->zero = (BtPage)(bt->mem + 6 * bt->page_size);
582
583         if( size || *amt ) {
584                 if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
585                         return bt_close (bt), NULL;
586
587                 return bt;
588         }
589
590         // initializes an empty b-tree with root page and page of leaves
591
592         memset (bt->alloc, 0, bt->page_size);
593         bt_putid(bt->alloc->right, MIN_lvl+1);
594         bt->alloc->bits = bt->page_bits;
595
596 #ifdef unix
597         if( write (bt->idx, bt->alloc, bt->page_size) < bt->page_size )
598                 return bt_close (bt), NULL;
599 #else
600         if( !WriteFile (bt->idx, (char *)bt->alloc, bt->page_size, amt, NULL) )
601                 return bt_close (bt), NULL;
602
603         if( *amt < bt->page_size )
604                 return bt_close (bt), NULL;
605 #endif
606
607         memset (bt->frame, 0, bt->page_size);
608         bt->frame->bits = bt->page_bits;
609         bt->frame->posted = 1;
610
611         for( lvl=MIN_lvl; lvl--; ) {
612                 slotptr(bt->frame, 1)->off = bt->page_size - 3;
613                 bt_putid(slotptr(bt->frame, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);               // next(lower) page number
614                 key = keyptr(bt->frame, 1);
615                 key->len = 2;                   // create stopper key
616                 key->key[0] = 0xff;
617                 key->key[1] = 0xff;
618                 bt->frame->fence[0] = 2;
619                 bt->frame->fence[1] = 0xff;
620                 bt->frame->fence[2] = 0xff;
621                 bt->frame->min = bt->page_size - 3;
622                 bt->frame->lvl = lvl;
623                 bt->frame->cnt = 1;
624                 bt->frame->act = 1;
625 #ifdef unix
626                 if( write (bt->idx, bt->frame, bt->page_size) < bt->page_size )
627                         return bt_close (bt), NULL;
628 #else
629                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
630                         return bt_close (bt), NULL;
631
632                 if( *amt < bt->page_size )
633                         return bt_close (bt), NULL;
634 #endif
635         }
636
637         // create empty page area by writing last page of first
638         // cache area (other pages are zeroed by O/S)
639
640         if( bt->mapped_io && bt->hashmask ) {
641                 memset(bt->frame, 0, bt->page_size);
642                 last = bt->hashmask;
643
644                 while( last < MIN_lvl + 1 )
645                         last += bt->hashmask + 1;
646 #ifdef unix
647                 pwrite(bt->idx, bt->frame, bt->page_size, last << bt->page_bits);
648 #else
649                 SetFilePointer (bt->idx, last << bt->page_bits, NULL, FILE_BEGIN);
650                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
651                         return bt_close (bt), NULL;
652                 if( *amt < bt->page_size )
653                         return bt_close (bt), NULL;
654 #endif
655         }
656
657         if( bt_unlockpage(bt, ALLOC_page, lockmode) )
658                 return bt_close (bt), NULL;
659
660         return bt;
661 }
662
663 //  compare two keys, returning > 0, = 0, or < 0
664 //  as the comparison value
665
666 int keycmp (BtKey key1, unsigned char *key2, uint len2)
667 {
668 uint len1 = key1->len;
669 int ans;
670
671         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
672                 return ans;
673
674         if( len1 > len2 )
675                 return 1;
676         if( len1 < len2 )
677                 return -1;
678
679         return 0;
680 }
681
682 //  Update current page of btree by writing file contents
683 //      or flushing mapped area to disk.
684
685 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
686 {
687 off64_t off = page_no << bt->page_bits;
688
689 #ifdef unix
690     if ( !bt->mapped_io )
691          if ( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
692                  return bt->err = BTERR_wrt;
693 #else
694 uint amt[1];
695         if ( !bt->mapped_io )
696         {
697                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
698                 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
699                         return GetLastError(), bt->err = BTERR_wrt;
700
701                 if( *amt < bt->page_size )
702                         return GetLastError(), bt->err = BTERR_wrt;
703         } 
704         else if ( bt->mode == BT_fl ) {
705                         FlushViewOfFile(page, bt->page_size);
706                         FlushFileBuffers(bt->idx);
707         }
708 #endif
709         return 0;
710 }
711
712 // find page in cache 
713
714 BtHash *bt_findhash(BtDb *bt, uid page_no)
715 {
716 BtHash *hash;
717 uint idx;
718
719         // compute cache block first page and hash idx 
720
721         page_no &= ~bt->hashmask;
722         idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
723
724         if( bt->cache[idx] ) 
725                 hash = bt->nodes + bt->cache[idx];
726         else
727                 return NULL;
728
729         do if( hash->page_no == page_no )
730                  break;
731         while(hash = hash->hashnext );
732
733         return hash;
734 }
735
736 // add page cache entry to hash index
737
738 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
739 {
740 uint idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
741 BtHash *hash;
742
743         if( bt->cache[idx] ) {
744                 node->hashnext = hash = bt->nodes + bt->cache[idx];
745                 hash->hashprev = node;
746         }
747
748         node->hashprev = NULL;
749         bt->cache[idx] = (ushort)(node - bt->nodes);
750 }
751
752 // remove cache entry from hash table
753
754 void bt_unlinkhash(BtDb *bt, BtHash *node)
755 {
756 uint idx = (uint)(node->page_no >> bt->seg_bits) % bt->hashsize;
757 BtHash *hash;
758
759         // unlink node
760         if( hash = node->hashprev )
761                 hash->hashnext = node->hashnext;
762         else if( hash = node->hashnext )
763                 bt->cache[idx] = (ushort)(hash - bt->nodes);
764         else
765                 bt->cache[idx] = 0;
766
767         if( hash = node->hashnext )
768                 hash->hashprev = node->hashprev;
769 }
770
771 // add cache page to lru chain and map pages
772
773 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
774 {
775 int flag;
776 off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
777 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
778 BtHash *node;
779
780         memset(hash, 0, sizeof(BtHash));
781         hash->page_no = (page_no & ~bt->hashmask);
782         bt_linkhash(bt, hash, page_no);
783
784         if( node = hash->lrunext = bt->lrufirst )
785                 node->lruprev = hash;
786         else
787                 bt->lrulast = hash;
788
789         bt->lrufirst = hash;
790
791 #ifdef unix
792         flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
793         hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
794         if( hash->page == MAP_FAILED )
795                 return bt->err = BTERR_map, (BtPage)NULL;
796
797 #else
798         flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
799         hash->hmap = CreateFileMapping(bt->idx, NULL, flag,     (DWORD)(limit >> 32), (DWORD)limit, NULL);
800         if( !hash->hmap )
801                 return bt->err = BTERR_map, NULL;
802
803         flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
804         hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
805         if( !hash->page )
806                 return bt->err = BTERR_map, NULL;
807 #endif
808
809         return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
810 }
811
812 //      find or place requested page in page-cache
813 //      return memory address where page is located.
814
815 BtPage bt_hashpage(BtDb *bt, uid page_no)
816 {
817 BtHash *hash, *node, *next;
818 BtPage page;
819
820         // find page in cache and move to top of lru list  
821
822         if( hash = bt_findhash(bt, page_no) ) {
823                 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
824                 // swap node in lru list
825                 if( node = hash->lruprev ) {
826                         if( next = node->lrunext = hash->lrunext )
827                                 next->lruprev = node;
828                         else
829                                 bt->lrulast = node;
830
831                         if( next = hash->lrunext = bt->lrufirst )
832                                 next->lruprev = hash;
833                         else
834                                 return bt->err = BTERR_hash, (BtPage)NULL;
835
836                         hash->lruprev = NULL;
837                         bt->lrufirst = hash;
838                 }
839                 return page;
840         }
841
842         // map pages and add to cache entry
843
844         if( bt->nodecnt < bt->nodemax ) {
845                 hash = bt->nodes + ++bt->nodecnt;
846                 return bt_linklru(bt, hash, page_no);
847         }
848
849         // hash table is already full, replace last lru entry from the cache
850
851         if( hash = bt->lrulast ) {
852                 // unlink from lru list
853                 if( node = bt->lrulast = hash->lruprev )
854                         node->lrunext = NULL;
855                 else
856                         return bt->err = BTERR_hash, (BtPage)NULL;
857
858 #ifdef unix
859                 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
860 #else
861                 FlushViewOfFile(hash->page, 0);
862                 UnmapViewOfFile(hash->page);
863                 CloseHandle(hash->hmap);
864 #endif
865                 // unlink from hash table
866
867                 bt_unlinkhash(bt, hash);
868
869                 // map and add to cache
870
871                 return bt_linklru(bt, hash, page_no);
872         }
873
874         return bt->err = BTERR_hash, (BtPage)NULL;
875 }
876
877 //  map a btree page onto current page
878
879 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
880 {
881 off64_t off = page_no << bt->page_bits;
882 #ifndef unix
883 int amt[1];
884 #endif
885         
886         if( bt->mapped_io ) {
887                 bt->err = 0;
888                 *page = bt_hashpage(bt, page_no);
889                 return bt->err;
890         }
891 #ifdef unix
892         if ( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
893                 return bt->err = BTERR_map;
894 #else
895         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
896
897         if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
898                 return bt->err = BTERR_map;
899
900         if( *amt <  bt->page_size )
901                 return bt->err = BTERR_map;
902 #endif
903         return 0;
904 }
905
906 //      allocate a new page and write page into it
907
908 uid bt_newpage(BtDb *bt, BtPage page)
909 {
910 uid new_page;
911 char *pmap;
912 int reuse;
913
914         // lock page zero
915
916         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite) )
917                 return 0;
918
919         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
920                 return 0;
921
922         // use empty chain first
923         // else allocate empty page
924
925         if( new_page = bt_getid(bt->alloc[1].right) ) {
926                 if( bt_mappage (bt, &bt->temp, new_page) )
927                         return 0;       // don't unlock on error
928                 memcpy(bt->alloc[1].right, bt->temp->right, BtId);
929                 reuse = 1;
930         } else {
931                 new_page = bt_getid(bt->alloc->right);
932                 bt_putid(bt->alloc->right, new_page+1);
933                 reuse = 0;
934         }
935
936         if( bt_update(bt, bt->alloc, ALLOC_page) )
937                 return 0;       // don't unlock on error
938
939         if( !bt->mapped_io ) {
940                 if( bt_update(bt, page, new_page) )
941                         return 0;       //don't unlock on error
942
943                 // unlock page zero 
944
945                 if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
946                         return 0;
947
948                 return new_page;
949         }
950
951 #ifdef unix
952         if ( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
953                 return bt->err = BTERR_wrt, 0;
954
955         // if writing first page of hash block, zero last page in the block
956
957         if ( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
958         {
959                 // use temp buffer to write zeros
960                 memset(bt->zero, 0, bt->page_size);
961                 if ( pwrite(bt->idx,bt->zero, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
962                         return bt->err = BTERR_wrt, 0;
963         }
964 #else
965         //      bring new page into page-cache and copy page.
966         //      this will extend the file into the new pages.
967
968         if( !(pmap = (char*)bt_hashpage(bt, new_page & ~bt->hashmask)) )
969                 return 0;
970
971         memcpy(pmap+((new_page & bt->hashmask) << bt->page_bits), page, bt->page_size);
972 #endif
973
974         // unlock page zero 
975
976         if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
977                 return 0;
978
979         return new_page;
980 }
981
982 //  find slot in page for given key at a given level
983 //      return 0 if beyond fence value
984
985 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
986 {
987 uint diff, higher = set->page->cnt, low = 1, slot;
988
989         //      is page being deleted?  if so,
990         //      tell caller to follow right link
991
992         if( !set->page->act )
993                 return 0;
994
995         //      make stopper key an infinite fence value
996
997         if( bt_getid (set->page->right) )
998                 higher++;
999
1000         //      low is the next candidate, higher is already
1001         //      tested as .ge. the given key, loop ends when they meet
1002
1003         while( diff = higher - low ) {
1004                 slot = low + ( diff >> 1 );
1005                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1006                         low = slot + 1;
1007                 else
1008                         higher = slot;
1009         }
1010
1011         if( higher <= set->page->cnt )
1012                 return higher;
1013
1014         //      if leaf page, compare against fence value
1015
1016         //      return zero if key is on right link page
1017         //      or return slot beyond last key
1018
1019         if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 )
1020                 return 0;
1021
1022         return higher;
1023 }
1024
1025 //  find and load page at given level for given key
1026 //      leave page rd or wr locked as requested
1027
1028 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock)
1029 {
1030 uid page_no = ROOT_page, prevpage = 0;
1031 uint drill = 0xff, slot;
1032 uint mode, prevmode;
1033 int posted = 1;
1034
1035   //  start at root of btree and drill down
1036
1037   do {
1038         // determine lock mode of drill level
1039         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1040
1041         set->page_no = page_no;
1042
1043         // obtain access lock using lock chaining
1044
1045         if( page_no > ROOT_page )
1046           if( bt_lockpage(bt, page_no, BtLockAccess) )
1047                 return 0;                                                                       
1048
1049         if( prevpage )
1050           if( bt_unlockpage(bt, prevpage, prevmode) )
1051                 return 0;
1052
1053         // obtain read lock using lock chaining
1054
1055         if( bt_lockpage(bt, page_no, mode) )
1056                 return 0;                                                                       
1057
1058         if( page_no > ROOT_page )
1059           if( bt_unlockpage(bt, page_no, BtLockAccess) )
1060                 return 0;                                                                       
1061
1062         //      map/obtain page contents
1063
1064         if( bt_mappage (bt, &set->page, page_no) )
1065                 return 0;
1066
1067         // re-read and re-lock root after determining actual level of root
1068
1069         if( set->page->lvl != drill) {
1070                 if ( page_no != ROOT_page )
1071                         return bt->err = BTERR_struct, 0;
1072                         
1073                 drill = set->page->lvl;
1074
1075                 if( lock == BtLockWrite && drill == lvl )
1076                   if( bt_unlockpage(bt, page_no, mode) )
1077                         return 0;
1078                   else
1079                         continue;
1080         }
1081
1082         prevpage = page_no;
1083         prevmode = mode;
1084
1085         //  find key on page at this level
1086         //  and descend to requested level
1087
1088         if( slot = bt_findslot (set, key, len) ) {
1089           if( drill == lvl )
1090                 return bt->posted = posted, slot;
1091
1092           if( slot > set->page->cnt )
1093                 return bt->err = BTERR_struct, 0;
1094
1095         //  if drilling down, find next active key
1096
1097           while( slotptr(set->page, slot)->dead )
1098                 if( slot++ < set->page->cnt )
1099                         continue;
1100                 else
1101                         return bt->err = BTERR_struct, 0;
1102
1103           page_no = bt_getid(slotptr(set->page, slot)->id);
1104           posted = 1;
1105           drill--;
1106           continue;
1107         }
1108
1109         //  or slide right into next page
1110         //  (slide left from deleted page)
1111
1112         page_no = bt_getid(set->page->right);
1113         posted = 0;
1114
1115   } while( page_no );
1116
1117   // return error on end of right chain
1118
1119   bt->err = BTERR_struct;
1120   return 0;     // return error
1121 }
1122
1123 // drill down fixing fence values for left sibling tree
1124 //      starting with current left page in bt->temp
1125 //  call with bt->temp write locked
1126 //      return with all pages unlocked.
1127
1128 BTERR bt_fixfences (BtDb *bt, uid page_no, unsigned char *newfence)
1129 {
1130 unsigned char oldfence[256];
1131 uid prevpage = 0;
1132 int chk;
1133
1134   memcpy (oldfence, bt->temp->fence, 256);
1135
1136   //  start at left page of btree and drill down
1137   //  to update their fence values
1138
1139   do {
1140         // obtain access lock using lock chaining
1141
1142         if( prevpage ) {
1143           if( bt_lockpage(bt, page_no, BtLockAccess) )
1144                 return 0;                                                                       
1145
1146           if( bt_unlockpage(bt, prevpage, BtLockWrite) )
1147                 return 0;
1148
1149           // obtain parent/fence key maintenance lock
1150
1151           if( bt_lockpage(bt, page_no, BtLockParent) )
1152                 return 0;                                                                       
1153
1154           // obtain write lock using lock chaining
1155
1156           if( bt_lockpage(bt, page_no, BtLockWrite) )
1157                 return 0;                                                                       
1158
1159           if( bt_unlockpage(bt, page_no, BtLockAccess) )
1160                 return 0;                                                                       
1161
1162           //  map/obtain page contents
1163
1164           if( bt_mappage (bt, &bt->temp, page_no) )
1165                 return 0;
1166         }
1167
1168         chk = keycmp ((BtKey)bt->temp->fence, oldfence + 1, *oldfence);
1169     prevpage = page_no;
1170
1171         if( chk < 0 ) {
1172                 page_no = bt_getid (bt->temp->right);
1173                 continue;
1174         }
1175
1176         if( chk > 0 )
1177                 return bt->err = BTERR_struct;
1178
1179         memcpy (bt->temp->fence, newfence, 256);
1180
1181         if( bt_update (bt, bt->temp, page_no) )
1182                 return bt->err;
1183
1184         //  return when we reach a leaf page
1185
1186         if( !bt->temp->lvl ) {
1187                 if( bt_unlockpage (bt, page_no, BtLockWrite) )
1188                         return bt->err;
1189                 return bt_unlockpage (bt, page_no, BtLockParent);
1190         }
1191
1192         page_no = bt_getid(slotptr(bt->temp, bt->temp->cnt)->id);
1193
1194   } while( page_no );
1195
1196   // return error on end of right chain
1197
1198   bt->err = BTERR_struct;
1199   return 0;     // return error
1200 }
1201
1202 //      return page to free list
1203 //      page must be delete & write locked
1204
1205 BTERR bt_freepage (BtDb *bt, BtPage page, uid page_no)
1206 {
1207         //      lock & map allocation page
1208
1209         if( bt_lockpage (bt, ALLOC_page, BtLockWrite) )
1210                 return bt->err;
1211
1212         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
1213                 return bt->err;
1214
1215         //      store chain in second right
1216         bt_putid(page->right, bt_getid(bt->alloc[1].right));
1217         bt_putid(bt->alloc[1].right, page_no);
1218
1219         if( bt_update(bt, bt->alloc, ALLOC_page) )
1220                 return bt->err;
1221         if( bt_update(bt, page, page_no) )
1222                 return bt->err;
1223
1224         // unlock page zero 
1225
1226         if( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
1227                 return bt->err;
1228
1229         //  remove write lock on deleted node
1230
1231         if( bt_unlockpage(bt, page_no, BtLockWrite) )
1232                 return bt->err;
1233
1234         return bt_unlockpage (bt, page_no, BtLockDelete);
1235 }
1236
1237 //      remove the root level by promoting its only child
1238
1239 BTERR bt_removeroot (BtDb *bt, BtPage root, BtPage child, uid page_no)
1240 {
1241 uid next = 0;
1242
1243   do {
1244         if( next ) {
1245           if( bt_lockpage (bt, next, BtLockDelete) )
1246                 return bt->err;
1247           if( bt_lockpage (bt, next, BtLockWrite) )
1248                 return bt->err;
1249
1250           if( bt_mappage (bt, &child, next) )
1251                 return bt->err;
1252
1253           page_no = next;
1254         }
1255
1256         memcpy (root, child, bt->page_size);
1257         next = bt_getid (slotptr(child, child->cnt)->id);
1258
1259         if( bt_freepage (bt, child, page_no) )
1260                 return bt->err;
1261   } while( root->lvl > 1 && root->cnt == 1 );
1262
1263   if( bt_update (bt, root, ROOT_page) )
1264         return bt->err;
1265
1266   return bt_unlockpage (bt, ROOT_page, BtLockWrite);
1267 }
1268
1269 //  pull right page over ourselves in simple merge
1270
1271 BTERR bt_mergeright (BtDb *bt, uid page_no, BtPageSet *parent, uint slot)
1272 {
1273 uid right;
1274 uint idx;
1275
1276         // find our right neighbor
1277         //      right must exist because the stopper prevents
1278         //      the rightmost page from deleting
1279
1280         for( idx = slot; idx++ < parent->page->cnt; )
1281           if( !slotptr(parent->page, idx)->dead )
1282                 break;
1283
1284         right = bt_getid (slotptr (parent->page, idx)->id);
1285
1286         if( right != bt_getid (bt->page->right) )
1287                 return bt->err = BTERR_struct;
1288
1289         if( bt_lockpage (bt, right, BtLockDelete) )
1290                 return bt->err;
1291
1292         if( bt_lockpage (bt, right, BtLockWrite) )
1293                 return bt->err;
1294
1295         if( bt_mappage (bt, &bt->temp, right) )
1296                 return bt->err;
1297
1298         memcpy (bt->page, bt->temp, bt->page_size);
1299
1300         if( bt_update(bt, bt->page, page_no) )
1301                 return bt->err;
1302
1303         //  install ourselves as child page
1304         //      and delete ourselves from parent
1305
1306         bt_putid (slotptr(parent->page, idx)->id, page_no);
1307         slotptr(parent->page, slot)->dead = 1;
1308         parent->page->act--;
1309
1310         //      collapse any empty slots
1311
1312         while( idx = parent->page->cnt - 1 )
1313           if( slotptr(parent->page, idx)->dead ) {
1314                 *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1);
1315                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1316           } else
1317                   break;
1318
1319         if( bt_freepage (bt, bt->temp, right) )
1320                 return bt->err;
1321
1322         //      do we need to remove a btree level?
1323         //      (leave the first page of leaves alone)
1324
1325         if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
1326           if( bt->page->lvl )
1327                 return bt_removeroot (bt, parent->page, bt->page, page_no);
1328
1329         if( bt_update (bt, parent->page, parent->page_no) )
1330                 return bt->err;
1331
1332         if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1333                 return bt->err;
1334
1335         if( bt_unlockpage (bt, page_no, BtLockWrite) )
1336                 return bt->err;
1337
1338         if( bt_unlockpage (bt, page_no, BtLockDelete) )
1339                 return bt->err;
1340
1341         return 0;
1342 }
1343
1344 //      remove both child and parent from the btree
1345 //      from the fence position in the parent
1346
1347 BTERR bt_removeparent (BtDb *bt, uid page_no, BtPageSet *parent, uint lvl)
1348 {
1349 unsigned char rightfence[256], pagefence[256];
1350 uid right, ppage_no;
1351
1352         right = bt_getid (bt->page->right);
1353
1354         if( bt_lockpage (bt, right, BtLockParent) )
1355                 return bt->err;
1356
1357         if( bt_lockpage (bt, right, BtLockAccess) )
1358                 return bt->err;
1359
1360         if( bt_lockpage (bt, right, BtLockWrite) )
1361                 return bt->err;
1362
1363         if( bt_unlockpage (bt, right, BtLockAccess) )
1364                 return bt->err;
1365
1366         if( bt_mappage (bt, &bt->temp, right) )
1367                 return bt->err;
1368
1369         //      save right page fence value and
1370         //      parent fence value
1371
1372         memcpy (rightfence, bt->temp->fence, 256);
1373         memcpy (pagefence, bt->parent->fence, 256);
1374         ppage_no = parent->page_no;
1375
1376         //  pull right sibling over ourselves and unlock
1377
1378         memcpy (bt->page, bt->temp, bt->page_size);
1379
1380         if( bt_update(bt, bt->page, page_no) )
1381                 return bt->err;
1382
1383         if( bt_unlockpage (bt, page_no, BtLockDelete) )
1384                 return bt->err;
1385
1386         if( bt_unlockpage (bt, page_no, BtLockWrite) )
1387                 return bt->err;
1388
1389         //  install ourselves into right link from old right page
1390
1391         bt->temp->act = 0;              // tell bt_findslot to go right (left)
1392         bt_putid (bt->temp->right, page_no);
1393
1394         if( bt_update (bt, bt->temp, right) )
1395                 return bt->err;
1396
1397         if( bt_unlockpage (bt, right, BtLockWrite) )
1398                 return bt->err;
1399
1400         //      remove our slot from our parent
1401         //      clear act to signal bt_findslot to move right
1402
1403         slotptr(parent->page, parent->page->cnt)->dead = 1;
1404         parent->page->act = 0;
1405
1406         if( bt_update (bt, parent->page, ppage_no) )
1407                 return bt->err;
1408         if( bt_unlockpage (bt, ppage_no, BtLockWrite) )
1409                 return bt->err;
1410
1411         //      redirect right page pointer in its parent to our left
1412         //      and free the right page
1413
1414         if( bt_insertkey (bt, rightfence+1, *rightfence, lvl, page_no, time(NULL)) )
1415                 return bt->err;
1416
1417         if( bt_removepage (bt, ppage_no, lvl, pagefence) )
1418                 return bt->err;
1419
1420         if( bt_unlockpage (bt, right, BtLockParent) )
1421                 return bt->err;
1422
1423         //      wait for others to drain away
1424
1425         if( bt_lockpage (bt, right, BtLockDelete) )
1426                 return bt->err;
1427
1428         if( bt_lockpage (bt, right, BtLockWrite) )
1429                 return bt->err;
1430
1431         if( bt_mappage (bt, &bt->temp, right) )
1432                 return bt->err;
1433
1434         return bt_freepage (bt, bt->temp, right);
1435 }
1436
1437 //      remove page from btree
1438 //      call with page unlocked
1439 //      returns with page on free list
1440
1441 BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence)
1442 {
1443 BtPageSet parent[1];
1444 uint slot, idx;
1445 BtKey ptr;
1446 uid left;
1447
1448         parent->page = bt->parent;
1449
1450         //      load and lock our parent
1451
1452 retry:
1453         if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
1454                 return bt->err;
1455
1456         //      wait until we are posted in our parent
1457
1458         if( !bt->posted ) {
1459                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1460                         return bt->err;
1461 #ifdef unix
1462                 sched_yield();
1463 #else
1464                 SwitchToThread();
1465 #endif
1466                 goto retry;
1467         }
1468
1469         //      wait for others to finish
1470         //      and obtain final WriteLock
1471
1472         if( bt_lockpage (bt, page_no, BtLockDelete) )
1473                 return bt->err;
1474
1475         if( bt_lockpage (bt, page_no, BtLockWrite) )
1476                 return bt->err;
1477
1478         if( bt_mappage (bt, &bt->page, page_no) )
1479                 return bt->err;
1480
1481         //  was page re-established?
1482
1483         if( bt->page->act ) {
1484                 if( bt_unlockpage (bt, page_no, BtLockWrite) )
1485                         return bt->err;
1486                 if( bt_unlockpage (bt, page_no, BtLockDelete) )
1487                         return bt->err;
1488
1489                 return bt_unlockpage (bt, parent->page_no, BtLockWrite);
1490         }
1491                 
1492         //      can we do a simple merge entirely
1493         //      between siblings on the parent page?
1494
1495         if( slot < parent->page->cnt )
1496                 return bt_mergeright(bt, page_no, parent, slot);
1497
1498         //  find our left neighbor in our parent page
1499
1500         for( idx = slot; --idx; )
1501           if( !slotptr(parent->page, idx)->dead )
1502                 break;
1503
1504         //      if no left neighbor, delete ourselves and our parent
1505
1506         if( !idx )
1507                 return bt_removeparent (bt, page_no, parent, lvl+1);
1508
1509         // lock and map our left neighbor's page
1510
1511         left = bt_getid (slotptr(parent->page, idx)->id);
1512
1513         //      wait our turn on fence key maintenance
1514
1515         if( bt_lockpage(bt, left, BtLockParent) )
1516                 return bt->err;
1517
1518         if( bt_lockpage(bt, left, BtLockAccess) )
1519                 return bt->err;
1520
1521         if( bt_lockpage(bt, left, BtLockWrite) )
1522                 return bt->err;
1523
1524         if( bt_unlockpage(bt, left, BtLockAccess) )
1525                 return bt->err;
1526
1527         if( bt_mappage (bt, &bt->temp, left) )
1528                 return bt->err;
1529
1530         //  wait until sibling is in our parent
1531
1532         if( bt_getid (bt->temp->right) != page_no ) {
1533                 if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1534                         return bt->err;
1535                 if( bt_unlockpage (bt, left, BtLockWrite) )
1536                         return bt->err;
1537                 if( bt_unlockpage (bt, left, BtLockParent) )
1538                         return bt->err;
1539                 if( bt_unlockpage (bt, page_no, BtLockWrite) )
1540                         return bt->err;
1541                 if( bt_unlockpage (bt, page_no, BtLockDelete) )
1542                         return bt->err;
1543 #ifdef linux
1544                 sched_yield();
1545 #else
1546                 SwitchToThread();
1547 #endif
1548                 goto retry;
1549         }
1550
1551         //      delete our left sibling from parent
1552
1553         slotptr(parent->page,idx)->dead = 1;
1554         parent->page->dirty = 1;
1555         parent->page->act--;
1556
1557         //      redirect our parent slot to our left sibling
1558
1559         bt_putid (slotptr(parent->page, slot)->id, left);
1560
1561         //      update left page with our fence key
1562
1563         memcpy (bt->temp->right, bt->page->right, BtId);
1564
1565         //      collapse dead slots from parent
1566
1567         while( idx = parent->page->cnt - 1 )
1568           if( slotptr(parent->page, idx)->dead ) {
1569                 *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt);
1570                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1571           } else
1572                   break;
1573
1574         //  update & unlock parent page
1575
1576         if( bt_update (bt, parent->page, parent->page_no) )
1577                 return bt->err;
1578
1579         if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
1580                 return bt->err;
1581
1582         //      go down the left node's fence keys to the leaf level
1583         //      and update the fence keys in each page
1584
1585         if( bt_fixfences (bt, left, pagefence) )
1586                 return bt->err;
1587
1588         //  free our original page
1589
1590         if( bt_mappage (bt, &bt->temp, page_no) )
1591                 return bt->err;
1592
1593         return bt_freepage (bt, bt->temp,page_no);
1594 }
1595
1596 //  find and delete key on page by marking delete flag bit
1597 //  when page becomes empty, delete it
1598
1599 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
1600 {
1601 unsigned char pagefence[256];
1602 uint slot, found, act, idx;
1603 BtPageSet set[1];
1604 BtKey ptr;
1605
1606         set->page = bt->page;
1607
1608         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) )
1609                 ptr = keyptr(set->page, slot);
1610         else
1611                 return bt->err;
1612
1613         // if key is found delete it, otherwise ignore request
1614
1615         if( found = slot <= set->page->cnt )
1616           if( found = !keycmp (ptr, key, len) )
1617                 if( found = slotptr(set->page, slot)->dead == 0 ) {
1618                   slotptr(set->page,slot)->dead = 1;
1619                   set->page->dirty = 1;
1620                   set->page->act--;
1621
1622                   //    collapse empty slots
1623
1624                   while( idx = set->page->cnt - 1 )
1625                         if( slotptr(set->page, idx)->dead ) {
1626                           *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1627                           memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1628                         } else
1629                                 break;
1630
1631                   if( bt_update(bt, set->page, set->page_no) )
1632                         return bt->err;
1633                 }
1634
1635         // delete page when empty
1636
1637         memcpy (pagefence, set->page->fence, 256);
1638         act = set->page->act;
1639
1640         if( bt_unlockpage(bt, set->page_no, BtLockWrite) )
1641                 return bt->err;
1642
1643         if( !act )
1644           if( bt_removepage (bt, set->page_no, 0, pagefence) )
1645                 return bt->err;
1646
1647         bt->found = found;
1648         return 0;
1649 }
1650
1651 //      find key in leaf level and return row-id
1652
1653 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1654 {
1655 BtPageSet set[1];
1656 uint  slot;
1657 uid id = 0;
1658 BtKey ptr;
1659
1660         set->page = bt->page;
1661
1662         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
1663                 ptr = keyptr(set->page, slot);
1664         else
1665                 return 0;
1666
1667         // if key exists, return row-id
1668         //      otherwise return 0
1669
1670         if( slot <= set->page->cnt )
1671           if( !keycmp (ptr, key, len) )
1672                 id = bt_getid(slotptr(set->page,slot)->id);
1673
1674         if ( bt_unlockpage(bt, set->page_no, BtLockRead) )
1675                 return 0;
1676
1677         return id;
1678 }
1679
1680 //      check page for space available,
1681 //      clean if necessary and return
1682 //      0 - page needs splitting
1683 //      >0 - new slot value
1684  
1685 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
1686 {
1687 uint nxt = bt->page_size, off;
1688 uint cnt = 0, idx = 0;
1689 uint max = page->cnt;
1690 uint newslot = max;
1691 BtKey key;
1692
1693         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1694                 return slot;
1695
1696         //      skip cleanup if nothing to reclaim
1697
1698         if( !page->dirty )
1699                 return 0;
1700
1701         memcpy (bt->frame, page, bt->page_size);
1702
1703         // skip page info and set rest of page to zero
1704
1705         memset (page+1, 0, bt->page_size - sizeof(*page));
1706         page->dirty = 0;
1707         page->act = 0;
1708
1709         while( cnt++ < max ) {
1710                 if( cnt == slot )
1711                         newslot = idx + 1;
1712                 if( slotptr(bt->frame,cnt)->dead )
1713                         continue;
1714
1715                 // copy key
1716                 if( !page->lvl || cnt < max ) {
1717                         key = keyptr(bt->frame, cnt);
1718                         off = nxt -= key->len + 1;
1719                         memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1720                 } else
1721                         off = offsetof(struct BtPage_, fence);
1722
1723                 // copy slot
1724                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1725                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1726                 slotptr(page, idx)->off = off;
1727                 page->act++;
1728         }
1729         page->min = nxt;
1730         page->cnt = idx;
1731
1732         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1733                 return newslot;
1734
1735         return 0;
1736 }
1737
1738 // split the root and raise the height of the btree
1739
1740 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2)
1741 {
1742 unsigned char leftkey[256];
1743 uint nxt = bt->page_size;
1744 uid new_page;
1745
1746         //  Obtain an empty page to use, and copy the current
1747         //  root contents into it, e.g. lower keys
1748
1749         memcpy (leftkey, root->page->fence, 256);
1750         root->page->posted = 1;
1751
1752         if( !(new_page = bt_newpage(bt, root->page)) )
1753                 return bt->err;
1754
1755         // preserve the page info at the bottom
1756         // of higher keys and set rest to zero
1757
1758         memset(root->page+1, 0, bt->page_size - sizeof(*root->page));
1759         memset(root->page->fence, 0, 256);
1760         root->page->fence[0] = 2;
1761         root->page->fence[1] = 0xff;
1762         root->page->fence[2] = 0xff;
1763
1764         // insert new page fence key on newroot page
1765
1766         nxt -= *leftkey + 1;
1767         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1768         bt_putid(slotptr(root->page, 1)->id, new_page);
1769         slotptr(root->page, 1)->off = nxt;
1770         
1771         // insert stopper key on newroot page
1772         // and increase the root height
1773
1774         bt_putid(slotptr(root->page, 2)->id, page_no2);
1775         slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence);
1776
1777         bt_putid(root->page->right, 0);
1778         root->page->min = nxt;          // reset lowest used offset and key count
1779         root->page->cnt = 2;
1780         root->page->act = 2;
1781         root->page->lvl++;
1782
1783         // update and release root
1784
1785         if( bt_update(bt, root->page, root->page_no) )
1786                 return bt->err;
1787
1788         return bt_unlockpage(bt, root->page_no, BtLockWrite);
1789 }
1790
1791 //  split already locked full node
1792 //      return unlocked.
1793
1794 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1795 {
1796 uint cnt = 0, idx = 0, max, nxt = bt->page_size, off;
1797 uid right, page_no = set->page_no;
1798 unsigned char fencekey[256];
1799 uint lvl = set->page->lvl;
1800 BtKey key;
1801
1802         //  split higher half of keys to bt->frame
1803
1804         memset (bt->frame, 0, bt->page_size);
1805         max = set->page->cnt;
1806         cnt = max / 2;
1807         idx = 0;
1808
1809         while( cnt++ < max ) {
1810                 if( !lvl || cnt < max ) {
1811                         key = keyptr(set->page, cnt);
1812                         off = nxt -= key->len + 1;
1813                         memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1814                 } else
1815                         off = offsetof(struct BtPage_, fence);
1816
1817                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
1818                 slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
1819                 slotptr(bt->frame, idx)->off = off;
1820                 bt->frame->act++;
1821         }
1822
1823         if( page_no == ROOT_page )
1824                 bt->frame->posted = 1;
1825
1826         memcpy (bt->frame->fence, set->page->fence, 256);
1827         bt->frame->bits = bt->page_bits;
1828         bt->frame->min = nxt;
1829         bt->frame->cnt = idx;
1830         bt->frame->lvl = lvl;
1831
1832         // link right node
1833
1834         if( page_no > ROOT_page )
1835                 memcpy (bt->frame->right, set->page->right, BtId);
1836
1837         //      get new free page and write higher keys to it.
1838
1839         if( !(right = bt_newpage(bt, bt->frame)) )
1840                 return bt->err;
1841
1842         //      update lower keys to continue in old page
1843
1844         memcpy (bt->frame, set->page, bt->page_size);
1845         memset (set->page+1, 0, bt->page_size - sizeof(*set->page));
1846         nxt = bt->page_size;
1847         set->page->posted = 0;
1848         set->page->dirty = 0;
1849         set->page->act = 0;
1850         cnt = 0;
1851         idx = 0;
1852
1853         //  assemble page of smaller keys
1854
1855         while( cnt++ < max / 2 ) {
1856                 key = keyptr(bt->frame, cnt);
1857
1858                 if( !lvl || cnt < max / 2 ) {
1859                         off = nxt -= key->len + 1;
1860                         memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
1861                 } else 
1862                         off = offsetof(struct BtPage_, fence);
1863
1864                 memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1865                 slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1866                 slotptr(set->page, idx)->off = off;
1867                 set->page->act++;
1868         }
1869
1870         // install fence key for smaller key page
1871
1872         memset(set->page->fence, 0, 256);
1873         memcpy(set->page->fence, key, key->len + 1);
1874
1875         bt_putid(set->page->right, right);
1876         set->page->min = nxt;
1877         set->page->cnt = idx;
1878
1879         // if current page is the root page, split it
1880
1881         if( page_no == ROOT_page )
1882                 return bt_splitroot (bt, set, right);
1883
1884         // insert new fences in parent page
1885
1886         while( 1 ) {
1887                 memcpy (fencekey, set->page->fence, 256);
1888                 right = bt_getid (set->page->right);
1889                 set->page->posted = 1;
1890
1891                 if( bt_update (bt, set->page, page_no) )
1892                         return bt->err; 
1893
1894                 if( bt_unlockpage (bt, page_no, BtLockWrite) )
1895                         return bt->err;
1896
1897                 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1898                         return bt->err;
1899
1900                 if( bt_unlockpage (bt, page_no, BtLockParent) )
1901                         return bt->err;
1902
1903                 if( !(page_no = right) )
1904                         break;
1905
1906                 // obtain fence modification/installation lock
1907
1908                 if( bt_lockpage (bt, page_no, BtLockParent) )
1909                         return bt->err;
1910
1911                 if( bt_lockpage (bt, page_no, BtLockAccess) )
1912                         return bt->err;
1913
1914                 if( bt_lockpage (bt, page_no, BtLockWrite) )
1915                         return bt->err;
1916
1917                 if( bt_unlockpage (bt, page_no, BtLockAccess) )
1918                         return bt->err;
1919
1920                 if( bt_mappage (bt, &set->page, page_no) )
1921                         return bt->err;
1922
1923                 if( set->page->posted ) {
1924                         if( bt_unlockpage (bt, page_no, BtLockWrite) )
1925                                 return bt->err;
1926                         return bt_unlockpage (bt, page_no, BtLockParent);
1927                 }
1928
1929         }
1930
1931         return 0;
1932 }
1933
1934 //  Insert new key into the btree at requested level.
1935
1936 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1937 {
1938 BtPageSet set[1];
1939 uint slot, idx;
1940 BtKey ptr;
1941
1942   set->page = bt->page;
1943
1944   while( 1 ) {
1945         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1946                 ptr = keyptr(set->page, slot);
1947         else
1948         {
1949                 if ( !bt->err )
1950                         bt->err = BTERR_ovflw;
1951                 return bt->err;
1952         }
1953
1954         // if key already exists, update id and return
1955
1956         if( slot <= set->page->cnt )
1957           if( !keycmp (ptr, key, len) ) {
1958                 if( slotptr(set->page, slot)->dead )
1959                         set->page->act++;
1960
1961                 slotptr(set->page, slot)->dead = 0;
1962                 slotptr(set->page, slot)->tod = tod;
1963                 bt_putid(slotptr(set->page,slot)->id, id);
1964
1965                 if ( bt_update(bt, set->page, set->page_no) )
1966                         return bt->err;
1967
1968                 return bt_unlockpage(bt, set->page_no, BtLockWrite);
1969           }
1970
1971         // check if page has enough space
1972
1973         if( slot = bt_cleanpage (bt, set->page, len, slot) )
1974                 break;
1975
1976         if( bt_splitpage (bt, set) )
1977                 return bt->err;
1978   }
1979
1980   // calculate next available slot and copy key into page
1981
1982   set->page->min -= len + 1; // reset lowest used offset
1983   ((unsigned char *)set->page)[set->page->min] = len;
1984   memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
1985
1986   for( idx = slot; idx <= set->page->cnt; idx++ )
1987         if( slotptr(set->page, idx)->dead )
1988                 break;
1989
1990   // now insert key into array before slot
1991
1992   if( idx > set->page->cnt )
1993         set->page->cnt++;
1994
1995   set->page->act++;
1996
1997   while( idx > slot )
1998         *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
1999
2000   bt_putid(slotptr(set->page,slot)->id, id);
2001   slotptr(set->page, slot)->off = set->page->min;
2002   slotptr(set->page, slot)->tod = tod;
2003   slotptr(set->page, slot)->dead = 0;
2004
2005   if( bt_update(bt, set->page, set->page_no) )
2006         return bt->err;
2007
2008   return bt_unlockpage(bt, set->page_no, BtLockWrite);
2009 }
2010
2011 //  cache page of keys into cursor and return starting slot for given key
2012
2013 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2014 {
2015 BtPageSet set[1];
2016 uint slot;
2017
2018         set->page = bt->page;
2019
2020         // cache page for retrieval
2021         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2022                 memcpy (bt->cursor, set->page, bt->page_size);
2023         bt->cursor_page = set->page_no;
2024         if ( bt_unlockpage(bt, set->page_no, BtLockRead) )
2025                 return 0;
2026
2027         return slot;
2028 }
2029
2030 //  return next slot for cursor page
2031 //  or slide cursor right into next page
2032
2033 uint bt_nextkey (BtDb *bt, uint slot)
2034 {
2035 BtPageSet set[1];
2036 uid right;
2037
2038   do {
2039         right = bt_getid(bt->cursor->right);
2040         while( slot++ < bt->cursor->cnt )
2041           if( slotptr(bt->cursor,slot)->dead )
2042                 continue;
2043           else if( right || (slot < bt->cursor->cnt)) // skip infinite stopper
2044                 return slot;
2045           else
2046                 break;
2047
2048         if( !right )
2049                 break;
2050
2051         bt->cursor_page = right;
2052         set->page = bt->page;
2053
2054     if( bt_lockpage(bt, right, BtLockRead) )
2055                 return 0;
2056
2057         if( bt_mappage (bt, &set->page, right) )
2058                 break;
2059
2060         memcpy (bt->cursor, set->page, bt->page_size);
2061
2062         if( bt_unlockpage(bt, right, BtLockRead) )
2063                 return 0;
2064
2065         slot = 0;
2066   } while( 1 );
2067
2068   return bt->err = 0;
2069 }
2070
2071 BtKey bt_key(BtDb *bt, uint slot)
2072 {
2073         return keyptr(bt->cursor, slot);
2074 }
2075
2076 uid bt_uid(BtDb *bt, uint slot)
2077 {
2078         return bt_getid(slotptr(bt->cursor,slot)->id);
2079 }
2080
2081 uint bt_tod(BtDb *bt, uint slot)
2082 {
2083         return slotptr(bt->cursor,slot)->tod;
2084 }
2085
2086
2087 #ifdef STANDALONE
2088 //  standalone program to index file of keys
2089 //  then list them onto std-out
2090
2091 int main (int argc, char **argv)
2092 {
2093 uint slot, line = 0, off = 0, found = 0;
2094 int dead, ch, cnt = 0, bits = 12;
2095 unsigned char key[256];
2096 clock_t done, start;
2097 uint pgblk = 0;
2098 time_t tod[1];
2099 uint scan = 0;
2100 uint len = 0;
2101 uint map = 0;
2102 BtKey ptr;
2103 BtDb *bt;
2104 FILE *in;
2105
2106         if( argc < 4 ) {
2107                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_segments pages_per_segment start_line_number]\n", argv[0]);
2108                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2109                 fprintf (stderr, "  mapped_pool_segments: size of buffer pool in segments\n");
2110                 fprintf (stderr, "  pages_per_segment: size of buffer pool segment in pages in bits\n");
2111                 exit(0);
2112         }
2113
2114         start = clock();
2115         time(tod);
2116
2117         if( argc > 4 )
2118                 bits = atoi(argv[4]);
2119
2120         if( argc > 5 )
2121                 map = atoi(argv[5]);
2122
2123         if( map > 65536 )
2124                 fprintf (stderr, "Warning: buffer_pool > 65536 segments\n");
2125
2126         if( map && map < 8 )
2127                 fprintf (stderr, "Buffer_pool too small\n");
2128
2129         if( argc > 6 )
2130                 pgblk = atoi(argv[6]);
2131
2132         if( bits + pgblk > 30 )
2133                 fprintf (stderr, "Warning: very large buffer pool segment size\n");
2134
2135         if( argc > 7 )
2136                 off = atoi(argv[7]);
2137
2138         bt = bt_open ((argv[1]), BT_rw, bits, map, pgblk);
2139
2140         if( !bt ) {
2141                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2142                 exit (1);
2143         }
2144
2145         switch(argv[3][0]| 0x20)
2146         {
2147         case 'w':
2148                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2149                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2150                   while( ch = getc(in), ch != EOF )
2151                         if( ch == '\n' )
2152                         {
2153                           if( off )
2154                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2155
2156                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2157                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2158                           len = 0;
2159                         }
2160                         else if( len < 245 )
2161                                 key[len++] = ch;
2162                 fprintf(stderr, "finished adding keys, %d \n", line);
2163                 break;
2164
2165         case 'd':
2166                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2167                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2168                   while( ch = getc(in), ch != EOF )
2169                         if( ch == '\n' )
2170                         {
2171                           if( off )
2172                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2173                           line++;
2174                           if( bt_deletekey (bt, key, len) )
2175                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2176                           len = 0;
2177                         }
2178                         else if( len < 245 )
2179                                 key[len++] = ch;
2180                 fprintf(stderr, "finished deleting keys, %d \n", line);
2181                 break;
2182
2183         case 'f':
2184                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2185                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2186                   while( ch = getc(in), ch != EOF )
2187                         if( ch == '\n' )
2188                         {
2189                           if( off )
2190                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2191                           line++;
2192                           if( bt_findkey (bt, key, len) )
2193                                 found++;
2194                           else if( bt->err )
2195                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2196                           len = 0;
2197                         }
2198                         else if( len < 245 )
2199                                 key[len++] = ch;
2200                 fprintf(stderr, "finished search of %d keys, found %d\n", line, found);
2201                 break;
2202
2203         case 's':
2204                 scan++;
2205                 break;
2206
2207         }
2208
2209         done = clock();
2210         fprintf(stderr, " Time to complete: %.2f seconds\n", (float)(done - start) / CLOCKS_PER_SEC);
2211
2212         dead = cnt = 0;
2213         len = key[0] = 0;
2214
2215         fprintf(stderr, "started reading\n");
2216
2217         if( slot = bt_startkey (bt, key, len) )
2218           slot--;
2219         else
2220           fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2221
2222         while( slot = bt_nextkey (bt, slot) )
2223           if( cnt++, scan ) {
2224                         ptr = bt_key(bt, slot);
2225                         fwrite (ptr->key, ptr->len, 1, stdout);
2226                         fputc ('\n', stdout);
2227           }
2228
2229         fprintf(stderr, " Total keys read %d\n", cnt);
2230         return 0;
2231 }
2232
2233 #endif  //STANDALONE