]> pd.if.org Git - btree/blob - btree2s.c
Make bt_splitpage more conservative wrt posting fence keys
[btree] / btree2s.c
1 // btree version 2s
2 // 18 FEB 2014
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <time.h>
35 #include <fcntl.h>
36 #include <sys/mman.h>
37 #include <errno.h>
38 #else
39 #define WIN32_LEAN_AND_MEAN
40 #include <windows.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <time.h>
44 #include <fcntl.h>
45 #endif
46
47 #include <memory.h>
48 #include <string.h>
49
50 typedef unsigned long long      uid;
51
52 #ifndef unix
53 typedef unsigned long long      off64_t;
54 typedef unsigned short          ushort;
55 typedef unsigned int            uint;
56 #endif
57
58 #define BT_ro 0x6f72    // ro
59 #define BT_rw 0x7772    // rw
60 #define BT_fl 0x6c66    // fl
61
62 #define BT_maxbits              24                                      // maximum page size in bits
63 #define BT_minbits              9                                       // minimum page size in bits
64 #define BT_minpage              (1 << BT_minbits)       // minimum page size
65
66 /*
67 There are five lock types for each node in three independent sets: 
68 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
69 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
70 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
71 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
72 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
73 */
74
75 typedef enum{
76         BtLockAccess,
77         BtLockDelete,
78         BtLockRead,
79         BtLockWrite,
80         BtLockParent
81 }BtLock;
82
83 //      Define the length of the page and key pointers
84
85 #define BtId 6
86
87 //      Page key slot definition.
88
89 //      If BT_maxbits is 15 or less, you can save 2 bytes
90 //      for each key stored by making the first two uints
91 //      into ushorts.  You can also save 4 bytes by removing
92 //      the tod field from the key.
93
94 //      Keys are marked dead, but remain on the page until
95 //      cleanup is called. The fence key (highest key) for
96 //      the page is always present, even if dead.
97
98 typedef struct {
99         uint off:BT_maxbits;            // page offset for key start
100         uint dead:1;                            // set for deleted key
101         uint tod;                                       // time-stamp for key
102         unsigned char id[BtId];         // id associated with key
103 } BtSlot;
104
105 //      The key structure occupies space at the upper end of
106 //      each page.  It's a length byte followed by the value
107 //      bytes.
108
109 typedef struct {
110         unsigned char len;
111         unsigned char key[0];
112 } *BtKey;
113
114 //      The first part of an index page.
115 //      It is immediately followed
116 //      by the BtSlot array of keys.
117
118 typedef struct {
119         uint cnt;                                       // count of keys in page
120         uint act;                                       // count of active keys
121         uint min;                                       // next key offset
122         unsigned char bits:7;           // page size in bits
123         unsigned char free:1;           // page is on free list
124         unsigned char lvl:6;            // level of page
125         unsigned char kill:1;           // page is being deleted
126         unsigned char dirty:1;          // page is dirty
127         unsigned char right[BtId];      // page number to right
128 } *BtPage;
129
130 //      The memory mapping hash table entry
131
132 typedef struct {
133         BtPage page;            // mapped page pointer
134         uid  page_no;           // mapped page number
135         void *lruprev;          // least recently used previous cache block
136         void *lrunext;          // lru next cache block
137         void *hashprev;         // previous cache block for the same hash idx
138         void *hashnext;         // next cache block for the same hash idx
139 #ifndef unix
140         HANDLE hmap;
141 #endif
142 }BtHash;
143
144 //      The object structure for Btree access
145
146 typedef struct _BtDb {
147         uint page_size;         // each page size       
148         uint page_bits;         // each page size in bits       
149         uint seg_bits;          // segment size in pages in bits
150         uid page_no;            // current page number  
151         uid cursor_page;        // current cursor page number   
152         int  err;
153         uint mode;                      // read-write mode
154         uint mapped_io;         // use memory mapping
155         BtPage temp;            // temporary frame buffer (memory mapped/file IO)
156         BtPage alloc;           // frame buffer for alloc page ( page 0 )
157         BtPage cursor;          // cached frame for start/next (never mapped)
158         BtPage frame;           // spare frame for the page split (never mapped)
159         BtPage zero;            // zeroes frame buffer (never mapped)
160         BtPage page;            // current page
161 #ifdef unix
162         int idx;
163 #else
164         HANDLE idx;
165 #endif
166         unsigned char *mem;     // frame, cursor, page memory buffer
167         int nodecnt;            // highest page cache segment in use
168         int nodemax;            // highest page cache segment allocated
169         int hashmask;           // number of pages in segments - 1
170         int hashsize;           // size of hash table
171         int found;                      // last deletekey found key
172         BtHash *lrufirst;       // lru list head
173         BtHash *lrulast;        // lru list tail
174         ushort *cache;          // hash table for cached segments
175         BtHash nodes[1];        // segment cache follows
176 } BtDb;
177
178 typedef enum {
179 BTERR_ok = 0,
180 BTERR_struct,
181 BTERR_ovflw,
182 BTERR_lock,
183 BTERR_map,
184 BTERR_wrt,
185 BTERR_hash
186 } BTERR;
187
188 // B-Tree functions
189 extern void bt_close (BtDb *bt);
190 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk, uint pgblk);
191 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
192 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
193 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
194 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
195 extern uint bt_nextkey   (BtDb *bt, uint slot);
196
197 //  Helper functions to return slot values
198
199 extern BtKey bt_key (BtDb *bt, uint slot);
200 extern uid bt_uid (BtDb *bt, uint slot);
201 extern uint bt_tod (BtDb *bt, uint slot);
202
203 //  BTree page number constants
204 #define ALLOC_page              0
205 #define ROOT_page               1
206 #define LEAF_page               2
207
208 //      Number of levels to create in a new BTree
209
210 #define MIN_lvl                 2
211
212 //  The page is allocated from low and hi ends.
213 //  The key offsets and row-id's are allocated
214 //  from the bottom, while the text of the key
215 //  is allocated from the top.  When the two
216 //  areas meet, the page is split into two.
217
218 //  A key consists of a length byte, two bytes of
219 //  index number (0 - 65534), and up to 253 bytes
220 //  of key value.  Duplicate keys are discarded.
221 //  Associated with each key is a 48 bit row-id.
222
223 //  The b-tree root is always located at page 1.
224 //      The first leaf page of level zero is always
225 //      located on page 2.
226
227 //      The b-tree pages are linked with right
228 //      pointers to facilitate enumerators,
229 //      and provide for concurrency.
230
231 //      When to root page fills, it is split in two and
232 //      the tree height is raised by a new root at page
233 //      one with two keys.
234
235 //      Deleted keys are marked with a dead bit until
236 //      page cleanup The fence key for a node is always
237 //      present, even after deletion and cleanup.
238
239 //  Deleted leaf pages are reclaimed  on a free list.
240 //      The upper levels of the btree are fixed on creation.
241
242 //  Groups of pages from the btree are optionally
243 //  cached with memory mapping. A hash table is used to keep
244 //  track of the cached pages.  This behaviour is controlled
245 //  by the number of cache blocks parameter and pages per block
246 //      given to bt_open.
247
248 //  To achieve maximum concurrency one page is locked at a time
249 //  as the tree is traversed to find leaf key in question. The right
250 //  page numbers are used in cases where the page is being split,
251 //      or consolidated.
252
253 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
254 //      and chains empty leaf pages together for reuse.
255
256 //      Parent locks are obtained to prevent resplitting or deleting a node
257 //      before its fence is posted into its upper level.
258
259 //      A special open mode of BT_fl is provided to safely access files on
260 //      WIN32 networks. WIN32 network operations should not use memory mapping.
261 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
262 //      to prevent local caching of network file contents.
263
264 //      Access macros to address slot and key values from the page.
265 //      Page slots use 1 based indexing.
266
267 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
268 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
269
270 void bt_putid(unsigned char *dest, uid id)
271 {
272 int i = BtId;
273
274         while( i-- )
275                 dest[i] = (unsigned char)id, id >>= 8;
276 }
277
278 uid bt_getid(unsigned char *src)
279 {
280 uid id = 0;
281 int i;
282
283         for( i = 0; i < BtId; i++ )
284                 id <<= 8, id |= *src++; 
285
286         return id;
287 }
288
289 // place write, read, or parent lock on requested page_no.
290
291 BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode)
292 {
293 off64_t off = page_no << bt->page_bits;
294 #ifdef unix
295 int flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
296 struct flock lock[1];
297 #else
298 uint flags = 0, len;
299 OVERLAPPED ovl[1];
300 #endif
301
302         if( mode == BtLockRead || mode == BtLockWrite )
303                 off +=  sizeof(*bt->page);      // use second segment
304
305         if( mode == BtLockParent )
306                 off +=  2 * sizeof(*bt->page);  // use third segment
307
308 #ifdef unix
309         memset (lock, 0, sizeof(lock));
310
311         lock->l_start = off;
312         lock->l_type = (mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent) ? F_WRLCK : F_RDLCK;
313         lock->l_len = sizeof(*bt->page);
314         lock->l_whence = 0;
315
316         if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
317                 return bt->err = BTERR_lock;
318
319         return 0;
320 #else
321         memset (ovl, 0, sizeof(ovl));
322         ovl->OffsetHigh = (uint)(off >> 32);
323         ovl->Offset = (uint)off;
324         len = sizeof(*bt->page);
325
326         //      use large offsets to
327         //      simulate advisory locking
328
329         ovl->OffsetHigh |= 0x80000000;
330
331         if( mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent )
332                 flags |= LOCKFILE_EXCLUSIVE_LOCK;
333
334         if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) )
335                 return bt->err = 0;
336
337         return bt->err = BTERR_lock;
338 #endif 
339 }
340
341 // remove write, read, or parent lock on requested page_no.
342
343 BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
344 {
345 off64_t off = page_no << bt->page_bits;
346 #ifdef unix
347 struct flock lock[1];
348 #else
349 OVERLAPPED ovl[1];
350 uint len;
351 #endif
352
353         if( mode == BtLockRead || mode == BtLockWrite )
354                 off +=  sizeof(*bt->page);      // use second segment
355
356         if( mode == BtLockParent )
357                 off +=  2 * sizeof(*bt->page);  // use third segment
358
359 #ifdef unix
360         memset (lock, 0, sizeof(lock));
361
362         lock->l_start = off;
363         lock->l_type = F_UNLCK;
364         lock->l_len = sizeof(*bt->page);
365         lock->l_whence = 0;
366
367         if( fcntl (bt->idx, F_SETLK, lock) < 0 )
368                 return bt->err = BTERR_lock;
369 #else
370         memset (ovl, 0, sizeof(ovl));
371         ovl->OffsetHigh = (uint)(off >> 32);
372         ovl->Offset = (uint)off;
373         len = sizeof(*bt->page);
374
375         //      use large offsets to
376         //      simulate advisory locking
377
378         ovl->OffsetHigh |= 0x80000000;
379
380         if( !UnlockFileEx (bt->idx, 0, len, 0, ovl) )
381                 return GetLastError(), bt->err = BTERR_lock;
382 #endif
383
384         return bt->err = 0;
385 }
386
387 //      close and release memory
388
389 void bt_close (BtDb *bt)
390 {
391 BtHash *hash;
392 #ifdef unix
393         // release mapped pages
394
395         if( hash = bt->lrufirst )
396                 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
397                 while(hash = hash->lrunext);
398
399         if ( bt->mem )
400                 free (bt->mem);
401         close (bt->idx);
402         free (bt->cache);
403         free (bt);
404 #else
405         if( hash = bt->lrufirst )
406           do
407           {
408                 FlushViewOfFile(hash->page, 0);
409                 UnmapViewOfFile(hash->page);
410                 CloseHandle(hash->hmap);
411           } while(hash = hash->lrunext);
412
413         if ( bt->mem)
414                 VirtualFree (bt->mem, 0, MEM_RELEASE);
415         FlushFileBuffers(bt->idx);
416         CloseHandle(bt->idx);
417         GlobalFree (bt->cache);
418         GlobalFree (bt);
419 #endif
420 }
421
422 //  open/create new btree
423 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
424 //              size of mapped page cache (e.g. 8192) or zero for no mapping.
425
426 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax, uint pgblk)
427 {
428 uint lvl, attr, cacheblk, last;
429 BtLock lockmode = BtLockWrite;
430 BtPage alloc;
431 off64_t size;
432 uint amt[1];
433 BtKey key;
434 BtDb* bt;
435
436 #ifndef unix
437 SYSTEM_INFO sysinfo[1];
438 #endif
439
440 #ifdef unix
441         bt = malloc (sizeof(BtDb) + nodemax * sizeof(BtHash));
442         memset (bt, 0, sizeof(BtDb));
443
444         switch (mode & 0x7fff)
445         {
446         case BT_fl:
447         case BT_rw:
448                 bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
449                 break;
450
451         case BT_ro:
452         default:
453                 bt->idx = open ((char*)name, O_RDONLY);
454                 lockmode = BtLockRead;
455                 break;
456         }
457         if( bt->idx == -1 )
458                 return free(bt), NULL;
459         
460         if( nodemax )
461                 cacheblk = 4096;        // page size for unix
462         else
463                 cacheblk = 0;
464
465 #else
466         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb) + nodemax * sizeof(BtHash));
467         attr = FILE_ATTRIBUTE_NORMAL;
468         switch (mode & 0x7fff)
469         {
470         case BT_fl:
471                 attr |= FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING;
472
473         case BT_rw:
474                 bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
475                 break;
476
477         case BT_ro:
478         default:
479                 bt->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
480                 lockmode = BtLockRead;
481                 break;
482         }
483         if( bt->idx == INVALID_HANDLE_VALUE )
484                 return GlobalFree(bt), NULL;
485
486         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
487         GetSystemInfo(sysinfo);
488
489         if( nodemax )
490                 cacheblk = sysinfo->dwAllocationGranularity;
491         else
492                 cacheblk = 0;
493 #endif
494
495         // determine sanity of page size
496
497         if( bits > BT_maxbits )
498                 bits = BT_maxbits;
499         else if( bits < BT_minbits )
500                 bits = BT_minbits;
501
502         if ( bt_lockpage(bt, ALLOC_page, lockmode) )
503                 return bt_close (bt), NULL;
504
505 #ifdef unix
506         *amt = 0;
507
508         // read minimum page size to get root info
509
510         if( size = lseek (bt->idx, 0L, 2) ) {
511                 alloc = malloc (BT_minpage);
512                 pread(bt->idx, alloc, BT_minpage, 0);
513                 bits = alloc->bits;
514                 free (alloc);
515         } else if( mode == BT_ro )
516                 return bt_close (bt), NULL;
517 #else
518         size = GetFileSize(bt->idx, amt);
519
520         if( size || *amt ) {
521                 alloc = VirtualAlloc(NULL, BT_minpage, MEM_COMMIT, PAGE_READWRITE);
522                 if( !ReadFile(bt->idx, (char *)alloc, BT_minpage, amt, NULL) )
523                         return bt_close (bt), NULL;
524                 bits = alloc->bits;
525                 VirtualFree (alloc, 0, MEM_RELEASE);
526         } else if( mode == BT_ro )
527                 return bt_close (bt), NULL;
528 #endif
529
530         bt->page_size = 1 << bits;
531         bt->page_bits = bits;
532
533         bt->nodemax = nodemax;
534         bt->mode = mode;
535
536         // setup cache mapping
537
538         if( cacheblk ) {
539                 if( cacheblk < bt->page_size )
540                         cacheblk = bt->page_size;
541
542                 bt->hashsize = nodemax / 8;
543                 bt->hashmask = (cacheblk >> bits) - 1;
544                 bt->mapped_io = 1;
545         }
546
547         //      requested number of pages per memmap segment
548
549         if( cacheblk )
550           if( (1 << pgblk) > bt->hashmask )
551                 bt->hashmask = (1 << pgblk) - 1;
552
553         bt->seg_bits = 0;
554
555         while( (1 << bt->seg_bits) <= bt->hashmask )
556                 bt->seg_bits++;
557
558 #ifdef unix
559         bt->mem = malloc (6 *bt->page_size);
560         bt->cache = calloc (bt->hashsize, sizeof(ushort));
561 #else
562         bt->mem = VirtualAlloc(NULL, 6 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
563         bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, bt->hashsize * sizeof(ushort));
564 #endif
565         bt->frame = (BtPage)bt->mem;
566         bt->cursor = (BtPage)(bt->mem + bt->page_size);
567         bt->page = (BtPage)(bt->mem + 2 * bt->page_size);
568         bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size);
569         bt->temp = (BtPage)(bt->mem + 4 * bt->page_size);
570         bt->zero = (BtPage)(bt->mem + 5 * bt->page_size);
571
572         if( size || *amt ) {
573                 if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
574                         return bt_close (bt), NULL;
575
576                 return bt;
577         }
578
579         // initializes an empty b-tree with root page and page of leaves
580
581         memset (bt->alloc, 0, bt->page_size);
582         bt_putid(bt->alloc->right, MIN_lvl+1);
583         bt->alloc->bits = bt->page_bits;
584
585 #ifdef unix
586         if( write (bt->idx, bt->alloc, bt->page_size) < bt->page_size )
587                 return bt_close (bt), NULL;
588 #else
589         if( !WriteFile (bt->idx, (char *)bt->alloc, bt->page_size, amt, NULL) )
590                 return bt_close (bt), NULL;
591
592         if( *amt < bt->page_size )
593                 return bt_close (bt), NULL;
594 #endif
595
596         memset (bt->frame, 0, bt->page_size);
597         bt->frame->bits = bt->page_bits;
598
599         for( lvl=MIN_lvl; lvl--; ) {
600                 slotptr(bt->frame, 1)->off = bt->page_size - 3;
601                 bt_putid(slotptr(bt->frame, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);               // next(lower) page number
602                 key = keyptr(bt->frame, 1);
603                 key->len = 2;                   // create stopper key
604                 key->key[0] = 0xff;
605                 key->key[1] = 0xff;
606                 bt->frame->min = bt->page_size - 3;
607                 bt->frame->lvl = lvl;
608                 bt->frame->cnt = 1;
609                 bt->frame->act = 1;
610 #ifdef unix
611                 if( write (bt->idx, bt->frame, bt->page_size) < bt->page_size )
612                         return bt_close (bt), NULL;
613 #else
614                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
615                         return bt_close (bt), NULL;
616
617                 if( *amt < bt->page_size )
618                         return bt_close (bt), NULL;
619 #endif
620         }
621
622         // create empty page area by writing last page of first
623         // cache area (other pages are zeroed by O/S)
624
625         if( bt->mapped_io && bt->hashmask ) {
626                 memset(bt->frame, 0, bt->page_size);
627                 last = bt->hashmask;
628
629                 while( last < MIN_lvl + 1 )
630                         last += bt->hashmask + 1;
631 #ifdef unix
632                 pwrite(bt->idx, bt->frame, bt->page_size, last << bt->page_bits);
633 #else
634                 SetFilePointer (bt->idx, last << bt->page_bits, NULL, FILE_BEGIN);
635                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
636                         return bt_close (bt), NULL;
637                 if( *amt < bt->page_size )
638                         return bt_close (bt), NULL;
639 #endif
640         }
641
642         if( bt_unlockpage(bt, ALLOC_page, lockmode) )
643                 return bt_close (bt), NULL;
644
645         return bt;
646 }
647
648 //  compare two keys, returning > 0, = 0, or < 0
649 //  as the comparison value
650
651 int keycmp (BtKey key1, unsigned char *key2, uint len2)
652 {
653 uint len1 = key1->len;
654 int ans;
655
656         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
657                 return ans;
658
659         if( len1 > len2 )
660                 return 1;
661         if( len1 < len2 )
662                 return -1;
663
664         return 0;
665 }
666
667 //  Update current page of btree by writing file contents
668 //      or flushing mapped area to disk.
669
670 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
671 {
672 off64_t off = page_no << bt->page_bits;
673
674 #ifdef unix
675     if ( !bt->mapped_io )
676          if ( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
677                  return bt->err = BTERR_wrt;
678 #else
679 uint amt[1];
680         if ( !bt->mapped_io )
681         {
682                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
683                 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
684                         return GetLastError(), bt->err = BTERR_wrt;
685
686                 if( *amt < bt->page_size )
687                         return GetLastError(), bt->err = BTERR_wrt;
688         } 
689         else if ( bt->mode == BT_fl ) {
690                         FlushViewOfFile(page, bt->page_size);
691                         FlushFileBuffers(bt->idx);
692         }
693 #endif
694         return 0;
695 }
696
697 // find page in cache 
698
699 BtHash *bt_findhash(BtDb *bt, uid page_no)
700 {
701 BtHash *hash;
702 uint idx;
703
704         // compute cache block first page and hash idx 
705
706         page_no &= ~bt->hashmask;
707         idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
708
709         if( bt->cache[idx] ) 
710                 hash = bt->nodes + bt->cache[idx];
711         else
712                 return NULL;
713
714         do if( hash->page_no == page_no )
715                  break;
716         while(hash = hash->hashnext );
717
718         return hash;
719 }
720
721 // add page cache entry to hash index
722
723 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
724 {
725 uint idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
726 BtHash *hash;
727
728         if( bt->cache[idx] ) {
729                 node->hashnext = hash = bt->nodes + bt->cache[idx];
730                 hash->hashprev = node;
731         }
732
733         node->hashprev = NULL;
734         bt->cache[idx] = (ushort)(node - bt->nodes);
735 }
736
737 // remove cache entry from hash table
738
739 void bt_unlinkhash(BtDb *bt, BtHash *node)
740 {
741 uint idx = (uint)(node->page_no >> bt->seg_bits) % bt->hashsize;
742 BtHash *hash;
743
744         // unlink node
745         if( hash = node->hashprev )
746                 hash->hashnext = node->hashnext;
747         else if( hash = node->hashnext )
748                 bt->cache[idx] = (ushort)(hash - bt->nodes);
749         else
750                 bt->cache[idx] = 0;
751
752         if( hash = node->hashnext )
753                 hash->hashprev = node->hashprev;
754 }
755
756 // add cache page to lru chain and map pages
757
758 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
759 {
760 int flag;
761 off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
762 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
763 BtHash *node;
764
765         memset(hash, 0, sizeof(BtHash));
766         hash->page_no = (page_no & ~bt->hashmask);
767         bt_linkhash(bt, hash, page_no);
768
769         if( node = hash->lrunext = bt->lrufirst )
770                 node->lruprev = hash;
771         else
772                 bt->lrulast = hash;
773
774         bt->lrufirst = hash;
775
776 #ifdef unix
777         flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
778         hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
779         if( hash->page == MAP_FAILED )
780                 return bt->err = BTERR_map, (BtPage)NULL;
781
782 #else
783         flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
784         hash->hmap = CreateFileMapping(bt->idx, NULL, flag,     (DWORD)(limit >> 32), (DWORD)limit, NULL);
785         if( !hash->hmap )
786                 return bt->err = BTERR_map, NULL;
787
788         flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
789         hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
790         if( !hash->page )
791                 return bt->err = BTERR_map, NULL;
792 #endif
793
794         return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
795 }
796
797 //      find or place requested page in page-cache
798 //      return memory address where page is located.
799
800 BtPage bt_hashpage(BtDb *bt, uid page_no)
801 {
802 BtHash *hash, *node, *next;
803 BtPage page;
804
805         // find page in cache and move to top of lru list  
806
807         if( hash = bt_findhash(bt, page_no) ) {
808                 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
809                 // swap node in lru list
810                 if( node = hash->lruprev ) {
811                         if( next = node->lrunext = hash->lrunext )
812                                 next->lruprev = node;
813                         else
814                                 bt->lrulast = node;
815
816                         if( next = hash->lrunext = bt->lrufirst )
817                                 next->lruprev = hash;
818                         else
819                                 return bt->err = BTERR_hash, (BtPage)NULL;
820
821                         hash->lruprev = NULL;
822                         bt->lrufirst = hash;
823                 }
824                 return page;
825         }
826
827         // map pages and add to cache entry
828
829         if( bt->nodecnt < bt->nodemax ) {
830                 hash = bt->nodes + ++bt->nodecnt;
831                 return bt_linklru(bt, hash, page_no);
832         }
833
834         // hash table is already full, replace last lru entry from the cache
835
836         if( hash = bt->lrulast ) {
837                 // unlink from lru list
838                 if( node = bt->lrulast = hash->lruprev )
839                         node->lrunext = NULL;
840                 else
841                         return bt->err = BTERR_hash, (BtPage)NULL;
842
843 #ifdef unix
844                 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
845 #else
846                 FlushViewOfFile(hash->page, 0);
847                 UnmapViewOfFile(hash->page);
848                 CloseHandle(hash->hmap);
849 #endif
850                 // unlink from hash table
851
852                 bt_unlinkhash(bt, hash);
853
854                 // map and add to cache
855
856                 return bt_linklru(bt, hash, page_no);
857         }
858
859         return bt->err = BTERR_hash, (BtPage)NULL;
860 }
861
862 //  map a btree page onto current page
863
864 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
865 {
866 off64_t off = page_no << bt->page_bits;
867 #ifndef unix
868 int amt[1];
869 #endif
870         
871         if( bt->mapped_io ) {
872                 bt->err = 0;
873                 *page = bt_hashpage(bt, page_no);
874                 return bt->err;
875         }
876 #ifdef unix
877         if ( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
878                 return bt->err = BTERR_map;
879 #else
880         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
881
882         if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
883                 return bt->err = BTERR_map;
884
885         if( *amt <  bt->page_size )
886                 return bt->err = BTERR_map;
887 #endif
888         return 0;
889 }
890
891 //      deallocate a deleted page 
892 //      place on free chain out of allocator page
893
894 BTERR bt_freepage(BtDb *bt, uid page_no)
895 {
896         if( bt_mappage (bt, &bt->temp, page_no) )
897                 return bt->err;
898
899         //      lock allocation page
900
901         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite) )
902                 return bt->err;
903
904         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
905                 return bt->err;
906
907         //      store chain in second right
908         bt_putid(bt->temp->right, bt_getid(bt->alloc[1].right));
909         bt_putid(bt->alloc[1].right, page_no);
910         bt->temp->free = 1;
911
912         if( bt_update(bt, bt->alloc, ALLOC_page) )
913                 return bt->err;
914         if( bt_update(bt, bt->temp, page_no) )
915                 return bt->err;
916
917         // unlock page zero 
918
919         if( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
920                 return bt->err;
921
922         //  remove write lock on deleted node
923
924         if( bt_unlockpage(bt, page_no, BtLockWrite) )
925                 return bt->err;
926
927         //  remove delete lock on deleted node
928
929         if( bt_unlockpage(bt, page_no, BtLockDelete) )
930                 return bt->err;
931
932         return 0;
933 }
934
935 //      allocate a new page and write page into it
936
937 uid bt_newpage(BtDb *bt, BtPage page)
938 {
939 uid new_page;
940 char *pmap;
941 int reuse;
942
943         // lock page zero
944
945         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite) )
946                 return 0;
947
948         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
949                 return 0;
950
951         // use empty chain first
952         // else allocate empty page
953
954         if( new_page = bt_getid(bt->alloc[1].right) ) {
955                 if( bt_mappage (bt, &bt->temp, new_page) )
956                         return 0;       // don't unlock on error
957                 bt_putid(bt->alloc[1].right, bt_getid(bt->temp->right));
958                 reuse = 1;
959         } else {
960                 new_page = bt_getid(bt->alloc->right);
961                 bt_putid(bt->alloc->right, new_page+1);
962                 reuse = 0;
963         }
964
965         if( bt_update(bt, bt->alloc, ALLOC_page) )
966                 return 0;       // don't unlock on error
967
968         if( !bt->mapped_io ) {
969                 if( bt_update(bt, page, new_page) )
970                         return 0;       //don't unlock on error
971
972                 // unlock page zero 
973
974                 if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
975                         return 0;
976
977                 return new_page;
978         }
979
980 #ifdef unix
981         if ( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
982                 return bt->err = BTERR_wrt, 0;
983
984         // if writing first page of hash block, zero last page in the block
985
986         if ( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
987         {
988                 // use temp buffer to write zeros
989                 memset(bt->zero, 0, bt->page_size);
990                 if ( pwrite(bt->idx,bt->zero, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
991                         return bt->err = BTERR_wrt, 0;
992         }
993 #else
994         //      bring new page into page-cache and copy page.
995         //      this will extend the file into the new pages.
996
997         if( !(pmap = (char*)bt_hashpage(bt, new_page & ~bt->hashmask)) )
998                 return 0;
999
1000         memcpy(pmap+((new_page & bt->hashmask) << bt->page_bits), page, bt->page_size);
1001 #endif
1002
1003         // unlock page zero 
1004
1005         if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
1006                 return 0;
1007
1008         return new_page;
1009 }
1010
1011 //  find slot in page for given key at a given level
1012
1013 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1014 {
1015 uint diff, higher = bt->page->cnt, low = 1, slot;
1016 uint good = 0;
1017
1018         //      make stopper key an infinite fence value
1019
1020         if( bt_getid (bt->page->right) )
1021                 higher++;
1022         else
1023                 good++;
1024
1025         //      low is the lowest candidate, higher is already
1026         //      tested as .ge. the given key, loop ends when they meet
1027
1028         while( diff = higher - low ) {
1029                 slot = low + ( diff >> 1 );
1030                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1031                         low = slot + 1;
1032                 else
1033                         higher = slot, good++;
1034         }
1035
1036         //      return zero if key is on right link page
1037
1038         return good ? higher : 0;
1039 }
1040
1041 //  find and load page at given level for given key
1042 //      leave page rd or wr locked as requested
1043
1044 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1045 {
1046 uid page_no = ROOT_page, prevpage = 0;
1047 uint drill = 0xff, slot;
1048 uint mode, prevmode;
1049
1050   //  start at root of btree and drill down
1051
1052   do {
1053         // determine lock mode of drill level
1054         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1055
1056         bt->page_no = page_no;
1057
1058         // obtain access lock using lock chaining
1059
1060         if( page_no > ROOT_page )
1061           if( bt_lockpage(bt, bt->page_no, BtLockAccess) )
1062                 return 0;                                                                       
1063
1064         if( prevpage )
1065           if( bt_unlockpage(bt, prevpage, prevmode) )
1066                 return 0;
1067
1068         // obtain read lock using lock chaining
1069
1070         if( bt_lockpage(bt, bt->page_no, mode) )
1071                 return 0;                                                                       
1072
1073         if( page_no > ROOT_page )
1074           if( bt_unlockpage(bt, bt->page_no, BtLockAccess) )
1075                 return 0;                                                                       
1076
1077         //      map/obtain page contents
1078
1079         if( bt_mappage (bt, &bt->page, page_no) )
1080                 return 0;
1081
1082         // re-read and re-lock root after determining actual level of root
1083
1084         if( bt->page->lvl != drill) {
1085                 if ( bt->page_no != ROOT_page )
1086                         return bt->err = BTERR_struct, 0;
1087                         
1088                 drill = bt->page->lvl;
1089
1090                 if( lock != BtLockRead && drill == lvl )
1091                   if( bt_unlockpage(bt, page_no, mode) )
1092                         return 0;
1093                   else
1094                         continue;
1095         }
1096
1097         prevpage = bt->page_no;
1098         prevmode = mode;
1099
1100         //  find key on page at this level
1101         //  and descend to requested level
1102
1103         if( !bt->page->kill )
1104          if( slot = bt_findslot (bt, key, len) ) {
1105           if( drill == lvl )
1106                 return slot;
1107
1108           while( slotptr(bt->page, slot)->dead )
1109                 if( slot++ < bt->page->cnt )
1110                         continue;
1111                 else
1112                         goto slideright;
1113
1114           page_no = bt_getid(slotptr(bt->page, slot)->id);
1115           drill--;
1116           continue;
1117          }
1118
1119         //  or slide right into next page
1120
1121 slideright:
1122         page_no = bt_getid(bt->page->right);
1123
1124   } while( page_no );
1125
1126   // return error on end of right chain
1127
1128   bt->err = BTERR_struct;
1129   return 0;     // return error
1130 }
1131
1132 //      a fence key was deleted from a page
1133 //      push new fence value upwards
1134
1135 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1136 {
1137 unsigned char leftkey[256], rightkey[256];
1138 BtKey ptr;
1139
1140         // remove deleted key, the old fence value
1141
1142         ptr = keyptr(bt->page, bt->page->cnt);
1143         memcpy(rightkey, ptr, ptr->len + 1);
1144
1145         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1146         bt->page->dirty = 1;
1147
1148         ptr = keyptr(bt->page, bt->page->cnt);
1149         memcpy(leftkey, ptr, ptr->len + 1);
1150
1151         if( bt_update (bt, bt->page, page_no) )
1152                 return bt->err;
1153
1154         if( bt_lockpage (bt, page_no, BtLockParent) )
1155                 return bt->err;
1156
1157         if( bt_unlockpage (bt, page_no, BtLockWrite) )
1158                 return bt->err;
1159
1160         //  insert new (now smaller) fence key
1161
1162         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1163                 return bt->err;
1164
1165         //  remove old (larger) fence key
1166
1167         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1168                 return bt->err;
1169
1170         return bt_unlockpage (bt, page_no, BtLockParent);
1171 }
1172
1173 //      root has a single child
1174 //      collapse a level from the btree
1175 //      call with root locked in bt->page
1176
1177 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1178 {
1179 uid child;
1180 uint idx;
1181
1182         // find the child entry
1183         //      and promote to new root
1184
1185   do {
1186         for( idx = 0; idx++ < root->cnt; )
1187           if( !slotptr(root, idx)->dead )
1188                 break;
1189
1190         child = bt_getid (slotptr(root, idx)->id);
1191
1192         if( bt_lockpage (bt, child, BtLockDelete) )
1193                 return bt->err;
1194
1195         if( bt_lockpage (bt, child, BtLockWrite) )
1196                 return bt->err;
1197
1198         if( bt_mappage (bt, &bt->temp, child) )
1199                 return bt->err;
1200
1201         memcpy (root, bt->temp, bt->page_size);
1202
1203         if( bt_update (bt, root, ROOT_page) )
1204                 return bt->err;
1205
1206         if( bt_freepage (bt, child) )
1207                 return bt->err;
1208
1209   } while( root->lvl > 1 && root->act == 1 );
1210
1211   return bt_unlockpage (bt, ROOT_page, BtLockWrite);
1212 }
1213
1214 //  find and delete key on page by marking delete flag bit
1215 //  when page becomes empty, delete it
1216
1217 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1218 {
1219 unsigned char lowerkey[256], higherkey[256];
1220 uint slot, dirty = 0, idx, fence, found;
1221 uid page_no, right;
1222 BtKey ptr;
1223
1224         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1225                 ptr = keyptr(bt->page, slot);
1226         else
1227                 return bt->err;
1228
1229         // are we deleting a fence slot?
1230
1231         fence = slot == bt->page->cnt;
1232
1233         // if key is found delete it, otherwise ignore request
1234
1235         if( found = !keycmp (ptr, key, len) )
1236           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1237                 dirty = slotptr(bt->page,slot)->dead = 1;
1238                 bt->page->dirty = 1;
1239                 bt->page->act--;
1240
1241                 // collapse empty slots
1242
1243                 while( idx = bt->page->cnt - 1 )
1244                   if( slotptr(bt->page, idx)->dead ) {
1245                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1246                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1247                   } else
1248                         break;
1249           }
1250
1251         right = bt_getid(bt->page->right);
1252         page_no = bt->page_no;
1253
1254         //      did we delete a fence key in an upper level?
1255
1256         if( dirty && lvl && bt->page->act && fence )
1257           if( bt_fixfence (bt, page_no, lvl) )
1258                 return bt->err;
1259           else
1260                 return bt->found = found, 0;
1261
1262         //      is this a collapsed root?
1263
1264         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1265           if( bt_collapseroot (bt, bt->page) )
1266                 return bt->err;
1267           else
1268                 return bt->found = found, 0;
1269
1270         // return if page is not empty
1271
1272         if( bt->page->act ) {
1273           if( dirty && bt_update(bt, bt->page, page_no) )
1274                 return bt->err;
1275           if( bt_unlockpage(bt, page_no, BtLockWrite) )
1276                 return bt->err;
1277           return bt->found = found, 0;
1278         }
1279
1280         // cache copy of fence key
1281         //      in order to find parent
1282
1283         ptr = keyptr(bt->page, bt->page->cnt);
1284         memcpy(lowerkey, ptr, ptr->len + 1);
1285
1286         // obtain lock on right page
1287
1288         if ( bt_lockpage(bt, right, BtLockWrite) )
1289                 return bt->err;
1290
1291         if( bt_mappage (bt, &bt->temp, right) )
1292                 return bt->err;
1293
1294         if( bt->temp->kill )
1295                 return bt->err = BTERR_struct;
1296
1297         // pull contents of next page into current empty page 
1298
1299         memcpy (bt->page, bt->temp, bt->page_size);
1300
1301         //      cache copy of key to update
1302
1303         ptr = keyptr(bt->temp, bt->temp->cnt);
1304         memcpy(higherkey, ptr, ptr->len + 1);
1305
1306         //  Mark right page as deleted and point it to left page
1307         //      until we can post updates at higher level.
1308
1309         bt_putid(bt->temp->right, page_no);
1310         bt->temp->kill = 1;
1311
1312         if( bt_update(bt, bt->page, page_no) )
1313                 return bt->err;
1314
1315         if( bt_update(bt, bt->temp, right) )
1316                 return bt->err;
1317
1318         if( bt_lockpage(bt, page_no, BtLockParent) )
1319                 return bt->err;
1320
1321         if( bt_unlockpage(bt, page_no, BtLockWrite) )
1322                 return bt->err;
1323
1324         if( bt_lockpage(bt, right, BtLockParent) )
1325                 return bt->err;
1326
1327         if( bt_unlockpage(bt, right, BtLockWrite) )
1328                 return bt->err;
1329
1330         //  redirect higher key directly to consolidated node
1331
1332         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1333                 return bt->err;
1334
1335         //  delete old lower key to consolidated node
1336
1337         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1338                 return bt->err;
1339
1340         //  obtain write & delete lock on deleted node
1341         //      add right block to free chain
1342
1343         if( bt_lockpage(bt, right, BtLockDelete) )
1344                 return bt->err;
1345
1346         if( bt_lockpage(bt, right, BtLockWrite) )
1347                 return bt->err;
1348
1349         if( bt_freepage (bt, right) )
1350                 return bt->err;
1351
1352         //      remove ParentModify locks
1353
1354         if( bt_unlockpage(bt, right, BtLockParent) )
1355                 return bt->err;
1356
1357         return bt_unlockpage(bt, page_no, BtLockParent);
1358 }
1359
1360 //      find key in leaf level and return row-id
1361
1362 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1363 {
1364 uint  slot;
1365 BtKey ptr;
1366 uid id;
1367
1368         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1369                 ptr = keyptr(bt->page, slot);
1370         else
1371                 return 0;
1372
1373         // if key exists, return row-id
1374         //      otherwise return 0
1375
1376         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1377                 id = bt_getid(slotptr(bt->page,slot)->id);
1378         else
1379                 id = 0;
1380
1381         if ( bt_unlockpage(bt, bt->page_no, BtLockRead) )
1382                 return 0;
1383
1384         return id;
1385 }
1386
1387 //      check page for space available,
1388 //      clean if necessary and return
1389 //      0 - page needs splitting
1390 //      >0 - go ahead with new slot
1391  
1392 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1393 {
1394 uint nxt = bt->page_size;
1395 BtPage page = bt->page;
1396 uint cnt = 0, idx = 0;
1397 uint max = page->cnt;
1398 uint newslot = slot;
1399 BtKey key;
1400 int ret;
1401
1402         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1403                 return slot;
1404
1405         //      skip cleanup if nothing to reclaim
1406
1407         if( !page->dirty )
1408                 return 0;
1409
1410         memcpy (bt->frame, page, bt->page_size);
1411
1412         // skip page info and set rest of page to zero
1413
1414         memset (page+1, 0, bt->page_size - sizeof(*page));
1415         page->act = 0;
1416
1417         while( cnt++ < max ) {
1418                 if( cnt == slot )
1419                         newslot = idx + 1;
1420                 // always leave fence key in list
1421                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1422                         continue;
1423
1424                 // copy key
1425                 key = keyptr(bt->frame, cnt);
1426                 nxt -= key->len + 1;
1427                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1428
1429                 // copy slot
1430                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1431                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1432                         page->act++;
1433                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1434                 slotptr(page, idx)->off = nxt;
1435         }
1436
1437         page->min = nxt;
1438         page->cnt = idx;
1439
1440         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1441                 return newslot;
1442
1443         return 0;
1444 }
1445
1446 // split the root and raise the height of the btree
1447
1448 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1449 {
1450 uint nxt = bt->page_size;
1451 BtPage root = bt->page;
1452 uid right;
1453
1454         //  Obtain an empty page to use, and copy the current
1455         //  root contents into it
1456
1457         if( !(right = bt_newpage(bt, root)) )
1458                 return bt->err;
1459
1460         // preserve the page info at the bottom
1461         // and set rest to zero
1462
1463         memset(root+1, 0, bt->page_size - sizeof(*root));
1464
1465         // insert first key on newroot page
1466
1467         nxt -= *leftkey + 1;
1468         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1469         bt_putid(slotptr(root, 1)->id, right);
1470         slotptr(root, 1)->off = nxt;
1471         
1472         // insert second key on newroot page
1473         // and increase the root height
1474
1475         nxt -= 3;
1476         ((unsigned char *)root)[nxt] = 2;
1477         ((unsigned char *)root)[nxt+1] = 0xff;
1478         ((unsigned char *)root)[nxt+2] = 0xff;
1479         bt_putid(slotptr(root, 2)->id, page_no2);
1480         slotptr(root, 2)->off = nxt;
1481
1482         bt_putid(root->right, 0);
1483         root->min = nxt;                // reset lowest used offset and key count
1484         root->cnt = 2;
1485         root->act = 2;
1486         root->lvl++;
1487
1488         // update and release root (bt->page)
1489
1490         if( bt_update(bt, root, bt->page_no) )
1491                 return bt->err;
1492
1493         return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1494 }
1495
1496 //  split already locked full node
1497 //      return unlocked.
1498
1499 BTERR bt_splitpage (BtDb *bt)
1500 {
1501 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1502 unsigned char fencekey[256], rightkey[256];
1503 uid page_no = bt->page_no, right;
1504 BtPage page = bt->page;
1505 uint lvl = page->lvl;
1506 BtKey key;
1507
1508         //  split higher half of keys to bt->frame
1509         //      the last key (fence key) might be dead
1510
1511         memset (bt->frame, 0, bt->page_size);
1512         max = page->cnt;
1513         cnt = max / 2;
1514         idx = 0;
1515
1516         while( cnt++ < max ) {
1517                 key = keyptr(page, cnt);
1518                 nxt -= key->len + 1;
1519                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1520                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1521                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1522                         bt->frame->act++;
1523                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1524                 slotptr(bt->frame, idx)->off = nxt;
1525         }
1526
1527         //      remember fence key for new right page
1528
1529         memcpy (rightkey, key, key->len + 1);
1530
1531         bt->frame->bits = bt->page_bits;
1532         bt->frame->min = nxt;
1533         bt->frame->cnt = idx;
1534         bt->frame->lvl = lvl;
1535
1536         // link right node
1537
1538         if( page_no > ROOT_page )
1539                 memcpy (bt->frame->right, page->right, BtId);
1540
1541         //      get new free page and write frame to it.
1542
1543         if( !(right = bt_newpage(bt, bt->frame)) )
1544                 return bt->err;
1545
1546         //      update lower keys to continue in old page
1547
1548         memcpy (bt->frame, page, bt->page_size);
1549         memset (page+1, 0, bt->page_size - sizeof(*page));
1550         nxt = bt->page_size;
1551         page->dirty = 0;
1552         page->act = 0;
1553         cnt = 0;
1554         idx = 0;
1555
1556         //  assemble page of smaller keys
1557         //      (they're all active keys)
1558
1559         while( cnt++ < max / 2 ) {
1560                 key = keyptr(bt->frame, cnt);
1561                 nxt -= key->len + 1;
1562                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1563                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1564                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1565                 slotptr(page, idx)->off = nxt;
1566                 page->act++;
1567         }
1568
1569         // remember fence key for smaller page
1570
1571         memcpy (fencekey, key, key->len + 1);
1572
1573         bt_putid(page->right, right);
1574         page->min = nxt;
1575         page->cnt = idx;
1576
1577         // if current page is the root page, split it
1578
1579         if( page_no == ROOT_page )
1580                 return bt_splitroot (bt, fencekey, right);
1581
1582         //      lock right page
1583
1584         if( bt_lockpage (bt, right, BtLockParent) )
1585                 return bt->err;
1586
1587         // update left (containing) node
1588
1589         if( bt_update(bt, page, page_no) )
1590                 return bt->err;
1591
1592         if( bt_lockpage (bt, page_no, BtLockParent) )
1593                 return bt->err;
1594
1595         // insert new fence for reformulated left block
1596
1597         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1598                 return bt->err;
1599
1600         //      switch fence for right block of larger keys to new right page
1601
1602         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1603                 return bt->err;
1604
1605         if( bt_unlockpage (bt, page_no, BtLockParent) )
1606                 return bt->err;
1607
1608         return bt_unlockpage (bt, right, BtLockParent);
1609 }
1610
1611 //  Insert new key into the btree at requested level.
1612 //  Level zero pages are leaf pages and are unlocked at exit.
1613 //      Interior nodes remain locked.
1614
1615 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1616 {
1617 uint slot, idx;
1618 BtPage page;
1619 BtKey ptr;
1620
1621   while( 1 ) {
1622         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1623                 ptr = keyptr(bt->page, slot);
1624         else
1625         {
1626                 if ( !bt->err )
1627                         bt->err = BTERR_ovflw;
1628                 return bt->err;
1629         }
1630
1631         // if key already exists, update id and return
1632
1633         page = bt->page;
1634
1635         if( !keycmp (ptr, key, len) ) {
1636           if( slotptr(page, slot)->dead )
1637                 page->act++;
1638           slotptr(page, slot)->dead = 0;
1639           slotptr(page, slot)->tod = tod;
1640           bt_putid(slotptr(page,slot)->id, id);
1641           if ( bt_update(bt, bt->page, bt->page_no) )
1642                 return bt->err;
1643           return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1644         }
1645
1646         // check if page has enough space
1647
1648         if( slot = bt_cleanpage (bt, len, slot) )
1649                 break;
1650
1651         if( bt_splitpage (bt) )
1652                 return bt->err;
1653   }
1654
1655   // calculate next available slot and copy key into page
1656
1657   page->min -= len + 1; // reset lowest used offset
1658   ((unsigned char *)page)[page->min] = len;
1659   memcpy ((unsigned char *)page + page->min +1, key, len );
1660
1661   for( idx = slot; idx < page->cnt; idx++ )
1662         if( slotptr(page, idx)->dead )
1663                 break;
1664
1665   // now insert key into array before slot
1666   // preserving the fence slot
1667
1668   if( idx == page->cnt )
1669         idx++, page->cnt++;
1670
1671   page->act++;
1672
1673   while( idx > slot )
1674         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1675
1676   bt_putid(slotptr(page,slot)->id, id);
1677   slotptr(page, slot)->off = page->min;
1678   slotptr(page, slot)->tod = tod;
1679   slotptr(page, slot)->dead = 0;
1680
1681   if( bt_update(bt, bt->page, bt->page_no) )
1682           return bt->err;
1683
1684   return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1685 }
1686
1687 //  cache page of keys into cursor and return starting slot for given key
1688
1689 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1690 {
1691 uint slot;
1692
1693         // cache page for retrieval
1694
1695         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1696           memcpy (bt->cursor, bt->page, bt->page_size);
1697         else
1698           return 0;
1699
1700         bt->cursor_page = bt->page_no;
1701
1702         if ( bt_unlockpage(bt, bt->page_no, BtLockRead) )
1703           return 0;
1704
1705         return slot;
1706 }
1707
1708 //  return next slot for cursor page
1709 //  or slide cursor right into next page
1710
1711 uint bt_nextkey (BtDb *bt, uint slot)
1712 {
1713 off64_t right;
1714
1715   do {
1716         right = bt_getid(bt->cursor->right);
1717
1718         while( slot++ < bt->cursor->cnt )
1719           if( slotptr(bt->cursor,slot)->dead )
1720                 continue;
1721           else if( right || (slot < bt->cursor->cnt))
1722                 return slot;
1723           else
1724                 break;
1725
1726         if( !right )
1727                 break;
1728
1729         bt->cursor_page = right;
1730
1731     if( bt_lockpage(bt, right, BtLockRead) )
1732                 return 0;
1733
1734         if( bt_mappage (bt, &bt->page, right) )
1735                 return 0;
1736
1737         memcpy (bt->cursor, bt->page, bt->page_size);
1738
1739         if ( bt_unlockpage(bt, right, BtLockRead) )
1740                 return 0;
1741
1742         slot = 0;
1743
1744   } while( 1 );
1745
1746   return bt->err = 0;
1747 }
1748
1749 BtKey bt_key(BtDb *bt, uint slot)
1750 {
1751         return keyptr(bt->cursor, slot);
1752 }
1753
1754 uid bt_uid(BtDb *bt, uint slot)
1755 {
1756         return bt_getid(slotptr(bt->cursor,slot)->id);
1757 }
1758
1759 uint bt_tod(BtDb *bt, uint slot)
1760 {
1761         return slotptr(bt->cursor,slot)->tod;
1762 }
1763
1764
1765 #ifdef STANDALONE
1766 //  standalone program to index file of keys
1767 //  then list them onto std-out
1768
1769 int main (int argc, char **argv)
1770 {
1771 uint slot, line = 0, off = 0, found = 0;
1772 int dead, ch, cnt = 0, bits = 12;
1773 unsigned char key[256];
1774 clock_t done, start;
1775 uint pgblk = 0;
1776 time_t tod[1];
1777 uint scan = 0;
1778 uint len = 0;
1779 uint map = 0;
1780 BtKey ptr;
1781 BtDb *bt;
1782 FILE *in;
1783
1784         if( argc < 4 ) {
1785                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_segments pages_per_segment start_line_number]\n", argv[0]);
1786                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
1787                 fprintf (stderr, "  mapped_pool_segments: size of buffer pool in segments\n");
1788                 fprintf (stderr, "  pages_per_segment: size of buffer pool segment in pages in bits\n");
1789                 exit(0);
1790         }
1791
1792         start = clock();
1793         time(tod);
1794
1795         if( argc > 4 )
1796                 bits = atoi(argv[4]);
1797
1798         if( argc > 5 )
1799                 map = atoi(argv[5]);
1800
1801         if( map > 65536 )
1802                 fprintf (stderr, "Warning: buffer_pool > 65536 segments\n");
1803
1804         if( map && map < 8 )
1805                 fprintf (stderr, "Buffer_pool too small\n");
1806
1807         if( argc > 6 )
1808                 pgblk = atoi(argv[6]);
1809
1810         if( bits + pgblk > 30 )
1811                 fprintf (stderr, "Warning: very large buffer pool segment size\n");
1812
1813         if( argc > 7 )
1814                 off = atoi(argv[7]);
1815
1816         bt = bt_open ((argv[1]), BT_rw, bits, map, pgblk);
1817
1818         if( !bt ) {
1819                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
1820                 exit (1);
1821         }
1822
1823         switch(argv[3][0]| 0x20)
1824         {
1825         case 'w':
1826                 fprintf(stderr, "started indexing for %s\n", argv[2]);
1827                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
1828                   while( ch = getc(in), ch != EOF )
1829                         if( ch == '\n' )
1830                         {
1831                           if( off )
1832                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
1833
1834                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
1835                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
1836                           len = 0;
1837                         }
1838                         else if( len < 245 )
1839                                 key[len++] = ch;
1840                 fprintf(stderr, "finished adding keys, %d \n", line);
1841                 break;
1842
1843         case 'd':
1844                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
1845                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
1846                   while( ch = getc(in), ch != EOF )
1847                         if( ch == '\n' )
1848                         {
1849                           if( off )
1850                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
1851                           line++;
1852                           if( bt_deletekey (bt, key, len, 0) )
1853                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
1854                           len = 0;
1855                         }
1856                         else if( len < 245 )
1857                                 key[len++] = ch;
1858                 fprintf(stderr, "finished deleting keys, %d \n", line);
1859                 break;
1860
1861         case 'f':
1862                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
1863                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
1864                   while( ch = getc(in), ch != EOF )
1865                         if( ch == '\n' )
1866                         {
1867                           if( off )
1868                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
1869                           line++;
1870                           if( bt_findkey (bt, key, len) )
1871                                 found++;
1872                           else if( bt->err )
1873                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
1874                           len = 0;
1875                         }
1876                         else if( len < 245 )
1877                                 key[len++] = ch;
1878                 fprintf(stderr, "finished search of %d keys, found %d\n", line, found);
1879                 break;
1880
1881         case 's':
1882                 scan++;
1883                 break;
1884
1885         }
1886
1887         done = clock();
1888         fprintf(stderr, " Time to complete: %.2f seconds\n", (float)(done - start) / CLOCKS_PER_SEC);
1889
1890         dead = cnt = 0;
1891         len = key[0] = 0;
1892
1893         fprintf(stderr, "started reading\n");
1894
1895         if( slot = bt_startkey (bt, key, len) )
1896           slot--;
1897         else
1898           fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
1899
1900         while( slot = bt_nextkey (bt, slot) )
1901           if( cnt++, scan ) {
1902                         ptr = bt_key(bt, slot);
1903                         fwrite (ptr->key, ptr->len, 1, stdout);
1904                         fputc ('\n', stdout);
1905           }
1906
1907         fprintf(stderr, " Total keys read %d\n", cnt);
1908         return 0;
1909 }
1910
1911 #endif  //STANDALONE