]> pd.if.org Git - btree/blob - threadskv7.c
Fix small bug in main when there is less t han one input file
[btree] / threadskv7.c
1 // btree version threadskv7 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and atomic non-consistent key insert
9
10 // 17 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 //      maximum number of keys to insert atomically in one call
88
89 #define MAX_atomic              256
90
91 #define BT_maxkey       255             // maximum number of bytes in a key
92 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
93
94 /*
95 There are five lock types for each node in three independent sets: 
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
101 */
102
103 typedef enum{
104         BtLockAccess,
105         BtLockDelete,
106         BtLockRead,
107         BtLockWrite,
108         BtLockParent
109 } BtLock;
110
111 //      definition for phase-fair reader/writer lock implementation
112
113 typedef struct {
114         ushort rin[1];
115         ushort rout[1];
116         ushort ticket[1];
117         ushort serving[1];
118 } RWLock;
119
120 #define PHID 0x1
121 #define PRES 0x2
122 #define MASK 0x3
123 #define RINC 0x4
124
125 //      definition for spin latch implementation
126
127 // exclusive is set for write access
128 // share is count of read accessors
129 // grant write lock when share == 0
130
131 volatile typedef struct {
132         uint exclusive:1;
133         uint pending:1;
134         uint share:30;
135 } BtSpinLatch;
136
137 #define XCL 1
138 #define PEND 2
139 #define BOTH 3
140 #define SHARE 4
141
142 //      The key structure occupies space at the upper end of
143 //      each page.  It's a length byte followed by the key
144 //      bytes.
145
146 typedef struct {
147         unsigned char len;              // this can be changed to a ushort or uint
148         unsigned char key[0];
149 } BtKey;
150
151 //      the value structure also occupies space at the upper
152 //      end of the page. Each key is immediately followed by a value.
153
154 typedef struct {
155         unsigned char len;              // this can be changed to a ushort or uint
156         unsigned char value[0];
157 } BtVal;
158
159 //  hash table entries
160
161 typedef struct {
162         uint slot;                              // Latch table entry at head of chain
163         BtSpinLatch latch[1];
164 } BtHashEntry;
165
166 //      latch manager table structure
167
168 typedef struct {
169         uid page_no;                    // latch set page number
170         RWLock readwr[1];               // read/write page lock
171         RWLock access[1];               // Access Intent/Page delete
172         RWLock parent[1];               // Posting of fence key in parent
173         uint slot;                              // entry slot in latch table
174         uint next;                              // next entry in hash table chain
175         uint prev;                              // prev entry in hash table chain
176         volatile ushort pin;    // number of outstanding threads
177         ushort dirty:1;                 // page in cache is dirty
178 } BtLatchSet;
179
180 //      lock manager table structure
181
182 typedef struct {
183         RWLock readwr[1];               // read/write key lock
184         uint next;
185         uint prev;
186         uint pin;                               // count of readers waiting
187         uint hashidx;                   // hash idx for entry
188         unsigned char key[BT_keyarray];
189 } BtLockSet;
190
191 //      Define the length of the page record numbers
192
193 #define BtId 6
194
195 //      Page key slot definition.
196
197 //      Keys are marked dead, but remain on the page until
198 //      it cleanup is called. The fence key (highest key) for
199 //      a leaf page is always present, even after cleanup.
200
201 //      Slot types
202
203 //      In addition to the Unique keys that occupy slots
204 //      there are Librarian and Duplicate key
205 //      slots occupying the key slot array.
206
207 //      The Librarian slots are dead keys that
208 //      serve as filler, available to add new Unique
209 //      or Dup slots that are inserted into the B-tree.
210
211 //      The Duplicate slots have had their key bytes extended
212 //      by 6 bytes to contain a binary duplicate key uniqueifier.
213
214 typedef enum {
215         Unique,
216         Librarian,
217         Duplicate
218 } BtSlotType;
219
220 typedef struct {
221         uint off:BT_maxbits;    // page offset for key start
222         uint type:3;                    // type of slot
223         uint dead:1;                    // set for deleted slot
224 } BtSlot;
225
226 //      The first part of an index page.
227 //      It is immediately followed
228 //      by the BtSlot array of keys.
229
230 //      note that this structure size
231 //      must be a multiple of 8 bytes
232 //      in order to place dups correctly.
233
234 typedef struct BtPage_ {
235         uint cnt;                                       // count of keys in page
236         uint act;                                       // count of active keys
237         uint min;                                       // next key offset
238         uint garbage;                           // page garbage in bytes
239         unsigned char bits:7;           // page size in bits
240         unsigned char free:1;           // page is on free chain
241         unsigned char lvl:7;            // level of page
242         unsigned char kill:1;           // page is being deleted
243         unsigned char left[BtId];       // page number to left
244         unsigned char filler[2];        // padding to multiple of 8
245         unsigned char right[BtId];      // page number to right
246 } *BtPage;
247
248 //  The loadpage interface object
249
250 typedef struct {
251         uid page_no;            // current page number
252         BtPage page;            // current page pointer
253         BtLatchSet *latch;      // current page latch set
254 } BtPageSet;
255
256 //      structure for latch manager on ALLOC_page
257
258 typedef struct {
259         struct BtPage_ alloc[1];        // next page_no in right ptr
260         unsigned long long dups[1];     // global duplicate key uniqueifier
261         unsigned char chain[BtId];      // head of free page_nos chain
262 } BtPageZero;
263
264 //      The object structure for Btree access
265
266 typedef struct {
267         uint page_size;                         // page size    
268         uint page_bits;                         // page size in bits    
269 #ifdef unix
270         int idx;
271 #else
272         HANDLE idx;
273 #endif
274         BtPageZero *pagezero;           // mapped allocation page
275         BtSpinLatch alloclatch[1];      // allocation area lite latch
276         uint latchdeployed;                     // highest number of pool entries deployed
277         uint nlatchpage;                        // number of latch & lock & pool pages
278         uint latchtotal;                        // number of page latch entries
279         uint latchhash;                         // number of latch hash table slots
280         uint latchvictim;                       // next latch entry to examine
281         BtHashEntry *hashtable;         // the anonymous mapping buffer pool
282         BtLatchSet *latchsets;          // mapped latch set from latch pages
283         unsigned char *pagepool;        // mapped to the buffer pool pages
284         uint lockhash;                          // number of lock hash table slots
285         uint lockfree;                          // next available lock table entry
286         BtSpinLatch locklatch[1];       // lock manager free chain latch
287         BtHashEntry *hashlock;          // the lock manager hash table
288         BtLockSet *locktable;           // the lock manager key table
289 #ifndef unix
290         HANDLE halloc;                          // allocation handle
291         HANDLE hpool;                           // buffer pool handle
292 #endif
293 } BtMgr;
294
295 typedef struct {
296         BtMgr *mgr;                             // buffer manager for thread
297         BtPage cursor;                  // cached frame for start/next (never mapped)
298         BtPage frame;                   // spare frame for the page split (never mapped)
299         uid cursor_page;                                // current cursor page number   
300         unsigned char *mem;                             // frame, cursor, page memory buffer
301         unsigned char key[BT_keyarray]; // last found complete key
302         int found;                              // last delete or insert was found
303         int err;                                // last error
304         int reads, writes;              // number of reads and writes from the btree
305 } BtDb;
306
307 typedef enum {
308         BTERR_ok = 0,
309         BTERR_struct,
310         BTERR_ovflw,
311         BTERR_lock,
312         BTERR_map,
313         BTERR_read,
314         BTERR_wrt,
315         BTERR_hash
316 } BTERR;
317
318 #define CLOCK_bit 0x8000
319
320 // B-Tree functions
321 extern void bt_close (BtDb *bt);
322 extern BtDb *bt_open (BtMgr *mgr);
323 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, int unique);
324 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
325 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
326 extern BtKey *bt_foundkey (BtDb *bt);
327 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
328 extern uint bt_nextkey   (BtDb *bt, uint slot);
329
330 //      manager functions
331 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint locksize);
332 void bt_mgrclose (BtMgr *mgr);
333
334 //  Helper functions to return slot values
335 //      from the cursor page.
336
337 extern BtKey *bt_key (BtDb *bt, uint slot);
338 extern BtVal *bt_val (BtDb *bt, uint slot);
339
340 //  The page is allocated from low and hi ends.
341 //  The key slots are allocated from the bottom,
342 //      while the text and value of the key
343 //  are allocated from the top.  When the two
344 //  areas meet, the page is split into two.
345
346 //  A key consists of a length byte, two bytes of
347 //  index number (0 - 65535), and up to 253 bytes
348 //  of key value.
349
350 //  Associated with each key is a value byte string
351 //      containing any value desired.
352
353 //  The b-tree root is always located at page 1.
354 //      The first leaf page of level zero is always
355 //      located on page 2.
356
357 //      The b-tree pages are linked with next
358 //      pointers to facilitate enumerators,
359 //      and provide for concurrency.
360
361 //      When to root page fills, it is split in two and
362 //      the tree height is raised by a new root at page
363 //      one with two keys.
364
365 //      Deleted keys are marked with a dead bit until
366 //      page cleanup. The fence key for a leaf node is
367 //      always present
368
369 //  To achieve maximum concurrency one page is locked at a time
370 //  as the tree is traversed to find leaf key in question. The right
371 //  page numbers are used in cases where the page is being split,
372 //      or consolidated.
373
374 //  Page 0 is dedicated to lock for new page extensions,
375 //      and chains empty pages together for reuse. It also
376 //      contains the latch manager hash table.
377
378 //      The ParentModification lock on a node is obtained to serialize posting
379 //      or changing the fence key for a node.
380
381 //      Empty pages are chained together through the ALLOC page and reused.
382
383 //      Access macros to address slot and key values from the page
384 //      Page slots use 1 based indexing.
385
386 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
387 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
388 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
389
390 void bt_putid(unsigned char *dest, uid id)
391 {
392 int i = BtId;
393
394         while( i-- )
395                 dest[i] = (unsigned char)id, id >>= 8;
396 }
397
398 uid bt_getid(unsigned char *src)
399 {
400 uid id = 0;
401 int i;
402
403         for( i = 0; i < BtId; i++ )
404                 id <<= 8, id |= *src++; 
405
406         return id;
407 }
408
409 uid bt_newdup (BtDb *bt)
410 {
411 #ifdef unix
412         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
413 #else
414         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
415 #endif
416 }
417
418 //      Phase-Fair reader/writer lock implementation
419
420 void WriteLock (RWLock *lock)
421 {
422 ushort w, r, tix;
423
424 #ifdef unix
425         tix = __sync_fetch_and_add (lock->ticket, 1);
426 #else
427         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
428 #endif
429         // wait for our ticket to come up
430
431         while( tix != lock->serving[0] )
432 #ifdef unix
433                 sched_yield();
434 #else
435                 SwitchToThread ();
436 #endif
437
438         w = PRES | (tix & PHID);
439 #ifdef  unix
440         r = __sync_fetch_and_add (lock->rin, w);
441 #else
442         r = _InterlockedExchangeAdd16 (lock->rin, w);
443 #endif
444         while( r != *lock->rout )
445 #ifdef unix
446                 sched_yield();
447 #else
448                 SwitchToThread();
449 #endif
450 }
451
452 void WriteRelease (RWLock *lock)
453 {
454 #ifdef unix
455         __sync_fetch_and_and (lock->rin, ~MASK);
456 #else
457         _InterlockedAnd16 (lock->rin, ~MASK);
458 #endif
459         lock->serving[0]++;
460 }
461
462 void ReadLock (RWLock *lock)
463 {
464 ushort w;
465 #ifdef unix
466         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
467 #else
468         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
469 #endif
470         if( w )
471           while( w == (*lock->rin & MASK) )
472 #ifdef unix
473                 sched_yield ();
474 #else
475                 SwitchToThread ();
476 #endif
477 }
478
479 void ReadRelease (RWLock *lock)
480 {
481 #ifdef unix
482         __sync_fetch_and_add (lock->rout, RINC);
483 #else
484         _InterlockedExchangeAdd16 (lock->rout, RINC);
485 #endif
486 }
487
488 //      Spin Latch Manager
489
490 //      wait until write lock mode is clear
491 //      and add 1 to the share count
492
493 void bt_spinreadlock(BtSpinLatch *latch)
494 {
495 uint prev;
496
497   do {
498 #ifdef unix
499         prev = __sync_fetch_and_add ((uint *)latch, SHARE);
500 #else
501         prev = _InterlockedExchangeAdd((uint *)latch, SHARE);
502 #endif
503         //  see if exclusive request is granted or pending
504
505         if( !(prev & BOTH) )
506                 return;
507 #ifdef unix
508         prev = __sync_fetch_and_add ((uint *)latch, -SHARE);
509 #else
510         prev = _InterlockedExchangeAdd((uint *)latch, -SHARE);
511 #endif
512 #ifdef  unix
513   } while( sched_yield(), 1 );
514 #else
515   } while( SwitchToThread(), 1 );
516 #endif
517 }
518
519 //      wait for other read and write latches to relinquish
520
521 void bt_spinwritelock(BtSpinLatch *latch)
522 {
523 uint prev;
524
525   do {
526 #ifdef  unix
527         prev = __sync_fetch_and_or((uint *)latch, PEND | XCL);
528 #else
529         prev = _InterlockedOr((uint *)latch, PEND | XCL);
530 #endif
531         if( !(prev & XCL) )
532           if( !(prev & ~BOTH) )
533                 return;
534           else
535 #ifdef unix
536                 __sync_fetch_and_and ((uint *)latch, ~XCL);
537 #else
538                 _InterlockedAnd((uint *)latch, ~XCL);
539 #endif
540 #ifdef  unix
541   } while( sched_yield(), 1 );
542 #else
543   } while( SwitchToThread(), 1 );
544 #endif
545 }
546
547 //      try to obtain write lock
548
549 //      return 1 if obtained,
550 //              0 otherwise
551
552 int bt_spinwritetry(BtSpinLatch *latch)
553 {
554 uint prev;
555
556 #ifdef  unix
557         prev = __sync_fetch_and_or((uint *)latch, XCL);
558 #else
559         prev = _InterlockedOr((uint *)latch, XCL);
560 #endif
561         //      take write access if all bits are clear
562
563         if( !(prev & XCL) )
564           if( !(prev & ~BOTH) )
565                 return 1;
566           else
567 #ifdef unix
568                 __sync_fetch_and_and ((uint *)latch, ~XCL);
569 #else
570                 _InterlockedAnd((uint *)latch, ~XCL);
571 #endif
572         return 0;
573 }
574
575 //      clear write mode
576
577 void bt_spinreleasewrite(BtSpinLatch *latch)
578 {
579 #ifdef unix
580         __sync_fetch_and_and((uint *)latch, ~BOTH);
581 #else
582         _InterlockedAnd((uint *)latch, ~BOTH);
583 #endif
584 }
585
586 //      decrement reader count
587
588 void bt_spinreleaseread(BtSpinLatch *latch)
589 {
590 #ifdef unix
591         __sync_fetch_and_add((uint *)latch, -SHARE);
592 #else
593         _InterlockedExchangeAdd((uint *)latch, -SHARE);
594 #endif
595 }
596
597 //      read page from permanent location in Btree file
598
599 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
600 {
601 off64_t off = page_no << mgr->page_bits;
602
603 #ifdef unix
604         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
605                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
606                 return BTERR_read;
607         }
608 #else
609 OVERLAPPED ovl[1];
610 uint amt[1];
611
612         memset (ovl, 0, sizeof(OVERLAPPED));
613         ovl->Offset = off;
614         ovl->OffsetHigh = off >> 32;
615
616         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
617                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
618                 return BTERR_read;
619         }
620         if( *amt <  mgr->page_size ) {
621                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
622                 return BTERR_read;
623         }
624 #endif
625         return 0;
626 }
627
628 //      write page to permanent location in Btree file
629 //      clear the dirty bit
630
631 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
632 {
633 off64_t off = page_no << mgr->page_bits;
634
635 #ifdef unix
636         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
637                 return BTERR_wrt;
638 #else
639 OVERLAPPED ovl[1];
640 uint amt[1];
641
642         memset (ovl, 0, sizeof(OVERLAPPED));
643         ovl->Offset = off;
644         ovl->OffsetHigh = off >> 32;
645
646         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
647                 return BTERR_wrt;
648
649         if( *amt <  mgr->page_size )
650                 return BTERR_wrt;
651 #endif
652         return 0;
653 }
654
655 //      link latch table entry into head of latch hash table
656
657 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
658 {
659 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
660 BtLatchSet *latch = bt->mgr->latchsets + slot;
661
662         if( latch->next = bt->mgr->hashtable[hashidx].slot )
663                 bt->mgr->latchsets[latch->next].prev = slot;
664
665         bt->mgr->hashtable[hashidx].slot = slot;
666         latch->page_no = page_no;
667         latch->slot = slot;
668         latch->prev = 0;
669         latch->pin = 1;
670
671         if( loadit )
672           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
673                 return bt->err;
674           else
675                 bt->reads++;
676
677         return bt->err = 0;
678 }
679
680 //      set CLOCK bit in latch
681 //      decrement pin count
682
683 void bt_unpinlatch (BtLatchSet *latch)
684 {
685 #ifdef unix
686         if( ~latch->pin & CLOCK_bit )
687                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
688         __sync_fetch_and_add(&latch->pin, -1);
689 #else
690         if( ~latch->pin & CLOCK_bit )
691                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
692         _InterlockedDecrement16 (&latch->pin);
693 #endif
694 }
695
696 //  return the btree cached page address
697
698 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
699 {
700 BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
701
702         return page;
703 }
704
705 //      find existing latchset or inspire new one
706 //      return with latchset pinned
707
708 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
709 {
710 uint hashidx = page_no % bt->mgr->latchhash;
711 BtLatchSet *latch;
712 uint slot, idx;
713 uint lvl, cnt;
714 off64_t off;
715 uint amt[1];
716 BtPage page;
717
718   //  try to find our entry
719
720   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
721
722   if( slot = bt->mgr->hashtable[hashidx].slot ) do
723   {
724         latch = bt->mgr->latchsets + slot;
725         if( page_no == latch->page_no )
726                 break;
727   } while( slot = latch->next );
728
729   //  found our entry
730   //  increment clock
731
732   if( slot ) {
733         latch = bt->mgr->latchsets + slot;
734 #ifdef unix
735         __sync_fetch_and_add(&latch->pin, 1);
736 #else
737         _InterlockedIncrement16 (&latch->pin);
738 #endif
739         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
740         return latch;
741   }
742
743         //  see if there are any unused pool entries
744 #ifdef unix
745         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
746 #else
747         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
748 #endif
749
750         if( slot < bt->mgr->latchtotal ) {
751                 latch = bt->mgr->latchsets + slot;
752                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
753                         return NULL;
754                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
755                 return latch;
756         }
757
758 #ifdef unix
759         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
760 #else
761         _InterlockedDecrement (&bt->mgr->latchdeployed);
762 #endif
763   //  find and reuse previous entry on victim
764
765   while( 1 ) {
766 #ifdef unix
767         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
768 #else
769         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
770 #endif
771         // try to get write lock on hash chain
772         //      skip entry if not obtained
773         //      or has outstanding pins
774
775         slot %= bt->mgr->latchtotal;
776
777         if( !slot )
778                 continue;
779
780         latch = bt->mgr->latchsets + slot;
781         idx = latch->page_no % bt->mgr->latchhash;
782
783         //      see we are on same chain as hashidx
784
785         if( idx == hashidx )
786                 continue;
787
788         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
789                 continue;
790
791         //  skip this slot if it is pinned
792         //      or the CLOCK bit is set
793
794         if( latch->pin ) {
795           if( latch->pin & CLOCK_bit ) {
796 #ifdef unix
797                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
798 #else
799                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
800 #endif
801           }
802           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
803           continue;
804         }
805
806         //  update permanent page area in btree from buffer pool
807
808         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
809
810         if( latch->dirty )
811           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
812                 return NULL;
813           else
814                 latch->dirty = 0, bt->writes++;
815
816         //  unlink our available slot from its hash chain
817
818         if( latch->prev )
819                 bt->mgr->latchsets[latch->prev].next = latch->next;
820         else
821                 bt->mgr->hashtable[idx].slot = latch->next;
822
823         if( latch->next )
824                 bt->mgr->latchsets[latch->next].prev = latch->prev;
825
826         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
827
828         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
829                 return NULL;
830
831         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
832         return latch;
833   }
834 }
835
836 void bt_mgrclose (BtMgr *mgr)
837 {
838 BtLatchSet *latch;
839 uint num = 0;
840 BtPage page;
841 uint slot;
842
843         //      flush dirty pool pages to the btree
844
845         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
846                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
847                 latch = mgr->latchsets + slot;
848
849                 if( latch->dirty ) {
850                         bt_writepage(mgr, page, latch->page_no);
851                         latch->dirty = 0, num++;
852                 }
853 //              madvise (page, mgr->page_size, MADV_DONTNEED);
854         }
855
856         fprintf(stderr, "%d buffer pool pages flushed\n", num);
857
858 #ifdef unix
859         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
860         munmap (mgr->pagezero, mgr->page_size);
861 #else
862         FlushViewOfFile(mgr->pagezero, 0);
863         UnmapViewOfFile(mgr->pagezero);
864         UnmapViewOfFile(mgr->hashtable);
865         CloseHandle(mgr->halloc);
866         CloseHandle(mgr->hpool);
867 #endif
868 #ifdef unix
869         close (mgr->idx);
870         free (mgr);
871 #else
872         FlushFileBuffers(mgr->idx);
873         CloseHandle(mgr->idx);
874         GlobalFree (mgr);
875 #endif
876 }
877
878 //      close and release memory
879
880 void bt_close (BtDb *bt)
881 {
882 #ifdef unix
883         if( bt->mem )
884                 free (bt->mem);
885 #else
886         if( bt->mem)
887                 VirtualFree (bt->mem, 0, MEM_RELEASE);
888 #endif
889         free (bt);
890 }
891
892 //  open/create new btree buffer manager
893
894 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
895 //              size of page pool (e.g. 262144) and number of lock table entries.
896
897 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint lockmax)
898 {
899 uint lvl, attr, last, slot, idx;
900 unsigned char value[BtId];
901 int flag, initit = 0;
902 BtPageZero *pagezero;
903 off64_t size;
904 uint amt[1];
905 BtMgr* mgr;
906 BtKey* key;
907 BtVal *val;
908
909         // determine sanity of page size and buffer pool
910
911         if( bits > BT_maxbits )
912                 bits = BT_maxbits;
913         else if( bits < BT_minbits )
914                 bits = BT_minbits;
915
916         if( nodemax < 16 ) {
917                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
918                 return NULL;
919         }
920
921 #ifdef unix
922         mgr = calloc (1, sizeof(BtMgr));
923
924         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
925
926         if( mgr->idx == -1 ) {
927                 fprintf (stderr, "Unable to open btree file\n");
928                 return free(mgr), NULL;
929         }
930 #else
931         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
932         attr = FILE_ATTRIBUTE_NORMAL;
933         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
934
935         if( mgr->idx == INVALID_HANDLE_VALUE )
936                 return GlobalFree(mgr), NULL;
937 #endif
938
939 #ifdef unix
940         pagezero = valloc (BT_maxpage);
941         *amt = 0;
942
943         // read minimum page size to get root info
944         //      to support raw disk partition files
945         //      check if bits == 0 on the disk.
946
947         if( size = lseek (mgr->idx, 0L, 2) )
948                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
949                         if( pagezero->alloc->bits )
950                                 bits = pagezero->alloc->bits;
951                         else
952                                 initit = 1;
953                 else
954                         return free(mgr), free(pagezero), NULL;
955         else
956                 initit = 1;
957 #else
958         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
959         size = GetFileSize(mgr->idx, amt);
960
961         if( size || *amt ) {
962                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
963                         return bt_mgrclose (mgr), NULL;
964                 bits = pagezero->alloc->bits;
965         } else
966                 initit = 1;
967 #endif
968
969         mgr->page_size = 1 << bits;
970         mgr->page_bits = bits;
971
972         //  calculate number of latch hash table entries
973
974         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
975         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
976
977         //  add on the number of pages in buffer pool
978         //      along with the corresponding latch table
979
980         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
981         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
982         mgr->latchtotal = nodemax;
983
984         //      add on the sizeof the lock manager hash table and the lock table
985
986         mgr->nlatchpage += (lockmax / 16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
987
988         mgr->nlatchpage += (lockmax * sizeof(BtLockSet) + mgr->page_size - 1) / mgr->page_size;
989
990         if( !initit )
991                 goto mgrlatch;
992
993         // initialize an empty b-tree with latch page, root page, page of leaves
994         // and page(s) of latches and page pool cache
995
996         memset (pagezero, 0, 1 << bits);
997         pagezero->alloc->bits = mgr->page_bits;
998         bt_putid(pagezero->alloc->right, MIN_lvl+1);
999
1000         //  initialize left-most LEAF page in
1001         //      alloc->left.
1002
1003         bt_putid (pagezero->alloc->left, LEAF_page);
1004
1005         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1006                 fprintf (stderr, "Unable to create btree page zero\n");
1007                 return bt_mgrclose (mgr), NULL;
1008         }
1009
1010         memset (pagezero, 0, 1 << bits);
1011         pagezero->alloc->bits = mgr->page_bits;
1012
1013         for( lvl=MIN_lvl; lvl--; ) {
1014                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1015                 key = keyptr(pagezero->alloc, 1);
1016                 key->len = 2;           // create stopper key
1017                 key->key[0] = 0xff;
1018                 key->key[1] = 0xff;
1019
1020                 bt_putid(value, MIN_lvl - lvl + 1);
1021                 val = valptr(pagezero->alloc, 1);
1022                 val->len = lvl ? BtId : 0;
1023                 memcpy (val->value, value, val->len);
1024
1025                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1026                 pagezero->alloc->lvl = lvl;
1027                 pagezero->alloc->cnt = 1;
1028                 pagezero->alloc->act = 1;
1029
1030                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1031                         fprintf (stderr, "Unable to create btree page zero\n");
1032                         return bt_mgrclose (mgr), NULL;
1033                 }
1034         }
1035
1036 mgrlatch:
1037 #ifdef unix
1038         free (pagezero);
1039 #else
1040         VirtualFree (pagezero, 0, MEM_RELEASE);
1041 #endif
1042 #ifdef unix
1043         // mlock the pagezero page
1044
1045         flag = PROT_READ | PROT_WRITE;
1046         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1047         if( mgr->pagezero == MAP_FAILED ) {
1048                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1049                 return bt_mgrclose (mgr), NULL;
1050         }
1051         mlock (mgr->pagezero, mgr->page_size);
1052
1053         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1054         if( mgr->hashtable == MAP_FAILED ) {
1055                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1056                 return bt_mgrclose (mgr), NULL;
1057         }
1058 #else
1059         flag = PAGE_READWRITE;
1060         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1061         if( !mgr->halloc ) {
1062                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1063                 return bt_mgrclose (mgr), NULL;
1064         }
1065
1066         flag = FILE_MAP_WRITE;
1067         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1068         if( !mgr->pagezero ) {
1069                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1070                 return bt_mgrclose (mgr), NULL;
1071         }
1072
1073         flag = PAGE_READWRITE;
1074         size = (uid)mgr->nlatchpage << mgr->page_bits;
1075         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1076         if( !mgr->hpool ) {
1077                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1078                 return bt_mgrclose (mgr), NULL;
1079         }
1080
1081         flag = FILE_MAP_WRITE;
1082         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1083         if( !mgr->hashtable ) {
1084                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1085                 return bt_mgrclose (mgr), NULL;
1086         }
1087 #endif
1088
1089         size = (mgr->latchhash * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1090         mgr->latchsets = (BtLatchSet *)((unsigned char *)mgr->hashtable + size * mgr->page_size);
1091         size = (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1092
1093         mgr->pagepool = (unsigned char *)mgr->hashtable + (size << mgr->page_bits);
1094         mgr->hashlock = (BtHashEntry *)(mgr->pagepool + ((uid)nodemax << mgr->page_bits));
1095         mgr->locktable = (BtLockSet *)((unsigned char *)mgr->hashtable + ((uid)mgr->nlatchpage << mgr->page_bits) - lockmax * sizeof(BtLockSet));
1096
1097         mgr->lockfree = lockmax - 1;
1098         mgr->lockhash = ((unsigned char *)mgr->locktable - (unsigned char *)mgr->hashlock) / sizeof(BtHashEntry);
1099
1100         for( idx = 1; idx < lockmax; idx++ )
1101                 mgr->locktable[idx].next = idx - 1;
1102
1103         return mgr;
1104 }
1105
1106 //      open BTree access method
1107 //      based on buffer manager
1108
1109 BtDb *bt_open (BtMgr *mgr)
1110 {
1111 BtDb *bt = malloc (sizeof(*bt));
1112
1113         memset (bt, 0, sizeof(*bt));
1114         bt->mgr = mgr;
1115 #ifdef unix
1116         bt->mem = valloc (2 *mgr->page_size);
1117 #else
1118         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1119 #endif
1120         bt->frame = (BtPage)bt->mem;
1121         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1122         return bt;
1123 }
1124
1125 //  compare two keys, returning > 0, = 0, or < 0
1126 //  as the comparison value
1127
1128 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1129 {
1130 uint len1 = key1->len;
1131 int ans;
1132
1133         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1134                 return ans;
1135
1136         if( len1 > len2 )
1137                 return 1;
1138         if( len1 < len2 )
1139                 return -1;
1140
1141         return 0;
1142 }
1143
1144 // place write, read, or parent lock on requested page_no.
1145
1146 void bt_lockpage(BtLock mode, BtLatchSet *set)
1147 {
1148         switch( mode ) {
1149         case BtLockRead:
1150                 ReadLock (set->readwr);
1151                 break;
1152         case BtLockWrite:
1153                 WriteLock (set->readwr);
1154                 break;
1155         case BtLockAccess:
1156                 ReadLock (set->access);
1157                 break;
1158         case BtLockDelete:
1159                 WriteLock (set->access);
1160                 break;
1161         case BtLockParent:
1162                 WriteLock (set->parent);
1163                 break;
1164         }
1165 }
1166
1167 // remove write, read, or parent lock on requested page
1168
1169 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1170 {
1171         switch( mode ) {
1172         case BtLockRead:
1173                 ReadRelease (set->readwr);
1174                 break;
1175         case BtLockWrite:
1176                 WriteRelease (set->readwr);
1177                 break;
1178         case BtLockAccess:
1179                 ReadRelease (set->access);
1180                 break;
1181         case BtLockDelete:
1182                 WriteRelease (set->access);
1183                 break;
1184         case BtLockParent:
1185                 WriteRelease (set->parent);
1186                 break;
1187         }
1188 }
1189
1190 //      allocate a new page
1191 //      return with page latched.
1192
1193 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1194 {
1195 int blk;
1196
1197         //      lock allocation page
1198
1199         bt_spinwritelock(bt->mgr->alloclatch);
1200
1201         // use empty chain first
1202         // else allocate empty page
1203
1204         if( set->page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1205                 if( set->latch = bt_pinlatch (bt, set->page_no, 1) )
1206                         set->page = bt_mappage (bt, set->latch);
1207                 else
1208                         return bt->err = BTERR_struct, -1;
1209
1210                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1211                 bt_spinreleasewrite(bt->mgr->alloclatch);
1212
1213                 memcpy (set->page, contents, bt->mgr->page_size);
1214                 set->latch->dirty = 1;
1215                 return 0;
1216         }
1217
1218         set->page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1219         bt_putid(bt->mgr->pagezero->alloc->right, set->page_no+1);
1220
1221         // unlock allocation latch
1222
1223         bt_spinreleasewrite(bt->mgr->alloclatch);
1224
1225         //      don't load cache from btree page
1226
1227         if( set->latch = bt_pinlatch (bt, set->page_no, 0) )
1228                 set->page = bt_mappage (bt, set->latch);
1229         else
1230                 return bt->err = BTERR_struct;
1231
1232         memcpy (set->page, contents, bt->mgr->page_size);
1233         set->latch->dirty = 1;
1234         return 0;
1235 }
1236
1237 //  find slot in page for given key at a given level
1238
1239 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1240 {
1241 uint diff, higher = set->page->cnt, low = 1, slot;
1242 uint good = 0;
1243
1244         //        make stopper key an infinite fence value
1245
1246         if( bt_getid (set->page->right) )
1247                 higher++;
1248         else
1249                 good++;
1250
1251         //      low is the lowest candidate.
1252         //  loop ends when they meet
1253
1254         //  higher is already
1255         //      tested as .ge. the passed key.
1256
1257         while( diff = higher - low ) {
1258                 slot = low + ( diff >> 1 );
1259                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1260                         low = slot + 1;
1261                 else
1262                         higher = slot, good++;
1263         }
1264
1265         //      return zero if key is on right link page
1266
1267         return good ? higher : 0;
1268 }
1269
1270 //  find and load page at given level for given key
1271 //      leave page rd or wr locked as requested
1272
1273 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1274 {
1275 uid page_no = ROOT_page, prevpage = 0;
1276 uint drill = 0xff, slot;
1277 BtLatchSet *prevlatch;
1278 uint mode, prevmode;
1279
1280   //  start at root of btree and drill down
1281
1282   do {
1283         // determine lock mode of drill level
1284         mode = (drill == lvl) ? lock : BtLockRead; 
1285
1286         if( set->latch = bt_pinlatch (bt, page_no, 1) )
1287                 set->page_no = page_no;
1288         else
1289                 return 0;
1290
1291         // obtain access lock using lock chaining with Access mode
1292
1293         if( page_no > ROOT_page )
1294           bt_lockpage(BtLockAccess, set->latch);
1295
1296         set->page = bt_mappage (bt, set->latch);
1297
1298         //      release & unpin parent page
1299
1300         if( prevpage ) {
1301           bt_unlockpage(prevmode, prevlatch);
1302           bt_unpinlatch (prevlatch);
1303           prevpage = 0;
1304         }
1305
1306         // obtain read lock using lock chaining
1307
1308         bt_lockpage(mode, set->latch);
1309
1310         if( set->page->free )
1311                 return bt->err = BTERR_struct, 0;
1312
1313         if( page_no > ROOT_page )
1314           bt_unlockpage(BtLockAccess, set->latch);
1315
1316         // re-read and re-lock root after determining actual level of root
1317
1318         if( set->page->lvl != drill) {
1319                 if( set->page_no != ROOT_page )
1320                         return bt->err = BTERR_struct, 0;
1321                         
1322                 drill = set->page->lvl;
1323
1324                 if( lock != BtLockRead && drill == lvl ) {
1325                   bt_unlockpage(mode, set->latch);
1326                   bt_unpinlatch (set->latch);
1327                   continue;
1328                 }
1329         }
1330
1331         prevpage = set->page_no;
1332         prevlatch = set->latch;
1333         prevmode = mode;
1334
1335         //  find key on page at this level
1336         //  and descend to requested level
1337
1338         if( set->page->kill )
1339           goto slideright;
1340
1341         if( slot = bt_findslot (set, key, len) ) {
1342           if( drill == lvl )
1343                 return slot;
1344
1345           while( slotptr(set->page, slot)->dead )
1346                 if( slot++ < set->page->cnt )
1347                         continue;
1348                 else
1349                         goto slideright;
1350
1351           page_no = bt_getid(valptr(set->page, slot)->value);
1352           drill--;
1353           continue;
1354         }
1355
1356         //  or slide right into next page
1357
1358 slideright:
1359         page_no = bt_getid(set->page->right);
1360
1361   } while( page_no );
1362
1363   // return error on end of right chain
1364
1365   bt->err = BTERR_struct;
1366   return 0;     // return error
1367 }
1368
1369 //      return page to free list
1370 //      page must be delete & write locked
1371
1372 void bt_freepage (BtDb *bt, BtPageSet *set)
1373 {
1374         //      lock allocation page
1375
1376         bt_spinwritelock (bt->mgr->alloclatch);
1377
1378         //      store chain
1379         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1380         bt_putid(bt->mgr->pagezero->chain, set->page_no);
1381         set->latch->dirty = 1;
1382         set->page->free = 1;
1383
1384         // unlock released page
1385
1386         bt_unlockpage (BtLockDelete, set->latch);
1387         bt_unlockpage (BtLockWrite, set->latch);
1388         bt_unpinlatch (set->latch);
1389
1390         // unlock allocation page
1391
1392         bt_spinreleasewrite (bt->mgr->alloclatch);
1393 }
1394
1395 //      a fence key was deleted from a page
1396 //      push new fence value upwards
1397
1398 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1399 {
1400 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1401 unsigned char value[BtId];
1402 BtKey* ptr;
1403 uint idx;
1404
1405         //      remove the old fence value
1406
1407         ptr = keyptr(set->page, set->page->cnt);
1408         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1409         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1410         set->latch->dirty = 1;
1411
1412         //  cache new fence value
1413
1414         ptr = keyptr(set->page, set->page->cnt);
1415         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1416
1417         bt_lockpage (BtLockParent, set->latch);
1418         bt_unlockpage (BtLockWrite, set->latch);
1419
1420         //      insert new (now smaller) fence key
1421
1422         bt_putid (value, set->page_no);
1423         ptr = (BtKey*)leftkey;
1424
1425         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1426           return bt->err;
1427
1428         //      now delete old fence key
1429
1430         ptr = (BtKey*)rightkey;
1431
1432         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1433                 return bt->err;
1434
1435         bt_unlockpage (BtLockParent, set->latch);
1436         bt_unpinlatch(set->latch);
1437         return 0;
1438 }
1439
1440 //      root has a single child
1441 //      collapse a level from the tree
1442
1443 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1444 {
1445 BtPageSet child[1];
1446 uint idx;
1447
1448   // find the child entry and promote as new root contents
1449
1450   do {
1451         for( idx = 0; idx++ < root->page->cnt; )
1452           if( !slotptr(root->page, idx)->dead )
1453                 break;
1454
1455         child->page_no = bt_getid (valptr(root->page, idx)->value);
1456
1457         if( child->latch = bt_pinlatch (bt, child->page_no, 1) )
1458                 child->page = bt_mappage (bt, child->latch);
1459         else
1460                 return bt->err;
1461
1462         bt_lockpage (BtLockDelete, child->latch);
1463         bt_lockpage (BtLockWrite, child->latch);
1464
1465         memcpy (root->page, child->page, bt->mgr->page_size);
1466         root->latch->dirty = 1;
1467
1468         bt_freepage (bt, child);
1469
1470   } while( root->page->lvl > 1 && root->page->act == 1 );
1471
1472   bt_unlockpage (BtLockWrite, root->latch);
1473   bt_unpinlatch (root->latch);
1474   return 0;
1475 }
1476
1477 //      maintain reverse scan pointers by
1478 //      linking left pointer of far right node
1479
1480 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1481 {
1482 BtPageSet right2[1];
1483
1484         //      keep track of rightmost leaf page
1485
1486         if( !right_page_no ) {
1487           bt_putid (bt->mgr->pagezero->alloc->left, left_page_no);
1488           return 0;
1489         }
1490
1491         //      link right page to left page
1492
1493         if( right2->latch = bt_pinlatch (bt, right_page_no, 1) )
1494                 right2->page = bt_mappage (bt, right2->latch);
1495         else
1496                 return bt->err;
1497
1498         bt_lockpage (BtLockWrite, right2->latch);
1499
1500         bt_putid(right2->page->left, left_page_no);
1501         bt_unlockpage (BtLockWrite, right2->latch);
1502         bt_unpinlatch (right2->latch);
1503         return 0;
1504 }
1505
1506 //  find and delete key on page by marking delete flag bit
1507 //  if page becomes empty, delete it from the btree
1508
1509 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1510 {
1511 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1512 uint slot, idx, found, fence;
1513 BtPageSet set[1], right[1];
1514 unsigned char value[BtId];
1515 BtKey *ptr, *tst;
1516 BtVal *val;
1517
1518         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1519                 ptr = keyptr(set->page, slot);
1520         else
1521                 return bt->err;
1522
1523         // if librarian slot, advance to real slot
1524
1525         if( slotptr(set->page, slot)->type == Librarian )
1526                 ptr = keyptr(set->page, ++slot);
1527
1528         fence = slot == set->page->cnt;
1529
1530         // if key is found delete it, otherwise ignore request
1531
1532         if( found = !keycmp (ptr, key, len) )
1533           if( found = slotptr(set->page, slot)->dead == 0 ) {
1534                 val = valptr(set->page,slot);
1535                 slotptr(set->page, slot)->dead = 1;
1536                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1537                 set->page->act--;
1538
1539                 // collapse empty slots beneath the fence
1540
1541                 while( idx = set->page->cnt - 1 )
1542                   if( slotptr(set->page, idx)->dead ) {
1543                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1544                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1545                   } else
1546                         break;
1547           }
1548
1549         //      did we delete a fence key in an upper level?
1550
1551         if( found && lvl && set->page->act && fence )
1552           if( bt_fixfence (bt, set, lvl) )
1553                 return bt->err;
1554           else
1555                 return bt->found = found, 0;
1556
1557         //      do we need to collapse root?
1558
1559         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1560           if( bt_collapseroot (bt, set) )
1561                 return bt->err;
1562           else
1563                 return bt->found = found, 0;
1564
1565         //      return if page is not empty
1566
1567         if( set->page->act ) {
1568                 set->latch->dirty = 1;
1569                 bt_unlockpage(BtLockWrite, set->latch);
1570                 bt_unpinlatch (set->latch);
1571                 return bt->found = found, 0;
1572         }
1573
1574         //      cache copy of fence key
1575         //      to post in parent
1576
1577         ptr = keyptr(set->page, set->page->cnt);
1578         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1579
1580         //      obtain lock on right page
1581
1582         right->page_no = bt_getid(set->page->right);
1583
1584         if( right->latch = bt_pinlatch (bt, right->page_no, 1) )
1585                 right->page = bt_mappage (bt, right->latch);
1586         else
1587                 return 0;
1588
1589         bt_lockpage (BtLockWrite, right->latch);
1590
1591         if( right->page->kill )
1592                 return bt->err = BTERR_struct;
1593
1594         // transfer left link
1595
1596         memcpy (right->page->left, set->page->left, BtId);
1597
1598         // pull contents of right peer into our empty page
1599
1600         memcpy (set->page, right->page, bt->mgr->page_size);
1601         set->latch->dirty = 1;
1602
1603         // update left link
1604
1605         if( !lvl )
1606           if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1607                 return bt->err;
1608
1609         // cache copy of key to update
1610
1611         ptr = keyptr(right->page, right->page->cnt);
1612         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1613
1614         // mark right page deleted and point it to left page
1615         //      until we can post parent updates
1616
1617         bt_putid (right->page->right, set->page_no);
1618         right->latch->dirty = 1;
1619         right->page->kill = 1;
1620
1621         bt_lockpage (BtLockParent, right->latch);
1622         bt_unlockpage (BtLockWrite, right->latch);
1623
1624         bt_lockpage (BtLockParent, set->latch);
1625         bt_unlockpage (BtLockWrite, set->latch);
1626
1627         // redirect higher key directly to our new node contents
1628
1629         bt_putid (value, set->page_no);
1630         ptr = (BtKey*)higherfence;
1631
1632         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1633           return bt->err;
1634
1635         //      delete old lower key to our node
1636
1637         ptr = (BtKey*)lowerfence;
1638
1639         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1640           return bt->err;
1641
1642         //      obtain delete and write locks to right node
1643
1644         bt_unlockpage (BtLockParent, right->latch);
1645         bt_lockpage (BtLockDelete, right->latch);
1646         bt_lockpage (BtLockWrite, right->latch);
1647         bt_freepage (bt, right);
1648
1649         bt_unlockpage (BtLockParent, set->latch);
1650         bt_unpinlatch (set->latch);
1651         bt->found = found;
1652         return 0;
1653 }
1654
1655 BtKey *bt_foundkey (BtDb *bt)
1656 {
1657         return (BtKey*)(bt->key);
1658 }
1659
1660 //      advance to next slot
1661
1662 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1663 {
1664 BtLatchSet *prevlatch;
1665 uid page_no;
1666
1667         if( slot < set->page->cnt )
1668                 return slot + 1;
1669
1670         prevlatch = set->latch;
1671
1672         if( page_no = bt_getid(set->page->right) )
1673           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1674                 set->page = bt_mappage (bt, set->latch);
1675           else
1676                 return 0;
1677         else
1678           return bt->err = BTERR_struct, 0;
1679
1680         // obtain access lock using lock chaining with Access mode
1681
1682         bt_lockpage(BtLockAccess, set->latch);
1683
1684         bt_unlockpage(BtLockRead, prevlatch);
1685         bt_unpinlatch (prevlatch);
1686
1687         bt_lockpage(BtLockRead, set->latch);
1688         bt_unlockpage(BtLockAccess, set->latch);
1689
1690         set->page_no = page_no;
1691         return 1;
1692 }
1693
1694 //      find unique key or first duplicate key in
1695 //      leaf level and return number of value bytes
1696 //      or (-1) if not found.  Setup key for bt_foundkey
1697
1698 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1699 {
1700 BtPageSet set[1];
1701 uint len, slot;
1702 int ret = -1;
1703 BtKey *ptr;
1704 BtVal *val;
1705
1706   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1707    do {
1708         ptr = keyptr(set->page, slot);
1709
1710         //      skip librarian slot place holder
1711
1712         if( slotptr(set->page, slot)->type == Librarian )
1713                 ptr = keyptr(set->page, ++slot);
1714
1715         //      return actual key found
1716
1717         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1718         len = ptr->len;
1719
1720         if( slotptr(set->page, slot)->type == Duplicate )
1721                 len -= BtId;
1722
1723         //      not there if we reach the stopper key
1724
1725         if( slot == set->page->cnt )
1726           if( !bt_getid (set->page->right) )
1727                 break;
1728
1729         // if key exists, return >= 0 value bytes copied
1730         //      otherwise return (-1)
1731
1732         if( slotptr(set->page, slot)->dead )
1733                 continue;
1734
1735         if( keylen == len )
1736           if( !memcmp (ptr->key, key, len) ) {
1737                 val = valptr (set->page,slot);
1738                 if( valmax > val->len )
1739                         valmax = val->len;
1740                 memcpy (value, val->value, valmax);
1741                 ret = valmax;
1742           }
1743
1744         break;
1745
1746    } while( slot = bt_findnext (bt, set, slot) );
1747
1748   bt_unlockpage (BtLockRead, set->latch);
1749   bt_unpinlatch (set->latch);
1750   return ret;
1751 }
1752
1753 //      check page for space available,
1754 //      clean if necessary and return
1755 //      0 - page needs splitting
1756 //      >0  new slot value
1757
1758 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1759 {
1760 uint nxt = bt->mgr->page_size;
1761 BtPage page = set->page;
1762 uint cnt = 0, idx = 0;
1763 uint max = page->cnt;
1764 uint newslot = max;
1765 BtKey *key;
1766 BtVal *val;
1767
1768         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1769                 return slot;
1770
1771         //      skip cleanup and proceed to split
1772         //      if there's not enough garbage
1773         //      to bother with.
1774
1775         if( page->garbage < nxt / 5 )
1776                 return 0;
1777
1778         memcpy (bt->frame, page, bt->mgr->page_size);
1779
1780         // skip page info and set rest of page to zero
1781
1782         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1783         set->latch->dirty = 1;
1784         page->garbage = 0;
1785         page->act = 0;
1786
1787         // clean up page first by
1788         // removing deleted keys
1789
1790         while( cnt++ < max ) {
1791                 if( cnt == slot )
1792                         newslot = idx + 2;
1793                 if( cnt < max || page->lvl )
1794                   if( slotptr(bt->frame,cnt)->dead )
1795                         continue;
1796
1797                 // copy the value across
1798
1799                 val = valptr(bt->frame, cnt);
1800                 nxt -= val->len + sizeof(BtVal);
1801                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1802
1803                 // copy the key across
1804
1805                 key = keyptr(bt->frame, cnt);
1806                 nxt -= key->len + sizeof(BtKey);
1807                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1808
1809                 // make a librarian slot
1810
1811                 slotptr(page, ++idx)->off = nxt;
1812                 slotptr(page, idx)->type = Librarian;
1813                 slotptr(page, idx)->dead = 1;
1814
1815                 // set up the slot
1816
1817                 slotptr(page, ++idx)->off = nxt;
1818                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1819
1820                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1821                         page->act++;
1822         }
1823
1824         page->min = nxt;
1825         page->cnt = idx;
1826
1827         //      see if page has enough space now, or does it need splitting?
1828
1829         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1830                 return newslot;
1831
1832         return 0;
1833 }
1834
1835 // split the root and raise the height of the btree
1836
1837 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
1838 {
1839 uint nxt = bt->mgr->page_size;
1840 unsigned char value[BtId];
1841 BtPageSet left[1];
1842 BtKey *ptr;
1843 BtVal *val;
1844
1845         //  Obtain an empty page to use, and copy the current
1846         //  root contents into it, e.g. lower keys
1847
1848         if( bt_newpage(bt, left, root->page) )
1849                 return bt->err;
1850
1851         bt_unpinlatch (left->latch);
1852
1853         // set left from higher to lower page
1854
1855         bt_putid (right->page->left, left->page_no);
1856         bt_unpinlatch (right->latch);
1857
1858         // preserve the page info at the bottom
1859         // of higher keys and set rest to zero
1860
1861         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1862
1863         // insert stopper key at top of newroot page
1864         // and increase the root height
1865
1866         nxt -= BtId + sizeof(BtVal);
1867         bt_putid (value, right->page_no);
1868         val = (BtVal *)((unsigned char *)root->page + nxt);
1869         memcpy (val->value, value, BtId);
1870         val->len = BtId;
1871
1872         nxt -= 2 + sizeof(BtKey);
1873         slotptr(root->page, 2)->off = nxt;
1874         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1875         ptr->len = 2;
1876         ptr->key[0] = 0xff;
1877         ptr->key[1] = 0xff;
1878
1879         // insert lower keys page fence key on newroot page as first key
1880
1881         nxt -= BtId + sizeof(BtVal);
1882         bt_putid (value, left->page_no);
1883         val = (BtVal *)((unsigned char *)root->page + nxt);
1884         memcpy (val->value, value, BtId);
1885         val->len = BtId;
1886
1887         nxt -= leftkey->len + sizeof(BtKey);
1888         slotptr(root->page, 1)->off = nxt;
1889         memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
1890         
1891         bt_putid(root->page->right, 0);
1892         root->page->min = nxt;          // reset lowest used offset and key count
1893         root->page->cnt = 2;
1894         root->page->act = 2;
1895         root->page->lvl++;
1896
1897         // release and unpin root pages
1898
1899         bt_unlockpage(BtLockWrite, root->latch);
1900         bt_unpinlatch (root->latch);
1901         return 0;
1902 }
1903
1904 //  split already locked full node
1905 //      return unlocked.
1906
1907 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1908 {
1909 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
1910 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1911 unsigned char value[BtId];
1912 uint lvl = set->page->lvl;
1913 BtPageSet right[1];
1914 BtKey *key, *ptr;
1915 BtVal *val, *src;
1916 uid right2;
1917 uint prev;
1918
1919         //  split higher half of keys to bt->frame
1920
1921         memset (bt->frame, 0, bt->mgr->page_size);
1922         max = set->page->cnt;
1923         cnt = max / 2;
1924         idx = 0;
1925
1926         while( cnt++ < max ) {
1927                 if( cnt < max || set->page->lvl )
1928                   if( slotptr(set->page, cnt)->dead && cnt < max )
1929                         continue;
1930
1931                 src = valptr(set->page, cnt);
1932                 nxt -= src->len + sizeof(BtVal);
1933                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1934
1935                 key = keyptr(set->page, cnt);
1936                 nxt -= key->len + sizeof(BtKey);
1937                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1938                 memcpy (ptr, key, key->len + sizeof(BtKey));
1939
1940                 //      add librarian slot
1941
1942                 slotptr(bt->frame, ++idx)->off = nxt;
1943                 slotptr(bt->frame, idx)->type = Librarian;
1944                 slotptr(bt->frame, idx)->dead = 1;
1945
1946                 //  add actual slot
1947
1948                 slotptr(bt->frame, ++idx)->off = nxt;
1949                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1950
1951                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1952                         bt->frame->act++;
1953         }
1954
1955         // remember existing fence key for new page to the right
1956
1957         memcpy (rightkey, key, key->len + sizeof(BtKey));
1958
1959         bt->frame->bits = bt->mgr->page_bits;
1960         bt->frame->min = nxt;
1961         bt->frame->cnt = idx;
1962         bt->frame->lvl = lvl;
1963
1964         // link right node
1965
1966         if( set->page_no > ROOT_page ) {
1967                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1968                 bt_putid(bt->frame->left, set->page_no);
1969         }
1970
1971         //      get new free page and write higher keys to it.
1972
1973         if( bt_newpage(bt, right, bt->frame) )
1974                 return bt->err;
1975
1976         // link left node
1977
1978         if( set->page_no > ROOT_page && !lvl )
1979           if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
1980                 return bt->err;
1981
1982         //      update lower keys to continue in old page
1983
1984         memcpy (bt->frame, set->page, bt->mgr->page_size);
1985         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1986         set->latch->dirty = 1;
1987
1988         nxt = bt->mgr->page_size;
1989         set->page->garbage = 0;
1990         set->page->act = 0;
1991         max /= 2;
1992         cnt = 0;
1993         idx = 0;
1994
1995         if( slotptr(bt->frame, max)->type == Librarian )
1996                 max--;
1997
1998         //  assemble page of smaller keys
1999
2000         while( cnt++ < max ) {
2001                 if( slotptr(bt->frame, cnt)->dead )
2002                         continue;
2003                 val = valptr(bt->frame, cnt);
2004                 nxt -= val->len + sizeof(BtVal);
2005                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2006
2007                 key = keyptr(bt->frame, cnt);
2008                 nxt -= key->len + sizeof(BtKey);
2009                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2010
2011                 //      add librarian slot
2012
2013                 slotptr(set->page, ++idx)->off = nxt;
2014                 slotptr(set->page, idx)->type = Librarian;
2015                 slotptr(set->page, idx)->dead = 1;
2016
2017                 //      add actual slot
2018
2019                 slotptr(set->page, ++idx)->off = nxt;
2020                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2021                 set->page->act++;
2022         }
2023
2024         // remember fence key for smaller page
2025
2026         memcpy(fencekey, key, key->len + sizeof(BtKey));
2027
2028         bt_putid(set->page->right, right->page_no);
2029         set->page->min = nxt;
2030         set->page->cnt = idx;
2031
2032         // if current page is the root page, split it
2033
2034         if( set->page_no == ROOT_page )
2035                 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2036
2037         // insert new fences in their parent pages
2038
2039         bt_lockpage (BtLockParent, right->latch);
2040
2041         bt_lockpage (BtLockParent, set->latch);
2042         bt_unlockpage (BtLockWrite, set->latch);
2043
2044         // insert new fence for reformulated left block of smaller keys
2045
2046         ptr = (BtKey*)fencekey;
2047         bt_putid (value, set->page_no);
2048
2049         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2050                 return bt->err;
2051
2052         // switch fence for right block of larger keys to new right page
2053
2054         ptr = (BtKey*)rightkey;
2055         bt_putid (value, right->page_no);
2056
2057         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2058                 return bt->err;
2059
2060         bt_unlockpage (BtLockParent, set->latch);
2061         bt_unpinlatch (set->latch);
2062
2063         bt_unlockpage (BtLockParent, right->latch);
2064         bt_unpinlatch (right->latch);
2065         return 0;
2066 }
2067
2068 //      install new key and value onto page
2069 //      page must already be checked for
2070 //      adequate space
2071
2072 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2073 {
2074 uint idx, librarian;
2075 BtSlot *node;
2076 BtKey *ptr;
2077 BtVal *val;
2078
2079         //      if found slot > desired slot and previous slot
2080         //      is a librarian slot, use it
2081
2082         if( slot > 1 )
2083           if( slotptr(set->page, slot-1)->type == Librarian )
2084                 slot--;
2085
2086         // copy value onto page
2087
2088         set->page->min -= vallen + sizeof(BtVal);
2089         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2090         memcpy (val->value, value, vallen);
2091         val->len = vallen;
2092
2093         // copy key onto page
2094
2095         set->page->min -= keylen + sizeof(BtKey);
2096         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2097         memcpy (ptr->key, key, keylen);
2098         ptr->len = keylen;
2099         
2100         //      find first empty slot
2101
2102         for( idx = slot; idx < set->page->cnt; idx++ )
2103           if( slotptr(set->page, idx)->dead )
2104                 break;
2105
2106         // now insert key into array before slot
2107
2108         if( idx == set->page->cnt )
2109                 idx += 2, set->page->cnt += 2, librarian = 2;
2110         else
2111                 librarian = 1;
2112
2113         set->latch->dirty = 1;
2114         set->page->act++;
2115
2116         while( idx > slot + librarian - 1 )
2117                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2118
2119         //      add librarian slot
2120
2121         if( librarian > 1 ) {
2122                 node = slotptr(set->page, slot++);
2123                 node->off = set->page->min;
2124                 node->type = Librarian;
2125                 node->dead = 1;
2126         }
2127
2128         //      fill in new slot
2129
2130         node = slotptr(set->page, slot);
2131         node->off = set->page->min;
2132         node->type = type;
2133         node->dead = 0;
2134
2135         bt_unlockpage (BtLockWrite, set->latch);
2136         bt_unpinlatch (set->latch);
2137         return 0;
2138 }
2139
2140 //  Insert new key into the btree at given level.
2141 //      either add a new key or update/add an existing one
2142
2143 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, int unique)
2144 {
2145 unsigned char newkey[BT_keyarray];
2146 uint slot, idx, len;
2147 BtPageSet set[1];
2148 BtKey *ptr, *ins;
2149 uid sequence;
2150 BtVal *val;
2151 uint type;
2152
2153   // set up the key we're working on
2154
2155   ins = (BtKey*)newkey;
2156   memcpy (ins->key, key, keylen);
2157   ins->len = keylen;
2158
2159   // is this a non-unique index value?
2160
2161   if( unique )
2162         type = Unique;
2163   else {
2164         type = Duplicate;
2165         sequence = bt_newdup (bt);
2166         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2167         ins->len += BtId;
2168   }
2169
2170   while( 1 ) { // find the page and slot for the current key
2171         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2172                 ptr = keyptr(set->page, slot);
2173         else {
2174                 if( !bt->err )
2175                         bt->err = BTERR_ovflw;
2176                 return bt->err;
2177         }
2178
2179         // if librarian slot == found slot, advance to real slot
2180
2181         if( slotptr(set->page, slot)->type == Librarian )
2182           if( !keycmp (ptr, key, keylen) )
2183                 ptr = keyptr(set->page, ++slot);
2184
2185         len = ptr->len;
2186
2187         if( slotptr(set->page, slot)->type == Duplicate )
2188                 len -= BtId;
2189
2190         //  if inserting a duplicate key or unique key
2191         //      check for adequate space on the page
2192         //      and insert the new key before slot.
2193
2194         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2195           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2196                 if( bt_splitpage (bt, set) )
2197                   return bt->err;
2198                 else
2199                   continue;
2200
2201           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2202         }
2203
2204         // if key already exists, update value and return
2205
2206         val = valptr(set->page, slot);
2207
2208         if( val->len >= vallen ) {
2209                 if( slotptr(set->page, slot)->dead )
2210                         set->page->act++;
2211                 set->page->garbage += val->len - vallen;
2212                 set->latch->dirty = 1;
2213                 slotptr(set->page, slot)->dead = 0;
2214                 val->len = vallen;
2215                 memcpy (val->value, value, vallen);
2216                 bt_unlockpage(BtLockWrite, set->latch);
2217                 bt_unpinlatch (set->latch);
2218                 return 0;
2219         }
2220
2221         //      new update value doesn't fit in existing value area
2222
2223         if( !slotptr(set->page, slot)->dead )
2224                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2225         else {
2226                 slotptr(set->page, slot)->dead = 0;
2227                 set->page->act++;
2228         }
2229
2230         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2231           if( bt_splitpage (bt, set) )
2232                 return bt->err;
2233           else
2234                 continue;
2235
2236         set->page->min -= vallen + sizeof(BtVal);
2237         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2238         memcpy (val->value, value, vallen);
2239         val->len = vallen;
2240
2241         set->latch->dirty = 1;
2242         set->page->min -= keylen + sizeof(BtKey);
2243         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2244         memcpy (ptr->key, key, keylen);
2245         ptr->len = keylen;
2246         
2247         slotptr(set->page, slot)->off = set->page->min;
2248         bt_unlockpage(BtLockWrite, set->latch);
2249         bt_unpinlatch (set->latch);
2250         return 0;
2251   }
2252   return 0;
2253 }
2254
2255 //      compute hash of string
2256
2257 uint bt_hashkey (unsigned char *key, unsigned int len)
2258 {
2259 uint hash = 0;
2260
2261         while( len >= sizeof(uint) )
2262                 hash *= 11, hash += *(uint *)key, len -= sizeof(uint), key += sizeof(uint);
2263
2264         while( len )
2265                 hash *= 11, hash += *key++ * len--;
2266
2267         return hash;
2268 }
2269
2270 //      add a new lock table entry
2271
2272 uint bt_newlock (BtDb *bt, BtKey *key, uint hashidx)
2273 {
2274 BtLockSet *lock = bt->mgr->locktable;
2275 uint slot, prev;
2276
2277         //      obtain lock manager global lock
2278
2279         bt_spinwritelock (bt->mgr->locklatch);
2280
2281         //      return NULL if table is full
2282
2283         if( !(slot = bt->mgr->lockfree) ) {
2284                 bt_spinreleasewrite (bt->mgr->locklatch);
2285                 return 0;
2286         }
2287
2288         //  maintain free chain
2289
2290         bt->mgr->lockfree = lock[slot].next;
2291         bt_spinreleasewrite (bt->mgr->locklatch);
2292         
2293         if( prev = bt->mgr->hashlock[hashidx].slot )
2294                 lock[prev].prev = slot;
2295
2296         bt->mgr->hashlock[hashidx].slot = slot;
2297         lock[slot].hashidx = hashidx;
2298         lock[slot].next = prev;
2299         lock[slot].prev = 0;
2300
2301         //      save the key being locked
2302
2303         memcpy (lock[slot].key, key, key->len + sizeof(BtKey));
2304         return slot;
2305 }
2306
2307 //      add key to the lock table
2308 //      block until available.
2309
2310 uint bt_setlock(BtDb *bt, BtKey *key)
2311 {
2312 uint hashidx = bt_hashkey(key->key, key->len) % bt->mgr->lockhash;
2313 BtLockSet *lock = NULL;
2314 BtKey *key2;
2315 uint slot;
2316
2317   //  find existing lock entry
2318   //  or recover from full table
2319
2320   while( lock == NULL ) {
2321         //      obtain lock on hash slot
2322
2323         bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2324
2325         if( slot = bt->mgr->hashlock[hashidx].slot )
2326           do {
2327                 lock = bt->mgr->locktable + slot;
2328                 key2 = (BtKey *)lock->key;
2329
2330                 if( !keycmp (key, key2->key, key2->len) )
2331                         break;
2332           } while( slot = lock->next );
2333
2334         if( slot )
2335                 break;
2336
2337         if( slot = bt_newlock (bt, key, hashidx) )
2338                 break;
2339
2340         bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2341 #ifdef unix
2342         sched_yield();
2343 #else
2344         SwitchToThread ();
2345 #endif
2346   }
2347
2348   lock = bt->mgr->locktable + slot;
2349   lock->pin++;
2350
2351   bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2352   WriteLock (lock->readwr);
2353   return slot;
2354 }
2355
2356 void bt_lockclr (BtDb *bt, uint slot)
2357 {
2358 BtLockSet *lock = bt->mgr->locktable + slot;
2359 uint hashidx = lock->hashidx;
2360 uint next, prev;
2361         
2362         bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2363         WriteRelease (lock->readwr);
2364
2365         // if entry is no longer in use,
2366         //      return it to the free chain.
2367
2368         if( !--lock->pin ) {
2369                 if( next = lock->next )
2370                         bt->mgr->locktable[next].prev = lock->prev;
2371
2372                 if( prev = lock->prev )
2373                         bt->mgr->locktable[prev].next = lock->next;
2374                 else
2375                         bt->mgr->hashlock[lock->hashidx].slot = next;
2376
2377                 bt_spinwritelock (bt->mgr->locklatch);
2378                 lock->next = bt->mgr->lockfree;
2379                 bt->mgr->lockfree = slot;
2380                 bt_spinreleasewrite (bt->mgr->locklatch);
2381         }
2382
2383         bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2384 }
2385
2386 //      atomic insert of a batch of keys.
2387 //      return -1 if BTERR is set
2388 //      otherwise return slot number
2389 //      causing the key constraint violation
2390 //      or zero on successful completion.
2391
2392 int bt_atomicinsert (BtDb *bt, BtPage source)
2393 {
2394 uint locks[MAX_atomic];
2395 BtKey *key, *key2;
2396 int result = 0;
2397 BtSlot temp[1];
2398 uint slot, idx;
2399 BtVal *val;
2400 int type;
2401
2402         // first sort the list of keys into order to
2403         //      prevent deadlocks between threads.
2404
2405         for( slot = 1; slot++ < source->cnt; ) {
2406           *temp = *slotptr(source,slot);
2407           key = keyptr (source,slot);
2408           for( idx = slot; --idx; ) {
2409             key2 = keyptr (source,idx);
2410                 if( keycmp (key, key2->key, key2->len) < 0 ) {
2411                   *slotptr(source,idx+1) = *slotptr(source,idx);
2412                   *slotptr(source,idx) = *temp;
2413                 } else
2414                   break;
2415           }
2416         }
2417
2418         // take each unique-type key and add it to the lock table
2419
2420         for( slot = 0; slot++ < source->cnt; )
2421           if( slotptr(source, slot)->type == Unique )
2422                 locks[slot] = bt_setlock (bt, keyptr(source,slot));
2423
2424         // Lookup each unique key and determine constraint violations
2425
2426         for( slot = 0; slot++ < source->cnt; )
2427           if( slotptr(source, slot)->type == Unique ) {
2428                 key = keyptr(source, slot);
2429                 if( bt_findkey (bt, key->key, key->len, NULL, 0) < 0 )
2430                   continue;
2431                 result = slot;
2432                 break;
2433           }
2434
2435         //      add each key to the btree
2436
2437         if( !result )
2438          for( slot = 0; slot++ < source->cnt; ) {
2439           key = keyptr(source,slot);
2440           val = valptr(source,slot);
2441           type = slotptr(source,slot)->type;
2442           if( bt_insertkey (bt, key->key, key->len, 0, val->value, val->len, type == Unique) )
2443                 return -1;
2444          }
2445
2446         // remove each unique-type key from the lock table
2447
2448         for( slot = 0; slot++ < source->cnt; )
2449           if( slotptr(source, slot)->type == Unique )
2450                 bt_lockclr (bt, locks[slot]);
2451
2452         return result;
2453 }
2454
2455 //      set cursor to highest slot on highest page
2456
2457 uint bt_lastkey (BtDb *bt)
2458 {
2459 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2460 BtPageSet set[1];
2461 uint slot;
2462
2463         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2464                 set->page = bt_mappage (bt, set->latch);
2465         else
2466                 return 0;
2467
2468     bt_lockpage(BtLockRead, set->latch);
2469
2470         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2471         slot = set->page->cnt;
2472
2473     bt_unlockpage(BtLockRead, set->latch);
2474         bt_unpinlatch (set->latch);
2475         return slot;
2476 }
2477
2478 //      return previous slot on cursor page
2479
2480 uint bt_prevkey (BtDb *bt, uint slot)
2481 {
2482 BtPageSet set[1];
2483 uid left;
2484
2485         if( --slot )
2486                 return slot;
2487
2488         if( left = bt_getid(bt->cursor->left) )
2489                 bt->cursor_page = left;
2490         else
2491                 return 0;
2492
2493         if( set->latch = bt_pinlatch (bt, left, 1) )
2494                 set->page = bt_mappage (bt, set->latch);
2495         else
2496                 return 0;
2497
2498     bt_lockpage(BtLockRead, set->latch);
2499         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2500         bt_unlockpage(BtLockRead, set->latch);
2501         bt_unpinlatch (set->latch);
2502         return bt->cursor->cnt;
2503 }
2504
2505 //  return next slot on cursor page
2506 //  or slide cursor right into next page
2507
2508 uint bt_nextkey (BtDb *bt, uint slot)
2509 {
2510 BtPageSet set[1];
2511 uid right;
2512
2513   do {
2514         right = bt_getid(bt->cursor->right);
2515
2516         while( slot++ < bt->cursor->cnt )
2517           if( slotptr(bt->cursor,slot)->dead )
2518                 continue;
2519           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2520                 return slot;
2521           else
2522                 break;
2523
2524         if( !right )
2525                 break;
2526
2527         bt->cursor_page = right;
2528
2529         if( set->latch = bt_pinlatch (bt, right, 1) )
2530                 set->page = bt_mappage (bt, set->latch);
2531         else
2532                 return 0;
2533
2534     bt_lockpage(BtLockRead, set->latch);
2535
2536         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2537
2538         bt_unlockpage(BtLockRead, set->latch);
2539         bt_unpinlatch (set->latch);
2540         slot = 0;
2541
2542   } while( 1 );
2543
2544   return bt->err = 0;
2545 }
2546
2547 //  cache page of keys into cursor and return starting slot for given key
2548
2549 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2550 {
2551 BtPageSet set[1];
2552 uint slot;
2553
2554         // cache page for retrieval
2555
2556         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2557           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2558         else
2559           return 0;
2560
2561         bt->cursor_page = set->page_no;
2562
2563         bt_unlockpage(BtLockRead, set->latch);
2564         bt_unpinlatch (set->latch);
2565         return slot;
2566 }
2567
2568 BtKey *bt_key(BtDb *bt, uint slot)
2569 {
2570         return keyptr(bt->cursor, slot);
2571 }
2572
2573 BtVal *bt_val(BtDb *bt, uint slot)
2574 {
2575         return valptr(bt->cursor,slot);
2576 }
2577
2578 #ifdef STANDALONE
2579
2580 #ifndef unix
2581 double getCpuTime(int type)
2582 {
2583 FILETIME crtime[1];
2584 FILETIME xittime[1];
2585 FILETIME systime[1];
2586 FILETIME usrtime[1];
2587 SYSTEMTIME timeconv[1];
2588 double ans = 0;
2589
2590         memset (timeconv, 0, sizeof(SYSTEMTIME));
2591
2592         switch( type ) {
2593         case 0:
2594                 GetSystemTimeAsFileTime (xittime);
2595                 FileTimeToSystemTime (xittime, timeconv);
2596                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2597                 break;
2598         case 1:
2599                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2600                 FileTimeToSystemTime (usrtime, timeconv);
2601                 break;
2602         case 2:
2603                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2604                 FileTimeToSystemTime (systime, timeconv);
2605                 break;
2606         }
2607
2608         ans += (double)timeconv->wHour * 3600;
2609         ans += (double)timeconv->wMinute * 60;
2610         ans += (double)timeconv->wSecond;
2611         ans += (double)timeconv->wMilliseconds / 1000;
2612         return ans;
2613 }
2614 #else
2615 #include <time.h>
2616 #include <sys/resource.h>
2617
2618 double getCpuTime(int type)
2619 {
2620 struct rusage used[1];
2621 struct timeval tv[1];
2622
2623         switch( type ) {
2624         case 0:
2625                 gettimeofday(tv, NULL);
2626                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2627
2628         case 1:
2629                 getrusage(RUSAGE_SELF, used);
2630                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2631
2632         case 2:
2633                 getrusage(RUSAGE_SELF, used);
2634                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2635         }
2636
2637         return 0;
2638 }
2639 #endif
2640
2641 void bt_poolaudit (BtMgr *mgr)
2642 {
2643 BtLatchSet *latch;
2644 uint slot = 0;
2645
2646         while( slot++ < mgr->latchdeployed ) {
2647                 latch = mgr->latchsets + slot;
2648
2649                 if( *latch->readwr->rin & MASK )
2650                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2651                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2652
2653                 if( *latch->access->rin & MASK )
2654                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2655                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2656
2657                 if( *latch->parent->rin & MASK )
2658                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2659                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2660
2661                 if( latch->pin & ~CLOCK_bit ) {
2662                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2663                         latch->pin = 0;
2664                 }
2665         }
2666 }
2667
2668 uint bt_latchaudit (BtDb *bt)
2669 {
2670 ushort idx, hashidx;
2671 uid next, page_no;
2672 BtLatchSet *latch;
2673 uint cnt = 0;
2674 BtKey *ptr;
2675
2676         if( *(ushort *)(bt->mgr->alloclatch) )
2677                 fprintf(stderr, "Alloc page locked\n");
2678         *(uint *)(bt->mgr->alloclatch) = 0;
2679
2680         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2681                 latch = bt->mgr->latchsets + idx;
2682                 if( *latch->readwr->rin & MASK )
2683                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2684                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2685
2686                 if( *latch->access->rin & MASK )
2687                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2688                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2689
2690                 if( *latch->parent->rin & MASK )
2691                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2692                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2693
2694                 if( latch->pin ) {
2695                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2696                         latch->pin = 0;
2697                 }
2698         }
2699
2700         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2701           if( *(uint *)(bt->mgr->hashtable[hashidx].latch) )
2702                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2703
2704           *(uint *)(bt->mgr->hashtable[hashidx].latch) = 0;
2705
2706           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2707                 latch = bt->mgr->latchsets + idx;
2708                 if( latch->pin )
2709                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2710           } while( idx = latch->next );
2711         }
2712
2713         page_no = LEAF_page;
2714
2715         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2716         uid off = page_no << bt->mgr->page_bits;
2717 #ifdef unix
2718           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2719 #else
2720         DWORD amt[1];
2721
2722           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2723
2724           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2725                 return bt->err = BTERR_map;
2726
2727           if( *amt <  bt->mgr->page_size )
2728                 return bt->err = BTERR_map;
2729 #endif
2730                 if( !bt->frame->free && !bt->frame->lvl )
2731                         cnt += bt->frame->act;
2732                 page_no++;
2733         }
2734                 
2735         cnt--;  // remove stopper key
2736         fprintf(stderr, " Total keys read %d\n", cnt);
2737
2738         bt_close (bt);
2739         return 0;
2740 }
2741
2742 typedef struct {
2743         char idx;
2744         char *type;
2745         char *infile;
2746         BtMgr *mgr;
2747 } ThreadArg;
2748
2749 //  standalone program to index file of keys
2750 //  then list them onto std-out
2751
2752 #ifdef unix
2753 void *index_file (void *arg)
2754 #else
2755 uint __stdcall index_file (void *arg)
2756 #endif
2757 {
2758 int line = 0, found = 0, cnt = 0, unique;
2759 uid next, page_no = LEAF_page;  // start on first page of leaves
2760 BtPage page = calloc (4096, 1);
2761 unsigned char key[BT_maxkey];
2762 ThreadArg *args = arg;
2763 int ch, len = 0, slot;
2764 BtPageSet set[1];
2765 int atomic;
2766 BtKey *ptr;
2767 BtVal *val;
2768 BtDb *bt;
2769 FILE *in;
2770
2771         bt = bt_open (args->mgr);
2772
2773         unique = (args->type[1] | 0x20) != 'd';
2774         atomic = (args->type[1] | 0x20) == 'a';
2775
2776         switch(args->type[0] | 0x20)
2777         {
2778         case 'a':
2779                 fprintf(stderr, "started latch mgr audit\n");
2780                 cnt = bt_latchaudit (bt);
2781                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2782                 break;
2783
2784         case 'p':
2785                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2786                 if( in = fopen (args->infile, "rb") )
2787                   while( ch = getc(in), ch != EOF )
2788                         if( ch == '\n' )
2789                         {
2790                           line++;
2791
2792                           if( atomic ) {
2793                                 memset (page, 0, 4096);
2794                                 slotptr(page, 1)->off = 2048;
2795                                 slotptr(page, 1)->type = Unique;
2796                                 ptr = keyptr(page,1);
2797                                 ptr->len = 10;
2798                                 memcpy(ptr->key, key, 10);
2799                                 val = valptr(page,1);
2800                                 val->len = len - 10;
2801                                 memcpy (val->value, key + 10, len - 10);
2802                                 page->cnt = 1;
2803                                 if( slot = bt_atomicinsert (bt, page) )
2804                                   fprintf(stderr, "Error %d Line: %d\n", slot, line), exit(0);
2805                           }
2806                           else if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2807                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2808                           len = 0;
2809                         }
2810                         else if( len < BT_maxkey )
2811                                 key[len++] = ch;
2812                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2813                 break;
2814
2815         case 'w':
2816                 fprintf(stderr, "started indexing for %s\n", args->infile);
2817                 if( in = fopen (args->infile, "rb") )
2818                   while( ch = getc(in), ch != EOF )
2819                         if( ch == '\n' )
2820                         {
2821                           line++;
2822                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2823                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2824                           len = 0;
2825                         }
2826                         else if( len < BT_maxkey )
2827                                 key[len++] = ch;
2828                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2829                 break;
2830
2831         case 'd':
2832                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2833                 if( in = fopen (args->infile, "rb") )
2834                   while( ch = getc(in), ch != EOF )
2835                         if( ch == '\n' )
2836                         {
2837                           line++;
2838                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2839                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2840                           ptr = (BtKey*)(bt->key);
2841                           found++;
2842
2843                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2844                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2845                           len = 0;
2846                         }
2847                         else if( len < BT_maxkey )
2848                                 key[len++] = ch;
2849                 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2850                 break;
2851
2852         case 'f':
2853                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2854                 if( in = fopen (args->infile, "rb") )
2855                   while( ch = getc(in), ch != EOF )
2856                         if( ch == '\n' )
2857                         {
2858                           line++;
2859                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2860                                 found++;
2861                           else if( bt->err )
2862                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2863                           len = 0;
2864                         }
2865                         else if( len < BT_maxkey )
2866                                 key[len++] = ch;
2867                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2868                 break;
2869
2870         case 's':
2871                 fprintf(stderr, "started scanning\n");
2872                 do {
2873                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2874                                 set->page = bt_mappage (bt, set->latch);
2875                         else
2876                                 fprintf(stderr, "unable to obtain latch"), exit(1);
2877                         bt_lockpage (BtLockRead, set->latch);
2878                         next = bt_getid (set->page->right);
2879
2880                         for( slot = 0; slot++ < set->page->cnt; )
2881                          if( next || slot < set->page->cnt )
2882                           if( !slotptr(set->page, slot)->dead ) {
2883                                 ptr = keyptr(set->page, slot);
2884                                 len = ptr->len;
2885
2886                             if( slotptr(set->page, slot)->type == Duplicate )
2887                                         len -= BtId;
2888
2889                                 fwrite (ptr->key, len, 1, stdout);
2890                                 val = valptr(set->page, slot);
2891                                 fwrite (val->value, val->len, 1, stdout);
2892                                 fputc ('\n', stdout);
2893                                 cnt++;
2894                            }
2895
2896                         bt_unlockpage (BtLockRead, set->latch);
2897                         bt_unpinlatch (set->latch);
2898                 } while( page_no = next );
2899
2900                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2901                 break;
2902
2903         case 'r':
2904                 fprintf(stderr, "started reverse scan\n");
2905                 if( slot = bt_lastkey (bt) )
2906                    while( slot = bt_prevkey (bt, slot) ) {
2907                         if( slotptr(bt->cursor, slot)->dead )
2908                           continue;
2909
2910                         ptr = keyptr(bt->cursor, slot);
2911                         len = ptr->len;
2912
2913                         if( slotptr(bt->cursor, slot)->type == Duplicate )
2914                                 len -= BtId;
2915
2916                         fwrite (ptr->key, len, 1, stdout);
2917                         val = valptr(bt->cursor, slot);
2918                         fwrite (val->value, val->len, 1, stdout);
2919                         fputc ('\n', stdout);
2920                         cnt++;
2921                   }
2922
2923                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2924                 break;
2925
2926         case 'c':
2927 #ifdef unix
2928                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2929 #endif
2930                 fprintf(stderr, "started counting\n");
2931                 page_no = LEAF_page;
2932
2933                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2934                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
2935                                 break;
2936
2937                         if( !bt->frame->free && !bt->frame->lvl )
2938                                 cnt += bt->frame->act;
2939
2940                         bt->reads++;
2941                         page_no++;
2942                 }
2943                 
2944                 cnt--;  // remove stopper key
2945                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2946                 break;
2947         }
2948
2949         bt_close (bt);
2950 #ifdef unix
2951         return NULL;
2952 #else
2953         return 0;
2954 #endif
2955 }
2956
2957 typedef struct timeval timer;
2958
2959 int main (int argc, char **argv)
2960 {
2961 int idx, cnt, len, slot, err;
2962 int segsize, bits = 16;
2963 double start, stop;
2964 #ifdef unix
2965 pthread_t *threads;
2966 #else
2967 HANDLE *threads;
2968 #endif
2969 ThreadArg *args;
2970 uint poolsize = 0;
2971 uint locksize = 0;
2972 float elapsed;
2973 char key[1];
2974 BtMgr *mgr;
2975 BtKey *ptr;
2976 BtDb *bt;
2977
2978         if( argc < 3 ) {
2979                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find/Atomic [page_bits buffer_pool_size lock_mgr_size src_file1 src_file2 ... ]\n", argv[0]);
2980                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2981                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
2982                 fprintf (stderr, "  lock_mgr_size is the maximum number of outstanding key locks\n");
2983                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2984                 exit(0);
2985         }
2986
2987         start = getCpuTime(0);
2988
2989         if( argc > 3 )
2990                 bits = atoi(argv[3]);
2991
2992         if( argc > 4 )
2993                 poolsize = atoi(argv[4]);
2994
2995         if( !poolsize )
2996                 fprintf (stderr, "Warning: no mapped_pool\n");
2997
2998         if( argc > 5 )
2999                 locksize = atoi(argv[5]);
3000
3001         cnt = argc - 6;
3002 #ifdef unix
3003         threads = malloc (cnt * sizeof(pthread_t));
3004 #else
3005         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3006 #endif
3007         args = malloc (cnt * sizeof(ThreadArg));
3008
3009         mgr = bt_mgr ((argv[1]), bits, poolsize, locksize);
3010
3011         if( !mgr ) {
3012                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3013                 exit (1);
3014         }
3015
3016         //      fire off threads
3017
3018         for( idx = 0; idx < cnt; idx++ ) {
3019                 args[idx].infile = argv[idx + 6];
3020                 args[idx].type = argv[2];
3021                 args[idx].mgr = mgr;
3022                 args[idx].idx = idx;
3023 #ifdef unix
3024                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3025                         fprintf(stderr, "Error creating thread %d\n", err);
3026 #else
3027                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3028 #endif
3029         }
3030
3031         //      wait for termination
3032
3033 #ifdef unix
3034         for( idx = 0; idx < cnt; idx++ )
3035                 pthread_join (threads[idx], NULL);
3036 #else
3037         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3038
3039         for( idx = 0; idx < cnt; idx++ )
3040                 CloseHandle(threads[idx]);
3041
3042 #endif
3043         bt_poolaudit(mgr);
3044         bt_mgrclose (mgr);
3045
3046         elapsed = getCpuTime(0) - start;
3047         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3048         elapsed = getCpuTime(1);
3049         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3050         elapsed = getCpuTime(2);
3051         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3052 }
3053
3054 #endif  //STANDALONE