]> pd.if.org Git - btree/blob - btree2q.c
Merge branch 'master' of https://github.com/malbrain/Btree-source-code
[btree] / btree2q.c
1 // btree version 2p
2 // 26 APR 2013
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <time.h>
35 #include <fcntl.h>
36 #include <sys/mman.h>
37 #include <errno.h>
38 #else
39 #define WIN32_LEAN_AND_MEAN
40 #include <windows.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <time.h>
44 #include <fcntl.h>
45 #endif
46
47 #include <memory.h>
48 #include <string.h>
49
50 typedef unsigned long long      uid;
51
52 #ifndef unix
53 typedef unsigned long long      off64_t;
54 typedef unsigned short          ushort;
55 typedef unsigned int            uint;
56 #endif
57
58 #define BT_ro 0x6f72    // ro
59 #define BT_rw 0x7772    // rw
60 #define BT_fl 0x6c66    // fl
61
62 #define BT_maxbits              24                                      // maximum page size in bits
63 #define BT_minbits              9                                       // minimum page size in bits
64 #define BT_minpage              (1 << BT_minbits)       // minimum page size
65
66 /*
67 There are five lock types for each node in three independent sets: 
68 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
69 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
70 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
71 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
72 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
73 */
74
75 typedef enum{
76         BtLockAccess,
77         BtLockDelete,
78         BtLockRead,
79         BtLockWrite,
80         BtLockParent
81 }BtLock;
82
83 //      Define the length of the page and key pointers
84
85 #define BtId 6
86
87 //      Page key slot definition.
88
89 //      If BT_maxbits is 15 or less, you can save 2 bytes
90 //      for each key stored by making the first two uints
91 //      into ushorts.  You can also save 4 bytes by removing
92 //      the tod field from the key.
93
94 //      Keys are marked dead, but remain on the page until
95 //      cleanup is called. The fence key (highest key) for
96 //      the page is always present, even if dead.
97
98 typedef struct {
99         uint off:BT_maxbits;            // page offset for key start
100         uint dead:1;                            // set for deleted key
101         uint tod;                                       // time-stamp for key
102         unsigned char id[BtId];         // id associated with key
103 } BtSlot;
104
105 //      The key structure occupies space at the upper end of
106 //      each page.  It's a length byte followed by the value
107 //      bytes.
108
109 typedef struct {
110         unsigned char len;
111         unsigned char key[0];
112 } *BtKey;
113
114 //      The first part of an index page.
115 //      It is immediately followed
116 //      by the BtSlot array of keys.
117
118 typedef struct {
119         uint cnt;                                       // count of keys in page
120         uint act;                                       // count of active keys
121         uint min;                                       // next key offset
122         unsigned char bits;                     // page size in bits
123         unsigned char lvl:7;            // level of page
124         unsigned char kill:1;           // page is being deleted
125         unsigned char right[BtId];      // page number to right
126 } *BtPage;
127
128 //      The memory mapping hash table entry
129
130 typedef struct {
131         BtPage page;            // mapped page pointer
132         uid  page_no;           // mapped page number
133         void *lruprev;          // least recently used previous cache block
134         void *lrunext;          // lru next cache block
135         void *hashprev;         // previous cache block for the same hash idx
136         void *hashnext;         // next cache block for the same hash idx
137 #ifndef unix
138         HANDLE hmap;
139 #endif
140 }BtHash;
141
142 //      The object structure for Btree access
143
144 typedef struct _BtDb {
145         uint page_size;         // each page size       
146         uint page_bits;         // each page size in bits       
147         uint seg_bits;          // segment size in pages in bits
148         uid page_no;            // current page number  
149         uid cursor_page;        // current cursor page number   
150         int  err;
151         uint mode;                      // read-write mode
152         uint mapped_io;         // use memory mapping
153         BtPage temp;            // temporary frame buffer (memory mapped/file IO)
154         BtPage alloc;           // frame buffer for alloc page ( page 0 )
155         BtPage cursor;          // cached frame for start/next (never mapped)
156         BtPage frame;           // spare frame for the page split (never mapped)
157         BtPage zero;            // zeroes frame buffer (never mapped)
158         BtPage page;            // current page
159 #ifdef unix
160         int idx;
161 #else
162         HANDLE idx;
163 #endif
164         unsigned char *mem;     // frame, cursor, page memory buffer
165         int nodecnt;            // highest page cache segment in use
166         int nodemax;            // highest page cache segment allocated
167         int hashmask;           // number of pages in segments - 1
168         int hashsize;           // size of hash table
169         BtHash *lrufirst;       // lru list head
170         BtHash *lrulast;        // lru list tail
171         ushort *cache;          // hash table for cached segments
172         BtHash nodes[1];        // segment cache follows
173 } BtDb;
174
175 typedef enum {
176 BTERR_ok = 0,
177 BTERR_struct,
178 BTERR_ovflw,
179 BTERR_lock,
180 BTERR_map,
181 BTERR_wrt,
182 BTERR_hash
183 } BTERR;
184
185 // B-Tree functions
186 extern void bt_close (BtDb *bt);
187 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk, uint pgblk);
188 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
189 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
190 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
191 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
192 extern uint bt_nextkey   (BtDb *bt, uint slot);
193
194 //  Helper functions to return slot values
195
196 extern BtKey bt_key (BtDb *bt, uint slot);
197 extern uid bt_uid (BtDb *bt, uint slot);
198 extern uint bt_tod (BtDb *bt, uint slot);
199
200 //  BTree page number constants
201 #define ALLOC_page              0
202 #define ROOT_page               1
203
204 //      Number of levels to create in a new BTree
205
206 #define MIN_lvl                 2
207
208 //  The page is allocated from low and hi ends.
209 //  The key offsets and row-id's are allocated
210 //  from the bottom, while the text of the key
211 //  is allocated from the top.  When the two
212 //  areas meet, the page is split into two.
213
214 //  A key consists of a length byte, two bytes of
215 //  index number (0 - 65534), and up to 253 bytes
216 //  of key value.  Duplicate keys are discarded.
217 //  Associated with each key is a 48 bit row-id.
218
219 //  The b-tree root is always located at page 1.
220 //      The first leaf page of level zero is always
221 //      located on page 2.
222
223 //      The b-tree pages are linked with right
224 //      pointers to facilitate enumerators,
225 //      and provide for concurrency.
226
227 //      When to root page fills, it is split in two and
228 //      the tree height is raised by a new root at page
229 //      one with two keys.
230
231 //      Deleted keys are marked with a dead bit until
232 //      page cleanup The fence key for a node is always
233 //      present, even after deletion and cleanup.
234
235 //  Groups of pages from the btree are optionally
236 //  cached with memory mapping. A hash table is used to keep
237 //  track of the cached pages.  This behaviour is controlled
238 //  by the number of cache blocks parameter and pages per block
239 //      given to bt_open.
240
241 //  To achieve maximum concurrency one page is locked at a time
242 //  as the tree is traversed to find leaf key in question. The right
243 //  page numbers are used in cases where the page is being split,
244 //      or consolidated.
245
246 //  Page 0 is dedicated to lock for new page extensions,
247 //      and chains empty pages together for reuse.
248
249 //      Parent locks are obtained to prevent resplitting or deleting a node
250 //      before its fence is posted into its upper level.
251
252 //      Empty nodes are chained together through the ALLOC page and reused.
253
254 //      A special open mode of BT_fl is provided to safely access files on
255 //      WIN32 networks. WIN32 network operations should not use memory mapping.
256 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
257 //      to prevent local caching of network file contents.
258
259 //      Access macros to address slot and key values from the page.
260 //      Page slots use 1 based indexing.
261
262 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
263 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
264
265 void bt_putid(unsigned char *dest, uid id)
266 {
267 int i = BtId;
268
269         while( i-- )
270                 dest[i] = (unsigned char)id, id >>= 8;
271 }
272
273 uid bt_getid(unsigned char *src)
274 {
275 uid id = 0;
276 int i;
277
278         for( i = 0; i < BtId; i++ )
279                 id <<= 8, id |= *src++; 
280
281         return id;
282 }
283
284 // place write, read, or parent lock on requested page_no.
285
286 BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode)
287 {
288 off64_t off = page_no << bt->page_bits;
289 #ifdef unix
290 int flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
291 struct flock lock[1];
292 #else
293 uint flags = 0, len;
294 OVERLAPPED ovl[1];
295 #endif
296
297         if( mode == BtLockRead || mode == BtLockWrite )
298                 off +=  sizeof(*bt->page);      // use second segment
299
300         if( mode == BtLockParent )
301                 off +=  2 * sizeof(*bt->page);  // use third segment
302
303 #ifdef unix
304         memset (lock, 0, sizeof(lock));
305
306         lock->l_start = off;
307         lock->l_type = (mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent) ? F_WRLCK : F_RDLCK;
308         lock->l_len = sizeof(*bt->page);
309         lock->l_whence = 0;
310
311         if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
312                 return bt->err = BTERR_lock;
313
314         return 0;
315 #else
316         memset (ovl, 0, sizeof(ovl));
317         ovl->OffsetHigh = (uint)(off >> 32);
318         ovl->Offset = (uint)off;
319         len = sizeof(*bt->page);
320
321         //      use large offsets to
322         //      simulate advisory locking
323
324         ovl->OffsetHigh |= 0x80000000;
325
326         if( mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent )
327                 flags |= LOCKFILE_EXCLUSIVE_LOCK;
328
329         if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) )
330                 return bt->err = 0;
331
332         return bt->err = BTERR_lock;
333 #endif 
334 }
335
336 // remove write, read, or parent lock on requested page_no.
337
338 BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
339 {
340 off64_t off = page_no << bt->page_bits;
341 #ifdef unix
342 struct flock lock[1];
343 #else
344 OVERLAPPED ovl[1];
345 uint len;
346 #endif
347
348         if( mode == BtLockRead || mode == BtLockWrite )
349                 off +=  sizeof(*bt->page);      // use second segment
350
351         if( mode == BtLockParent )
352                 off +=  2 * sizeof(*bt->page);  // use third segment
353
354 #ifdef unix
355         memset (lock, 0, sizeof(lock));
356
357         lock->l_start = off;
358         lock->l_type = F_UNLCK;
359         lock->l_len = sizeof(*bt->page);
360         lock->l_whence = 0;
361
362         if( fcntl (bt->idx, F_SETLK, lock) < 0 )
363                 return bt->err = BTERR_lock;
364 #else
365         memset (ovl, 0, sizeof(ovl));
366         ovl->OffsetHigh = (uint)(off >> 32);
367         ovl->Offset = (uint)off;
368         len = sizeof(*bt->page);
369
370         //      use large offsets to
371         //      simulate advisory locking
372
373         ovl->OffsetHigh |= 0x80000000;
374
375         if( !UnlockFileEx (bt->idx, 0, len, 0, ovl) )
376                 return GetLastError(), bt->err = BTERR_lock;
377 #endif
378
379         return bt->err = 0;
380 }
381
382 //      close and release memory
383
384 void bt_close (BtDb *bt)
385 {
386 BtHash *hash;
387 #ifdef unix
388         // release mapped pages
389
390         if( hash = bt->lrufirst )
391                 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
392                 while(hash = hash->lrunext);
393
394         if ( bt->mem )
395                 free (bt->mem);
396         close (bt->idx);
397         free (bt->cache);
398         free (bt);
399 #else
400         if( hash = bt->lrufirst )
401           do
402           {
403                 FlushViewOfFile(hash->page, 0);
404                 UnmapViewOfFile(hash->page);
405                 CloseHandle(hash->hmap);
406           } while(hash = hash->lrunext);
407
408         if ( bt->mem)
409                 VirtualFree (bt->mem, 0, MEM_RELEASE);
410         FlushFileBuffers(bt->idx);
411         CloseHandle(bt->idx);
412         GlobalFree (bt->cache);
413         GlobalFree (bt);
414 #endif
415 }
416
417 //  open/create new btree
418 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
419 //              size of mapped page cache (e.g. 8192) or zero for no mapping.
420
421 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax, uint pgblk)
422 {
423 uint lvl, attr, cacheblk, last;
424 BtLock lockmode = BtLockWrite;
425 BtPage alloc;
426 off64_t size;
427 uint amt[1];
428 BtKey key;
429 BtDb* bt;
430
431 #ifndef unix
432 SYSTEM_INFO sysinfo[1];
433 #endif
434
435 #ifdef unix
436         bt = malloc (sizeof(BtDb) + nodemax * sizeof(BtHash));
437         memset (bt, 0, sizeof(BtDb));
438
439         switch (mode & 0x7fff)
440         {
441         case BT_fl:
442         case BT_rw:
443                 bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
444                 break;
445
446         case BT_ro:
447         default:
448                 bt->idx = open ((char*)name, O_RDONLY);
449                 lockmode = BtLockRead;
450                 break;
451         }
452         if( bt->idx == -1 )
453                 return free(bt), NULL;
454         
455         if( nodemax )
456                 cacheblk = 4096;        // page size for unix
457         else
458                 cacheblk = 0;
459
460 #else
461         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb) + nodemax * sizeof(BtHash));
462         attr = FILE_ATTRIBUTE_NORMAL;
463         switch (mode & 0x7fff)
464         {
465         case BT_fl:
466                 attr |= FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING;
467
468         case BT_rw:
469                 bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
470                 break;
471
472         case BT_ro:
473         default:
474                 bt->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
475                 lockmode = BtLockRead;
476                 break;
477         }
478         if( bt->idx == INVALID_HANDLE_VALUE )
479                 return GlobalFree(bt), NULL;
480
481         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
482         GetSystemInfo(sysinfo);
483
484         if( nodemax )
485                 cacheblk = sysinfo->dwAllocationGranularity;
486         else
487                 cacheblk = 0;
488 #endif
489
490         // determine sanity of page size
491
492         if( bits > BT_maxbits )
493                 bits = BT_maxbits;
494         else if( bits < BT_minbits )
495                 bits = BT_minbits;
496
497         if ( bt_lockpage(bt, ALLOC_page, lockmode) )
498                 return bt_close (bt), NULL;
499
500 #ifdef unix
501         *amt = 0;
502
503         // read minimum page size to get root info
504
505         if( size = lseek (bt->idx, 0L, 2) ) {
506                 alloc = malloc (BT_minpage);
507                 pread(bt->idx, alloc, BT_minpage, 0);
508                 bits = alloc->bits;
509                 free (alloc);
510         } else if( mode == BT_ro )
511                 return bt_close (bt), NULL;
512 #else
513         size = GetFileSize(bt->idx, amt);
514
515         if( size || *amt ) {
516                 alloc = VirtualAlloc(NULL, BT_minpage, MEM_COMMIT, PAGE_READWRITE);
517                 if( !ReadFile(bt->idx, (char *)alloc, BT_minpage, amt, NULL) )
518                         return bt_close (bt), NULL;
519                 bits = alloc->bits;
520                 VirtualFree (alloc, 0, MEM_RELEASE);
521         } else if( mode == BT_ro )
522                 return bt_close (bt), NULL;
523 #endif
524
525         bt->page_size = 1 << bits;
526         bt->page_bits = bits;
527
528         bt->nodemax = nodemax;
529         bt->mode = mode;
530
531         // setup cache mapping
532
533         if( cacheblk ) {
534                 if( cacheblk < bt->page_size )
535                         cacheblk = bt->page_size;
536
537                 bt->hashsize = nodemax / 8;
538                 bt->hashmask = (cacheblk >> bits) - 1;
539                 bt->mapped_io = 1;
540         }
541
542         //      requested number of pages per memmap segment
543
544         if( cacheblk )
545           if( (1 << pgblk) > bt->hashmask )
546                 bt->hashmask = (1 << pgblk) - 1;
547
548         bt->seg_bits = 0;
549
550         while( (1 << bt->seg_bits) <= bt->hashmask )
551                 bt->seg_bits++;
552
553 #ifdef unix
554         bt->mem = malloc (6 *bt->page_size);
555         bt->cache = calloc (bt->hashsize, sizeof(ushort));
556 #else
557         bt->mem = VirtualAlloc(NULL, 6 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
558         bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, bt->hashsize * sizeof(ushort));
559 #endif
560         bt->frame = (BtPage)bt->mem;
561         bt->cursor = (BtPage)(bt->mem + bt->page_size);
562         bt->page = (BtPage)(bt->mem + 2 * bt->page_size);
563         bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size);
564         bt->temp = (BtPage)(bt->mem + 4 * bt->page_size);
565         bt->zero = (BtPage)(bt->mem + 5 * bt->page_size);
566
567         if( size || *amt ) {
568                 if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
569                         return bt_close (bt), NULL;
570
571                 return bt;
572         }
573
574         // initializes an empty b-tree with root page and page of leaves
575
576         memset (bt->alloc, 0, bt->page_size);
577         bt_putid(bt->alloc->right, MIN_lvl+1);
578         bt->alloc->bits = bt->page_bits;
579
580 #ifdef unix
581         if( write (bt->idx, bt->alloc, bt->page_size) < bt->page_size )
582                 return bt_close (bt), NULL;
583 #else
584         if( !WriteFile (bt->idx, (char *)bt->alloc, bt->page_size, amt, NULL) )
585                 return bt_close (bt), NULL;
586
587         if( *amt < bt->page_size )
588                 return bt_close (bt), NULL;
589 #endif
590
591         memset (bt->frame, 0, bt->page_size);
592         bt->frame->bits = bt->page_bits;
593
594         for( lvl=MIN_lvl; lvl--; ) {
595                 slotptr(bt->frame, 1)->off = bt->page_size - 3;
596                 bt_putid(slotptr(bt->frame, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);               // next(lower) page number
597                 key = keyptr(bt->frame, 1);
598                 key->len = 2;                   // create stopper key
599                 key->key[0] = 0xff;
600                 key->key[1] = 0xff;
601                 bt->frame->min = bt->page_size - 3;
602                 bt->frame->lvl = lvl;
603                 bt->frame->cnt = 1;
604                 bt->frame->act = 1;
605 #ifdef unix
606                 if( write (bt->idx, bt->frame, bt->page_size) < bt->page_size )
607                         return bt_close (bt), NULL;
608 #else
609                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
610                         return bt_close (bt), NULL;
611
612                 if( *amt < bt->page_size )
613                         return bt_close (bt), NULL;
614 #endif
615         }
616
617         // create empty page area by writing last page of first
618         // cache area (other pages are zeroed by O/S)
619
620         if( bt->mapped_io && bt->hashmask ) {
621                 memset(bt->frame, 0, bt->page_size);
622                 last = bt->hashmask;
623
624                 while( last < MIN_lvl + 1 )
625                         last += bt->hashmask + 1;
626 #ifdef unix
627                 pwrite(bt->idx, bt->frame, bt->page_size, last << bt->page_bits);
628 #else
629                 SetFilePointer (bt->idx, last << bt->page_bits, NULL, FILE_BEGIN);
630                 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
631                         return bt_close (bt), NULL;
632                 if( *amt < bt->page_size )
633                         return bt_close (bt), NULL;
634 #endif
635         }
636
637         if( bt_unlockpage(bt, ALLOC_page, lockmode) )
638                 return bt_close (bt), NULL;
639
640         return bt;
641 }
642
643 //  compare two keys, returning > 0, = 0, or < 0
644 //  as the comparison value
645
646 int keycmp (BtKey key1, unsigned char *key2, uint len2)
647 {
648 uint len1 = key1->len;
649 int ans;
650
651         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
652                 return ans;
653
654         if( len1 > len2 )
655                 return 1;
656         if( len1 < len2 )
657                 return -1;
658
659         return 0;
660 }
661
662 //  Update current page of btree by writing file contents
663 //      or flushing mapped area to disk.
664
665 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
666 {
667 off64_t off = page_no << bt->page_bits;
668
669 #ifdef unix
670     if ( !bt->mapped_io )
671          if ( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
672                  return bt->err = BTERR_wrt;
673 #else
674 uint amt[1];
675         if ( !bt->mapped_io )
676         {
677                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
678                 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
679                         return GetLastError(), bt->err = BTERR_wrt;
680
681                 if( *amt < bt->page_size )
682                         return GetLastError(), bt->err = BTERR_wrt;
683         } 
684         else if ( bt->mode == BT_fl ) {
685                         FlushViewOfFile(page, bt->page_size);
686                         FlushFileBuffers(bt->idx);
687         }
688 #endif
689         return 0;
690 }
691
692 // find page in cache 
693
694 BtHash *bt_findhash(BtDb *bt, uid page_no)
695 {
696 BtHash *hash;
697 uint idx;
698
699         // compute cache block first page and hash idx 
700
701         page_no &= ~bt->hashmask;
702         idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
703
704         if( bt->cache[idx] ) 
705                 hash = bt->nodes + bt->cache[idx];
706         else
707                 return NULL;
708
709         do if( hash->page_no == page_no )
710                  break;
711         while(hash = hash->hashnext );
712
713         return hash;
714 }
715
716 // add page cache entry to hash index
717
718 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
719 {
720 uint idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
721 BtHash *hash;
722
723         if( bt->cache[idx] ) {
724                 node->hashnext = hash = bt->nodes + bt->cache[idx];
725                 hash->hashprev = node;
726         }
727
728         node->hashprev = NULL;
729         bt->cache[idx] = (ushort)(node - bt->nodes);
730 }
731
732 // remove cache entry from hash table
733
734 void bt_unlinkhash(BtDb *bt, BtHash *node)
735 {
736 uint idx = (uint)(node->page_no >> bt->seg_bits) % bt->hashsize;
737 BtHash *hash;
738
739         // unlink node
740         if( hash = node->hashprev )
741                 hash->hashnext = node->hashnext;
742         else if( hash = node->hashnext )
743                 bt->cache[idx] = (ushort)(hash - bt->nodes);
744         else
745                 bt->cache[idx] = 0;
746
747         if( hash = node->hashnext )
748                 hash->hashprev = node->hashprev;
749 }
750
751 // add cache page to lru chain and map pages
752
753 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
754 {
755 int flag;
756 off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
757 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
758 BtHash *node;
759
760         memset(hash, 0, sizeof(BtHash));
761         hash->page_no = (page_no & ~bt->hashmask);
762         bt_linkhash(bt, hash, page_no);
763
764         if( node = hash->lrunext = bt->lrufirst )
765                 node->lruprev = hash;
766         else
767                 bt->lrulast = hash;
768
769         bt->lrufirst = hash;
770
771 #ifdef unix
772         flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
773         hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
774         if( hash->page == MAP_FAILED )
775                 return bt->err = BTERR_map, (BtPage)NULL;
776
777 #else
778         flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
779         hash->hmap = CreateFileMapping(bt->idx, NULL, flag,     (DWORD)(limit >> 32), (DWORD)limit, NULL);
780         if( !hash->hmap )
781                 return bt->err = BTERR_map, NULL;
782
783         flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
784         hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
785         if( !hash->page )
786                 return bt->err = BTERR_map, NULL;
787 #endif
788
789         return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
790 }
791
792 //      find or place requested page in page-cache
793 //      return memory address where page is located.
794
795 BtPage bt_hashpage(BtDb *bt, uid page_no)
796 {
797 BtHash *hash, *node, *next;
798 BtPage page;
799
800         // find page in cache and move to top of lru list  
801
802         if( hash = bt_findhash(bt, page_no) ) {
803                 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
804                 // swap node in lru list
805                 if( node = hash->lruprev ) {
806                         if( next = node->lrunext = hash->lrunext )
807                                 next->lruprev = node;
808                         else
809                                 bt->lrulast = node;
810
811                         if( next = hash->lrunext = bt->lrufirst )
812                                 next->lruprev = hash;
813                         else
814                                 return bt->err = BTERR_hash, (BtPage)NULL;
815
816                         hash->lruprev = NULL;
817                         bt->lrufirst = hash;
818                 }
819                 return page;
820         }
821
822         // map pages and add to cache entry
823
824         if( bt->nodecnt < bt->nodemax ) {
825                 hash = bt->nodes + ++bt->nodecnt;
826                 return bt_linklru(bt, hash, page_no);
827         }
828
829         // hash table is already full, replace last lru entry from the cache
830
831         if( hash = bt->lrulast ) {
832                 // unlink from lru list
833                 if( node = bt->lrulast = hash->lruprev )
834                         node->lrunext = NULL;
835                 else
836                         return bt->err = BTERR_hash, (BtPage)NULL;
837
838 #ifdef unix
839                 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
840 #else
841                 FlushViewOfFile(hash->page, 0);
842                 UnmapViewOfFile(hash->page);
843                 CloseHandle(hash->hmap);
844 #endif
845                 // unlink from hash table
846
847                 bt_unlinkhash(bt, hash);
848
849                 // map and add to cache
850
851                 return bt_linklru(bt, hash, page_no);
852         }
853
854         return bt->err = BTERR_hash, (BtPage)NULL;
855 }
856
857 //  map a btree page onto current page
858
859 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
860 {
861 off64_t off = page_no << bt->page_bits;
862 #ifndef unix
863 int amt[1];
864 #endif
865         
866         if( bt->mapped_io ) {
867                 bt->err = 0;
868                 *page = bt_hashpage(bt, page_no);
869                 return bt->err;
870         }
871 #ifdef unix
872         if ( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
873                 return bt->err = BTERR_map;
874 #else
875         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
876
877         if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
878                 return bt->err = BTERR_map;
879
880         if( *amt <  bt->page_size )
881                 return bt->err = BTERR_map;
882 #endif
883         return 0;
884 }
885
886 //      deallocate a deleted page 
887 //      place on free chain out of allocator page
888
889 BTERR bt_freepage(BtDb *bt, uid page_no)
890 {
891         //  obtain delete lock on deleted node
892
893         if( bt_lockpage(bt, page_no, BtLockDelete) )
894                 return bt->err;
895
896         //  obtain write lock on deleted node
897
898         if( bt_lockpage(bt, page_no, BtLockWrite) )
899                 return bt->err;
900
901         if( bt_mappage (bt, &bt->temp, page_no) )
902                 return bt->err;
903
904         //      lock allocation page
905
906         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite) )
907                 return bt->err;
908
909         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
910                 return bt->err;
911
912         //      store chain in second right
913         bt_putid(bt->temp->right, bt_getid(bt->alloc[1].right));
914         bt_putid(bt->alloc[1].right, page_no);
915
916         if( bt_update(bt, bt->alloc, ALLOC_page) )
917                 return bt->err;
918         if( bt_update(bt, bt->temp, page_no) )
919                 return bt->err;
920
921         // unlock page zero 
922
923         if( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
924                 return bt->err;
925
926         //  remove write lock on deleted node
927
928         if( bt_unlockpage(bt, page_no, BtLockWrite) )
929                 return bt->err;
930
931         //  remove delete lock on deleted node
932
933         if( bt_unlockpage(bt, page_no, BtLockDelete) )
934                 return bt->err;
935
936         return 0;
937 }
938
939 //      allocate a new page and write page into it
940
941 uid bt_newpage(BtDb *bt, BtPage page)
942 {
943 uid new_page;
944 char *pmap;
945 int reuse;
946
947         // lock page zero
948
949         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite) )
950                 return 0;
951
952         if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
953                 return 0;
954
955         // use empty chain first
956         // else allocate empty page
957
958         if( new_page = bt_getid(bt->alloc[1].right) ) {
959                 if( bt_mappage (bt, &bt->temp, new_page) )
960                         return 0;       // don't unlock on error
961                 bt_putid(bt->alloc[1].right, bt_getid(bt->temp->right));
962                 reuse = 1;
963         } else {
964                 new_page = bt_getid(bt->alloc->right);
965                 bt_putid(bt->alloc->right, new_page+1);
966                 reuse = 0;
967         }
968
969         if( bt_update(bt, bt->alloc, ALLOC_page) )
970                 return 0;       // don't unlock on error
971
972         if( !bt->mapped_io ) {
973                 if( bt_update(bt, page, new_page) )
974                         return 0;       //don't unlock on error
975
976                 // unlock page zero 
977
978                 if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
979                         return 0;
980
981                 return new_page;
982         }
983
984 #ifdef unix
985         if ( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
986                 return bt->err = BTERR_wrt, 0;
987
988         // if writing first page of hash block, zero last page in the block
989
990         if ( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
991         {
992                 // use temp buffer to write zeros
993                 memset(bt->zero, 0, bt->page_size);
994                 if ( pwrite(bt->idx,bt->zero, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
995                         return bt->err = BTERR_wrt, 0;
996         }
997 #else
998         //      bring new page into page-cache and copy page.
999         //      this will extend the file into the new pages.
1000
1001         if( !(pmap = (char*)bt_hashpage(bt, new_page & ~bt->hashmask)) )
1002                 return 0;
1003
1004         memcpy(pmap+((new_page & bt->hashmask) << bt->page_bits), page, bt->page_size);
1005 #endif
1006
1007         // unlock page zero 
1008
1009         if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
1010                 return 0;
1011
1012         return new_page;
1013 }
1014
1015 //  find slot in page for given key at a given level
1016
1017 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1018 {
1019 uint diff, higher = bt->page->cnt, low = 1, slot;
1020 uint good = 0;
1021
1022         //      make stopper key an infinite fence value
1023
1024         if( bt_getid (bt->page->right) )
1025                 higher++;
1026         else
1027                 good++;
1028
1029         //      low is the next candidate, higher is already
1030         //      tested as .ge. the given key, loop ends when they meet
1031
1032         while( diff = higher - low ) {
1033                 slot = low + ( diff >> 1 );
1034                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1035                         low = slot + 1;
1036                 else
1037                         higher = slot, good++;
1038         }
1039
1040         //      return zero if key is on right link page
1041
1042         return good ? higher : 0;
1043 }
1044
1045 //  find and load page at given level for given key
1046 //      leave page rd or wr locked as requested
1047
1048 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1049 {
1050 uid page_no = ROOT_page, prevpage = 0;
1051 uint drill = 0xff, slot;
1052 uint mode, prevmode;
1053
1054   //  start at root of btree and drill down
1055
1056   do {
1057         // determine lock mode of drill level
1058         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1059
1060         bt->page_no = page_no;
1061
1062         // obtain access lock using lock chaining
1063
1064         if( page_no > ROOT_page )
1065           if( bt_lockpage(bt, bt->page_no, BtLockAccess) )
1066                 return 0;                                                                       
1067
1068         if( prevpage )
1069           if( bt_unlockpage(bt, prevpage, prevmode) )
1070                 return 0;
1071
1072         // obtain read lock using lock chaining
1073
1074         if( bt_lockpage(bt, bt->page_no, mode) )
1075                 return 0;                                                                       
1076
1077         if( page_no > ROOT_page )
1078           if( bt_unlockpage(bt, bt->page_no, BtLockAccess) )
1079                 return 0;                                                                       
1080
1081         //      map/obtain page contents
1082
1083         if( bt_mappage (bt, &bt->page, page_no) )
1084                 return 0;
1085
1086         // re-read and re-lock root after determining actual level of root
1087
1088         if( bt->page->lvl != drill) {
1089                 if ( bt->page_no != ROOT_page )
1090                         return bt->err = BTERR_struct, 0;
1091                         
1092                 drill = bt->page->lvl;
1093
1094                 if( lock == BtLockWrite && drill == lvl )
1095                   if( bt_unlockpage(bt, page_no, mode) )
1096                         return 0;
1097                   else
1098                         continue;
1099         }
1100
1101         //  find key on page at this level
1102         //  and descend to requested level
1103
1104         if( !bt->page->kill && (slot = bt_findslot (bt, key, len)) ) {
1105           if( drill == lvl )
1106                 return slot;
1107
1108           while( slotptr(bt->page, slot)->dead )
1109                 if( slot++ < bt->page->cnt )
1110                         continue;
1111                 else {
1112                         page_no = bt_getid(bt->page->right);
1113                         goto slideright;
1114                 }
1115
1116           page_no = bt_getid(slotptr(bt->page, slot)->id);
1117           drill--;
1118         }
1119
1120         //  or slide right into next page
1121         //  (slide left from deleted page)
1122
1123         else
1124                 page_no = bt_getid(bt->page->right);
1125
1126         //  continue down / right using overlapping locks
1127         //  to protect pages being killed or split.
1128
1129 slideright:
1130         prevpage = bt->page_no;
1131         prevmode = mode;
1132   } while( page_no );
1133
1134   // return error on end of right chain
1135
1136   bt->err = BTERR_struct;
1137   return 0;     // return error
1138 }
1139
1140 //  find and delete key on page by marking delete flag bit
1141 //  when page becomes empty, delete it
1142
1143 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1144 {
1145 unsigned char lowerkey[256], higherkey[256];
1146 uint slot, tod, dirty = 0;
1147 uid page_no, right;
1148 BtKey ptr;
1149
1150         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1151                 ptr = keyptr(bt->page, slot);
1152         else
1153                 return bt->err;
1154
1155         // if key is found delete it, otherwise ignore request
1156
1157         if( !keycmp (ptr, key, len) )
1158                 if( slotptr(bt->page, slot)->dead == 0 )
1159                         dirty = slotptr(bt->page,slot)->dead = 1, bt->page->act--;
1160
1161         // return if page is not empty, or it has no right sibling
1162
1163         right = bt_getid(bt->page->right);
1164         page_no = bt->page_no;
1165
1166         if( !right || bt->page->act )
1167           if ( dirty && bt_update(bt, bt->page, page_no) )
1168                 return bt->err;
1169           else
1170                 return bt_unlockpage(bt, page_no, BtLockWrite);
1171
1172         // obtain Parent lock over write lock
1173
1174         if( bt_lockpage(bt, page_no, BtLockParent) )
1175                 return bt->err;
1176
1177         // cache copy of key to delete
1178
1179         ptr = keyptr(bt->page, bt->page->cnt);
1180         memcpy(lowerkey, ptr, ptr->len + 1);
1181
1182         // lock and map right page
1183
1184         if ( bt_lockpage(bt, right, BtLockWrite) )
1185                 return bt->err;
1186
1187         if( bt_mappage (bt, &bt->temp, right) )
1188                 return bt->err;
1189
1190         // pull contents of next page into current empty page 
1191         memcpy (bt->page, bt->temp, bt->page_size);
1192
1193         //      cache copy of key to update
1194         ptr = keyptr(bt->temp, bt->temp->cnt);
1195         memcpy(higherkey, ptr, ptr->len + 1);
1196
1197         //  Mark right page as deleted and point it to left page
1198         //      until we can post updates at higher level.
1199
1200         bt_putid(bt->temp->right, page_no);
1201         bt->temp->kill = 1;
1202         bt->temp->cnt = 0;
1203
1204         if ( bt_update(bt, bt->page, page_no) )
1205                 return bt->err;
1206
1207         if ( bt_update(bt, bt->temp, right) )
1208                 return bt->err;
1209
1210         if( bt_unlockpage(bt, right, BtLockWrite) )
1211                 return bt->err;
1212         if( bt_unlockpage(bt, page_no, BtLockWrite) )
1213                 return bt->err;
1214
1215         //  delete old lower key to consolidated node
1216
1217         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1218                 return bt->err;
1219
1220         //  redirect higher key directly to consolidated node
1221
1222         tod = (uint)time(NULL);
1223
1224         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl + 1, page_no, tod) )
1225                 return bt->err;
1226
1227         //      obtain write lock and
1228         //      add right block to free chain
1229
1230         if( bt_freepage (bt, right) )
1231                 return bt->err;
1232
1233         //      remove ParentModify lock
1234
1235         if( bt_unlockpage(bt, page_no, BtLockParent) )
1236                 return bt->err;
1237         
1238         return 0;
1239 }
1240
1241 //      find key in leaf level and return row-id
1242
1243 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1244 {
1245 uint  slot;
1246 BtKey ptr;
1247 uid id;
1248
1249         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1250                 ptr = keyptr(bt->page, slot);
1251         else
1252                 return 0;
1253
1254         // if key exists, return row-id
1255         //      otherwise return 0
1256
1257         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1258                 id = bt_getid(slotptr(bt->page,slot)->id);
1259         else
1260                 id = 0;
1261
1262         if ( bt_unlockpage(bt, bt->page_no, BtLockRead) )
1263                 return 0;
1264
1265         return id;
1266 }
1267
1268 void bt_cleanpage(BtDb *bt)
1269 {
1270 uint nxt = bt->page_size;
1271 BtPage page = bt->page;
1272 uint cnt = 0, idx = 0;
1273 uint max = page->cnt;
1274 BtKey key;
1275
1276         memcpy (bt->frame, page, bt->page_size);
1277
1278         // skip page info and set rest of page to zero
1279         memset (page+1, 0, bt->page_size - sizeof(*page));
1280         page->act = 0;
1281
1282         // try cleaning up page first
1283
1284         while( cnt++ < max ) {
1285                 // always leave fence key in list
1286                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1287                         continue;
1288
1289                 // copy key
1290                 key = keyptr(bt->frame, cnt);
1291                 nxt -= key->len + 1;
1292                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1293
1294                 // copy slot
1295                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1296                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1297                         page->act++;
1298                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1299                 slotptr(page, idx)->off = nxt;
1300         }
1301         page->min = nxt;
1302         page->cnt = idx;
1303 }
1304
1305 // split the root and raise the height of the btree
1306
1307 BTERR bt_splitroot(BtDb *bt,  unsigned char *newkey, unsigned char *oldkey, uid page_no2)
1308 {
1309 uint nxt = bt->page_size;
1310 BtPage root = bt->page;
1311 uid new_page;
1312
1313         //  Obtain an empty page to use, and copy the current
1314         //  root contents into it
1315
1316         if( !(new_page = bt_newpage(bt, root)) )
1317                 return bt->err;
1318
1319         // preserve the page info at the bottom
1320         // and set rest to zero
1321
1322         memset(root+1, 0, bt->page_size - sizeof(*root));
1323
1324         // insert first key on newroot page
1325
1326         nxt -= *newkey + 1;
1327         memcpy ((unsigned char *)root + nxt, newkey, *newkey + 1);
1328         bt_putid(slotptr(root, 1)->id, new_page);
1329         slotptr(root, 1)->off = nxt;
1330         
1331         // insert second key on newroot page
1332         // and increase the root height
1333
1334         nxt -= *oldkey + 1;
1335         memcpy ((unsigned char *)root + nxt, oldkey, *oldkey + 1);
1336         bt_putid(slotptr(root, 2)->id, page_no2);
1337         slotptr(root, 2)->off = nxt;
1338
1339         bt_putid(root->right, 0);
1340         root->min = nxt;                // reset lowest used offset and key count
1341         root->cnt = 2;
1342         root->act = 2;
1343         root->lvl++;
1344
1345         // update and release root (bt->page)
1346
1347         if( bt_update(bt, root, bt->page_no) )
1348                 return bt->err;
1349
1350         return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1351 }
1352
1353 //  split already locked full node
1354 //      return unlocked.
1355
1356 BTERR bt_splitpage (BtDb *bt, uint len)
1357 {
1358 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1359 unsigned char oldkey[256], lowerkey[256];
1360 uid page_no = bt->page_no, right;
1361 BtPage page = bt->page;
1362 uint lvl = page->lvl;
1363 uid new_page;
1364 BtKey key;
1365 uint tod;
1366
1367         // perform cleanup
1368
1369         bt_cleanpage(bt);
1370
1371         // return if enough space now
1372
1373         if( page->min >= (page->cnt + 1) * sizeof(BtSlot) + sizeof(*page) + len + 1)
1374         {
1375                 if ( bt_update(bt, page, page_no) )
1376                         return bt->err;
1377
1378                 return bt_unlockpage(bt, page_no, BtLockWrite);
1379         }
1380
1381         //  split higher half of keys to bt->frame
1382         //      the last key (fence key) might be dead
1383
1384         tod = (uint)time(NULL);
1385
1386         memset (bt->frame, 0, bt->page_size);
1387         max = (int)page->cnt;
1388         cnt = max / 2;
1389         idx = 0;
1390
1391         while( cnt++ < max ) {
1392                 key = keyptr(page, cnt);
1393                 nxt -= key->len + 1;
1394                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1395                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1396                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1397                         bt->frame->act++;
1398                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1399                 slotptr(bt->frame, idx)->off = nxt;
1400         }
1401
1402         // remember existing fence key for new page to the right
1403
1404         memcpy (oldkey, key, key->len + 1);
1405
1406         bt->frame->bits = bt->page_bits;
1407         bt->frame->min = nxt;
1408         bt->frame->cnt = idx;
1409         bt->frame->lvl = lvl;
1410
1411         // link right node
1412
1413         if( page_no > ROOT_page ) {
1414                 right = bt_getid (page->right);
1415                 bt_putid(bt->frame->right, right);
1416         }
1417
1418         //      get new free page and write frame to it.
1419
1420         if( !(new_page = bt_newpage(bt, bt->frame)) )
1421                 return bt->err;
1422
1423         //      update lower keys to continue in old page
1424
1425         memcpy (bt->frame, page, bt->page_size);
1426         memset (page+1, 0, bt->page_size - sizeof(*page));
1427         nxt = bt->page_size;
1428         page->act = 0;
1429         cnt = 0;
1430         idx = 0;
1431
1432         //  assemble page of smaller keys
1433         //      (they're all active keys)
1434
1435         while( cnt++ < max / 2 ) {
1436                 key = keyptr(bt->frame, cnt);
1437                 nxt -= key->len + 1;
1438                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1439                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1440                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1441                 slotptr(page, idx)->off = nxt;
1442                 page->act++;
1443         }
1444
1445         // remember fence key for old page
1446
1447         memcpy(lowerkey, key, key->len + 1);
1448         bt_putid(page->right, new_page);
1449         page->min = nxt;
1450         page->cnt = idx;
1451
1452         // if current page is the root page, split it
1453
1454         if( page_no == ROOT_page )
1455                 return bt_splitroot (bt, lowerkey, oldkey, new_page);
1456
1457         // update left (containing) node
1458
1459         if( bt_update(bt, page, page_no) )
1460                 return bt->err;
1461
1462         // obtain Parent/Write locks
1463         // for left and right node pages
1464
1465         if( bt_lockpage (bt, new_page, BtLockParent) )
1466                 return bt->err;
1467
1468         if( bt_lockpage (bt, page_no, BtLockParent) )
1469                 return bt->err;
1470
1471         //  release wr lock on left page
1472
1473         if( bt_unlockpage (bt, page_no, BtLockWrite) )
1474                 return bt->err;
1475
1476         // insert new fence for reformulated left block
1477
1478         if( bt_insertkey (bt, lowerkey+1, *lowerkey, lvl + 1, page_no, tod) )
1479                 return bt->err;
1480
1481         // fix old fence for newly allocated right block page
1482
1483         if( bt_insertkey (bt, oldkey+1, *oldkey, lvl + 1, new_page, tod) )
1484                 return bt->err;
1485
1486         // release Parent & Write locks
1487
1488         if( bt_unlockpage (bt, new_page, BtLockParent) )
1489                 return bt->err;
1490
1491         if( bt_unlockpage (bt, page_no, BtLockParent) )
1492                 return bt->err;
1493
1494         return 0;
1495 }
1496
1497 //  Insert new key into the btree at requested level.
1498 //  Level zero pages are leaf pages and are unlocked at exit.
1499 //      Interior nodes remain locked.
1500
1501 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1502 {
1503 uint slot, idx;
1504 BtPage page;
1505 BtKey ptr;
1506
1507   while( 1 ) {
1508         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1509                 ptr = keyptr(bt->page, slot);
1510         else
1511         {
1512                 if ( !bt->err )
1513                         bt->err = BTERR_ovflw;
1514                 return bt->err;
1515         }
1516
1517         // if key already exists, update id and return
1518
1519         page = bt->page;
1520
1521         if( !keycmp (ptr, key, len) ) {
1522                 slotptr(page, slot)->dead = 0;
1523                 slotptr(page, slot)->tod = tod;
1524                 bt_putid(slotptr(page,slot)->id, id);
1525                 if ( bt_update(bt, bt->page, bt->page_no) )
1526                         return bt->err;
1527                 return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1528         }
1529
1530         // check if page has enough space
1531
1532         if( page->min >= (page->cnt + 1) * sizeof(BtSlot) + sizeof(*page) + len + 1)
1533                 break;
1534
1535         if( bt_splitpage (bt, len) )
1536                 return bt->err;
1537   }
1538
1539   // calculate next available slot and copy key into page
1540
1541   page->min -= len + 1; // reset lowest used offset
1542   ((unsigned char *)page)[page->min] = len;
1543   memcpy ((unsigned char *)page + page->min +1, key, len );
1544
1545   for( idx = slot; idx < page->cnt; idx++ )
1546         if( slotptr(page, idx)->dead )
1547                 break;
1548
1549   // now insert key into array before slot
1550   // preserving the fence slot
1551
1552   if( idx == page->cnt )
1553         idx++, page->cnt++;
1554
1555   page->act++;
1556
1557   while( idx > slot )
1558         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1559
1560   bt_putid(slotptr(page,slot)->id, id);
1561   slotptr(page, slot)->off = page->min;
1562   slotptr(page, slot)->tod = tod;
1563   slotptr(page, slot)->dead = 0;
1564
1565   if ( bt_update(bt, bt->page, bt->page_no) )
1566           return bt->err;
1567
1568   return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1569 }
1570
1571 //  cache page of keys into cursor and return starting slot for given key
1572
1573 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1574 {
1575 uint slot;
1576
1577         // cache page for retrieval
1578         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1579                 memcpy (bt->cursor, bt->page, bt->page_size);
1580         bt->cursor_page = bt->page_no;
1581         if ( bt_unlockpage(bt, bt->page_no, BtLockRead) )
1582                 return 0;
1583
1584         return slot;
1585 }
1586
1587 //  return next slot for cursor page
1588 //  or slide cursor right into next page
1589
1590 uint bt_nextkey (BtDb *bt, uint slot)
1591 {
1592 off64_t right;
1593
1594   do {
1595         right = bt_getid(bt->cursor->right);
1596         while( slot++ < bt->cursor->cnt )
1597           if( slotptr(bt->cursor,slot)->dead )
1598                 continue;
1599           else if( right || (slot < bt->cursor->cnt))
1600                 return slot;
1601           else
1602                 break;
1603
1604         if( !right )
1605                 break;
1606
1607         bt->cursor_page = right;
1608
1609     if( bt_lockpage(bt, right,BtLockRead) )
1610                 return 0;
1611
1612         if( bt_mappage (bt, &bt->page, right) )
1613                 break;
1614
1615         memcpy (bt->cursor, bt->page, bt->page_size);
1616         if ( bt_unlockpage(bt, right, BtLockRead) )
1617                 return 0;
1618
1619         slot = 0;
1620   } while( 1 );
1621
1622   return bt->err = 0;
1623 }
1624
1625 BtKey bt_key(BtDb *bt, uint slot)
1626 {
1627         return keyptr(bt->cursor, slot);
1628 }
1629
1630 uid bt_uid(BtDb *bt, uint slot)
1631 {
1632         return bt_getid(slotptr(bt->cursor,slot)->id);
1633 }
1634
1635 uint bt_tod(BtDb *bt, uint slot)
1636 {
1637         return slotptr(bt->cursor,slot)->tod;
1638 }
1639
1640
1641 #ifdef STANDALONE
1642 //  standalone program to index file of keys
1643 //  then list them onto std-out
1644
1645 int main (int argc, char **argv)
1646 {
1647 uint slot, line = 0, off = 0, found = 0;
1648 int dead, ch, cnt = 0, bits = 12;
1649 unsigned char key[256];
1650 clock_t done, start;
1651 uint pgblk = 0;
1652 time_t tod[1];
1653 uint scan = 0;
1654 uint len = 0;
1655 uint map = 0;
1656 BtKey ptr;
1657 BtDb *bt;
1658 FILE *in;
1659
1660         if( argc < 4 ) {
1661                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_segments pages_per_segment start_line_number]\n", argv[0]);
1662                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
1663                 fprintf (stderr, "  mapped_pool_segments: size of buffer pool in segments\n");
1664                 fprintf (stderr, "  pages_per_segment: size of buffer pool segment in pages in bits\n");
1665                 exit(0);
1666         }
1667
1668         start = clock();
1669         time(tod);
1670
1671         if( argc > 4 )
1672                 bits = atoi(argv[4]);
1673
1674         if( argc > 5 )
1675                 map = atoi(argv[5]);
1676
1677         if( map > 65536 )
1678                 fprintf (stderr, "Warning: buffer_pool > 65536 segments\n");
1679
1680         if( map && map < 8 )
1681                 fprintf (stderr, "Buffer_pool too small\n");
1682
1683         if( argc > 6 )
1684                 pgblk = atoi(argv[6]);
1685
1686         if( bits + pgblk > 30 )
1687                 fprintf (stderr, "Warning: very large buffer pool segment size\n");
1688
1689         if( argc > 7 )
1690                 off = atoi(argv[7]);
1691
1692         bt = bt_open ((argv[1]), BT_rw, bits, map, pgblk);
1693
1694         if( !bt ) {
1695                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
1696                 exit (1);
1697         }
1698
1699         switch(argv[3][0]| 0x20)
1700         {
1701         case 'w':
1702                 fprintf(stderr, "started indexing for %s\n", argv[2]);
1703                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
1704                   while( ch = getc(in), ch != EOF )
1705                         if( ch == '\n' )
1706                         {
1707                           if( off )
1708                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
1709
1710                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
1711                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
1712                           len = 0;
1713                         }
1714                         else if( len < 245 )
1715                                 key[len++] = ch;
1716                 fprintf(stderr, "finished adding keys, %d \n", line);
1717                 break;
1718
1719         case 'd':
1720                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
1721                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
1722                   while( ch = getc(in), ch != EOF )
1723                         if( ch == '\n' )
1724                         {
1725                           if( off )
1726                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
1727                           line++;
1728                           if( bt_deletekey (bt, key, len, 0) )
1729                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
1730                           len = 0;
1731                         }
1732                         else if( len < 245 )
1733                                 key[len++] = ch;
1734                 fprintf(stderr, "finished deleting keys, %d \n", line);
1735                 break;
1736
1737         case 'f':
1738                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
1739                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
1740                   while( ch = getc(in), ch != EOF )
1741                         if( ch == '\n' )
1742                         {
1743                           if( off )
1744                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
1745                           line++;
1746                           if( bt_findkey (bt, key, len) )
1747                                 found++;
1748                           else if( bt->err )
1749                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
1750                           len = 0;
1751                         }
1752                         else if( len < 245 )
1753                                 key[len++] = ch;
1754                 fprintf(stderr, "finished search of %d keys, found %d\n", line, found);
1755                 break;
1756
1757         case 's':
1758                 scan++;
1759                 break;
1760
1761         }
1762
1763         done = clock();
1764         fprintf(stderr, " Time to complete: %.2f seconds\n", (float)(done - start) / CLOCKS_PER_SEC);
1765
1766         dead = cnt = 0;
1767         len = key[0] = 0;
1768
1769         fprintf(stderr, "started reading\n");
1770
1771         if( slot = bt_startkey (bt, key, len) )
1772           slot--;
1773         else
1774           fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
1775
1776         while( slot = bt_nextkey (bt, slot) )
1777           if( cnt++, scan ) {
1778                         ptr = bt_key(bt, slot);
1779                         fwrite (ptr->key, ptr->len, 1, stdout);
1780                         fputc ('\n', stdout);
1781           }
1782
1783         fprintf(stderr, " Total keys read %d\n", cnt);
1784 }
1785
1786 #endif  //STANDALONE