]> pd.if.org Git - btree/blob - threadskv5.c
b5df26d617fe1e242a86a87a593163ae94878553
[btree] / threadskv5.c
1 // btree version threadskv5 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      and bi-directional cursors
7
8 // 09 SEP 2014
9
10 // author: karl malbrain, malbrain@cal.berkeley.edu
11
12 /*
13 This work, including the source code, documentation
14 and related data, is placed into the public domain.
15
16 The orginal author is Karl Malbrain.
17
18 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
19 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
20 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
21 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
22 RESULTING FROM THE USE, MODIFICATION, OR
23 REDISTRIBUTION OF THIS SOFTWARE.
24 */
25
26 // Please see the project home page for documentation
27 // code.google.com/p/high-concurrency-btree
28
29 #define _FILE_OFFSET_BITS 64
30 #define _LARGEFILE64_SOURCE
31
32 #ifdef linux
33 #define _GNU_SOURCE
34 #endif
35
36 #ifdef unix
37 #include <unistd.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <fcntl.h>
41 #include <sys/time.h>
42 #include <sys/mman.h>
43 #include <errno.h>
44 #include <pthread.h>
45 #else
46 #define WIN32_LEAN_AND_MEAN
47 #include <windows.h>
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <time.h>
51 #include <fcntl.h>
52 #include <process.h>
53 #include <intrin.h>
54 #endif
55
56 #include <memory.h>
57 #include <string.h>
58 #include <stddef.h>
59
60 typedef unsigned long long      uid;
61
62 #ifndef unix
63 typedef unsigned long long      off64_t;
64 typedef unsigned short          ushort;
65 typedef unsigned int            uint;
66 #endif
67
68 #define BT_latchtable   128                                     // number of latch manager slots
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 /*
79 There are five lock types for each node in three independent sets: 
80 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
81 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
82 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
83 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
84 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
85 */
86
87 typedef enum{
88         BtLockAccess,
89         BtLockDelete,
90         BtLockRead,
91         BtLockWrite,
92         BtLockParent
93 } BtLock;
94
95 //      definition for phase-fair reader/writer lock implementation
96
97 typedef struct {
98         ushort rin[1];
99         ushort rout[1];
100         ushort ticket[1];
101         ushort serving[1];
102 } RWLock;
103
104 #define PHID 0x1
105 #define PRES 0x2
106 #define MASK 0x3
107 #define RINC 0x4
108
109 //      definition for spin latch implementation
110
111 // exclusive is set for write access
112 // share is count of read accessors
113 // grant write lock when share == 0
114
115 volatile typedef struct {
116         ushort exclusive:1;
117         ushort pending:1;
118         ushort share:14;
119 } BtSpinLatch;
120
121 #define XCL 1
122 #define PEND 2
123 #define BOTH 3
124 #define SHARE 4
125
126 //  hash table entries
127
128 typedef struct {
129         BtSpinLatch latch[1];
130         volatile ushort slot;           // Latch table entry at head of chain
131 } BtHashEntry;
132
133 //      latch manager table structure
134
135 typedef struct {
136         RWLock readwr[1];               // read/write page lock
137         RWLock access[1];               // Access Intent/Page delete
138         RWLock parent[1];               // Posting of fence key in parent
139         BtSpinLatch busy[1];            // slot is being moved between chains
140         volatile ushort next;           // next entry in hash table chain
141         volatile ushort prev;           // prev entry in hash table chain
142         volatile ushort pin;            // number of outstanding locks
143         volatile ushort hash;           // hash slot entry is under
144         volatile uid page_no;           // latch set page number
145 } BtLatchSet;
146
147 //      Define the length of the page record numbers
148
149 #define BtId 6
150
151 //      Page key slot definition.
152
153 //      Keys are marked dead, but remain on the page until
154 //      it cleanup is called. The fence key (highest key) for
155 //      a leaf page is always present, even after cleanup.
156
157 //      Slot types
158
159 //      In addition to the Unique keys that occupy slots
160 //      there are Librarian and Duplicate key
161 //      slots occupying the key slot array.
162
163 //      The Librarian slots are dead keys that
164 //      serve as filler, available to add new Unique
165 //      or Dup slots that are inserted into the B-tree.
166
167 //      The Duplicate slots have had their key bytes extended
168 //      by 6 bytes to contain a binary duplicate key uniqueifier.
169
170 typedef enum {
171         Unique,
172         Librarian,
173         Duplicate
174 } BtSlotType;
175
176 typedef struct {
177         uint off:BT_maxbits;    // page offset for key start
178         uint type:3;                    // type of slot
179         uint dead:1;                    // set for deleted slot
180 } BtSlot;
181
182 //      The key structure occupies space at the upper end of
183 //      each page.  It's a length byte followed by the key
184 //      bytes.
185
186 typedef struct {
187         unsigned char len;              // this can be changed to a ushort or uint
188         unsigned char key[0];
189 } BtKey;
190
191 //      the value structure also occupies space at the upper
192 //      end of the page. Each key is immediately followed by a value.
193
194 typedef struct {
195         unsigned char len;              // this can be changed to a ushort or uint
196         unsigned char value[0];
197 } BtVal;
198
199 #define BT_maxkey       255             // maximum number of bytes in a key
200 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
201
202 //      The first part of an index page.
203 //      It is immediately followed
204 //      by the BtSlot array of keys.
205
206 //      note that this structure size
207 //      must be a multiple of 8 bytes
208 //      in order to place dups correctly.
209
210 typedef struct BtPage_ {
211         uint cnt;                                       // count of keys in page
212         uint act;                                       // count of active keys
213         uint min;                                       // next key offset
214         uint garbage;                           // page garbage in bytes
215         unsigned char bits:7;           // page size in bits
216         unsigned char free:1;           // page is on free chain
217         unsigned char lvl:7;            // level of page
218         unsigned char kill:1;           // page is being deleted
219         unsigned char left[BtId];       // page number to left
220         unsigned char filler[2];        // padding to multiple of 8
221         unsigned char right[BtId];      // page number to right
222 } *BtPage;
223
224 //      The memory mapping pool table buffer manager entry
225
226 typedef struct {
227         uid  basepage;                          // mapped base page number
228         char *map;                                      // mapped memory pointer
229         ushort slot;                            // slot index in this array
230         ushort pin;                                     // mapped page pin counter
231         void *hashprev;                         // previous pool entry for the same hash idx
232         void *hashnext;                         // next pool entry for the same hash idx
233 #ifndef unix
234         HANDLE hmap;                            // Windows memory mapping handle
235 #endif
236 } BtPool;
237
238 #define CLOCK_bit 0x8000                // bit in pool->pin
239
240 //  The loadpage interface object
241
242 typedef struct {
243         uid page_no;            // current page number
244         BtPage page;            // current page pointer
245         BtPool *pool;           // current page pool
246         BtLatchSet *latch;      // current page latch set
247 } BtPageSet;
248
249 //      structure for latch manager on ALLOC_page
250
251 typedef struct {
252         struct BtPage_ alloc[1];        // next page_no in right ptr
253         unsigned long long dups[1];     // global duplicate key uniqueifier
254         unsigned char chain[BtId];      // head of free page_nos chain
255         BtSpinLatch lock[1];            // allocation area lite latch
256         ushort latchdeployed;           // highest number of latch entries deployed
257         ushort nlatchpage;                      // number of latch pages at BT_latch
258         ushort latchtotal;                      // number of page latch entries
259         ushort latchhash;                       // number of latch hash table slots
260         ushort latchvictim;                     // next latch entry to examine
261         BtHashEntry table[0];           // the hash table
262 } BtLatchMgr;
263
264 //      The object structure for Btree access
265
266 typedef struct {
267         uint page_size;                         // page size    
268         uint page_bits;                         // page size in bits    
269         uint seg_bits;                          // seg size in pages in bits
270         uint mode;                                      // read-write mode
271 #ifdef unix
272         int idx;
273 #else
274         HANDLE idx;
275 #endif
276         ushort poolcnt;                         // highest page pool node in use
277         ushort poolmax;                         // highest page pool node allocated
278         ushort poolmask;                        // total number of pages in mmap segment - 1
279         ushort hashsize;                        // size of Hash Table for pool entries
280         volatile uint evicted;          // last evicted hash table slot
281         ushort *hash;                           // pool index for hash entries
282         BtSpinLatch *latch;                     // latches for hash table slots
283         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
284         BtLatchSet *latchsets;          // mapped latch set from latch pages
285         BtPool *pool;                           // memory pool page segments
286 #ifndef unix
287         HANDLE halloc;                          // allocation and latch table handle
288 #endif
289 } BtMgr;
290
291 typedef struct {
292         BtMgr *mgr;                             // buffer manager for thread
293         BtPage cursor;                  // cached frame for start/next (never mapped)
294         BtPage frame;                   // spare frame for the page split (never mapped)
295         uid cursor_page;                                // current cursor page number   
296         unsigned char *mem;                             // frame, cursor, page memory buffer
297         unsigned char key[BT_keyarray]; // last found complete key
298         int found;                              // last delete or insert was found
299         int err;                                // last error
300 } BtDb;
301
302 typedef enum {
303         BTERR_ok = 0,
304         BTERR_struct,
305         BTERR_ovflw,
306         BTERR_lock,
307         BTERR_map,
308         BTERR_wrt,
309         BTERR_hash
310 } BTERR;
311
312 // B-Tree functions
313 extern void bt_close (BtDb *bt);
314 extern BtDb *bt_open (BtMgr *mgr);
315 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
316 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
317 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
318 extern BtKey *bt_foundkey (BtDb *bt);
319 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
320 extern uint bt_nextkey   (BtDb *bt, uint slot);
321
322 //      manager functions
323 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
324 void bt_mgrclose (BtMgr *mgr);
325
326 //  Helper functions to return slot values
327 //      from the cursor page.
328
329 extern BtKey *bt_key (BtDb *bt, uint slot);
330 extern BtVal *bt_val (BtDb *bt, uint slot);
331
332 //  BTree page number constants
333 #define ALLOC_page              0       // allocation & latch manager hash table
334 #define ROOT_page               1       // root of the btree
335 #define LEAF_page               2       // first page of leaves
336 #define LATCH_page              3       // pages for latch manager
337
338 //      Number of levels to create in a new BTree
339
340 #define MIN_lvl                 2
341
342 //  The page is allocated from low and hi ends.
343 //  The key slots are allocated from the bottom,
344 //      while the text and value of the key
345 //  are allocated from the top.  When the two
346 //  areas meet, the page is split into two.
347
348 //  A key consists of a length byte, two bytes of
349 //  index number (0 - 65534), and up to 253 bytes
350 //  of key value.
351
352 //  Associated with each key is a value byte string
353 //      containing any value desired.
354
355 //  The b-tree root is always located at page 1.
356 //      The first leaf page of level zero is always
357 //      located on page 2.
358
359 //      The b-tree pages are linked with next
360 //      pointers to facilitate enumerators,
361 //      and provide for concurrency.
362
363 //      When to root page fills, it is split in two and
364 //      the tree height is raised by a new root at page
365 //      one with two keys.
366
367 //      Deleted keys are marked with a dead bit until
368 //      page cleanup. The fence key for a leaf node is
369 //      always present
370
371 //  Groups of pages called segments from the btree are optionally
372 //  cached with a memory mapped pool. A hash table is used to keep
373 //  track of the cached segments.  This behaviour is controlled
374 //  by the cache block size parameter to bt_open.
375
376 //  To achieve maximum concurrency one page is locked at a time
377 //  as the tree is traversed to find leaf key in question. The right
378 //  page numbers are used in cases where the page is being split,
379 //      or consolidated.
380
381 //  Page 0 is dedicated to lock for new page extensions,
382 //      and chains empty pages together for reuse. It also
383 //      contains the latch manager hash table.
384
385 //      The ParentModification lock on a node is obtained to serialize posting
386 //      or changing the fence key for a node.
387
388 //      Empty pages are chained together through the ALLOC page and reused.
389
390 //      Access macros to address slot and key values from the page
391 //      Page slots use 1 based indexing.
392
393 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
394 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
395 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
396
397 void bt_putid(unsigned char *dest, uid id)
398 {
399 int i = BtId;
400
401         while( i-- )
402                 dest[i] = (unsigned char)id, id >>= 8;
403 }
404
405 uid bt_getid(unsigned char *src)
406 {
407 uid id = 0;
408 int i;
409
410         for( i = 0; i < BtId; i++ )
411                 id <<= 8, id |= *src++; 
412
413         return id;
414 }
415
416 uid bt_newdup (BtDb *bt)
417 {
418 #ifdef unix
419         return __sync_fetch_and_add (bt->mgr->latchmgr->dups, 1) + 1;
420 #else
421         return _InterlockedIncrement64(bt->mgr->latchmgr->dups, 1);
422 #endif
423 }
424
425 //      Phase-Fair reader/writer lock implementation
426
427 void WriteLock (RWLock *lock)
428 {
429 ushort w, r, tix;
430
431 #ifdef unix
432         tix = __sync_fetch_and_add (lock->ticket, 1);
433 #else
434         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
435 #endif
436         // wait for our ticket to come up
437
438         while( tix != lock->serving[0] )
439 #ifdef unix
440                 sched_yield();
441 #else
442                 SwitchToThread ();
443 #endif
444
445         w = PRES | (tix & PHID);
446 #ifdef  unix
447         r = __sync_fetch_and_add (lock->rin, w);
448 #else
449         r = _InterlockedExchangeAdd16 (lock->rin, w);
450 #endif
451         while( r != *lock->rout )
452 #ifdef unix
453                 sched_yield();
454 #else
455                 SwitchToThread();
456 #endif
457 }
458
459 void WriteRelease (RWLock *lock)
460 {
461 #ifdef unix
462         __sync_fetch_and_and (lock->rin, ~MASK);
463 #else
464         _InterlockedAnd16 (lock->rin, ~MASK);
465 #endif
466         lock->serving[0]++;
467 }
468
469 void ReadLock (RWLock *lock)
470 {
471 ushort w;
472 #ifdef unix
473         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
474 #else
475         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
476 #endif
477         if( w )
478           while( w == (*lock->rin & MASK) )
479 #ifdef unix
480                 sched_yield ();
481 #else
482                 SwitchToThread ();
483 #endif
484 }
485
486 void ReadRelease (RWLock *lock)
487 {
488 #ifdef unix
489         __sync_fetch_and_add (lock->rout, RINC);
490 #else
491         _InterlockedExchangeAdd16 (lock->rout, RINC);
492 #endif
493 }
494
495 //      Spin Latch Manager
496
497 //      wait until write lock mode is clear
498 //      and add 1 to the share count
499
500 void bt_spinreadlock(BtSpinLatch *latch)
501 {
502 ushort prev;
503
504   do {
505 #ifdef unix
506         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
507 #else
508         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
509 #endif
510         //  see if exclusive request is granted or pending
511
512         if( !(prev & BOTH) )
513                 return;
514 #ifdef unix
515         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
516 #else
517         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
518 #endif
519 #ifdef  unix
520   } while( sched_yield(), 1 );
521 #else
522   } while( SwitchToThread(), 1 );
523 #endif
524 }
525
526 //      wait for other read and write latches to relinquish
527
528 void bt_spinwritelock(BtSpinLatch *latch)
529 {
530 ushort prev;
531
532   do {
533 #ifdef  unix
534         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
535 #else
536         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
537 #endif
538         if( !(prev & XCL) )
539           if( !(prev & ~BOTH) )
540                 return;
541           else
542 #ifdef unix
543                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
544 #else
545                 _InterlockedAnd16((ushort *)latch, ~XCL);
546 #endif
547 #ifdef  unix
548   } while( sched_yield(), 1 );
549 #else
550   } while( SwitchToThread(), 1 );
551 #endif
552 }
553
554 //      try to obtain write lock
555
556 //      return 1 if obtained,
557 //              0 otherwise
558
559 int bt_spinwritetry(BtSpinLatch *latch)
560 {
561 ushort prev;
562
563 #ifdef  unix
564         prev = __sync_fetch_and_or((ushort *)latch, XCL);
565 #else
566         prev = _InterlockedOr16((ushort *)latch, XCL);
567 #endif
568         //      take write access if all bits are clear
569
570         if( !(prev & XCL) )
571           if( !(prev & ~BOTH) )
572                 return 1;
573           else
574 #ifdef unix
575                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
576 #else
577                 _InterlockedAnd16((ushort *)latch, ~XCL);
578 #endif
579         return 0;
580 }
581
582 //      clear write mode
583
584 void bt_spinreleasewrite(BtSpinLatch *latch)
585 {
586 #ifdef unix
587         __sync_fetch_and_and((ushort *)latch, ~BOTH);
588 #else
589         _InterlockedAnd16((ushort *)latch, ~BOTH);
590 #endif
591 }
592
593 //      decrement reader count
594
595 void bt_spinreleaseread(BtSpinLatch *latch)
596 {
597 #ifdef unix
598         __sync_fetch_and_add((ushort *)latch, -SHARE);
599 #else
600         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
601 #endif
602 }
603
604 //      link latch table entry into latch hash table
605
606 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
607 {
608 BtLatchSet *set = bt->mgr->latchsets + victim;
609
610         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
611                 bt->mgr->latchsets[set->next].prev = victim;
612
613         bt->mgr->latchmgr->table[hashidx].slot = victim;
614         set->page_no = page_no;
615         set->hash = hashidx;
616         set->prev = 0;
617 }
618
619 //      release latch pin
620
621 void bt_unpinlatch (BtLatchSet *set)
622 {
623 #ifdef unix
624         __sync_fetch_and_add(&set->pin, -1);
625 #else
626         _InterlockedDecrement16 (&set->pin);
627 #endif
628 }
629
630 //      find existing latchset or inspire new one
631 //      return with latchset pinned
632
633 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
634 {
635 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
636 ushort slot, avail = 0, victim, idx;
637 BtLatchSet *set;
638
639         //  obtain read lock on hash table entry
640
641         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
642
643         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
644         {
645                 set = bt->mgr->latchsets + slot;
646                 if( page_no == set->page_no )
647                         break;
648         } while( slot = set->next );
649
650         if( slot ) {
651 #ifdef unix
652                 __sync_fetch_and_add(&set->pin, 1);
653 #else
654                 _InterlockedIncrement16 (&set->pin);
655 #endif
656         }
657
658     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
659
660         if( slot )
661                 return set;
662
663   //  try again, this time with write lock
664
665   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
666
667   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
668   {
669         set = bt->mgr->latchsets + slot;
670         if( page_no == set->page_no )
671                 break;
672         if( !set->pin && !avail )
673                 avail = slot;
674   } while( slot = set->next );
675
676   //  found our entry, or take over an unpinned one
677
678   if( slot || (slot = avail) ) {
679         set = bt->mgr->latchsets + slot;
680 #ifdef unix
681         __sync_fetch_and_add(&set->pin, 1);
682 #else
683         _InterlockedIncrement16 (&set->pin);
684 #endif
685         set->page_no = page_no;
686         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
687         return set;
688   }
689
690         //  see if there are any unused entries
691 #ifdef unix
692         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
693 #else
694         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
695 #endif
696
697         if( victim < bt->mgr->latchmgr->latchtotal ) {
698                 set = bt->mgr->latchsets + victim;
699 #ifdef unix
700                 __sync_fetch_and_add(&set->pin, 1);
701 #else
702                 _InterlockedIncrement16 (&set->pin);
703 #endif
704                 bt_latchlink (bt, hashidx, victim, page_no);
705                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
706                 return set;
707         }
708
709 #ifdef unix
710         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
711 #else
712         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
713 #endif
714   //  find and reuse previous lock entry
715
716   while( 1 ) {
717 #ifdef unix
718         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
719 #else
720         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
721 #endif
722         //      we don't use slot zero
723
724         if( victim %= bt->mgr->latchmgr->latchtotal )
725                 set = bt->mgr->latchsets + victim;
726         else
727                 continue;
728
729         //      take control of our slot
730         //      from other threads
731
732         if( set->pin || !bt_spinwritetry (set->busy) )
733                 continue;
734
735         idx = set->hash;
736
737         // try to get write lock on hash chain
738         //      skip entry if not obtained
739         //      or has outstanding locks
740
741         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
742                 bt_spinreleasewrite (set->busy);
743                 continue;
744         }
745
746         if( set->pin ) {
747                 bt_spinreleasewrite (set->busy);
748                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
749                 continue;
750         }
751
752         //  unlink our available victim from its hash chain
753
754         if( set->prev )
755                 bt->mgr->latchsets[set->prev].next = set->next;
756         else
757                 bt->mgr->latchmgr->table[idx].slot = set->next;
758
759         if( set->next )
760                 bt->mgr->latchsets[set->next].prev = set->prev;
761
762         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
763 #ifdef unix
764         __sync_fetch_and_add(&set->pin, 1);
765 #else
766         _InterlockedIncrement16 (&set->pin);
767 #endif
768         bt_latchlink (bt, hashidx, victim, page_no);
769         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
770         bt_spinreleasewrite (set->busy);
771         return set;
772   }
773 }
774
775 void bt_mgrclose (BtMgr *mgr)
776 {
777 BtPool *pool;
778 uint slot;
779
780         // release mapped pages
781         //      note that slot zero is never used
782
783         for( slot = 1; slot < mgr->poolmax; slot++ ) {
784                 pool = mgr->pool + slot;
785                 if( pool->slot )
786 #ifdef unix
787                         munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
788 #else
789                 {
790                         FlushViewOfFile(pool->map, 0);
791                         UnmapViewOfFile(pool->map);
792                         CloseHandle(pool->hmap);
793                 }
794 #endif
795         }
796
797 #ifdef unix
798         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
799         munmap (mgr->latchmgr, 2 * mgr->page_size);
800 #else
801         FlushViewOfFile(mgr->latchmgr, 0);
802         UnmapViewOfFile(mgr->latchmgr);
803         CloseHandle(mgr->halloc);
804 #endif
805 #ifdef unix
806         close (mgr->idx);
807         free (mgr->pool);
808         free (mgr->hash);
809         free ((void *)mgr->latch);
810         free (mgr);
811 #else
812         FlushFileBuffers(mgr->idx);
813         CloseHandle(mgr->idx);
814         GlobalFree (mgr->pool);
815         GlobalFree (mgr->hash);
816         GlobalFree ((void *)mgr->latch);
817         GlobalFree (mgr);
818 #endif
819 }
820
821 //      close and release memory
822
823 void bt_close (BtDb *bt)
824 {
825 #ifdef unix
826         if( bt->mem )
827                 free (bt->mem);
828 #else
829         if( bt->mem)
830                 VirtualFree (bt->mem, 0, MEM_RELEASE);
831 #endif
832         free (bt);
833 }
834
835 //  open/create new btree buffer manager
836
837 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
838 //              size of mapped page pool (e.g. 8192)
839
840 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
841 {
842 uint lvl, attr, cacheblk, last, slot, idx;
843 uint nlatchpage, latchhash;
844 unsigned char value[BtId];
845 int flag, initit = 0;
846 BtLatchMgr *latchmgr;
847 off64_t size;
848 uint amt[1];
849 BtMgr* mgr;
850 BtKey* key;
851 BtVal *val;
852
853 #ifndef unix
854 SYSTEM_INFO sysinfo[1];
855 #endif
856
857         // determine sanity of page size and buffer pool
858
859         if( bits > BT_maxbits )
860                 bits = BT_maxbits;
861         else if( bits < BT_minbits )
862                 bits = BT_minbits;
863
864         if( !poolmax )
865                 return NULL;    // must have buffer pool
866
867 #ifdef unix
868         mgr = calloc (1, sizeof(BtMgr));
869
870         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
871
872         if( mgr->idx == -1 )
873                 return free(mgr), NULL;
874         
875         cacheblk = 4096;        // minimum mmap segment size for unix
876
877 #else
878         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
879         attr = FILE_ATTRIBUTE_NORMAL;
880         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
881
882         if( mgr->idx == INVALID_HANDLE_VALUE )
883                 return GlobalFree(mgr), NULL;
884
885         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
886         GetSystemInfo(sysinfo);
887         cacheblk = sysinfo->dwAllocationGranularity;
888 #endif
889
890 #ifdef unix
891         latchmgr = malloc (BT_maxpage);
892         *amt = 0;
893
894         // read minimum page size to get root info
895         //      to support raw disk partition files
896         //      check if bits == 0 on the disk.
897
898         if( size = lseek (mgr->idx, 0L, 2) ) {
899                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
900                         if( latchmgr->alloc->bits )
901                                 bits = latchmgr->alloc->bits;
902                         else
903                                 initit = 1;
904                 else
905                         return free(mgr), free(latchmgr), NULL;
906         } else if( mode == BT_ro )
907                 return free(latchmgr), bt_mgrclose (mgr), NULL;
908 #else
909         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
910         size = GetFileSize(mgr->idx, amt);
911
912         if( size || *amt ) {
913                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
914                         return bt_mgrclose (mgr), NULL;
915                 bits = latchmgr->alloc->bits;
916         } else if( mode == BT_ro )
917                 return bt_mgrclose (mgr), NULL;
918         else
919                 initit = 1;
920 #endif
921
922         mgr->page_size = 1 << bits;
923         mgr->page_bits = bits;
924
925         mgr->poolmax = poolmax;
926         mgr->mode = mode;
927
928         if( cacheblk < mgr->page_size )
929                 cacheblk = mgr->page_size;
930
931         //  mask for partial memmaps
932
933         mgr->poolmask = (cacheblk >> bits) - 1;
934
935         //      see if requested size of pages per memmap is greater
936
937         if( (1 << segsize) > mgr->poolmask )
938                 mgr->poolmask = (1 << segsize) - 1;
939
940         mgr->seg_bits = 0;
941
942         while( (1 << mgr->seg_bits) <= mgr->poolmask )
943                 mgr->seg_bits++;
944
945         mgr->hashsize = hashsize;
946
947 #ifdef unix
948         mgr->pool = calloc (poolmax, sizeof(BtPool));
949         mgr->hash = calloc (hashsize, sizeof(ushort));
950         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
951 #else
952         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
953         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
954         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
955 #endif
956
957         if( !initit )
958                 goto mgrlatch;
959
960         // initialize an empty b-tree with latch page, root page, page of leaves
961         // and page(s) of latches
962
963         memset (latchmgr, 0, 1 << bits);
964         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
965         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
966         latchmgr->alloc->bits = mgr->page_bits;
967
968         latchmgr->nlatchpage = nlatchpage;
969         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
970
971         //  initialize latch manager
972
973         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
974
975         //      size of hash table = total number of latchsets
976
977         if( latchhash > latchmgr->latchtotal )
978                 latchhash = latchmgr->latchtotal;
979
980         latchmgr->latchhash = latchhash;
981
982         //  initialize left-most LEAF page in
983         //      alloc->left.
984
985         bt_putid (latchmgr->alloc->left, LEAF_page);
986
987 #ifdef unix
988         if( pwrite (mgr->idx, latchmgr, mgr->page_size, 0) < mgr->page_size )
989                 return bt_mgrclose (mgr), NULL;
990 #else
991         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
992                 return bt_mgrclose (mgr), NULL;
993
994         if( *amt < mgr->page_size )
995                 return bt_mgrclose (mgr), NULL;
996 #endif
997
998         memset (latchmgr, 0, 1 << bits);
999         latchmgr->alloc->bits = mgr->page_bits;
1000
1001         for( lvl=MIN_lvl; lvl--; ) {
1002                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1003                 key = keyptr(latchmgr->alloc, 1);
1004                 key->len = 2;           // create stopper key
1005                 key->key[0] = 0xff;
1006                 key->key[1] = 0xff;
1007
1008                 bt_putid(value, MIN_lvl - lvl + 1);
1009                 val = valptr(latchmgr->alloc, 1);
1010                 val->len = lvl ? BtId : 0;
1011                 memcpy (val->value, value, val->len);
1012
1013                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
1014                 latchmgr->alloc->lvl = lvl;
1015                 latchmgr->alloc->cnt = 1;
1016                 latchmgr->alloc->act = 1;
1017 #ifdef unix
1018                 if( pwrite (mgr->idx, latchmgr, mgr->page_size, (MIN_lvl - lvl) << bits) < mgr->page_size )
1019                         return bt_mgrclose (mgr), NULL;
1020 #else
1021                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1022                         return bt_mgrclose (mgr), NULL;
1023
1024                 if( *amt < mgr->page_size )
1025                         return bt_mgrclose (mgr), NULL;
1026 #endif
1027         }
1028
1029         // clear out latch manager locks
1030         //      and rest of pages to round out segment
1031
1032         memset(latchmgr, 0, mgr->page_size);
1033         last = MIN_lvl + 1;
1034
1035         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1036 #ifdef unix
1037                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1038 #else
1039                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1040                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1041                         return bt_mgrclose (mgr), NULL;
1042                 if( *amt < mgr->page_size )
1043                         return bt_mgrclose (mgr), NULL;
1044 #endif
1045                 last++;
1046         }
1047
1048 mgrlatch:
1049 #ifdef unix
1050         // mlock the root page and the latchmgr page
1051
1052         flag = PROT_READ | PROT_WRITE;
1053         mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1054         if( mgr->latchmgr == MAP_FAILED )
1055                 return bt_mgrclose (mgr), NULL;
1056         mlock (mgr->latchmgr, 2 * mgr->page_size);
1057
1058         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1059         if( mgr->latchsets == MAP_FAILED )
1060                 return bt_mgrclose (mgr), NULL;
1061         mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1062 #else
1063         flag = PAGE_READWRITE;
1064         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1065         if( !mgr->halloc )
1066                 return bt_mgrclose (mgr), NULL;
1067
1068         flag = FILE_MAP_WRITE;
1069         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1070         if( !mgr->latchmgr )
1071                 return GetLastError(), bt_mgrclose (mgr), NULL;
1072
1073         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1074 #endif
1075
1076 #ifdef unix
1077         free (latchmgr);
1078 #else
1079         VirtualFree (latchmgr, 0, MEM_RELEASE);
1080 #endif
1081         return mgr;
1082 }
1083
1084 //      open BTree access method
1085 //      based on buffer manager
1086
1087 BtDb *bt_open (BtMgr *mgr)
1088 {
1089 BtDb *bt = malloc (sizeof(*bt));
1090
1091         memset (bt, 0, sizeof(*bt));
1092         bt->mgr = mgr;
1093 #ifdef unix
1094         bt->mem = malloc (2 *mgr->page_size);
1095 #else
1096         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1097 #endif
1098         bt->frame = (BtPage)bt->mem;
1099         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1100         return bt;
1101 }
1102
1103 //  compare two keys, returning > 0, = 0, or < 0
1104 //  as the comparison value
1105
1106 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1107 {
1108 uint len1 = key1->len;
1109 int ans;
1110
1111         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1112                 return ans;
1113
1114         if( len1 > len2 )
1115                 return 1;
1116         if( len1 < len2 )
1117                 return -1;
1118
1119         return 0;
1120 }
1121
1122 //      Buffer Pool mgr
1123
1124 // find segment in pool
1125 // must be called with hashslot idx locked
1126 //      return NULL if not there
1127 //      otherwise return node
1128
1129 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1130 {
1131 BtPool *pool;
1132 uint slot;
1133
1134         // compute start of hash chain in pool
1135
1136         if( slot = bt->mgr->hash[idx] ) 
1137                 pool = bt->mgr->pool + slot;
1138         else
1139                 return NULL;
1140
1141         page_no &= ~bt->mgr->poolmask;
1142
1143         while( pool->basepage != page_no )
1144           if( pool = pool->hashnext )
1145                 continue;
1146           else
1147                 return NULL;
1148
1149         return pool;
1150 }
1151
1152 // add segment to hash table
1153
1154 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1155 {
1156 BtPool *node;
1157 uint slot;
1158
1159         pool->hashprev = pool->hashnext = NULL;
1160         pool->basepage = page_no & ~bt->mgr->poolmask;
1161         pool->pin = CLOCK_bit + 1;
1162
1163         if( slot = bt->mgr->hash[idx] ) {
1164                 node = bt->mgr->pool + slot;
1165                 pool->hashnext = node;
1166                 node->hashprev = pool;
1167         }
1168
1169         bt->mgr->hash[idx] = pool->slot;
1170 }
1171
1172 //  map new buffer pool segment to virtual memory
1173
1174 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1175 {
1176 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1177 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1178 int flag;
1179
1180 #ifdef unix
1181         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1182         pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1183         if( pool->map == MAP_FAILED )
1184                 return bt->err = BTERR_map;
1185
1186 #else
1187         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1188         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1189         if( !pool->hmap )
1190                 return bt->err = BTERR_map;
1191
1192         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1193         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1194         if( !pool->map )
1195                 return bt->err = BTERR_map;
1196 #endif
1197         return bt->err = 0;
1198 }
1199
1200 //      calculate page within pool
1201
1202 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1203 {
1204 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1205 BtPage page;
1206
1207         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1208 //      madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1209         return page;
1210 }
1211
1212 //  release pool pin
1213
1214 void bt_unpinpool (BtPool *pool)
1215 {
1216 #ifdef unix
1217         __sync_fetch_and_add(&pool->pin, -1);
1218 #else
1219         _InterlockedDecrement16 (&pool->pin);
1220 #endif
1221 }
1222
1223 //      find or place requested page in segment-pool
1224 //      return pool table entry, incrementing pin
1225
1226 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1227 {
1228 uint slot, hashidx, idx, victim;
1229 BtPool *pool, *node, *next;
1230
1231         //      lock hash table chain
1232
1233         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1234         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1235
1236         //      look up in hash table
1237
1238         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1239 #ifdef unix
1240                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1241                 __sync_fetch_and_add(&pool->pin, 1);
1242 #else
1243                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1244                 _InterlockedIncrement16 (&pool->pin);
1245 #endif
1246                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1247                 return pool;
1248         }
1249
1250         // allocate a new pool node
1251         // and add to hash table
1252
1253 #ifdef unix
1254         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1255 #else
1256         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1257 #endif
1258
1259         if( ++slot < bt->mgr->poolmax ) {
1260                 pool = bt->mgr->pool + slot;
1261                 pool->slot = slot;
1262
1263                 if( bt_mapsegment(bt, pool, page_no) )
1264                         return NULL;
1265
1266                 bt_linkhash(bt, pool, page_no, hashidx);
1267                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1268                 return pool;
1269         }
1270
1271         // pool table is full
1272         //      find best pool entry to evict
1273
1274 #ifdef unix
1275         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1276 #else
1277         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1278 #endif
1279
1280         while( 1 ) {
1281 #ifdef unix
1282                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1283 #else
1284                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1285 #endif
1286                 victim %= bt->mgr->poolmax;
1287                 pool = bt->mgr->pool + victim;
1288                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1289
1290                 if( !victim )
1291                         continue;
1292
1293                 // try to get write lock
1294                 //      skip entry if not obtained
1295
1296                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1297                         continue;
1298
1299                 //      skip this entry if
1300                 //      page is pinned
1301                 //  or clock bit is set
1302
1303                 if( pool->pin ) {
1304 #ifdef unix
1305                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1306 #else
1307                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1308 #endif
1309                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1310                         continue;
1311                 }
1312
1313                 // unlink victim pool node from hash table
1314
1315                 if( node = pool->hashprev )
1316                         node->hashnext = pool->hashnext;
1317                 else if( node = pool->hashnext )
1318                         bt->mgr->hash[idx] = node->slot;
1319                 else
1320                         bt->mgr->hash[idx] = 0;
1321
1322                 if( node = pool->hashnext )
1323                         node->hashprev = pool->hashprev;
1324
1325                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1326
1327                 //      remove old file mapping
1328 #ifdef unix
1329                 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1330 #else
1331 //              FlushViewOfFile(pool->map, 0);
1332                 UnmapViewOfFile(pool->map);
1333                 CloseHandle(pool->hmap);
1334 #endif
1335                 pool->map = NULL;
1336
1337                 //  create new pool mapping
1338                 //  and link into hash table
1339
1340                 if( bt_mapsegment(bt, pool, page_no) )
1341                         return NULL;
1342
1343                 bt_linkhash(bt, pool, page_no, hashidx);
1344                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1345                 return pool;
1346         }
1347 }
1348
1349 // place write, read, or parent lock on requested page_no.
1350
1351 void bt_lockpage(BtLock mode, BtLatchSet *set)
1352 {
1353         switch( mode ) {
1354         case BtLockRead:
1355                 ReadLock (set->readwr);
1356                 break;
1357         case BtLockWrite:
1358                 WriteLock (set->readwr);
1359                 break;
1360         case BtLockAccess:
1361                 ReadLock (set->access);
1362                 break;
1363         case BtLockDelete:
1364                 WriteLock (set->access);
1365                 break;
1366         case BtLockParent:
1367                 WriteLock (set->parent);
1368                 break;
1369         }
1370 }
1371
1372 // remove write, read, or parent lock on requested page
1373
1374 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1375 {
1376         switch( mode ) {
1377         case BtLockRead:
1378                 ReadRelease (set->readwr);
1379                 break;
1380         case BtLockWrite:
1381                 WriteRelease (set->readwr);
1382                 break;
1383         case BtLockAccess:
1384                 ReadRelease (set->access);
1385                 break;
1386         case BtLockDelete:
1387                 WriteRelease (set->access);
1388                 break;
1389         case BtLockParent:
1390                 WriteRelease (set->parent);
1391                 break;
1392         }
1393 }
1394
1395 //      allocate a new page
1396
1397 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
1398 {
1399 int blk;
1400
1401         //      lock allocation page
1402
1403         bt_spinwritelock(bt->mgr->latchmgr->lock);
1404
1405         // use empty chain first
1406         // else allocate empty page
1407
1408         if( set->page_no = bt_getid(bt->mgr->latchmgr->chain) ) {
1409                 if( set->pool = bt_pinpool (bt, set->page_no) )
1410                         set->page = bt_page (bt, set->pool, set->page_no);
1411                 else
1412                         return bt->err = BTERR_struct;
1413
1414                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1415         } else {
1416                 set->page_no = bt_getid(bt->mgr->latchmgr->alloc->right);
1417                 bt_putid(bt->mgr->latchmgr->alloc->right, set->page_no+1);
1418 #ifdef unix
1419                 // if writing first page of pool block, set file length thru last page
1420
1421                 if( (set->page_no & bt->mgr->poolmask) == 0 )
1422                         ftruncate (bt->mgr->idx, (set->page_no + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1423 #endif
1424                 if( set->pool = bt_pinpool (bt, set->page_no) )
1425                         set->page = bt_page (bt, set->pool, set->page_no);
1426                 else
1427                         return bt->err;
1428         }
1429
1430         // unlock allocation latch
1431
1432         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1433         return 0;
1434 }
1435
1436 //  find slot in page for given key at a given level
1437
1438 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1439 {
1440 uint diff, higher = set->page->cnt, low = 1, slot;
1441 uint good = 0;
1442
1443         //        make stopper key an infinite fence value
1444
1445         if( bt_getid (set->page->right) )
1446                 higher++;
1447         else
1448                 good++;
1449
1450         //      low is the lowest candidate.
1451         //  loop ends when they meet
1452
1453         //  higher is already
1454         //      tested as .ge. the passed key.
1455
1456         while( diff = higher - low ) {
1457                 slot = low + ( diff >> 1 );
1458                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1459                         low = slot + 1;
1460                 else
1461                         higher = slot, good++;
1462         }
1463
1464         //      return zero if key is on right link page
1465
1466         return good ? higher : 0;
1467 }
1468
1469 //  find and load page at given level for given key
1470 //      leave page rd or wr locked as requested
1471
1472 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1473 {
1474 uid page_no = ROOT_page, prevpage = 0;
1475 uint drill = 0xff, slot;
1476 BtLatchSet *prevlatch;
1477 uint mode, prevmode;
1478 BtPool *prevpool;
1479
1480   //  start at root of btree and drill down
1481
1482   do {
1483         // determine lock mode of drill level
1484         mode = (drill == lvl) ? lock : BtLockRead; 
1485
1486         set->latch = bt_pinlatch (bt, page_no);
1487         set->page_no = page_no;
1488
1489         // pin page contents
1490
1491         if( set->pool = bt_pinpool (bt, page_no) )
1492                 set->page = bt_page (bt, set->pool, page_no);
1493         else
1494                 return 0;
1495
1496         // obtain access lock using lock chaining with Access mode
1497
1498         if( page_no > ROOT_page )
1499           bt_lockpage(BtLockAccess, set->latch);
1500
1501         //      release & unpin parent page
1502
1503         if( prevpage ) {
1504           bt_unlockpage(prevmode, prevlatch);
1505           bt_unpinlatch (prevlatch);
1506           bt_unpinpool (prevpool);
1507           prevpage = 0;
1508         }
1509
1510         // obtain read lock using lock chaining
1511
1512         bt_lockpage(mode, set->latch);
1513
1514         if( set->page->free )
1515                 return bt->err = BTERR_struct, 0;
1516
1517         if( page_no > ROOT_page )
1518           bt_unlockpage(BtLockAccess, set->latch);
1519
1520         // re-read and re-lock root after determining actual level of root
1521
1522         if( set->page->lvl != drill) {
1523                 if( set->page_no != ROOT_page )
1524                         return bt->err = BTERR_struct, 0;
1525                         
1526                 drill = set->page->lvl;
1527
1528                 if( lock != BtLockRead && drill == lvl ) {
1529                   bt_unlockpage(mode, set->latch);
1530                   bt_unpinlatch (set->latch);
1531                   bt_unpinpool (set->pool);
1532                   continue;
1533                 }
1534         }
1535
1536         prevpage = set->page_no;
1537         prevlatch = set->latch;
1538         prevpool = set->pool;
1539         prevmode = mode;
1540
1541         //  find key on page at this level
1542         //  and descend to requested level
1543
1544         if( set->page->kill )
1545           goto slideright;
1546
1547         if( slot = bt_findslot (set, key, len) ) {
1548           if( drill == lvl )
1549                 return slot;
1550
1551           while( slotptr(set->page, slot)->dead )
1552                 if( slot++ < set->page->cnt )
1553                         continue;
1554                 else
1555                         goto slideright;
1556
1557           page_no = bt_getid(valptr(set->page, slot)->value);
1558           drill--;
1559           continue;
1560         }
1561
1562         //  or slide right into next page
1563
1564 slideright:
1565         page_no = bt_getid(set->page->right);
1566
1567   } while( page_no );
1568
1569   // return error on end of right chain
1570
1571   bt->err = BTERR_struct;
1572   return 0;     // return error
1573 }
1574
1575 //      return page to free list
1576 //      page must be delete & write locked
1577
1578 void bt_freepage (BtDb *bt, BtPageSet *set)
1579 {
1580         //      lock allocation page
1581
1582         bt_spinwritelock (bt->mgr->latchmgr->lock);
1583
1584         //      store chain
1585         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1586         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1587         set->page->free = 1;
1588
1589         // unlock released page
1590
1591         bt_unlockpage (BtLockDelete, set->latch);
1592         bt_unlockpage (BtLockWrite, set->latch);
1593         bt_unpinlatch (set->latch);
1594         bt_unpinpool (set->pool);
1595
1596         // unlock allocation page
1597
1598         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1599 }
1600
1601 //      a fence key was deleted from a page
1602 //      push new fence value upwards
1603
1604 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1605 {
1606 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1607 unsigned char value[BtId];
1608 BtKey* ptr;
1609 uint idx;
1610
1611         //      remove the old fence value
1612
1613         ptr = keyptr(set->page, set->page->cnt);
1614         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1615         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1616
1617         //  cache new fence value
1618
1619         ptr = keyptr(set->page, set->page->cnt);
1620         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1621
1622         bt_lockpage (BtLockParent, set->latch);
1623         bt_unlockpage (BtLockWrite, set->latch);
1624
1625         //      insert new (now smaller) fence key
1626
1627         bt_putid (value, set->page_no);
1628         ptr = (BtKey*)leftkey;
1629
1630         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1631           return bt->err;
1632
1633         //      now delete old fence key
1634
1635         ptr = (BtKey*)rightkey;
1636
1637         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1638                 return bt->err;
1639
1640         bt_unlockpage (BtLockParent, set->latch);
1641         bt_unpinlatch(set->latch);
1642         bt_unpinpool (set->pool);
1643         return 0;
1644 }
1645
1646 //      root has a single child
1647 //      collapse a level from the tree
1648
1649 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1650 {
1651 BtPageSet child[1];
1652 uint idx;
1653
1654   // find the child entry and promote as new root contents
1655
1656   do {
1657         for( idx = 0; idx++ < root->page->cnt; )
1658           if( !slotptr(root->page, idx)->dead )
1659                 break;
1660
1661         child->page_no = bt_getid (valptr(root->page, idx)->value);
1662
1663         child->latch = bt_pinlatch (bt, child->page_no);
1664         bt_lockpage (BtLockDelete, child->latch);
1665         bt_lockpage (BtLockWrite, child->latch);
1666
1667         if( child->pool = bt_pinpool (bt, child->page_no) )
1668                 child->page = bt_page (bt, child->pool, child->page_no);
1669         else
1670                 return bt->err;
1671
1672         memcpy (root->page, child->page, bt->mgr->page_size);
1673         bt_freepage (bt, child);
1674
1675   } while( root->page->lvl > 1 && root->page->act == 1 );
1676
1677   bt_unlockpage (BtLockWrite, root->latch);
1678   bt_unpinlatch (root->latch);
1679   bt_unpinpool (root->pool);
1680   return 0;
1681 }
1682
1683 //      maintain reverse scan pointers by
1684 //      linking left pointer of far right node
1685
1686 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1687 {
1688 BtPageSet right2[1];
1689
1690         //      keep track of rightmost leaf page
1691
1692         if( !right_page_no ) {
1693           bt_putid (bt->mgr->latchmgr->alloc->left, left_page_no);
1694           return 0;
1695         }
1696
1697         //      link right page to left page
1698
1699         right2->latch = bt_pinlatch (bt, right_page_no);
1700         bt_lockpage (BtLockWrite, right2->latch);
1701
1702         if( right2->pool = bt_pinpool (bt, right_page_no) )
1703                 right2->page = bt_page (bt, right2->pool, right_page_no);
1704         else
1705                 return bt->err;
1706
1707         bt_putid(right2->page->left, left_page_no);
1708         bt_unlockpage (BtLockWrite, right2->latch);
1709         bt_unpinlatch (right2->latch);
1710         return 0;
1711 }
1712
1713 //  find and delete key on page by marking delete flag bit
1714 //  if page becomes empty, delete it from the btree
1715
1716 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1717 {
1718 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1719 BtPageSet set[1], right[1], right2[1];
1720 uint slot, idx, found, fence;
1721 unsigned char value[BtId];
1722 BtKey *ptr, *tst;
1723 BtVal *val;
1724
1725         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1726                 ptr = keyptr(set->page, slot);
1727         else
1728                 return bt->err;
1729
1730         // if librarian slot, advance to real slot
1731
1732         if( slotptr(set->page, slot)->type == Librarian )
1733                 ptr = keyptr(set->page, ++slot);
1734
1735         fence = slot == set->page->cnt;
1736
1737         // if key is found delete it, otherwise ignore request
1738
1739         if( found = !keycmp (ptr, key, len) )
1740           if( found = slotptr(set->page, slot)->dead == 0 ) {
1741                 val = valptr(set->page,slot);
1742                 slotptr(set->page, slot)->dead = 1;
1743                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1744                 set->page->act--;
1745
1746                 // collapse empty slots beneath the fence
1747
1748                 while( idx = set->page->cnt - 1 )
1749                   if( slotptr(set->page, idx)->dead ) {
1750                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1751                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1752                   } else
1753                         break;
1754           }
1755
1756         //      did we delete a fence key in an upper level?
1757
1758         if( found && lvl && set->page->act && fence )
1759           if( bt_fixfence (bt, set, lvl) )
1760                 return bt->err;
1761           else
1762                 return bt->found = found, 0;
1763
1764         //      do we need to collapse root?
1765
1766         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1767           if( bt_collapseroot (bt, set) )
1768                 return bt->err;
1769           else
1770                 return bt->found = found, 0;
1771
1772         //      return if page is not empty
1773
1774         if( set->page->act ) {
1775                 bt_unlockpage(BtLockWrite, set->latch);
1776                 bt_unpinlatch (set->latch);
1777                 bt_unpinpool (set->pool);
1778                 return bt->found = found, 0;
1779         }
1780
1781         //      cache copy of fence key
1782         //      to post in parent
1783
1784         ptr = keyptr(set->page, set->page->cnt);
1785         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1786
1787         //      obtain lock on right page
1788
1789         right->page_no = bt_getid(set->page->right);
1790         right->latch = bt_pinlatch (bt, right->page_no);
1791         bt_lockpage (BtLockWrite, right->latch);
1792
1793         // pin page contents
1794
1795         if( right->pool = bt_pinpool (bt, right->page_no) )
1796                 right->page = bt_page (bt, right->pool, right->page_no);
1797         else
1798                 return bt->err;
1799
1800         if( right->page->kill )
1801                 return bt->err = BTERR_struct;
1802
1803         // transfer left link
1804
1805         memcpy (right->page->left, set->page->left, BtId);
1806
1807         // pull contents of right peer into our empty page
1808
1809         memcpy (set->page, right->page, bt->mgr->page_size);
1810
1811         // update left link
1812
1813         if( !lvl )
1814           if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1815                 return bt->err;
1816
1817         // cache copy of key to update
1818
1819         ptr = keyptr(right->page, right->page->cnt);
1820         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1821
1822         // mark right page deleted and point it to left page
1823         //      until we can post parent updates
1824
1825         bt_putid (right->page->right, set->page_no);
1826         right->page->kill = 1;
1827
1828         bt_lockpage (BtLockParent, right->latch);
1829         bt_unlockpage (BtLockWrite, right->latch);
1830
1831         bt_lockpage (BtLockParent, set->latch);
1832         bt_unlockpage (BtLockWrite, set->latch);
1833
1834         // redirect higher key directly to our new node contents
1835
1836         bt_putid (value, set->page_no);
1837         ptr = (BtKey*)higherfence;
1838
1839         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1840           return bt->err;
1841
1842         //      delete old lower key to our node
1843
1844         ptr = (BtKey*)lowerfence;
1845
1846         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1847           return bt->err;
1848
1849         //      obtain delete and write locks to right node
1850
1851         bt_unlockpage (BtLockParent, right->latch);
1852         bt_lockpage (BtLockDelete, right->latch);
1853         bt_lockpage (BtLockWrite, right->latch);
1854         bt_freepage (bt, right);
1855
1856         bt_unlockpage (BtLockParent, set->latch);
1857         bt_unpinlatch (set->latch);
1858         bt_unpinpool (set->pool);
1859         bt->found = found;
1860         return 0;
1861 }
1862
1863 BtKey *bt_foundkey (BtDb *bt)
1864 {
1865         return (BtKey*)(bt->key);
1866 }
1867
1868 //      advance to next slot
1869
1870 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1871 {
1872 BtLatchSet *prevlatch;
1873 BtPool *prevpool;
1874 uid page_no;
1875
1876         if( slot < set->page->cnt )
1877                 return slot + 1;
1878
1879         prevlatch = set->latch;
1880         prevpool = set->pool;
1881
1882         if( page_no = bt_getid(set->page->right) )
1883                 set->latch = bt_pinlatch (bt, page_no);
1884         else
1885                 return bt->err = BTERR_struct, 0;
1886
1887         // pin page contents
1888
1889         if( set->pool = bt_pinpool (bt, page_no) )
1890                 set->page = bt_page (bt, set->pool, page_no);
1891         else
1892                 return 0;
1893
1894         // obtain access lock using lock chaining with Access mode
1895
1896         bt_lockpage(BtLockAccess, set->latch);
1897
1898         bt_unlockpage(BtLockRead, prevlatch);
1899         bt_unpinlatch (prevlatch);
1900         bt_unpinpool (prevpool);
1901
1902         bt_lockpage(BtLockRead, set->latch);
1903         bt_unlockpage(BtLockAccess, set->latch);
1904
1905         set->page_no = page_no;
1906         return 1;
1907 }
1908
1909 //      find unique key or first duplicate key in
1910 //      leaf level and return number of value bytes
1911 //      or (-1) if not found.  Setup key for bt_foundkey
1912
1913 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1914 {
1915 BtPageSet set[1];
1916 uint len, slot;
1917 int ret = -1;
1918 BtKey *ptr;
1919 BtVal *val;
1920
1921   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1922    do {
1923         ptr = keyptr(set->page, slot);
1924
1925         //      skip librarian slot place holder
1926
1927         if( slotptr(set->page, slot)->type == Librarian )
1928                 ptr = keyptr(set->page, ++slot);
1929
1930         //      return actual key found
1931
1932         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1933         len = ptr->len;
1934
1935         if( slotptr(set->page, slot)->type == Duplicate )
1936                 len -= BtId;
1937
1938         // if key exists, return >= 0 value bytes copied
1939         //      otherwise return (-1)
1940
1941         if( slotptr(set->page, slot)->dead )
1942                 continue;
1943
1944         if( keylen == len )
1945           if( !memcmp (ptr->key, key, len) ) {
1946                 val = valptr (set->page,slot);
1947                 if( valmax > val->len )
1948                         valmax = val->len;
1949                 memcpy (value, val->value, valmax);
1950                 ret = valmax;
1951           }
1952
1953         break;
1954
1955    } while( slot = bt_findnext (bt, set, slot) );
1956
1957   bt_unlockpage (BtLockRead, set->latch);
1958   bt_unpinlatch (set->latch);
1959   bt_unpinpool (set->pool);
1960   return ret;
1961 }
1962
1963 //      check page for space available,
1964 //      clean if necessary and return
1965 //      0 - page needs splitting
1966 //      >0  new slot value
1967
1968 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1969 {
1970 uint nxt = bt->mgr->page_size;
1971 uint cnt = 0, idx = 0;
1972 uint max = page->cnt;
1973 uint newslot = max;
1974 BtKey *key;
1975 BtVal *val;
1976
1977         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1978                 return slot;
1979
1980         //      skip cleanup and proceed to split
1981         //      if there's not enough garbage
1982         //      to bother with.
1983
1984         if( page->garbage < nxt / 5 )
1985                 return 0;
1986
1987         memcpy (bt->frame, page, bt->mgr->page_size);
1988
1989         // skip page info and set rest of page to zero
1990
1991         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1992         page->garbage = 0;
1993         page->act = 0;
1994
1995         // clean up page first by
1996         // removing deleted keys
1997
1998         while( cnt++ < max ) {
1999                 if( cnt == slot )
2000                         newslot = idx + 2;
2001                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
2002                         continue;
2003
2004                 // copy the value across
2005
2006                 val = valptr(bt->frame, cnt);
2007                 nxt -= val->len + sizeof(BtVal);
2008                 ((unsigned char *)page)[nxt] = val->len;
2009                 memcpy ((unsigned char *)page + nxt + sizeof(BtVal), val->value, val->len);
2010
2011                 // copy the key across
2012
2013                 key = keyptr(bt->frame, cnt);
2014                 nxt -= key->len + sizeof(BtKey);
2015                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2016
2017                 // make a librarian slot
2018
2019                 if( idx ) {
2020                         slotptr(page, ++idx)->off = nxt;
2021                         slotptr(page, idx)->type = Librarian;
2022                         slotptr(page, idx)->dead = 1;
2023                 }
2024
2025                 // set up the slot
2026
2027                 slotptr(page, ++idx)->off = nxt;
2028                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2029
2030                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2031                         page->act++;
2032         }
2033
2034         page->min = nxt;
2035         page->cnt = idx;
2036
2037         //      see if page has enough space now, or does it need splitting?
2038
2039         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2040                 return newslot;
2041
2042         return 0;
2043 }
2044
2045 // split the root and raise the height of the btree
2046
2047 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
2048 {
2049 uint nxt = bt->mgr->page_size;
2050 unsigned char value[BtId];
2051 BtPageSet left[1];
2052 BtKey *ptr;
2053 BtVal *val;
2054
2055         //  Obtain an empty page to use, and copy the current
2056         //  root contents into it, e.g. lower keys
2057
2058         if( bt_newpage(bt, left) )
2059                 return bt->err;
2060
2061         memcpy(left->page, root->page, bt->mgr->page_size);
2062         bt_unpinpool (left->pool);
2063
2064         // set left from higher to lower page
2065
2066         bt_putid (right->page->left, left->page_no);
2067         bt_unpinpool (right->pool);
2068
2069         // preserve the page info at the bottom
2070         // of higher keys and set rest to zero
2071
2072         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2073
2074         // insert stopper key at top of newroot page
2075         // and increase the root height
2076
2077         nxt -= BtId + sizeof(BtVal);
2078         bt_putid (value, right->page_no);
2079         val = (BtVal *)((unsigned char *)root->page + nxt);
2080         memcpy (val->value, value, BtId);
2081         val->len = BtId;
2082
2083         nxt -= 2 + sizeof(BtKey);
2084         slotptr(root->page, 2)->off = nxt;
2085         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2086         ptr->len = 2;
2087         ptr->key[0] = 0xff;
2088         ptr->key[1] = 0xff;
2089
2090         // insert lower keys page fence key on newroot page as first key
2091
2092         nxt -= BtId + sizeof(BtVal);
2093         bt_putid (value, left->page_no);
2094         val = (BtVal *)((unsigned char *)root->page + nxt);
2095         memcpy (val->value, value, BtId);
2096         val->len = BtId;
2097
2098         nxt -= leftkey->len + sizeof(BtKey);
2099         slotptr(root->page, 1)->off = nxt;
2100         memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
2101         
2102         bt_putid(root->page->right, 0);
2103         root->page->min = nxt;          // reset lowest used offset and key count
2104         root->page->cnt = 2;
2105         root->page->act = 2;
2106         root->page->lvl++;
2107
2108         // release and unpin root
2109
2110         bt_unlockpage(BtLockWrite, root->latch);
2111         bt_unpinlatch (root->latch);
2112         bt_unpinpool (root->pool);
2113         return 0;
2114 }
2115
2116 //  split already locked full node
2117 //      return unlocked.
2118
2119 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2120 {
2121 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
2122 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2123 unsigned char value[BtId];
2124 uint lvl = set->page->lvl;
2125 BtPageSet right[1];
2126 BtKey *key, *ptr;
2127 BtVal *val, *src;
2128 uid right2;
2129 uint prev;
2130
2131         //  split higher half of keys to bt->frame
2132
2133         memset (bt->frame, 0, bt->mgr->page_size);
2134         max = set->page->cnt;
2135         cnt = max / 2;
2136         idx = 0;
2137
2138         while( cnt++ < max ) {
2139                 if( slotptr(set->page, cnt)->dead && cnt < max )
2140                         continue;
2141                 src = valptr(set->page, cnt);
2142                 nxt -= src->len + sizeof(BtVal);
2143                 val = (BtVal*)((unsigned char *)bt->frame + nxt);
2144                 memcpy (val->value, src->value, src->len);
2145                 val->len = src->len;
2146
2147                 key = keyptr(set->page, cnt);
2148                 nxt -= key->len + sizeof(BtKey);
2149                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2150                 memcpy (ptr, key, key->len + sizeof(BtKey));
2151
2152                 //      add librarian slot
2153
2154                 if( idx ) {
2155                         slotptr(bt->frame, ++idx)->off = nxt;
2156                         slotptr(bt->frame, idx)->type = Librarian;
2157                         slotptr(bt->frame, idx)->dead = 1;
2158                 }
2159
2160                 //  add actual slot
2161
2162                 slotptr(bt->frame, ++idx)->off = nxt;
2163                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2164
2165                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2166                         bt->frame->act++;
2167         }
2168
2169         // remember existing fence key for new page to the right
2170
2171         memcpy (rightkey, key, key->len + sizeof(BtKey));
2172
2173         bt->frame->bits = bt->mgr->page_bits;
2174         bt->frame->min = nxt;
2175         bt->frame->cnt = idx;
2176         bt->frame->lvl = lvl;
2177
2178         // link right node
2179
2180         if( set->page_no > ROOT_page ) {
2181                 bt_putid (bt->frame->right, bt_getid (set->page->right));
2182                 bt_putid(bt->frame->left, set->page_no);
2183         }
2184
2185         //      get new free page and write higher keys to it.
2186
2187         if( bt_newpage(bt, right) )
2188                 return bt->err;
2189
2190         // link left node
2191
2192         if( set->page_no > ROOT_page && !lvl )
2193           if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
2194                 return bt->err;
2195
2196         memcpy (right->page, bt->frame, bt->mgr->page_size);
2197         bt_unpinpool (right->pool);
2198
2199         //      update lower keys to continue in old page
2200
2201         memcpy (bt->frame, set->page, bt->mgr->page_size);
2202         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2203         nxt = bt->mgr->page_size;
2204         set->page->garbage = 0;
2205         set->page->act = 0;
2206         max /= 2;
2207         cnt = 0;
2208         idx = 0;
2209
2210         if( slotptr(bt->frame, max)->type == Librarian )
2211                 max--;
2212
2213         //  assemble page of smaller keys
2214
2215         while( cnt++ < max ) {
2216                 if( slotptr(bt->frame, cnt)->dead )
2217                         continue;
2218                 val = valptr(bt->frame, cnt);
2219                 nxt -= val->len + sizeof(BtVal);
2220                 ((unsigned char *)set->page)[nxt] = val->len;
2221                 memcpy ((unsigned char *)set->page + nxt + sizeof(BtVal), val->value, val->len);
2222
2223                 key = keyptr(bt->frame, cnt);
2224                 nxt -= key->len + sizeof(BtKey);
2225                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2226
2227                 //      add librarian slot
2228
2229                 if( idx ) {
2230                         slotptr(set->page, ++idx)->off = nxt;
2231                         slotptr(set->page, idx)->type = Librarian;
2232                         slotptr(set->page, idx)->dead = 1;
2233                 }
2234
2235                 //      add actual slot
2236
2237                 slotptr(set->page, ++idx)->off = nxt;
2238                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2239                 set->page->act++;
2240         }
2241
2242         // remember fence key for smaller page
2243
2244         memcpy(fencekey, key, key->len + sizeof(BtKey));
2245
2246         bt_putid(set->page->right, right->page_no);
2247         set->page->min = nxt;
2248         set->page->cnt = idx;
2249
2250         // if current page is the root page, split it
2251
2252         if( set->page_no == ROOT_page )
2253                 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2254
2255         // insert new fences in their parent pages
2256
2257         right->latch = bt_pinlatch (bt, right->page_no);
2258         bt_lockpage (BtLockParent, right->latch);
2259
2260         bt_lockpage (BtLockParent, set->latch);
2261         bt_unlockpage (BtLockWrite, set->latch);
2262
2263         // insert new fence for reformulated left block of smaller keys
2264
2265         bt_putid (value, set->page_no);
2266
2267         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2268                 return bt->err;
2269
2270         // switch fence for right block of larger keys to new right page
2271
2272         bt_putid (value, right->page_no);
2273
2274         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2275                 return bt->err;
2276
2277         bt_unlockpage (BtLockParent, set->latch);
2278         bt_unpinlatch (set->latch);
2279         bt_unpinpool (set->pool);
2280
2281         bt_unlockpage (BtLockParent, right->latch);
2282         bt_unpinlatch (right->latch);
2283         return 0;
2284 }
2285
2286 //      install new key and value onto page
2287 //      page must already be checked for
2288 //      adequate space
2289
2290 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2291 {
2292 uint idx, librarian;
2293 BtSlot *node;
2294 BtKey *ptr;
2295 BtVal *val;
2296
2297         //      if found slot > desired slot and previous slot
2298         //      is a librarian slot, use it
2299
2300         if( slot > 1 )
2301           if( slotptr(set->page, slot-1)->type == Librarian )
2302                 slot--;
2303
2304         // copy value onto page
2305
2306         set->page->min -= vallen + sizeof(BtVal);
2307         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2308         memcpy (val->value, value, vallen);
2309         val->len = vallen;
2310
2311         // copy key onto page
2312
2313         set->page->min -= keylen + sizeof(BtKey);
2314         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2315         memcpy (ptr->key, key, keylen);
2316         ptr->len = keylen;
2317         
2318         //      find first empty slot
2319
2320         for( idx = slot; idx < set->page->cnt; idx++ )
2321           if( slotptr(set->page, idx)->dead )
2322                 break;
2323
2324         // now insert key into array before slot
2325
2326         if( idx == set->page->cnt )
2327                 idx += 2, set->page->cnt += 2, librarian = 2;
2328         else
2329                 librarian = 1;
2330
2331         set->page->act++;
2332
2333         while( idx > slot + librarian - 1 )
2334                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2335
2336         //      add librarian slot
2337
2338         if( librarian > 1 ) {
2339                 node = slotptr(set->page, slot++);
2340                 node->off = set->page->min;
2341                 node->type = Librarian;
2342                 node->dead = 1;
2343         }
2344
2345         //      fill in new slot
2346
2347         node = slotptr(set->page, slot);
2348         node->off = set->page->min;
2349         node->type = type;
2350         node->dead = 0;
2351
2352         bt_unlockpage (BtLockWrite, set->latch);
2353         bt_unpinlatch (set->latch);
2354         bt_unpinpool (set->pool);
2355         return 0;
2356 }
2357
2358 //  Insert new key into the btree at given level.
2359 //      either add a new key or update/add an existing one
2360
2361 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2362 {
2363 unsigned char newkey[BT_keyarray];
2364 uint slot, idx, len;
2365 BtPageSet set[1];
2366 BtKey *ptr, *ins;
2367 uid sequence;
2368 BtVal *val;
2369 uint type;
2370
2371   // set up the key we're working on
2372
2373   ins = (BtKey*)newkey;
2374   memcpy (ins->key, key, keylen);
2375   ins->len = keylen;
2376
2377   // is this a non-unique index value?
2378
2379   if( unique )
2380         type = Unique;
2381   else {
2382         type = Duplicate;
2383         sequence = bt_newdup (bt);
2384         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2385         ins->len += BtId;
2386   }
2387
2388   while( 1 ) { // find the page and slot for the current key
2389         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2390                 ptr = keyptr(set->page, slot);
2391         else {
2392                 if( !bt->err )
2393                         bt->err = BTERR_ovflw;
2394                 return bt->err;
2395         }
2396
2397         // if librarian slot == found slot, advance to real slot
2398
2399         if( slotptr(set->page, slot)->type == Librarian )
2400           if( !keycmp (ptr, key, keylen) )
2401                 ptr = keyptr(set->page, ++slot);
2402
2403         len = ptr->len;
2404
2405         if( slotptr(set->page, slot)->type == Duplicate )
2406                 len -= BtId;
2407
2408         //  if inserting a duplicate key or unique key
2409         //      check for adequate space on the page
2410         //      and insert the new key before slot.
2411
2412         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2413           if( !(slot = bt_cleanpage (bt, set->page, ins->len, slot, vallen)) )
2414                 if( bt_splitpage (bt, set) )
2415                   return bt->err;
2416                 else
2417                   continue;
2418
2419           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2420         }
2421
2422         // if key already exists, update value and return
2423
2424         val = valptr(set->page, slot);
2425
2426         if( val->len >= vallen ) {
2427                 if( slotptr(set->page, slot)->dead )
2428                         set->page->act++;
2429                 set->page->garbage += val->len - vallen;
2430                 slotptr(set->page, slot)->dead = 0;
2431                 val->len = vallen;
2432                 memcpy (val->value, value, vallen);
2433                 bt_unlockpage(BtLockWrite, set->latch);
2434                 bt_unpinlatch (set->latch);
2435                 bt_unpinpool (set->pool);
2436                 return 0;
2437         }
2438
2439         //      new update value doesn't fit in existing value area
2440
2441         if( !slotptr(set->page, slot)->dead )
2442                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2443         else {
2444                 slotptr(set->page, slot)->dead = 0;
2445                 set->page->act++;
2446         }
2447
2448         if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2449           if( bt_splitpage (bt, set) )
2450                 return bt->err;
2451           else
2452                 continue;
2453
2454         set->page->min -= vallen + sizeof(BtVal);
2455         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2456         memcpy (val->value, value, vallen);
2457         val->len = vallen;
2458
2459         set->page->min -= keylen + sizeof(BtKey);
2460         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2461         memcpy (ptr->key, key, keylen);
2462         ptr->len = keylen;
2463         
2464         slotptr(set->page, slot)->off = set->page->min;
2465         bt_unlockpage(BtLockWrite, set->latch);
2466         bt_unpinlatch (set->latch);
2467         bt_unpinpool (set->pool);
2468         return 0;
2469   }
2470   return 0;
2471 }
2472
2473 //      set cursor to highest slot on highest page
2474
2475 uint bt_lastkey (BtDb *bt)
2476 {
2477 uid page_no = bt_getid (bt->mgr->latchmgr->alloc->left);
2478 BtPageSet set[1];
2479 uint slot;
2480
2481         if( set->pool = bt_pinpool (bt, page_no) )
2482                 set->page = bt_page (bt, set->pool, page_no);
2483         else
2484                 return 0;
2485
2486         set->latch = bt_pinlatch (bt, page_no);
2487     bt_lockpage(BtLockRead, set->latch);
2488
2489         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2490         slot = set->page->cnt;
2491
2492     bt_unlockpage(BtLockRead, set->latch);
2493         bt_unpinlatch (set->latch);
2494         bt_unpinpool (set->pool);
2495
2496         return slot;
2497 }
2498
2499 //      return previous slot on cursor page
2500
2501 uint bt_prevkey (BtDb *bt, uint slot)
2502 {
2503 BtPageSet set[1];
2504 uid left;
2505
2506         if( --slot )
2507                 return slot;
2508
2509         if( left = bt_getid(bt->cursor->left) )
2510                 bt->cursor_page = left;
2511         else
2512                 return 0;
2513
2514         if( set->pool = bt_pinpool (bt, left) )
2515                 set->page = bt_page (bt, set->pool, left);
2516         else
2517                 return 0;
2518
2519         set->latch = bt_pinlatch (bt, left);
2520     bt_lockpage(BtLockRead, set->latch);
2521
2522         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2523
2524         bt_unlockpage(BtLockRead, set->latch);
2525         bt_unpinlatch (set->latch);
2526         bt_unpinpool (set->pool);
2527         return bt->cursor->cnt;
2528 }
2529
2530 //  return next slot on cursor page
2531 //  or slide cursor right into next page
2532
2533 uint bt_nextkey (BtDb *bt, uint slot)
2534 {
2535 BtPageSet set[1];
2536 uid right;
2537
2538   do {
2539         right = bt_getid(bt->cursor->right);
2540
2541         while( slot++ < bt->cursor->cnt )
2542           if( slotptr(bt->cursor,slot)->dead )
2543                 continue;
2544           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2545                 return slot;
2546           else
2547                 break;
2548
2549         if( !right )
2550                 break;
2551
2552         bt->cursor_page = right;
2553
2554         if( set->pool = bt_pinpool (bt, right) )
2555                 set->page = bt_page (bt, set->pool, right);
2556         else
2557                 return 0;
2558
2559         set->latch = bt_pinlatch (bt, right);
2560     bt_lockpage(BtLockRead, set->latch);
2561
2562         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2563
2564         bt_unlockpage(BtLockRead, set->latch);
2565         bt_unpinlatch (set->latch);
2566         bt_unpinpool (set->pool);
2567         slot = 0;
2568
2569   } while( 1 );
2570
2571   return bt->err = 0;
2572 }
2573
2574 //  cache page of keys into cursor and return starting slot for given key
2575
2576 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2577 {
2578 BtPageSet set[1];
2579 uint slot;
2580
2581         // cache page for retrieval
2582
2583         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2584           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2585         else
2586           return 0;
2587
2588         bt->cursor_page = set->page_no;
2589
2590         bt_unlockpage(BtLockRead, set->latch);
2591         bt_unpinlatch (set->latch);
2592         bt_unpinpool (set->pool);
2593         return slot;
2594 }
2595
2596 BtKey *bt_key(BtDb *bt, uint slot)
2597 {
2598         return keyptr(bt->cursor, slot);
2599 }
2600
2601 BtVal *bt_val(BtDb *bt, uint slot)
2602 {
2603         return valptr(bt->cursor,slot);
2604 }
2605
2606 #ifdef STANDALONE
2607
2608 #ifndef unix
2609 double getCpuTime(int type)
2610 {
2611 FILETIME crtime[1];
2612 FILETIME xittime[1];
2613 FILETIME systime[1];
2614 FILETIME usrtime[1];
2615 SYSTEMTIME timeconv[1];
2616 double ans = 0;
2617
2618         memset (timeconv, 0, sizeof(SYSTEMTIME));
2619
2620         switch( type ) {
2621         case 0:
2622                 GetSystemTimeAsFileTime (xittime);
2623                 FileTimeToSystemTime (xittime, timeconv);
2624                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2625                 break;
2626         case 1:
2627                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2628                 FileTimeToSystemTime (usrtime, timeconv);
2629                 break;
2630         case 2:
2631                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2632                 FileTimeToSystemTime (systime, timeconv);
2633                 break;
2634         }
2635
2636         ans += (double)timeconv->wHour * 3600;
2637         ans += (double)timeconv->wMinute * 60;
2638         ans += (double)timeconv->wSecond;
2639         ans += (double)timeconv->wMilliseconds / 1000;
2640         return ans;
2641 }
2642 #else
2643 #include <time.h>
2644 #include <sys/resource.h>
2645
2646 double getCpuTime(int type)
2647 {
2648 struct rusage used[1];
2649 struct timeval tv[1];
2650
2651         switch( type ) {
2652         case 0:
2653                 gettimeofday(tv, NULL);
2654                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2655
2656         case 1:
2657                 getrusage(RUSAGE_SELF, used);
2658                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2659
2660         case 2:
2661                 getrusage(RUSAGE_SELF, used);
2662                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2663         }
2664
2665         return 0;
2666 }
2667 #endif
2668
2669 uint bt_latchaudit (BtDb *bt)
2670 {
2671 ushort idx, hashidx;
2672 uid next, page_no;
2673 BtLatchSet *latch;
2674 uint cnt = 0;
2675 BtKey *ptr;
2676
2677 #ifdef unix
2678         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2679 #endif
2680         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2681                 fprintf(stderr, "Alloc page locked\n");
2682         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2683
2684         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2685                 latch = bt->mgr->latchsets + idx;
2686                 if( *latch->readwr->rin & MASK )
2687                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2688                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2689
2690                 if( *latch->access->rin & MASK )
2691                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2692                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2693
2694                 if( *latch->parent->rin & MASK )
2695                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2696                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2697
2698                 if( latch->pin ) {
2699                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2700                         latch->pin = 0;
2701                 }
2702         }
2703
2704         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2705           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2706                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2707
2708           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2709
2710           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2711                 latch = bt->mgr->latchsets + idx;
2712                 if( *(ushort *)latch->busy )
2713                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2714                 *(ushort *)latch->busy = 0;
2715                 if( latch->pin )
2716                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2717           } while( idx = latch->next );
2718         }
2719
2720         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2721         page_no = LEAF_page;
2722
2723         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2724         off64_t off = page_no << bt->mgr->page_bits;
2725 #ifdef unix
2726                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2727 #else
2728                 DWORD amt[1];
2729
2730                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2731
2732                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2733                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2734
2735                   if( *amt <  bt->mgr->page_size )
2736                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2737 #endif
2738                 if( !bt->frame->free ) {
2739                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2740                   ptr = keyptr(bt->frame, idx+1);
2741                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2742                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2743                  }
2744                  if( !bt->frame->lvl )
2745                         cnt += bt->frame->act;
2746                 }
2747                 if( page_no > LEAF_page )
2748                         next = page_no + 1;
2749                 page_no = next;
2750         }
2751         return cnt - 1;
2752 }
2753
2754 typedef struct {
2755         char idx;
2756         char *type;
2757         char *infile;
2758         BtMgr *mgr;
2759         int num;
2760 } ThreadArg;
2761
2762 //  standalone program to index file of keys
2763 //  then list them onto std-out
2764
2765 #ifdef unix
2766 void *index_file (void *arg)
2767 #else
2768 uint __stdcall index_file (void *arg)
2769 #endif
2770 {
2771 int line = 0, found = 0, cnt = 0, unique;
2772 uid next, page_no = LEAF_page;  // start on first page of leaves
2773 unsigned char key[BT_maxkey];
2774 ThreadArg *args = arg;
2775 int ch, len = 0, slot;
2776 BtPageSet set[1];
2777 BtKey *ptr;
2778 BtVal *val;
2779 BtDb *bt;
2780 FILE *in;
2781
2782         bt = bt_open (args->mgr);
2783
2784         unique = (args->type[1] | 0x20) != 'd';
2785
2786         switch(args->type[0] | 0x20)
2787         {
2788         case 'a':
2789                 fprintf(stderr, "started latch mgr audit\n");
2790                 cnt = bt_latchaudit (bt);
2791                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2792                 break;
2793
2794         case 'p':
2795                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2796                 if( in = fopen (args->infile, "rb") )
2797                   while( ch = getc(in), ch != EOF )
2798                         if( ch == '\n' )
2799                         {
2800                           line++;
2801
2802                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2803                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2804                           len = 0;
2805                         }
2806                         else if( len < BT_maxkey )
2807                                 key[len++] = ch;
2808                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2809                 break;
2810
2811         case 'w':
2812                 fprintf(stderr, "started indexing for %s\n", args->infile);
2813                 if( in = fopen (args->infile, "rb") )
2814                   while( ch = getc(in), ch != EOF )
2815                         if( ch == '\n' )
2816                         {
2817                           line++;
2818
2819                           if( args->num == 1 )
2820                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2821
2822                           else if( args->num )
2823                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2824
2825                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2826                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2827                           len = 0;
2828                         }
2829                         else if( len < BT_maxkey )
2830                                 key[len++] = ch;
2831                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2832                 break;
2833
2834         case 'd':
2835                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2836                 if( in = fopen (args->infile, "rb") )
2837                   while( ch = getc(in), ch != EOF )
2838                         if( ch == '\n' )
2839                         {
2840                           line++;
2841                           if( args->num == 1 )
2842                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2843
2844                           else if( args->num )
2845                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2846
2847                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2848                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2849                           ptr = (BtKey*)(bt->key);
2850                           found++;
2851
2852                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2853                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2854                           len = 0;
2855                         }
2856                         else if( len < BT_maxkey )
2857                                 key[len++] = ch;
2858                 fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
2859                 break;
2860
2861         case 'f':
2862                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2863                 if( in = fopen (args->infile, "rb") )
2864                   while( ch = getc(in), ch != EOF )
2865                         if( ch == '\n' )
2866                         {
2867                           line++;
2868                           if( args->num == 1 )
2869                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2870
2871                           else if( args->num )
2872                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2873
2874                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2875                                 found++;
2876                           else if( bt->err )
2877                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2878                           len = 0;
2879                         }
2880                         else if( len < BT_maxkey )
2881                                 key[len++] = ch;
2882                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2883                 break;
2884
2885         case 's':
2886                 fprintf(stderr, "started scanning\n");
2887                 do {
2888                         if( set->pool = bt_pinpool (bt, page_no) )
2889                                 set->page = bt_page (bt, set->pool, page_no);
2890                         else
2891                                 break;
2892                         set->latch = bt_pinlatch (bt, page_no);
2893                         bt_lockpage (BtLockRead, set->latch);
2894                         next = bt_getid (set->page->right);
2895
2896                         for( slot = 0; slot++ < set->page->cnt; )
2897                          if( next || slot < set->page->cnt )
2898                           if( !slotptr(set->page, slot)->dead ) {
2899                                 ptr = keyptr(set->page, slot);
2900                                 len = ptr->len;
2901
2902                             if( slotptr(set->page, slot)->type == Duplicate )
2903                                         len -= BtId;
2904
2905                                 fwrite (ptr->key, len, 1, stdout);
2906                                 val = valptr(set->page, slot);
2907                                 fwrite (val->value, val->len, 1, stdout);
2908                                 fputc ('\n', stdout);
2909                                 cnt++;
2910                            }
2911
2912                         bt_unlockpage (BtLockRead, set->latch);
2913                         bt_unpinlatch (set->latch);
2914                         bt_unpinpool (set->pool);
2915                 } while( page_no = next );
2916
2917                 fprintf(stderr, " Total keys read %d\n", cnt);
2918                 break;
2919
2920         case 'r':
2921                 fprintf(stderr, "started reverse scan\n");
2922                 if( slot = bt_lastkey (bt) )
2923                    while( slot = bt_prevkey (bt, slot) ) {
2924                         if( slotptr(bt->cursor, slot)->dead )
2925                           continue;
2926
2927                         ptr = keyptr(bt->cursor, slot);
2928                         len = ptr->len;
2929
2930                         if( slotptr(bt->cursor, slot)->type == Duplicate )
2931                                 len -= BtId;
2932
2933                         fwrite (ptr->key, len, 1, stdout);
2934                         val = valptr(bt->cursor, slot);
2935                         fwrite (val->value, val->len, 1, stdout);
2936                         fputc ('\n', stdout);
2937                         cnt++;
2938                   }
2939
2940                 fprintf(stderr, " Total keys read %d\n", cnt);
2941                 break;
2942
2943         case 'c':
2944 #ifdef unix
2945                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2946 #endif
2947                 fprintf(stderr, "started counting\n");
2948                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2949                 page_no = LEAF_page;
2950
2951                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2952                 uid off = page_no << bt->mgr->page_bits;
2953 #ifdef unix
2954                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2955 #else
2956                 DWORD amt[1];
2957
2958                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2959
2960                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2961                         return bt->err = BTERR_map;
2962
2963                   if( *amt <  bt->mgr->page_size )
2964                         return bt->err = BTERR_map;
2965 #endif
2966                         if( !bt->frame->free && !bt->frame->lvl )
2967                                 cnt += bt->frame->act;
2968                         if( page_no > LEAF_page )
2969                                 next = page_no + 1;
2970                         page_no = next;
2971                 }
2972                 
2973                 cnt--;  // remove stopper key
2974                 fprintf(stderr, " Total keys read %d\n", cnt);
2975                 break;
2976         }
2977
2978         bt_close (bt);
2979 #ifdef unix
2980         return NULL;
2981 #else
2982         return 0;
2983 #endif
2984 }
2985
2986 typedef struct timeval timer;
2987
2988 int main (int argc, char **argv)
2989 {
2990 int idx, cnt, len, slot, err;
2991 int segsize, bits = 16;
2992 double start, stop;
2993 #ifdef unix
2994 pthread_t *threads;
2995 #else
2996 HANDLE *threads;
2997 #endif
2998 ThreadArg *args;
2999 uint poolsize = 0;
3000 float elapsed;
3001 int num = 0;
3002 char key[1];
3003 BtMgr *mgr;
3004 BtKey *ptr;
3005 BtDb *bt;
3006
3007         if( argc < 3 ) {
3008                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3009                 fprintf (stderr, "  where page_bits is the page size in bits\n");
3010                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
3011                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
3012                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
3013                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3014                 exit(0);
3015         }
3016
3017         start = getCpuTime(0);
3018
3019         if( argc > 3 )
3020                 bits = atoi(argv[3]);
3021
3022         if( argc > 4 )
3023                 poolsize = atoi(argv[4]);
3024
3025         if( !poolsize )
3026                 fprintf (stderr, "Warning: no mapped_pool\n");
3027
3028         if( poolsize > 65535 )
3029                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
3030
3031         if( argc > 5 )
3032                 segsize = atoi(argv[5]);
3033         else
3034                 segsize = 4;    // 16 pages per mmap segment
3035
3036         if( argc > 6 )
3037                 num = atoi(argv[6]);
3038
3039         cnt = argc - 7;
3040 #ifdef unix
3041         threads = malloc (cnt * sizeof(pthread_t));
3042 #else
3043         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3044 #endif
3045         args = malloc (cnt * sizeof(ThreadArg));
3046
3047         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3048
3049         if( !mgr ) {
3050                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3051                 exit (1);
3052         }
3053
3054         //      fire off threads
3055
3056         for( idx = 0; idx < cnt; idx++ ) {
3057                 args[idx].infile = argv[idx + 7];
3058                 args[idx].type = argv[2];
3059                 args[idx].mgr = mgr;
3060                 args[idx].num = num;
3061                 args[idx].idx = idx;
3062 #ifdef unix
3063                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3064                         fprintf(stderr, "Error creating thread %d\n", err);
3065 #else
3066                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3067 #endif
3068         }
3069
3070         //      wait for termination
3071
3072 #ifdef unix
3073         for( idx = 0; idx < cnt; idx++ )
3074                 pthread_join (threads[idx], NULL);
3075 #else
3076         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3077
3078         for( idx = 0; idx < cnt; idx++ )
3079                 CloseHandle(threads[idx]);
3080
3081 #endif
3082         elapsed = getCpuTime(0) - start;
3083         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3084         elapsed = getCpuTime(1);
3085         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3086         elapsed = getCpuTime(2);
3087         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3088
3089         bt_mgrclose (mgr);
3090 }
3091
3092 #endif  //STANDALONE