]> pd.if.org Git - btree/blob - threadskv7.c
fix latch protocol in bt_atomicload
[btree] / threadskv7.c
1 // btree version threadskv7 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and atomic non-consistent key insert
9
10 // 17 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 //      maximum number of keys to insert atomically in one call
88
89 #define MAX_atomic              256
90
91 #define BT_maxkey       255             // maximum number of bytes in a key
92 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
93
94 /*
95 There are five lock types for each node in three independent sets: 
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
101 */
102
103 typedef enum{
104         BtLockAccess,
105         BtLockDelete,
106         BtLockRead,
107         BtLockWrite,
108         BtLockParent
109 } BtLock;
110
111 //      definition for phase-fair reader/writer lock implementation
112
113 typedef struct {
114         ushort rin[1];
115         ushort rout[1];
116         ushort ticket[1];
117         ushort serving[1];
118 } RWLock;
119
120 #define PHID 0x1
121 #define PRES 0x2
122 #define MASK 0x3
123 #define RINC 0x4
124
125 //      definition for spin latch implementation
126
127 // exclusive is set for write access
128 // share is count of read accessors
129 // grant write lock when share == 0
130
131 volatile typedef struct {
132         uint exclusive:1;
133         uint pending:1;
134         uint share:30;
135 } BtSpinLatch;
136
137 #define XCL 1
138 #define PEND 2
139 #define BOTH 3
140 #define SHARE 4
141
142 //      The key structure occupies space at the upper end of
143 //      each page.  It's a length byte followed by the key
144 //      bytes.
145
146 typedef struct {
147         unsigned char len;              // this can be changed to a ushort or uint
148         unsigned char key[0];
149 } BtKey;
150
151 //      the value structure also occupies space at the upper
152 //      end of the page. Each key is immediately followed by a value.
153
154 typedef struct {
155         unsigned char len;              // this can be changed to a ushort or uint
156         unsigned char value[0];
157 } BtVal;
158
159 //  hash table entries
160
161 typedef struct {
162         uint slot;                              // Latch table entry at head of chain
163         BtSpinLatch latch[1];
164 } BtHashEntry;
165
166 //      latch manager table structure
167
168 typedef struct {
169         uid page_no;                    // latch set page number
170         RWLock readwr[1];               // read/write page lock
171         RWLock access[1];               // Access Intent/Page delete
172         RWLock parent[1];               // Posting of fence key in parent
173         uint slot;                              // entry slot in latch table
174         uint next;                              // next entry in hash table chain
175         uint prev;                              // prev entry in hash table chain
176         volatile ushort pin;    // number of outstanding threads
177         ushort dirty:1;                 // page in cache is dirty
178 } BtLatchSet;
179
180 //      lock manager table structure
181
182 typedef struct {
183         RWLock readwr[1];               // read/write key lock
184         uint next;
185         uint prev;
186         uint pin;                               // count of readers waiting
187         uint hashidx;                   // hash idx for entry
188         unsigned char key[BT_keyarray];
189 } BtLockSet;
190
191 //      Define the length of the page record numbers
192
193 #define BtId 6
194
195 //      Page key slot definition.
196
197 //      Keys are marked dead, but remain on the page until
198 //      it cleanup is called. The fence key (highest key) for
199 //      a leaf page is always present, even after cleanup.
200
201 //      Slot types
202
203 //      In addition to the Unique keys that occupy slots
204 //      there are Librarian and Duplicate key
205 //      slots occupying the key slot array.
206
207 //      The Librarian slots are dead keys that
208 //      serve as filler, available to add new Unique
209 //      or Dup slots that are inserted into the B-tree.
210
211 //      The Duplicate slots have had their key bytes extended
212 //      by 6 bytes to contain a binary duplicate key uniqueifier.
213
214 typedef enum {
215         Unique,
216         Librarian,
217         Duplicate
218 } BtSlotType;
219
220 typedef struct {
221         uint off:BT_maxbits;    // page offset for key start
222         uint type:3;                    // type of slot
223         uint dead:1;                    // set for deleted slot
224 } BtSlot;
225
226 //      The first part of an index page.
227 //      It is immediately followed
228 //      by the BtSlot array of keys.
229
230 //      note that this structure size
231 //      must be a multiple of 8 bytes
232 //      in order to place dups correctly.
233
234 typedef struct BtPage_ {
235         uint cnt;                                       // count of keys in page
236         uint act;                                       // count of active keys
237         uint min;                                       // next key offset
238         uint garbage;                           // page garbage in bytes
239         unsigned char bits:7;           // page size in bits
240         unsigned char free:1;           // page is on free chain
241         unsigned char lvl:7;            // level of page
242         unsigned char kill:1;           // page is being deleted
243         unsigned char left[BtId];       // page number to left
244         unsigned char filler[2];        // padding to multiple of 8
245         unsigned char right[BtId];      // page number to right
246 } *BtPage;
247
248 //  The loadpage interface object
249
250 typedef struct {
251         uid page_no;            // current page number
252         BtPage page;            // current page pointer
253         BtLatchSet *latch;      // current page latch set
254 } BtPageSet;
255
256 //      structure for latch manager on ALLOC_page
257
258 typedef struct {
259         struct BtPage_ alloc[1];        // next page_no in right ptr
260         unsigned long long dups[1];     // global duplicate key uniqueifier
261         unsigned char chain[BtId];      // head of free page_nos chain
262 } BtPageZero;
263
264 //      The object structure for Btree access
265
266 typedef struct {
267         uint page_size;                         // page size    
268         uint page_bits;                         // page size in bits    
269 #ifdef unix
270         int idx;
271 #else
272         HANDLE idx;
273 #endif
274         BtPageZero *pagezero;           // mapped allocation page
275         BtSpinLatch alloclatch[1];      // allocation area lite latch
276         uint latchdeployed;                     // highest number of pool entries deployed
277         uint nlatchpage;                        // number of latch & lock & pool pages
278         uint latchtotal;                        // number of page latch entries
279         uint latchhash;                         // number of latch hash table slots
280         uint latchvictim;                       // next latch entry to examine
281         BtHashEntry *hashtable;         // the anonymous mapping buffer pool
282         BtLatchSet *latchsets;          // mapped latch set from latch pages
283         unsigned char *pagepool;        // mapped to the buffer pool pages
284         uint lockhash;                          // number of lock hash table slots
285         uint lockfree;                          // next available lock table entry
286         BtSpinLatch locklatch[1];       // lock manager free chain latch
287         BtHashEntry *hashlock;          // the lock manager hash table
288         BtLockSet *locktable;           // the lock manager key table
289 #ifndef unix
290         HANDLE halloc;                          // allocation handle
291         HANDLE hpool;                           // buffer pool handle
292 #endif
293 } BtMgr;
294
295 typedef struct {
296         BtMgr *mgr;                             // buffer manager for thread
297         BtPage cursor;                  // cached frame for start/next (never mapped)
298         BtPage frame;                   // spare frame for the page split (never mapped)
299         uid cursor_page;                                // current cursor page number   
300         unsigned char *mem;                             // frame, cursor, page memory buffer
301         unsigned char key[BT_keyarray]; // last found complete key
302         int found;                              // last delete or insert was found
303         int err;                                // last error
304         int reads, writes;              // number of reads and writes from the btree
305 } BtDb;
306
307 typedef enum {
308         BTERR_ok = 0,
309         BTERR_struct,
310         BTERR_ovflw,
311         BTERR_lock,
312         BTERR_map,
313         BTERR_read,
314         BTERR_wrt,
315         BTERR_hash
316 } BTERR;
317
318 #define CLOCK_bit 0x8000
319
320 // B-Tree functions
321 extern void bt_close (BtDb *bt);
322 extern BtDb *bt_open (BtMgr *mgr);
323 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, int unique);
324 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
325 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
326 extern BtKey *bt_foundkey (BtDb *bt);
327 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
328 extern uint bt_nextkey   (BtDb *bt, uint slot);
329
330 //      manager functions
331 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint locksize);
332 void bt_mgrclose (BtMgr *mgr);
333
334 //  Helper functions to return slot values
335 //      from the cursor page.
336
337 extern BtKey *bt_key (BtDb *bt, uint slot);
338 extern BtVal *bt_val (BtDb *bt, uint slot);
339
340 //  The page is allocated from low and hi ends.
341 //  The key slots are allocated from the bottom,
342 //      while the text and value of the key
343 //  are allocated from the top.  When the two
344 //  areas meet, the page is split into two.
345
346 //  A key consists of a length byte, two bytes of
347 //  index number (0 - 65535), and up to 253 bytes
348 //  of key value.
349
350 //  Associated with each key is a value byte string
351 //      containing any value desired.
352
353 //  The b-tree root is always located at page 1.
354 //      The first leaf page of level zero is always
355 //      located on page 2.
356
357 //      The b-tree pages are linked with next
358 //      pointers to facilitate enumerators,
359 //      and provide for concurrency.
360
361 //      When to root page fills, it is split in two and
362 //      the tree height is raised by a new root at page
363 //      one with two keys.
364
365 //      Deleted keys are marked with a dead bit until
366 //      page cleanup. The fence key for a leaf node is
367 //      always present
368
369 //  To achieve maximum concurrency one page is locked at a time
370 //  as the tree is traversed to find leaf key in question. The right
371 //  page numbers are used in cases where the page is being split,
372 //      or consolidated.
373
374 //  Page 0 is dedicated to lock for new page extensions,
375 //      and chains empty pages together for reuse. It also
376 //      contains the latch manager hash table.
377
378 //      The ParentModification lock on a node is obtained to serialize posting
379 //      or changing the fence key for a node.
380
381 //      Empty pages are chained together through the ALLOC page and reused.
382
383 //      Access macros to address slot and key values from the page
384 //      Page slots use 1 based indexing.
385
386 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
387 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
388 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
389
390 void bt_putid(unsigned char *dest, uid id)
391 {
392 int i = BtId;
393
394         while( i-- )
395                 dest[i] = (unsigned char)id, id >>= 8;
396 }
397
398 uid bt_getid(unsigned char *src)
399 {
400 uid id = 0;
401 int i;
402
403         for( i = 0; i < BtId; i++ )
404                 id <<= 8, id |= *src++; 
405
406         return id;
407 }
408
409 uid bt_newdup (BtDb *bt)
410 {
411 #ifdef unix
412         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
413 #else
414         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
415 #endif
416 }
417
418 //      Phase-Fair reader/writer lock implementation
419
420 void WriteLock (RWLock *lock)
421 {
422 ushort w, r, tix;
423
424 #ifdef unix
425         tix = __sync_fetch_and_add (lock->ticket, 1);
426 #else
427         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
428 #endif
429         // wait for our ticket to come up
430
431         while( tix != lock->serving[0] )
432 #ifdef unix
433                 sched_yield();
434 #else
435                 SwitchToThread ();
436 #endif
437
438         w = PRES | (tix & PHID);
439 #ifdef  unix
440         r = __sync_fetch_and_add (lock->rin, w);
441 #else
442         r = _InterlockedExchangeAdd16 (lock->rin, w);
443 #endif
444         while( r != *lock->rout )
445 #ifdef unix
446                 sched_yield();
447 #else
448                 SwitchToThread();
449 #endif
450 }
451
452 void WriteRelease (RWLock *lock)
453 {
454 #ifdef unix
455         __sync_fetch_and_and (lock->rin, ~MASK);
456 #else
457         _InterlockedAnd16 (lock->rin, ~MASK);
458 #endif
459         lock->serving[0]++;
460 }
461
462 void ReadLock (RWLock *lock)
463 {
464 ushort w;
465 #ifdef unix
466         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
467 #else
468         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
469 #endif
470         if( w )
471           while( w == (*lock->rin & MASK) )
472 #ifdef unix
473                 sched_yield ();
474 #else
475                 SwitchToThread ();
476 #endif
477 }
478
479 void ReadRelease (RWLock *lock)
480 {
481 #ifdef unix
482         __sync_fetch_and_add (lock->rout, RINC);
483 #else
484         _InterlockedExchangeAdd16 (lock->rout, RINC);
485 #endif
486 }
487
488 //      Spin Latch Manager
489
490 //      wait until write lock mode is clear
491 //      and add 1 to the share count
492
493 void bt_spinreadlock(BtSpinLatch *latch)
494 {
495 uint prev;
496
497   do {
498 #ifdef unix
499         prev = __sync_fetch_and_add ((uint *)latch, SHARE);
500 #else
501         prev = _InterlockedExchangeAdd((uint *)latch, SHARE);
502 #endif
503         //  see if exclusive request is granted or pending
504
505         if( !(prev & BOTH) )
506                 return;
507 #ifdef unix
508         prev = __sync_fetch_and_add ((uint *)latch, -SHARE);
509 #else
510         prev = _InterlockedExchangeAdd((uint *)latch, -SHARE);
511 #endif
512 #ifdef  unix
513   } while( sched_yield(), 1 );
514 #else
515   } while( SwitchToThread(), 1 );
516 #endif
517 }
518
519 //      wait for other read and write latches to relinquish
520
521 void bt_spinwritelock(BtSpinLatch *latch)
522 {
523 uint prev;
524
525   do {
526 #ifdef  unix
527         prev = __sync_fetch_and_or((uint *)latch, PEND | XCL);
528 #else
529         prev = _InterlockedOr((uint *)latch, PEND | XCL);
530 #endif
531         if( !(prev & XCL) )
532           if( !(prev & ~BOTH) )
533                 return;
534           else
535 #ifdef unix
536                 __sync_fetch_and_and ((uint *)latch, ~XCL);
537 #else
538                 _InterlockedAnd((uint *)latch, ~XCL);
539 #endif
540 #ifdef  unix
541   } while( sched_yield(), 1 );
542 #else
543   } while( SwitchToThread(), 1 );
544 #endif
545 }
546
547 //      try to obtain write lock
548
549 //      return 1 if obtained,
550 //              0 otherwise
551
552 int bt_spinwritetry(BtSpinLatch *latch)
553 {
554 uint prev;
555
556 #ifdef  unix
557         prev = __sync_fetch_and_or((uint *)latch, XCL);
558 #else
559         prev = _InterlockedOr((uint *)latch, XCL);
560 #endif
561         //      take write access if all bits are clear
562
563         if( !(prev & XCL) )
564           if( !(prev & ~BOTH) )
565                 return 1;
566           else
567 #ifdef unix
568                 __sync_fetch_and_and ((uint *)latch, ~XCL);
569 #else
570                 _InterlockedAnd((uint *)latch, ~XCL);
571 #endif
572         return 0;
573 }
574
575 //      clear write mode
576
577 void bt_spinreleasewrite(BtSpinLatch *latch)
578 {
579 #ifdef unix
580         __sync_fetch_and_and((uint *)latch, ~BOTH);
581 #else
582         _InterlockedAnd((uint *)latch, ~BOTH);
583 #endif
584 }
585
586 //      decrement reader count
587
588 void bt_spinreleaseread(BtSpinLatch *latch)
589 {
590 #ifdef unix
591         __sync_fetch_and_add((uint *)latch, -SHARE);
592 #else
593         _InterlockedExchangeAdd((uint *)latch, -SHARE);
594 #endif
595 }
596
597 //      read page from permanent location in Btree file
598
599 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
600 {
601 off64_t off = page_no << mgr->page_bits;
602
603 #ifdef unix
604         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
605                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
606                 return BTERR_read;
607         }
608 #else
609 OVERLAPPED ovl[1];
610 uint amt[1];
611
612         memset (ovl, 0, sizeof(OVERLAPPED));
613         ovl->Offset = off;
614         ovl->OffsetHigh = off >> 32;
615
616         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
617                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
618                 return BTERR_read;
619         }
620         if( *amt <  mgr->page_size ) {
621                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
622                 return BTERR_read;
623         }
624 #endif
625         return 0;
626 }
627
628 //      write page to permanent location in Btree file
629 //      clear the dirty bit
630
631 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
632 {
633 off64_t off = page_no << mgr->page_bits;
634
635 #ifdef unix
636         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
637                 return BTERR_wrt;
638 #else
639 OVERLAPPED ovl[1];
640 uint amt[1];
641
642         memset (ovl, 0, sizeof(OVERLAPPED));
643         ovl->Offset = off;
644         ovl->OffsetHigh = off >> 32;
645
646         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
647                 return BTERR_wrt;
648
649         if( *amt <  mgr->page_size )
650                 return BTERR_wrt;
651 #endif
652         return 0;
653 }
654
655 //      link latch table entry into head of latch hash table
656
657 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
658 {
659 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
660 BtLatchSet *latch = bt->mgr->latchsets + slot;
661
662         if( latch->next = bt->mgr->hashtable[hashidx].slot )
663                 bt->mgr->latchsets[latch->next].prev = slot;
664
665         bt->mgr->hashtable[hashidx].slot = slot;
666         latch->page_no = page_no;
667         latch->slot = slot;
668         latch->prev = 0;
669         latch->pin = 1;
670
671         if( loadit )
672           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
673                 return bt->err;
674           else
675                 bt->reads++;
676
677         return bt->err = 0;
678 }
679
680 //      set CLOCK bit in latch
681 //      decrement pin count
682
683 void bt_unpinlatch (BtLatchSet *latch)
684 {
685 #ifdef unix
686         if( ~latch->pin & CLOCK_bit )
687                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
688         __sync_fetch_and_add(&latch->pin, -1);
689 #else
690         if( ~latch->pin & CLOCK_bit )
691                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
692         _InterlockedDecrement16 (&latch->pin);
693 #endif
694 }
695
696 //  return the btree cached page address
697
698 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
699 {
700 BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
701
702         return page;
703 }
704
705 //      find existing latchset or inspire new one
706 //      return with latchset pinned
707
708 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
709 {
710 uint hashidx = page_no % bt->mgr->latchhash;
711 BtLatchSet *latch;
712 uint slot, idx;
713 uint lvl, cnt;
714 off64_t off;
715 uint amt[1];
716 BtPage page;
717
718   //  try to find our entry
719
720   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
721
722   if( slot = bt->mgr->hashtable[hashidx].slot ) do
723   {
724         latch = bt->mgr->latchsets + slot;
725         if( page_no == latch->page_no )
726                 break;
727   } while( slot = latch->next );
728
729   //  found our entry
730   //  increment clock
731
732   if( slot ) {
733         latch = bt->mgr->latchsets + slot;
734 #ifdef unix
735         __sync_fetch_and_add(&latch->pin, 1);
736 #else
737         _InterlockedIncrement16 (&latch->pin);
738 #endif
739         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
740         return latch;
741   }
742
743         //  see if there are any unused pool entries
744 #ifdef unix
745         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
746 #else
747         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
748 #endif
749
750         if( slot < bt->mgr->latchtotal ) {
751                 latch = bt->mgr->latchsets + slot;
752                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
753                         return NULL;
754                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
755                 return latch;
756         }
757
758 #ifdef unix
759         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
760 #else
761         _InterlockedDecrement (&bt->mgr->latchdeployed);
762 #endif
763   //  find and reuse previous entry on victim
764
765   while( 1 ) {
766 #ifdef unix
767         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
768 #else
769         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
770 #endif
771         // try to get write lock on hash chain
772         //      skip entry if not obtained
773         //      or has outstanding pins
774
775         slot %= bt->mgr->latchtotal;
776
777         if( !slot )
778                 continue;
779
780         latch = bt->mgr->latchsets + slot;
781         idx = latch->page_no % bt->mgr->latchhash;
782
783         //      see we are on same chain as hashidx
784
785         if( idx == hashidx )
786                 continue;
787
788         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
789                 continue;
790
791         //  skip this slot if it is pinned
792         //      or the CLOCK bit is set
793
794         if( latch->pin ) {
795           if( latch->pin & CLOCK_bit ) {
796 #ifdef unix
797                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
798 #else
799                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
800 #endif
801           }
802           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
803           continue;
804         }
805
806         //  update permanent page area in btree from buffer pool
807
808         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
809
810         if( latch->dirty )
811           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
812                 return NULL;
813           else
814                 latch->dirty = 0, bt->writes++;
815
816         //  unlink our available slot from its hash chain
817
818         if( latch->prev )
819                 bt->mgr->latchsets[latch->prev].next = latch->next;
820         else
821                 bt->mgr->hashtable[idx].slot = latch->next;
822
823         if( latch->next )
824                 bt->mgr->latchsets[latch->next].prev = latch->prev;
825
826         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
827
828         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
829                 return NULL;
830
831         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
832         return latch;
833   }
834 }
835
836 void bt_mgrclose (BtMgr *mgr)
837 {
838 BtLatchSet *latch;
839 uint num = 0;
840 BtPage page;
841 uint slot;
842
843         //      flush dirty pool pages to the btree
844
845         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
846                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
847                 latch = mgr->latchsets + slot;
848
849                 if( latch->dirty ) {
850                         bt_writepage(mgr, page, latch->page_no);
851                         latch->dirty = 0, num++;
852                 }
853 //              madvise (page, mgr->page_size, MADV_DONTNEED);
854         }
855
856         fprintf(stderr, "%d buffer pool pages flushed\n", num);
857
858 #ifdef unix
859         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
860         munmap (mgr->pagezero, mgr->page_size);
861 #else
862         FlushViewOfFile(mgr->pagezero, 0);
863         UnmapViewOfFile(mgr->pagezero);
864         UnmapViewOfFile(mgr->hashtable);
865         CloseHandle(mgr->halloc);
866         CloseHandle(mgr->hpool);
867 #endif
868 #ifdef unix
869         close (mgr->idx);
870         free (mgr);
871 #else
872         FlushFileBuffers(mgr->idx);
873         CloseHandle(mgr->idx);
874         GlobalFree (mgr);
875 #endif
876 }
877
878 //      close and release memory
879
880 void bt_close (BtDb *bt)
881 {
882 #ifdef unix
883         if( bt->mem )
884                 free (bt->mem);
885 #else
886         if( bt->mem)
887                 VirtualFree (bt->mem, 0, MEM_RELEASE);
888 #endif
889         free (bt);
890 }
891
892 //  open/create new btree buffer manager
893
894 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
895 //              size of page pool (e.g. 262144) and number of lock table entries.
896
897 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint lockmax)
898 {
899 uint lvl, attr, last, slot, idx;
900 unsigned char value[BtId];
901 int flag, initit = 0;
902 BtPageZero *pagezero;
903 off64_t size;
904 uint amt[1];
905 BtMgr* mgr;
906 BtKey* key;
907 BtVal *val;
908
909         // determine sanity of page size and buffer pool
910
911         if( bits > BT_maxbits )
912                 bits = BT_maxbits;
913         else if( bits < BT_minbits )
914                 bits = BT_minbits;
915
916         if( nodemax < 16 ) {
917                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
918                 return NULL;
919         }
920
921 #ifdef unix
922         mgr = calloc (1, sizeof(BtMgr));
923
924         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
925
926         if( mgr->idx == -1 ) {
927                 fprintf (stderr, "Unable to open btree file\n");
928                 return free(mgr), NULL;
929         }
930 #else
931         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
932         attr = FILE_ATTRIBUTE_NORMAL;
933         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
934
935         if( mgr->idx == INVALID_HANDLE_VALUE )
936                 return GlobalFree(mgr), NULL;
937 #endif
938
939 #ifdef unix
940         pagezero = valloc (BT_maxpage);
941         *amt = 0;
942
943         // read minimum page size to get root info
944         //      to support raw disk partition files
945         //      check if bits == 0 on the disk.
946
947         if( size = lseek (mgr->idx, 0L, 2) )
948                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
949                         if( pagezero->alloc->bits )
950                                 bits = pagezero->alloc->bits;
951                         else
952                                 initit = 1;
953                 else
954                         return free(mgr), free(pagezero), NULL;
955         else
956                 initit = 1;
957 #else
958         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
959         size = GetFileSize(mgr->idx, amt);
960
961         if( size || *amt ) {
962                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
963                         return bt_mgrclose (mgr), NULL;
964                 bits = pagezero->alloc->bits;
965         } else
966                 initit = 1;
967 #endif
968
969         mgr->page_size = 1 << bits;
970         mgr->page_bits = bits;
971
972         //  calculate number of latch hash table entries
973
974         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
975         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
976
977         //  add on the number of pages in buffer pool
978         //      along with the corresponding latch table
979
980         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
981         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
982         mgr->latchtotal = nodemax;
983
984         //      add on the sizeof the lock manager hash table and the lock table
985
986         mgr->nlatchpage += (lockmax / 16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
987
988         mgr->nlatchpage += (lockmax * sizeof(BtLockSet) + mgr->page_size - 1) / mgr->page_size;
989
990         if( !initit )
991                 goto mgrlatch;
992
993         // initialize an empty b-tree with latch page, root page, page of leaves
994         // and page(s) of latches and page pool cache
995
996         memset (pagezero, 0, 1 << bits);
997         pagezero->alloc->bits = mgr->page_bits;
998         bt_putid(pagezero->alloc->right, MIN_lvl+1);
999
1000         //  initialize left-most LEAF page in
1001         //      alloc->left.
1002
1003         bt_putid (pagezero->alloc->left, LEAF_page);
1004
1005         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1006                 fprintf (stderr, "Unable to create btree page zero\n");
1007                 return bt_mgrclose (mgr), NULL;
1008         }
1009
1010         memset (pagezero, 0, 1 << bits);
1011         pagezero->alloc->bits = mgr->page_bits;
1012
1013         for( lvl=MIN_lvl; lvl--; ) {
1014                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1015                 key = keyptr(pagezero->alloc, 1);
1016                 key->len = 2;           // create stopper key
1017                 key->key[0] = 0xff;
1018                 key->key[1] = 0xff;
1019
1020                 bt_putid(value, MIN_lvl - lvl + 1);
1021                 val = valptr(pagezero->alloc, 1);
1022                 val->len = lvl ? BtId : 0;
1023                 memcpy (val->value, value, val->len);
1024
1025                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1026                 pagezero->alloc->lvl = lvl;
1027                 pagezero->alloc->cnt = 1;
1028                 pagezero->alloc->act = 1;
1029
1030                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1031                         fprintf (stderr, "Unable to create btree page zero\n");
1032                         return bt_mgrclose (mgr), NULL;
1033                 }
1034         }
1035
1036 mgrlatch:
1037 #ifdef unix
1038         free (pagezero);
1039 #else
1040         VirtualFree (pagezero, 0, MEM_RELEASE);
1041 #endif
1042 #ifdef unix
1043         // mlock the pagezero page
1044
1045         flag = PROT_READ | PROT_WRITE;
1046         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1047         if( mgr->pagezero == MAP_FAILED ) {
1048                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1049                 return bt_mgrclose (mgr), NULL;
1050         }
1051         mlock (mgr->pagezero, mgr->page_size);
1052
1053         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1054         if( mgr->hashtable == MAP_FAILED ) {
1055                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1056                 return bt_mgrclose (mgr), NULL;
1057         }
1058 #else
1059         flag = PAGE_READWRITE;
1060         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1061         if( !mgr->halloc ) {
1062                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1063                 return bt_mgrclose (mgr), NULL;
1064         }
1065
1066         flag = FILE_MAP_WRITE;
1067         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1068         if( !mgr->pagezero ) {
1069                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1070                 return bt_mgrclose (mgr), NULL;
1071         }
1072
1073         flag = PAGE_READWRITE;
1074         size = (uid)mgr->nlatchpage << mgr->page_bits;
1075         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1076         if( !mgr->hpool ) {
1077                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1078                 return bt_mgrclose (mgr), NULL;
1079         }
1080
1081         flag = FILE_MAP_WRITE;
1082         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1083         if( !mgr->hashtable ) {
1084                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1085                 return bt_mgrclose (mgr), NULL;
1086         }
1087 #endif
1088
1089         size = (mgr->latchhash * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1090         mgr->latchsets = (BtLatchSet *)((unsigned char *)mgr->hashtable + size * mgr->page_size);
1091         size = (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1092
1093         mgr->pagepool = (unsigned char *)mgr->hashtable + (size << mgr->page_bits);
1094         mgr->hashlock = (BtHashEntry *)(mgr->pagepool + ((uid)nodemax << mgr->page_bits));
1095         mgr->locktable = (BtLockSet *)((unsigned char *)mgr->hashtable + ((uid)mgr->nlatchpage << mgr->page_bits) - lockmax * sizeof(BtLockSet));
1096
1097         mgr->lockfree = lockmax - 1;
1098         mgr->lockhash = ((unsigned char *)mgr->locktable - (unsigned char *)mgr->hashlock) / sizeof(BtHashEntry);
1099
1100         for( idx = 1; idx < lockmax; idx++ )
1101                 mgr->locktable[idx].next = idx - 1;
1102
1103         return mgr;
1104 }
1105
1106 //      open BTree access method
1107 //      based on buffer manager
1108
1109 BtDb *bt_open (BtMgr *mgr)
1110 {
1111 BtDb *bt = malloc (sizeof(*bt));
1112
1113         memset (bt, 0, sizeof(*bt));
1114         bt->mgr = mgr;
1115 #ifdef unix
1116         bt->mem = valloc (2 *mgr->page_size);
1117 #else
1118         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1119 #endif
1120         bt->frame = (BtPage)bt->mem;
1121         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1122         return bt;
1123 }
1124
1125 //  compare two keys, returning > 0, = 0, or < 0
1126 //  as the comparison value
1127
1128 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1129 {
1130 uint len1 = key1->len;
1131 int ans;
1132
1133         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1134                 return ans;
1135
1136         if( len1 > len2 )
1137                 return 1;
1138         if( len1 < len2 )
1139                 return -1;
1140
1141         return 0;
1142 }
1143
1144 // place write, read, or parent lock on requested page_no.
1145
1146 void bt_lockpage(BtLock mode, BtLatchSet *set)
1147 {
1148         switch( mode ) {
1149         case BtLockRead:
1150                 ReadLock (set->readwr);
1151                 break;
1152         case BtLockWrite:
1153                 WriteLock (set->readwr);
1154                 break;
1155         case BtLockAccess:
1156                 ReadLock (set->access);
1157                 break;
1158         case BtLockDelete:
1159                 WriteLock (set->access);
1160                 break;
1161         case BtLockParent:
1162                 WriteLock (set->parent);
1163                 break;
1164         }
1165 }
1166
1167 // remove write, read, or parent lock on requested page
1168
1169 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1170 {
1171         switch( mode ) {
1172         case BtLockRead:
1173                 ReadRelease (set->readwr);
1174                 break;
1175         case BtLockWrite:
1176                 WriteRelease (set->readwr);
1177                 break;
1178         case BtLockAccess:
1179                 ReadRelease (set->access);
1180                 break;
1181         case BtLockDelete:
1182                 WriteRelease (set->access);
1183                 break;
1184         case BtLockParent:
1185                 WriteRelease (set->parent);
1186                 break;
1187         }
1188 }
1189
1190 //      allocate a new page
1191 //      return with page latched.
1192
1193 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1194 {
1195 int blk;
1196
1197         //      lock allocation page
1198
1199         bt_spinwritelock(bt->mgr->alloclatch);
1200
1201         // use empty chain first
1202         // else allocate empty page
1203
1204         if( set->page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1205                 if( set->latch = bt_pinlatch (bt, set->page_no, 1) )
1206                         set->page = bt_mappage (bt, set->latch);
1207                 else
1208                         return bt->err = BTERR_struct, -1;
1209
1210                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1211                 bt_spinreleasewrite(bt->mgr->alloclatch);
1212
1213                 memcpy (set->page, contents, bt->mgr->page_size);
1214                 set->latch->dirty = 1;
1215                 return 0;
1216         }
1217
1218         set->page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1219         bt_putid(bt->mgr->pagezero->alloc->right, set->page_no+1);
1220
1221         // unlock allocation latch
1222
1223         bt_spinreleasewrite(bt->mgr->alloclatch);
1224
1225         //      don't load cache from btree page
1226
1227         if( set->latch = bt_pinlatch (bt, set->page_no, 0) )
1228                 set->page = bt_mappage (bt, set->latch);
1229         else
1230                 return bt->err = BTERR_struct;
1231
1232         memcpy (set->page, contents, bt->mgr->page_size);
1233         set->latch->dirty = 1;
1234         return 0;
1235 }
1236
1237 //  find slot in page for given key at a given level
1238
1239 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1240 {
1241 uint diff, higher = set->page->cnt, low = 1, slot;
1242 uint good = 0;
1243
1244         //        make stopper key an infinite fence value
1245
1246         if( bt_getid (set->page->right) )
1247                 higher++;
1248         else
1249                 good++;
1250
1251         //      low is the lowest candidate.
1252         //  loop ends when they meet
1253
1254         //  higher is already
1255         //      tested as .ge. the passed key.
1256
1257         while( diff = higher - low ) {
1258                 slot = low + ( diff >> 1 );
1259                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1260                         low = slot + 1;
1261                 else
1262                         higher = slot, good++;
1263         }
1264
1265         //      return zero if key is on right link page
1266
1267         return good ? higher : 0;
1268 }
1269
1270 //  find and load page at given level for given key
1271 //      leave page rd or wr locked as requested
1272
1273 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1274 {
1275 uid page_no = ROOT_page, prevpage = 0;
1276 uint drill = 0xff, slot;
1277 BtLatchSet *prevlatch;
1278 uint mode, prevmode;
1279
1280   //  start at root of btree and drill down
1281
1282   do {
1283         // determine lock mode of drill level
1284         mode = (drill == lvl) ? lock : BtLockRead; 
1285
1286         if( set->latch = bt_pinlatch (bt, page_no, 1) )
1287                 set->page_no = page_no;
1288         else
1289                 return 0;
1290
1291         // obtain access lock using lock chaining with Access mode
1292
1293         if( page_no > ROOT_page )
1294           bt_lockpage(BtLockAccess, set->latch);
1295
1296         set->page = bt_mappage (bt, set->latch);
1297
1298         //      release & unpin parent page
1299
1300         if( prevpage ) {
1301           bt_unlockpage(prevmode, prevlatch);
1302           bt_unpinlatch (prevlatch);
1303           prevpage = 0;
1304         }
1305
1306         // obtain read lock using lock chaining
1307
1308         bt_lockpage(mode, set->latch);
1309
1310         if( set->page->free )
1311                 return bt->err = BTERR_struct, 0;
1312
1313         if( page_no > ROOT_page )
1314           bt_unlockpage(BtLockAccess, set->latch);
1315
1316         // re-read and re-lock root after determining actual level of root
1317
1318         if( set->page->lvl != drill) {
1319                 if( set->page_no != ROOT_page )
1320                         return bt->err = BTERR_struct, 0;
1321                         
1322                 drill = set->page->lvl;
1323
1324                 if( lock != BtLockRead && drill == lvl ) {
1325                   bt_unlockpage(mode, set->latch);
1326                   bt_unpinlatch (set->latch);
1327                   continue;
1328                 }
1329         }
1330
1331         prevpage = set->page_no;
1332         prevlatch = set->latch;
1333         prevmode = mode;
1334
1335         //  find key on page at this level
1336         //  and descend to requested level
1337
1338         if( set->page->kill )
1339           goto slideright;
1340
1341         if( slot = bt_findslot (set, key, len) ) {
1342           if( drill == lvl )
1343                 return slot;
1344
1345           while( slotptr(set->page, slot)->dead )
1346                 if( slot++ < set->page->cnt )
1347                         continue;
1348                 else
1349                         goto slideright;
1350
1351           page_no = bt_getid(valptr(set->page, slot)->value);
1352           drill--;
1353           continue;
1354         }
1355
1356         //  or slide right into next page
1357
1358 slideright:
1359         page_no = bt_getid(set->page->right);
1360
1361   } while( page_no );
1362
1363   // return error on end of right chain
1364
1365   bt->err = BTERR_struct;
1366   return 0;     // return error
1367 }
1368
1369 //      return page to free list
1370 //      page must be delete & write locked
1371
1372 void bt_freepage (BtDb *bt, BtPageSet *set)
1373 {
1374         //      lock allocation page
1375
1376         bt_spinwritelock (bt->mgr->alloclatch);
1377
1378         //      store chain
1379         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1380         bt_putid(bt->mgr->pagezero->chain, set->page_no);
1381         set->latch->dirty = 1;
1382         set->page->free = 1;
1383
1384         // unlock released page
1385
1386         bt_unlockpage (BtLockDelete, set->latch);
1387         bt_unlockpage (BtLockWrite, set->latch);
1388         bt_unpinlatch (set->latch);
1389
1390         // unlock allocation page
1391
1392         bt_spinreleasewrite (bt->mgr->alloclatch);
1393 }
1394
1395 //      a fence key was deleted from a page
1396 //      push new fence value upwards
1397
1398 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1399 {
1400 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1401 unsigned char value[BtId];
1402 BtKey* ptr;
1403 uint idx;
1404
1405         //      remove the old fence value
1406
1407         ptr = keyptr(set->page, set->page->cnt);
1408         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1409         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1410         set->latch->dirty = 1;
1411
1412         //  cache new fence value
1413
1414         ptr = keyptr(set->page, set->page->cnt);
1415         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1416
1417         bt_lockpage (BtLockParent, set->latch);
1418         bt_unlockpage (BtLockWrite, set->latch);
1419
1420         //      insert new (now smaller) fence key
1421
1422         bt_putid (value, set->page_no);
1423         ptr = (BtKey*)leftkey;
1424
1425         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1426           return bt->err;
1427
1428         //      now delete old fence key
1429
1430         ptr = (BtKey*)rightkey;
1431
1432         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1433                 return bt->err;
1434
1435         bt_unlockpage (BtLockParent, set->latch);
1436         bt_unpinlatch(set->latch);
1437         return 0;
1438 }
1439
1440 //      root has a single child
1441 //      collapse a level from the tree
1442
1443 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1444 {
1445 BtPageSet child[1];
1446 uint idx;
1447
1448   // find the child entry and promote as new root contents
1449
1450   do {
1451         for( idx = 0; idx++ < root->page->cnt; )
1452           if( !slotptr(root->page, idx)->dead )
1453                 break;
1454
1455         child->page_no = bt_getid (valptr(root->page, idx)->value);
1456
1457         if( child->latch = bt_pinlatch (bt, child->page_no, 1) )
1458                 child->page = bt_mappage (bt, child->latch);
1459         else
1460                 return bt->err;
1461
1462         bt_lockpage (BtLockDelete, child->latch);
1463         bt_lockpage (BtLockWrite, child->latch);
1464
1465         memcpy (root->page, child->page, bt->mgr->page_size);
1466         root->latch->dirty = 1;
1467
1468         bt_freepage (bt, child);
1469
1470   } while( root->page->lvl > 1 && root->page->act == 1 );
1471
1472   bt_unlockpage (BtLockWrite, root->latch);
1473   bt_unpinlatch (root->latch);
1474   return 0;
1475 }
1476
1477 //      maintain reverse scan pointers by
1478 //      linking left pointer of far right node
1479
1480 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1481 {
1482 BtPageSet right2[1];
1483
1484         //      keep track of rightmost leaf page
1485
1486         if( !right_page_no ) {
1487           bt_putid (bt->mgr->pagezero->alloc->left, left_page_no);
1488           return 0;
1489         }
1490
1491         //      link right page to left page
1492
1493         if( right2->latch = bt_pinlatch (bt, right_page_no, 1) )
1494                 right2->page = bt_mappage (bt, right2->latch);
1495         else
1496                 return bt->err;
1497
1498         bt_lockpage (BtLockWrite, right2->latch);
1499
1500         bt_putid(right2->page->left, left_page_no);
1501         bt_unlockpage (BtLockWrite, right2->latch);
1502         bt_unpinlatch (right2->latch);
1503         return 0;
1504 }
1505
1506 //  find and delete key on page by marking delete flag bit
1507 //  if page becomes empty, delete it from the btree
1508
1509 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1510 {
1511 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1512 uint slot, idx, found, fence;
1513 BtPageSet set[1], right[1];
1514 unsigned char value[BtId];
1515 BtKey *ptr, *tst;
1516 BtVal *val;
1517
1518         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1519                 ptr = keyptr(set->page, slot);
1520         else
1521                 return bt->err;
1522
1523         // if librarian slot, advance to real slot
1524
1525         if( slotptr(set->page, slot)->type == Librarian )
1526                 ptr = keyptr(set->page, ++slot);
1527
1528         fence = slot == set->page->cnt;
1529
1530         // if key is found delete it, otherwise ignore request
1531
1532         if( found = !keycmp (ptr, key, len) )
1533           if( found = slotptr(set->page, slot)->dead == 0 ) {
1534                 val = valptr(set->page,slot);
1535                 slotptr(set->page, slot)->dead = 1;
1536                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1537                 set->page->act--;
1538
1539                 // collapse empty slots beneath the fence
1540
1541                 while( idx = set->page->cnt - 1 )
1542                   if( slotptr(set->page, idx)->dead ) {
1543                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1544                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1545                   } else
1546                         break;
1547           }
1548
1549         //      did we delete a fence key in an upper level?
1550
1551         if( found && lvl && set->page->act && fence )
1552           if( bt_fixfence (bt, set, lvl) )
1553                 return bt->err;
1554           else
1555                 return bt->found = found, 0;
1556
1557         //      do we need to collapse root?
1558
1559         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1560           if( bt_collapseroot (bt, set) )
1561                 return bt->err;
1562           else
1563                 return bt->found = found, 0;
1564
1565         //      return if page is not empty
1566
1567         if( set->page->act ) {
1568                 set->latch->dirty = 1;
1569                 bt_unlockpage(BtLockWrite, set->latch);
1570                 bt_unpinlatch (set->latch);
1571                 return bt->found = found, 0;
1572         }
1573
1574         //      cache copy of fence key
1575         //      to post in parent
1576
1577         ptr = keyptr(set->page, set->page->cnt);
1578         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1579
1580         //      obtain lock on right page
1581
1582         right->page_no = bt_getid(set->page->right);
1583
1584         if( right->latch = bt_pinlatch (bt, right->page_no, 1) )
1585                 right->page = bt_mappage (bt, right->latch);
1586         else
1587                 return 0;
1588
1589         bt_lockpage (BtLockWrite, right->latch);
1590
1591         if( right->page->kill )
1592                 return bt->err = BTERR_struct;
1593
1594         // transfer left link
1595
1596         memcpy (right->page->left, set->page->left, BtId);
1597
1598         // pull contents of right peer into our empty page
1599
1600         memcpy (set->page, right->page, bt->mgr->page_size);
1601         set->latch->dirty = 1;
1602
1603         // update left link
1604
1605         if( !lvl )
1606           if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1607                 return bt->err;
1608
1609         // cache copy of key to update
1610
1611         ptr = keyptr(right->page, right->page->cnt);
1612         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1613
1614         // mark right page deleted and point it to left page
1615         //      until we can post parent updates
1616
1617         bt_putid (right->page->right, set->page_no);
1618         right->latch->dirty = 1;
1619         right->page->kill = 1;
1620
1621         bt_lockpage (BtLockParent, right->latch);
1622         bt_unlockpage (BtLockWrite, right->latch);
1623
1624         bt_lockpage (BtLockParent, set->latch);
1625         bt_unlockpage (BtLockWrite, set->latch);
1626
1627         // redirect higher key directly to our new node contents
1628
1629         bt_putid (value, set->page_no);
1630         ptr = (BtKey*)higherfence;
1631
1632         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1633           return bt->err;
1634
1635         //      delete old lower key to our node
1636
1637         ptr = (BtKey*)lowerfence;
1638
1639         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1640           return bt->err;
1641
1642         //      obtain delete and write locks to right node
1643
1644         bt_unlockpage (BtLockParent, right->latch);
1645         bt_lockpage (BtLockDelete, right->latch);
1646         bt_lockpage (BtLockWrite, right->latch);
1647         bt_freepage (bt, right);
1648
1649         bt_unlockpage (BtLockParent, set->latch);
1650         bt_unpinlatch (set->latch);
1651         bt->found = found;
1652         return 0;
1653 }
1654
1655 BtKey *bt_foundkey (BtDb *bt)
1656 {
1657         return (BtKey*)(bt->key);
1658 }
1659
1660 //      advance to next slot
1661
1662 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1663 {
1664 BtLatchSet *prevlatch;
1665 uid page_no;
1666
1667         if( slot < set->page->cnt )
1668                 return slot + 1;
1669
1670         prevlatch = set->latch;
1671
1672         if( page_no = bt_getid(set->page->right) )
1673           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1674                 set->page = bt_mappage (bt, set->latch);
1675           else
1676                 return 0;
1677         else
1678           return bt->err = BTERR_struct, 0;
1679
1680         // obtain access lock using lock chaining with Access mode
1681
1682         bt_lockpage(BtLockAccess, set->latch);
1683
1684         bt_unlockpage(BtLockRead, prevlatch);
1685         bt_unpinlatch (prevlatch);
1686
1687         bt_lockpage(BtLockRead, set->latch);
1688         bt_unlockpage(BtLockAccess, set->latch);
1689
1690         set->page_no = page_no;
1691         return 1;
1692 }
1693
1694 //      find unique key or first duplicate key in
1695 //      leaf level and return number of value bytes
1696 //      or (-1) if not found.  Setup key for bt_foundkey
1697
1698 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1699 {
1700 BtPageSet set[1];
1701 uint len, slot;
1702 int ret = -1;
1703 BtKey *ptr;
1704 BtVal *val;
1705
1706   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1707    do {
1708         ptr = keyptr(set->page, slot);
1709
1710         //      skip librarian slot place holder
1711
1712         if( slotptr(set->page, slot)->type == Librarian )
1713                 ptr = keyptr(set->page, ++slot);
1714
1715         //      return actual key found
1716
1717         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1718         len = ptr->len;
1719
1720         if( slotptr(set->page, slot)->type == Duplicate )
1721                 len -= BtId;
1722
1723         //      not there if we reach the stopper key
1724
1725         if( slot == set->page->cnt )
1726           if( !bt_getid (set->page->right) )
1727                 break;
1728
1729         // if key exists, return >= 0 value bytes copied
1730         //      otherwise return (-1)
1731
1732         if( slotptr(set->page, slot)->dead )
1733                 continue;
1734
1735         if( keylen == len )
1736           if( !memcmp (ptr->key, key, len) ) {
1737                 val = valptr (set->page,slot);
1738                 if( valmax > val->len )
1739                         valmax = val->len;
1740                 memcpy (value, val->value, valmax);
1741                 ret = valmax;
1742           }
1743
1744         break;
1745
1746    } while( slot = bt_findnext (bt, set, slot) );
1747
1748   bt_unlockpage (BtLockRead, set->latch);
1749   bt_unpinlatch (set->latch);
1750   return ret;
1751 }
1752
1753 //      check page for space available,
1754 //      clean if necessary and return
1755 //      0 - page needs splitting
1756 //      >0  new slot value
1757
1758 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1759 {
1760 uint nxt = bt->mgr->page_size;
1761 BtPage page = set->page;
1762 uint cnt = 0, idx = 0;
1763 uint max = page->cnt;
1764 uint newslot = max;
1765 BtKey *key;
1766 BtVal *val;
1767
1768         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1769                 return slot;
1770
1771         //      skip cleanup and proceed to split
1772         //      if there's not enough garbage
1773         //      to bother with.
1774
1775         if( page->garbage < nxt / 5 )
1776                 return 0;
1777
1778         memcpy (bt->frame, page, bt->mgr->page_size);
1779
1780         // skip page info and set rest of page to zero
1781
1782         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1783         set->latch->dirty = 1;
1784         page->garbage = 0;
1785         page->act = 0;
1786
1787         // clean up page first by
1788         // removing deleted keys
1789
1790         while( cnt++ < max ) {
1791                 if( cnt == slot )
1792                         newslot = idx + 2;
1793                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1794                         continue;
1795
1796                 // copy the value across
1797
1798                 val = valptr(bt->frame, cnt);
1799                 nxt -= val->len + sizeof(BtVal);
1800                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1801
1802                 // copy the key across
1803
1804                 key = keyptr(bt->frame, cnt);
1805                 nxt -= key->len + sizeof(BtKey);
1806                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1807
1808                 // make a librarian slot
1809
1810                 if( idx ) {
1811                         slotptr(page, ++idx)->off = nxt;
1812                         slotptr(page, idx)->type = Librarian;
1813                         slotptr(page, idx)->dead = 1;
1814                 }
1815
1816                 // set up the slot
1817
1818                 slotptr(page, ++idx)->off = nxt;
1819                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1820
1821                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1822                         page->act++;
1823         }
1824
1825         page->min = nxt;
1826         page->cnt = idx;
1827
1828         //      see if page has enough space now, or does it need splitting?
1829
1830         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1831                 return newslot;
1832
1833         return 0;
1834 }
1835
1836 // split the root and raise the height of the btree
1837
1838 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
1839 {
1840 uint nxt = bt->mgr->page_size;
1841 unsigned char value[BtId];
1842 BtPageSet left[1];
1843 BtKey *ptr;
1844 BtVal *val;
1845
1846         //  Obtain an empty page to use, and copy the current
1847         //  root contents into it, e.g. lower keys
1848
1849         if( bt_newpage(bt, left, root->page) )
1850                 return bt->err;
1851
1852         bt_unpinlatch (left->latch);
1853
1854         // set left from higher to lower page
1855
1856         bt_putid (right->page->left, left->page_no);
1857         bt_unpinlatch (right->latch);
1858
1859         // preserve the page info at the bottom
1860         // of higher keys and set rest to zero
1861
1862         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1863
1864         // insert stopper key at top of newroot page
1865         // and increase the root height
1866
1867         nxt -= BtId + sizeof(BtVal);
1868         bt_putid (value, right->page_no);
1869         val = (BtVal *)((unsigned char *)root->page + nxt);
1870         memcpy (val->value, value, BtId);
1871         val->len = BtId;
1872
1873         nxt -= 2 + sizeof(BtKey);
1874         slotptr(root->page, 2)->off = nxt;
1875         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1876         ptr->len = 2;
1877         ptr->key[0] = 0xff;
1878         ptr->key[1] = 0xff;
1879
1880         // insert lower keys page fence key on newroot page as first key
1881
1882         nxt -= BtId + sizeof(BtVal);
1883         bt_putid (value, left->page_no);
1884         val = (BtVal *)((unsigned char *)root->page + nxt);
1885         memcpy (val->value, value, BtId);
1886         val->len = BtId;
1887
1888         nxt -= leftkey->len + sizeof(BtKey);
1889         slotptr(root->page, 1)->off = nxt;
1890         memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
1891         
1892         bt_putid(root->page->right, 0);
1893         root->page->min = nxt;          // reset lowest used offset and key count
1894         root->page->cnt = 2;
1895         root->page->act = 2;
1896         root->page->lvl++;
1897
1898         // release and unpin root pages
1899
1900         bt_unlockpage(BtLockWrite, root->latch);
1901         bt_unpinlatch (root->latch);
1902         return 0;
1903 }
1904
1905 //  split already locked full node
1906 //      return unlocked.
1907
1908 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1909 {
1910 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
1911 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1912 unsigned char value[BtId];
1913 uint lvl = set->page->lvl;
1914 BtPageSet right[1];
1915 BtKey *key, *ptr;
1916 BtVal *val, *src;
1917 uid right2;
1918 uint prev;
1919
1920         //  split higher half of keys to bt->frame
1921
1922         memset (bt->frame, 0, bt->mgr->page_size);
1923         max = set->page->cnt;
1924         cnt = max / 2;
1925         idx = 0;
1926
1927         while( cnt++ < max ) {
1928                 if( slotptr(set->page, cnt)->dead && cnt < max )
1929                         continue;
1930                 src = valptr(set->page, cnt);
1931                 nxt -= src->len + sizeof(BtVal);
1932                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1933
1934                 key = keyptr(set->page, cnt);
1935                 nxt -= key->len + sizeof(BtKey);
1936                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1937                 memcpy (ptr, key, key->len + sizeof(BtKey));
1938
1939                 //      add librarian slot
1940
1941                 if( idx ) {
1942                         slotptr(bt->frame, ++idx)->off = nxt;
1943                         slotptr(bt->frame, idx)->type = Librarian;
1944                         slotptr(bt->frame, idx)->dead = 1;
1945                 }
1946
1947                 //  add actual slot
1948
1949                 slotptr(bt->frame, ++idx)->off = nxt;
1950                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1951
1952                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1953                         bt->frame->act++;
1954         }
1955
1956         // remember existing fence key for new page to the right
1957
1958         memcpy (rightkey, key, key->len + sizeof(BtKey));
1959
1960         bt->frame->bits = bt->mgr->page_bits;
1961         bt->frame->min = nxt;
1962         bt->frame->cnt = idx;
1963         bt->frame->lvl = lvl;
1964
1965         // link right node
1966
1967         if( set->page_no > ROOT_page ) {
1968                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1969                 bt_putid(bt->frame->left, set->page_no);
1970         }
1971
1972         //      get new free page and write higher keys to it.
1973
1974         if( bt_newpage(bt, right, bt->frame) )
1975                 return bt->err;
1976
1977         // link left node
1978
1979         if( set->page_no > ROOT_page && !lvl )
1980           if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
1981                 return bt->err;
1982
1983         //      update lower keys to continue in old page
1984
1985         memcpy (bt->frame, set->page, bt->mgr->page_size);
1986         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1987         set->latch->dirty = 1;
1988
1989         nxt = bt->mgr->page_size;
1990         set->page->garbage = 0;
1991         set->page->act = 0;
1992         max /= 2;
1993         cnt = 0;
1994         idx = 0;
1995
1996         if( slotptr(bt->frame, max)->type == Librarian )
1997                 max--;
1998
1999         //  assemble page of smaller keys
2000
2001         while( cnt++ < max ) {
2002                 if( slotptr(bt->frame, cnt)->dead )
2003                         continue;
2004                 val = valptr(bt->frame, cnt);
2005                 nxt -= val->len + sizeof(BtVal);
2006                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2007
2008                 key = keyptr(bt->frame, cnt);
2009                 nxt -= key->len + sizeof(BtKey);
2010                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2011
2012                 //      add librarian slot
2013
2014                 if( idx ) {
2015                         slotptr(set->page, ++idx)->off = nxt;
2016                         slotptr(set->page, idx)->type = Librarian;
2017                         slotptr(set->page, idx)->dead = 1;
2018                 }
2019
2020                 //      add actual slot
2021
2022                 slotptr(set->page, ++idx)->off = nxt;
2023                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2024                 set->page->act++;
2025         }
2026
2027         // remember fence key for smaller page
2028
2029         memcpy(fencekey, key, key->len + sizeof(BtKey));
2030
2031         bt_putid(set->page->right, right->page_no);
2032         set->page->min = nxt;
2033         set->page->cnt = idx;
2034
2035         // if current page is the root page, split it
2036
2037         if( set->page_no == ROOT_page )
2038                 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2039
2040         // insert new fences in their parent pages
2041
2042         bt_lockpage (BtLockParent, right->latch);
2043
2044         bt_lockpage (BtLockParent, set->latch);
2045         bt_unlockpage (BtLockWrite, set->latch);
2046
2047         // insert new fence for reformulated left block of smaller keys
2048
2049         ptr = (BtKey*)fencekey;
2050         bt_putid (value, set->page_no);
2051
2052         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2053                 return bt->err;
2054
2055         // switch fence for right block of larger keys to new right page
2056
2057         ptr = (BtKey*)rightkey;
2058         bt_putid (value, right->page_no);
2059
2060         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2061                 return bt->err;
2062
2063         bt_unlockpage (BtLockParent, set->latch);
2064         bt_unpinlatch (set->latch);
2065
2066         bt_unlockpage (BtLockParent, right->latch);
2067         bt_unpinlatch (right->latch);
2068         return 0;
2069 }
2070
2071 //      install new key and value onto page
2072 //      page must already be checked for
2073 //      adequate space
2074
2075 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2076 {
2077 uint idx, librarian;
2078 BtSlot *node;
2079 BtKey *ptr;
2080 BtVal *val;
2081
2082         //      if found slot > desired slot and previous slot
2083         //      is a librarian slot, use it
2084
2085         if( slot > 1 )
2086           if( slotptr(set->page, slot-1)->type == Librarian )
2087                 slot--;
2088
2089         // copy value onto page
2090
2091         set->page->min -= vallen + sizeof(BtVal);
2092         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2093         memcpy (val->value, value, vallen);
2094         val->len = vallen;
2095
2096         // copy key onto page
2097
2098         set->page->min -= keylen + sizeof(BtKey);
2099         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2100         memcpy (ptr->key, key, keylen);
2101         ptr->len = keylen;
2102         
2103         //      find first empty slot
2104
2105         for( idx = slot; idx < set->page->cnt; idx++ )
2106           if( slotptr(set->page, idx)->dead )
2107                 break;
2108
2109         // now insert key into array before slot
2110
2111         if( idx == set->page->cnt )
2112                 idx += 2, set->page->cnt += 2, librarian = 2;
2113         else
2114                 librarian = 1;
2115
2116         set->latch->dirty = 1;
2117         set->page->act++;
2118
2119         while( idx > slot + librarian - 1 )
2120                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2121
2122         //      add librarian slot
2123
2124         if( librarian > 1 ) {
2125                 node = slotptr(set->page, slot++);
2126                 node->off = set->page->min;
2127                 node->type = Librarian;
2128                 node->dead = 1;
2129         }
2130
2131         //      fill in new slot
2132
2133         node = slotptr(set->page, slot);
2134         node->off = set->page->min;
2135         node->type = type;
2136         node->dead = 0;
2137
2138         bt_unlockpage (BtLockWrite, set->latch);
2139         bt_unpinlatch (set->latch);
2140         return 0;
2141 }
2142
2143 //  Insert new key into the btree at given level.
2144 //      either add a new key or update/add an existing one
2145
2146 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, int unique)
2147 {
2148 unsigned char newkey[BT_keyarray];
2149 uint slot, idx, len;
2150 BtPageSet set[1];
2151 BtKey *ptr, *ins;
2152 uid sequence;
2153 BtVal *val;
2154 uint type;
2155
2156   // set up the key we're working on
2157
2158   ins = (BtKey*)newkey;
2159   memcpy (ins->key, key, keylen);
2160   ins->len = keylen;
2161
2162   // is this a non-unique index value?
2163
2164   if( unique )
2165         type = Unique;
2166   else {
2167         type = Duplicate;
2168         sequence = bt_newdup (bt);
2169         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2170         ins->len += BtId;
2171   }
2172
2173   while( 1 ) { // find the page and slot for the current key
2174         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2175                 ptr = keyptr(set->page, slot);
2176         else {
2177                 if( !bt->err )
2178                         bt->err = BTERR_ovflw;
2179                 return bt->err;
2180         }
2181
2182         // if librarian slot == found slot, advance to real slot
2183
2184         if( slotptr(set->page, slot)->type == Librarian )
2185           if( !keycmp (ptr, key, keylen) )
2186                 ptr = keyptr(set->page, ++slot);
2187
2188         len = ptr->len;
2189
2190         if( slotptr(set->page, slot)->type == Duplicate )
2191                 len -= BtId;
2192
2193         //  if inserting a duplicate key or unique key
2194         //      check for adequate space on the page
2195         //      and insert the new key before slot.
2196
2197         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2198           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2199                 if( bt_splitpage (bt, set) )
2200                   return bt->err;
2201                 else
2202                   continue;
2203
2204           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2205         }
2206
2207         // if key already exists, update value and return
2208
2209         val = valptr(set->page, slot);
2210
2211         if( val->len >= vallen ) {
2212                 if( slotptr(set->page, slot)->dead )
2213                         set->page->act++;
2214                 set->page->garbage += val->len - vallen;
2215                 set->latch->dirty = 1;
2216                 slotptr(set->page, slot)->dead = 0;
2217                 val->len = vallen;
2218                 memcpy (val->value, value, vallen);
2219                 bt_unlockpage(BtLockWrite, set->latch);
2220                 bt_unpinlatch (set->latch);
2221                 return 0;
2222         }
2223
2224         //      new update value doesn't fit in existing value area
2225
2226         if( !slotptr(set->page, slot)->dead )
2227                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2228         else {
2229                 slotptr(set->page, slot)->dead = 0;
2230                 set->page->act++;
2231         }
2232
2233         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2234           if( bt_splitpage (bt, set) )
2235                 return bt->err;
2236           else
2237                 continue;
2238
2239         set->page->min -= vallen + sizeof(BtVal);
2240         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2241         memcpy (val->value, value, vallen);
2242         val->len = vallen;
2243
2244         set->latch->dirty = 1;
2245         set->page->min -= keylen + sizeof(BtKey);
2246         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2247         memcpy (ptr->key, key, keylen);
2248         ptr->len = keylen;
2249         
2250         slotptr(set->page, slot)->off = set->page->min;
2251         bt_unlockpage(BtLockWrite, set->latch);
2252         bt_unpinlatch (set->latch);
2253         return 0;
2254   }
2255   return 0;
2256 }
2257
2258 //      compute hash of string
2259
2260 uint bt_hashkey (unsigned char *key, unsigned int len)
2261 {
2262 uint hash = 0;
2263
2264         while( len >= sizeof(uint) )
2265                 hash *= 11, hash += *(uint *)key, len -= sizeof(uint), key += sizeof(uint);
2266
2267         while( len )
2268                 hash *= 11, hash += *key++ * len--;
2269
2270         return hash;
2271 }
2272
2273 //      add a new lock table entry
2274
2275 uint bt_newlock (BtDb *bt, BtKey *key, uint hashidx)
2276 {
2277 BtLockSet *lock = bt->mgr->locktable;
2278 uint slot, prev;
2279
2280         //      obtain lock manager global lock
2281
2282         bt_spinwritelock (bt->mgr->locklatch);
2283
2284         //      return NULL if table is full
2285
2286         if( !(slot = bt->mgr->lockfree) ) {
2287                 bt_spinreleasewrite (bt->mgr->locklatch);
2288                 return 0;
2289         }
2290
2291         //  maintain free chain
2292
2293         bt->mgr->lockfree = lock[slot].next;
2294         bt_spinreleasewrite (bt->mgr->locklatch);
2295         
2296         if( prev = bt->mgr->hashlock[hashidx].slot )
2297                 lock[prev].prev = slot;
2298
2299         bt->mgr->hashlock[hashidx].slot = slot;
2300         lock[slot].hashidx = hashidx;
2301         lock[slot].next = prev;
2302         lock[slot].prev = 0;
2303
2304         //      save the key being locked
2305
2306         memcpy (lock[slot].key, key, key->len + sizeof(BtKey));
2307         return slot;
2308 }
2309
2310 //      add key to the lock table
2311 //      block until available.
2312
2313 uint bt_setlock(BtDb *bt, BtKey *key)
2314 {
2315 uint hashidx = bt_hashkey(key->key, key->len) % bt->mgr->lockhash;
2316 BtLockSet *lock = NULL;
2317 BtKey *key2;
2318 uint slot;
2319
2320   //  find existing lock entry
2321   //  or recover from full table
2322
2323   while( lock == NULL ) {
2324         //      obtain lock on hash slot
2325
2326         bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2327
2328         if( slot = bt->mgr->hashlock[hashidx].slot )
2329           do {
2330                 lock = bt->mgr->locktable + slot;
2331                 key2 = (BtKey *)lock->key;
2332
2333                 if( !keycmp (key, key2->key, key2->len) )
2334                         break;
2335           } while( slot = lock->next );
2336
2337         if( slot )
2338                 break;
2339
2340         if( slot = bt_newlock (bt, key, hashidx) )
2341                 break;
2342
2343         bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2344 #ifdef unix
2345         sched_yield();
2346 #else
2347         SwitchToThread ();
2348 #endif
2349   }
2350
2351   lock = bt->mgr->locktable + slot;
2352   lock->pin++;
2353
2354   bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2355   WriteLock (lock->readwr);
2356   return slot;
2357 }
2358
2359 void bt_lockclr (BtDb *bt, uint slot)
2360 {
2361 BtLockSet *lock = bt->mgr->locktable + slot;
2362 uint hashidx = lock->hashidx;
2363 uint next, prev;
2364         
2365         bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2366         WriteRelease (lock->readwr);
2367
2368         // if entry is no longer in use,
2369         //      return it to the free chain.
2370
2371         if( !--lock->pin ) {
2372                 if( next = lock->next )
2373                         bt->mgr->locktable[next].prev = lock->prev;
2374
2375                 if( prev = lock->prev )
2376                         bt->mgr->locktable[prev].next = lock->next;
2377                 else
2378                         bt->mgr->hashlock[lock->hashidx].slot = next;
2379
2380                 bt_spinwritelock (bt->mgr->locklatch);
2381                 lock->next = bt->mgr->lockfree;
2382                 bt->mgr->lockfree = slot;
2383                 bt_spinreleasewrite (bt->mgr->locklatch);
2384         }
2385
2386         bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2387 }
2388
2389 //      atomic insert of a batch of keys.
2390 //      return -1 if BTERR is set
2391 //      otherwise return slot number
2392 //      causing the key constraint violation
2393 //      or zero on successful completion.
2394
2395 int bt_atomicinsert (BtDb *bt, BtPage source)
2396 {
2397 uint locks[MAX_atomic];
2398 BtKey *key, *key2;
2399 int result = 0;
2400 BtSlot temp[1];
2401 uint slot, idx;
2402 BtVal *val;
2403 int type;
2404
2405         // first sort the list of keys into order to
2406         //      prevent deadlocks between threads.
2407
2408         for( slot = 1; slot++ < source->cnt; ) {
2409           *temp = *slotptr(source,slot);
2410           key = keyptr (source,slot);
2411           for( idx = slot; --idx; ) {
2412             key2 = keyptr (source,idx);
2413                 if( keycmp (key, key2->key, key2->len) < 0 ) {
2414                   *slotptr(source,idx+1) = *slotptr(source,idx);
2415                   *slotptr(source,idx) = *temp;
2416                 } else
2417                   break;
2418           }
2419         }
2420
2421         // take each unique-type key and add it to the lock table
2422
2423         for( slot = 0; slot++ < source->cnt; )
2424           if( slotptr(source, slot)->type == Unique )
2425                 locks[slot] = bt_setlock (bt, keyptr(source,slot));
2426
2427         // Lookup each unique key and determine constraint violations
2428
2429         for( slot = 0; slot++ < source->cnt; )
2430           if( slotptr(source, slot)->type == Unique ) {
2431                 key = keyptr(source, slot);
2432                 if( bt_findkey (bt, key->key, key->len, NULL, 0) < 0 )
2433                   continue;
2434                 result = slot;
2435                 break;
2436           }
2437
2438         //      add each key to the btree
2439
2440         if( !result )
2441          for( slot = 0; slot++ < source->cnt; ) {
2442           key = keyptr(source,slot);
2443           val = valptr(source,slot);
2444           type = slotptr(source,slot)->type;
2445           if( bt_insertkey (bt, key->key, key->len, 0, val->value, val->len, type == Unique) )
2446                 return -1;
2447          }
2448
2449         // remove each unique-type key from the lock table
2450
2451         for( slot = 0; slot++ < source->cnt; )
2452           if( slotptr(source, slot)->type == Unique )
2453                 bt_lockclr (bt, locks[slot]);
2454
2455         return result;
2456 }
2457
2458 //      set cursor to highest slot on highest page
2459
2460 uint bt_lastkey (BtDb *bt)
2461 {
2462 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2463 BtPageSet set[1];
2464 uint slot;
2465
2466         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2467                 set->page = bt_mappage (bt, set->latch);
2468         else
2469                 return 0;
2470
2471     bt_lockpage(BtLockRead, set->latch);
2472
2473         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2474         slot = set->page->cnt;
2475
2476     bt_unlockpage(BtLockRead, set->latch);
2477         bt_unpinlatch (set->latch);
2478         return slot;
2479 }
2480
2481 //      return previous slot on cursor page
2482
2483 uint bt_prevkey (BtDb *bt, uint slot)
2484 {
2485 BtPageSet set[1];
2486 uid left;
2487
2488         if( --slot )
2489                 return slot;
2490
2491         if( left = bt_getid(bt->cursor->left) )
2492                 bt->cursor_page = left;
2493         else
2494                 return 0;
2495
2496         if( set->latch = bt_pinlatch (bt, left, 1) )
2497                 set->page = bt_mappage (bt, set->latch);
2498         else
2499                 return 0;
2500
2501     bt_lockpage(BtLockRead, set->latch);
2502         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2503         bt_unlockpage(BtLockRead, set->latch);
2504         bt_unpinlatch (set->latch);
2505         return bt->cursor->cnt;
2506 }
2507
2508 //  return next slot on cursor page
2509 //  or slide cursor right into next page
2510
2511 uint bt_nextkey (BtDb *bt, uint slot)
2512 {
2513 BtPageSet set[1];
2514 uid right;
2515
2516   do {
2517         right = bt_getid(bt->cursor->right);
2518
2519         while( slot++ < bt->cursor->cnt )
2520           if( slotptr(bt->cursor,slot)->dead )
2521                 continue;
2522           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2523                 return slot;
2524           else
2525                 break;
2526
2527         if( !right )
2528                 break;
2529
2530         bt->cursor_page = right;
2531
2532         if( set->latch = bt_pinlatch (bt, right, 1) )
2533                 set->page = bt_mappage (bt, set->latch);
2534         else
2535                 return 0;
2536
2537     bt_lockpage(BtLockRead, set->latch);
2538
2539         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2540
2541         bt_unlockpage(BtLockRead, set->latch);
2542         bt_unpinlatch (set->latch);
2543         slot = 0;
2544
2545   } while( 1 );
2546
2547   return bt->err = 0;
2548 }
2549
2550 //  cache page of keys into cursor and return starting slot for given key
2551
2552 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2553 {
2554 BtPageSet set[1];
2555 uint slot;
2556
2557         // cache page for retrieval
2558
2559         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2560           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2561         else
2562           return 0;
2563
2564         bt->cursor_page = set->page_no;
2565
2566         bt_unlockpage(BtLockRead, set->latch);
2567         bt_unpinlatch (set->latch);
2568         return slot;
2569 }
2570
2571 BtKey *bt_key(BtDb *bt, uint slot)
2572 {
2573         return keyptr(bt->cursor, slot);
2574 }
2575
2576 BtVal *bt_val(BtDb *bt, uint slot)
2577 {
2578         return valptr(bt->cursor,slot);
2579 }
2580
2581 #ifdef STANDALONE
2582
2583 #ifndef unix
2584 double getCpuTime(int type)
2585 {
2586 FILETIME crtime[1];
2587 FILETIME xittime[1];
2588 FILETIME systime[1];
2589 FILETIME usrtime[1];
2590 SYSTEMTIME timeconv[1];
2591 double ans = 0;
2592
2593         memset (timeconv, 0, sizeof(SYSTEMTIME));
2594
2595         switch( type ) {
2596         case 0:
2597                 GetSystemTimeAsFileTime (xittime);
2598                 FileTimeToSystemTime (xittime, timeconv);
2599                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2600                 break;
2601         case 1:
2602                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2603                 FileTimeToSystemTime (usrtime, timeconv);
2604                 break;
2605         case 2:
2606                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2607                 FileTimeToSystemTime (systime, timeconv);
2608                 break;
2609         }
2610
2611         ans += (double)timeconv->wHour * 3600;
2612         ans += (double)timeconv->wMinute * 60;
2613         ans += (double)timeconv->wSecond;
2614         ans += (double)timeconv->wMilliseconds / 1000;
2615         return ans;
2616 }
2617 #else
2618 #include <time.h>
2619 #include <sys/resource.h>
2620
2621 double getCpuTime(int type)
2622 {
2623 struct rusage used[1];
2624 struct timeval tv[1];
2625
2626         switch( type ) {
2627         case 0:
2628                 gettimeofday(tv, NULL);
2629                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2630
2631         case 1:
2632                 getrusage(RUSAGE_SELF, used);
2633                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2634
2635         case 2:
2636                 getrusage(RUSAGE_SELF, used);
2637                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2638         }
2639
2640         return 0;
2641 }
2642 #endif
2643
2644 void bt_poolaudit (BtMgr *mgr)
2645 {
2646 BtLatchSet *latch;
2647 uint slot = 0;
2648
2649         while( slot++ < mgr->latchdeployed ) {
2650                 latch = mgr->latchsets + slot;
2651
2652                 if( *latch->readwr->rin & MASK )
2653                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2654                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2655
2656                 if( *latch->access->rin & MASK )
2657                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2658                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2659
2660                 if( *latch->parent->rin & MASK )
2661                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2662                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2663
2664                 if( latch->pin & ~CLOCK_bit ) {
2665                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2666                         latch->pin = 0;
2667                 }
2668         }
2669 }
2670
2671 uint bt_latchaudit (BtDb *bt)
2672 {
2673 ushort idx, hashidx;
2674 uid next, page_no;
2675 BtLatchSet *latch;
2676 uint cnt = 0;
2677 BtKey *ptr;
2678
2679         if( *(ushort *)(bt->mgr->alloclatch) )
2680                 fprintf(stderr, "Alloc page locked\n");
2681         *(uint *)(bt->mgr->alloclatch) = 0;
2682
2683         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2684                 latch = bt->mgr->latchsets + idx;
2685                 if( *latch->readwr->rin & MASK )
2686                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2687                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2688
2689                 if( *latch->access->rin & MASK )
2690                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2691                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2692
2693                 if( *latch->parent->rin & MASK )
2694                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2695                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2696
2697                 if( latch->pin ) {
2698                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2699                         latch->pin = 0;
2700                 }
2701         }
2702
2703         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2704           if( *(uint *)(bt->mgr->hashtable[hashidx].latch) )
2705                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2706
2707           *(uint *)(bt->mgr->hashtable[hashidx].latch) = 0;
2708
2709           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2710                 latch = bt->mgr->latchsets + idx;
2711                 if( latch->pin )
2712                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2713           } while( idx = latch->next );
2714         }
2715
2716         page_no = LEAF_page;
2717
2718         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2719         uid off = page_no << bt->mgr->page_bits;
2720 #ifdef unix
2721           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2722 #else
2723         DWORD amt[1];
2724
2725           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2726
2727           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2728                 return bt->err = BTERR_map;
2729
2730           if( *amt <  bt->mgr->page_size )
2731                 return bt->err = BTERR_map;
2732 #endif
2733                 if( !bt->frame->free && !bt->frame->lvl )
2734                         cnt += bt->frame->act;
2735                 page_no++;
2736         }
2737                 
2738         cnt--;  // remove stopper key
2739         fprintf(stderr, " Total keys read %d\n", cnt);
2740
2741         bt_close (bt);
2742         return 0;
2743 }
2744
2745 typedef struct {
2746         char idx;
2747         char *type;
2748         char *infile;
2749         BtMgr *mgr;
2750 } ThreadArg;
2751
2752 //  standalone program to index file of keys
2753 //  then list them onto std-out
2754
2755 #ifdef unix
2756 void *index_file (void *arg)
2757 #else
2758 uint __stdcall index_file (void *arg)
2759 #endif
2760 {
2761 int line = 0, found = 0, cnt = 0, unique;
2762 uid next, page_no = LEAF_page;  // start on first page of leaves
2763 BtPage page = calloc (4096, 1);
2764 unsigned char key[BT_maxkey];
2765 ThreadArg *args = arg;
2766 int ch, len = 0, slot;
2767 BtPageSet set[1];
2768 int atomic;
2769 BtKey *ptr;
2770 BtVal *val;
2771 BtDb *bt;
2772 FILE *in;
2773
2774         bt = bt_open (args->mgr);
2775
2776         unique = (args->type[1] | 0x20) != 'd';
2777         atomic = (args->type[1] | 0x20) == 'a';
2778
2779         switch(args->type[0] | 0x20)
2780         {
2781         case 'a':
2782                 fprintf(stderr, "started latch mgr audit\n");
2783                 cnt = bt_latchaudit (bt);
2784                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2785                 break;
2786
2787         case 'p':
2788                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2789                 if( in = fopen (args->infile, "rb") )
2790                   while( ch = getc(in), ch != EOF )
2791                         if( ch == '\n' )
2792                         {
2793                           line++;
2794
2795                           if( atomic ) {
2796                                 memset (page, 0, 4096);
2797                                 slotptr(page, 1)->off = 2048;
2798                                 slotptr(page, 1)->type = Unique;
2799                                 ptr = keyptr(page,1);
2800                                 ptr->len = 10;
2801                                 memcpy(ptr->key, key, 10);
2802                                 val = valptr(page,1);
2803                                 val->len = len - 10;
2804                                 memcpy (val->value, key + 10, len - 10);
2805                                 page->cnt = 1;
2806                                 if( slot = bt_atomicinsert (bt, page) )
2807                                   fprintf(stderr, "Error %d Line: %d\n", slot, line), exit(0);
2808                           }
2809                           else if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2810                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2811                           len = 0;
2812                         }
2813                         else if( len < BT_maxkey )
2814                                 key[len++] = ch;
2815                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2816                 break;
2817
2818         case 'w':
2819                 fprintf(stderr, "started indexing for %s\n", args->infile);
2820                 if( in = fopen (args->infile, "rb") )
2821                   while( ch = getc(in), ch != EOF )
2822                         if( ch == '\n' )
2823                         {
2824                           line++;
2825                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2826                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2827                           len = 0;
2828                         }
2829                         else if( len < BT_maxkey )
2830                                 key[len++] = ch;
2831                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2832                 break;
2833
2834         case 'd':
2835                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2836                 if( in = fopen (args->infile, "rb") )
2837                   while( ch = getc(in), ch != EOF )
2838                         if( ch == '\n' )
2839                         {
2840                           line++;
2841                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2842                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2843                           ptr = (BtKey*)(bt->key);
2844                           found++;
2845
2846                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2847                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2848                           len = 0;
2849                         }
2850                         else if( len < BT_maxkey )
2851                                 key[len++] = ch;
2852                 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2853                 break;
2854
2855         case 'f':
2856                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2857                 if( in = fopen (args->infile, "rb") )
2858                   while( ch = getc(in), ch != EOF )
2859                         if( ch == '\n' )
2860                         {
2861                           line++;
2862                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2863                                 found++;
2864                           else if( bt->err )
2865                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2866                           len = 0;
2867                         }
2868                         else if( len < BT_maxkey )
2869                                 key[len++] = ch;
2870                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2871                 break;
2872
2873         case 's':
2874                 fprintf(stderr, "started scanning\n");
2875                 do {
2876                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2877                                 set->page = bt_mappage (bt, set->latch);
2878                         else
2879                                 fprintf(stderr, "unable to obtain latch"), exit(1);
2880                         bt_lockpage (BtLockRead, set->latch);
2881                         next = bt_getid (set->page->right);
2882
2883                         for( slot = 0; slot++ < set->page->cnt; )
2884                          if( next || slot < set->page->cnt )
2885                           if( !slotptr(set->page, slot)->dead ) {
2886                                 ptr = keyptr(set->page, slot);
2887                                 len = ptr->len;
2888
2889                             if( slotptr(set->page, slot)->type == Duplicate )
2890                                         len -= BtId;
2891
2892                                 fwrite (ptr->key, len, 1, stdout);
2893                                 val = valptr(set->page, slot);
2894                                 fwrite (val->value, val->len, 1, stdout);
2895                                 fputc ('\n', stdout);
2896                                 cnt++;
2897                            }
2898
2899                         bt_unlockpage (BtLockRead, set->latch);
2900                         bt_unpinlatch (set->latch);
2901                 } while( page_no = next );
2902
2903                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2904                 break;
2905
2906         case 'r':
2907                 fprintf(stderr, "started reverse scan\n");
2908                 if( slot = bt_lastkey (bt) )
2909                    while( slot = bt_prevkey (bt, slot) ) {
2910                         if( slotptr(bt->cursor, slot)->dead )
2911                           continue;
2912
2913                         ptr = keyptr(bt->cursor, slot);
2914                         len = ptr->len;
2915
2916                         if( slotptr(bt->cursor, slot)->type == Duplicate )
2917                                 len -= BtId;
2918
2919                         fwrite (ptr->key, len, 1, stdout);
2920                         val = valptr(bt->cursor, slot);
2921                         fwrite (val->value, val->len, 1, stdout);
2922                         fputc ('\n', stdout);
2923                         cnt++;
2924                   }
2925
2926                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2927                 break;
2928
2929         case 'c':
2930 #ifdef unix
2931                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2932 #endif
2933                 fprintf(stderr, "started counting\n");
2934                 page_no = LEAF_page;
2935
2936                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2937                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
2938                                 break;
2939
2940                         if( !bt->frame->free && !bt->frame->lvl )
2941                                 cnt += bt->frame->act;
2942
2943                         bt->reads++;
2944                         page_no++;
2945                 }
2946                 
2947                 cnt--;  // remove stopper key
2948                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2949                 break;
2950         }
2951
2952         bt_close (bt);
2953 #ifdef unix
2954         return NULL;
2955 #else
2956         return 0;
2957 #endif
2958 }
2959
2960 typedef struct timeval timer;
2961
2962 int main (int argc, char **argv)
2963 {
2964 int idx, cnt, len, slot, err;
2965 int segsize, bits = 16;
2966 double start, stop;
2967 #ifdef unix
2968 pthread_t *threads;
2969 #else
2970 HANDLE *threads;
2971 #endif
2972 ThreadArg *args;
2973 uint poolsize = 0;
2974 uint locksize = 0;
2975 float elapsed;
2976 char key[1];
2977 BtMgr *mgr;
2978 BtKey *ptr;
2979 BtDb *bt;
2980
2981         if( argc < 3 ) {
2982                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find/Atomic [page_bits buffer_pool_size lock_mgr_size src_file1 src_file2 ... ]\n", argv[0]);
2983                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2984                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
2985                 fprintf (stderr, "  lock_mgr_size is the maximum number of outstanding key locks\n");
2986                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2987                 exit(0);
2988         }
2989
2990         start = getCpuTime(0);
2991
2992         if( argc > 3 )
2993                 bits = atoi(argv[3]);
2994
2995         if( argc > 4 )
2996                 poolsize = atoi(argv[4]);
2997
2998         if( !poolsize )
2999                 fprintf (stderr, "Warning: no mapped_pool\n");
3000
3001         if( argc > 5 )
3002                 locksize = atoi(argv[5]);
3003
3004         cnt = argc - 6;
3005 #ifdef unix
3006         threads = malloc (cnt * sizeof(pthread_t));
3007 #else
3008         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3009 #endif
3010         args = malloc (cnt * sizeof(ThreadArg));
3011
3012         mgr = bt_mgr ((argv[1]), bits, poolsize, locksize);
3013
3014         if( !mgr ) {
3015                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3016                 exit (1);
3017         }
3018
3019         //      fire off threads
3020
3021         for( idx = 0; idx < cnt; idx++ ) {
3022                 args[idx].infile = argv[idx + 6];
3023                 args[idx].type = argv[2];
3024                 args[idx].mgr = mgr;
3025                 args[idx].idx = idx;
3026 #ifdef unix
3027                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3028                         fprintf(stderr, "Error creating thread %d\n", err);
3029 #else
3030                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3031 #endif
3032         }
3033
3034         //      wait for termination
3035
3036 #ifdef unix
3037         for( idx = 0; idx < cnt; idx++ )
3038                 pthread_join (threads[idx], NULL);
3039 #else
3040         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3041
3042         for( idx = 0; idx < cnt; idx++ )
3043                 CloseHandle(threads[idx]);
3044
3045 #endif
3046         bt_poolaudit(mgr);
3047         bt_mgrclose (mgr);
3048
3049         elapsed = getCpuTime(0) - start;
3050         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3051         elapsed = getCpuTime(1);
3052         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3053         elapsed = getCpuTime(2);
3054         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3055 }
3056
3057 #endif  //STANDALONE