]> pd.if.org Git - btree/blob - threadskv7.c
Introduce threadskv7 that includes atomic insert of a set of keys with values
[btree] / threadskv7.c
1 // btree version threadskv6 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and atomic key insert
9
10 // 17 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 //      maximum number of keys to insert atomically in one call
88
89 #define MAX_atomic              256
90
91 #define BT_maxkey       255             // maximum number of bytes in a key
92 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
93
94 /*
95 There are five lock types for each node in three independent sets: 
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
101 */
102
103 typedef enum{
104         BtLockAccess,
105         BtLockDelete,
106         BtLockRead,
107         BtLockWrite,
108         BtLockParent
109 } BtLock;
110
111 //      definition for phase-fair reader/writer lock implementation
112
113 typedef struct {
114         ushort rin[1];
115         ushort rout[1];
116         ushort ticket[1];
117         ushort serving[1];
118 } RWLock;
119
120 #define PHID 0x1
121 #define PRES 0x2
122 #define MASK 0x3
123 #define RINC 0x4
124
125 //      definition for spin latch implementation
126
127 // exclusive is set for write access
128 // share is count of read accessors
129 // grant write lock when share == 0
130
131 volatile typedef struct {
132         uint exclusive:1;
133         uint pending:1;
134         uint share:30;
135 } BtSpinLatch;
136
137 #define XCL 1
138 #define PEND 2
139 #define BOTH 3
140 #define SHARE 4
141
142 //      The key structure occupies space at the upper end of
143 //      each page.  It's a length byte followed by the key
144 //      bytes.
145
146 typedef struct {
147         unsigned char len;              // this can be changed to a ushort or uint
148         unsigned char key[0];
149 } BtKey;
150
151 //      the value structure also occupies space at the upper
152 //      end of the page. Each key is immediately followed by a value.
153
154 typedef struct {
155         unsigned char len;              // this can be changed to a ushort or uint
156         unsigned char value[0];
157 } BtVal;
158
159 //  hash table entries
160
161 typedef struct {
162         uint slot;                              // Latch table entry at head of chain
163         BtSpinLatch latch[1];
164 } BtHashEntry;
165
166 //      latch manager table structure
167
168 typedef struct {
169         uid page_no;                    // latch set page number
170         RWLock readwr[1];               // read/write page lock
171         RWLock access[1];               // Access Intent/Page delete
172         RWLock parent[1];               // Posting of fence key in parent
173         uint slot;                              // entry slot in latch table
174         uint next;                              // next entry in hash table chain
175         uint prev;                              // prev entry in hash table chain
176         volatile ushort pin;    // number of outstanding threads
177         ushort dirty:1;                 // page in cache is dirty
178 } BtLatchSet;
179
180 //      lock manager table structure
181
182 typedef struct {
183         RWLock readwr[1];               // read/write key lock
184         uint next;
185         uint prev;
186         uint pin;                               // count of readers waiting
187         uint hashidx;                   // hash idx for entry
188         unsigned char key[BT_keyarray];
189 } BtLockSet;
190
191 //      Define the length of the page record numbers
192
193 #define BtId 6
194
195 //      Page key slot definition.
196
197 //      Keys are marked dead, but remain on the page until
198 //      it cleanup is called. The fence key (highest key) for
199 //      a leaf page is always present, even after cleanup.
200
201 //      Slot types
202
203 //      In addition to the Unique keys that occupy slots
204 //      there are Librarian and Duplicate key
205 //      slots occupying the key slot array.
206
207 //      The Librarian slots are dead keys that
208 //      serve as filler, available to add new Unique
209 //      or Dup slots that are inserted into the B-tree.
210
211 //      The Duplicate slots have had their key bytes extended
212 //      by 6 bytes to contain a binary duplicate key uniqueifier.
213
214 typedef enum {
215         Unique,
216         Librarian,
217         Duplicate
218 } BtSlotType;
219
220 typedef struct {
221         uint off:BT_maxbits;    // page offset for key start
222         uint type:3;                    // type of slot
223         uint dead:1;                    // set for deleted slot
224 } BtSlot;
225
226 //      The first part of an index page.
227 //      It is immediately followed
228 //      by the BtSlot array of keys.
229
230 //      note that this structure size
231 //      must be a multiple of 8 bytes
232 //      in order to place dups correctly.
233
234 typedef struct BtPage_ {
235         uint cnt;                                       // count of keys in page
236         uint act;                                       // count of active keys
237         uint min;                                       // next key offset
238         uint garbage;                           // page garbage in bytes
239         unsigned char bits:7;           // page size in bits
240         unsigned char free:1;           // page is on free chain
241         unsigned char lvl:7;            // level of page
242         unsigned char kill:1;           // page is being deleted
243         unsigned char left[BtId];       // page number to left
244         unsigned char filler[2];        // padding to multiple of 8
245         unsigned char right[BtId];      // page number to right
246 } *BtPage;
247
248 //  The loadpage interface object
249
250 typedef struct {
251         uid page_no;            // current page number
252         BtPage page;            // current page pointer
253         BtLatchSet *latch;      // current page latch set
254 } BtPageSet;
255
256 //      structure for latch manager on ALLOC_page
257
258 typedef struct {
259         struct BtPage_ alloc[1];        // next page_no in right ptr
260         unsigned long long dups[1];     // global duplicate key uniqueifier
261         unsigned char chain[BtId];      // head of free page_nos chain
262 } BtPageZero;
263
264 //      The object structure for Btree access
265
266 typedef struct {
267         uint page_size;                         // page size    
268         uint page_bits;                         // page size in bits    
269 #ifdef unix
270         int idx;
271 #else
272         HANDLE idx;
273 #endif
274         BtPageZero *pagezero;           // mapped allocation page
275         BtSpinLatch alloclatch[1];      // allocation area lite latch
276         uint latchdeployed;                     // highest number of pool entries deployed
277         uint nlatchpage;                        // number of latch & lock & pool pages
278         uint latchtotal;                        // number of page latch entries
279         uint latchhash;                         // number of latch hash table slots
280         uint latchvictim;                       // next latch entry to examine
281         BtHashEntry *hashtable;         // the anonymous mapping buffer pool
282         BtLatchSet *latchsets;          // mapped latch set from latch pages
283         unsigned char *pagepool;        // mapped to the buffer pool pages
284         uint lockhash;                          // number of lock hash table slots
285         uint lockfree;                          // next available lock table entry
286         BtSpinLatch locklatch[1];       // lock manager free chain latch
287         BtHashEntry *hashlock;          // the lock manager hash table
288         BtLockSet *locktable;           // the lock manager key table
289 #ifndef unix
290         HANDLE halloc;                          // allocation handle
291         HANDLE hpool;                           // buffer pool handle
292 #endif
293 } BtMgr;
294
295 typedef struct {
296         BtMgr *mgr;                             // buffer manager for thread
297         BtPage cursor;                  // cached frame for start/next (never mapped)
298         BtPage frame;                   // spare frame for the page split (never mapped)
299         uid cursor_page;                                // current cursor page number   
300         unsigned char *mem;                             // frame, cursor, page memory buffer
301         unsigned char key[BT_keyarray]; // last found complete key
302         int found;                              // last delete or insert was found
303         int err;                                // last error
304         int reads, writes;              // number of reads and writes from the btree
305 } BtDb;
306
307 typedef enum {
308         BTERR_ok = 0,
309         BTERR_struct,
310         BTERR_ovflw,
311         BTERR_lock,
312         BTERR_map,
313         BTERR_read,
314         BTERR_wrt,
315         BTERR_hash
316 } BTERR;
317
318 #define CLOCK_bit 0x8000
319
320 // B-Tree functions
321 extern void bt_close (BtDb *bt);
322 extern BtDb *bt_open (BtMgr *mgr);
323 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, int unique);
324 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
325 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
326 extern BtKey *bt_foundkey (BtDb *bt);
327 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
328 extern uint bt_nextkey   (BtDb *bt, uint slot);
329
330 //      manager functions
331 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint locksize);
332 void bt_mgrclose (BtMgr *mgr);
333
334 //  Helper functions to return slot values
335 //      from the cursor page.
336
337 extern BtKey *bt_key (BtDb *bt, uint slot);
338 extern BtVal *bt_val (BtDb *bt, uint slot);
339
340 //  The page is allocated from low and hi ends.
341 //  The key slots are allocated from the bottom,
342 //      while the text and value of the key
343 //  are allocated from the top.  When the two
344 //  areas meet, the page is split into two.
345
346 //  A key consists of a length byte, two bytes of
347 //  index number (0 - 65535), and up to 253 bytes
348 //  of key value.
349
350 //  Associated with each key is a value byte string
351 //      containing any value desired.
352
353 //  The b-tree root is always located at page 1.
354 //      The first leaf page of level zero is always
355 //      located on page 2.
356
357 //      The b-tree pages are linked with next
358 //      pointers to facilitate enumerators,
359 //      and provide for concurrency.
360
361 //      When to root page fills, it is split in two and
362 //      the tree height is raised by a new root at page
363 //      one with two keys.
364
365 //      Deleted keys are marked with a dead bit until
366 //      page cleanup. The fence key for a leaf node is
367 //      always present
368
369 //  To achieve maximum concurrency one page is locked at a time
370 //  as the tree is traversed to find leaf key in question. The right
371 //  page numbers are used in cases where the page is being split,
372 //      or consolidated.
373
374 //  Page 0 is dedicated to lock for new page extensions,
375 //      and chains empty pages together for reuse. It also
376 //      contains the latch manager hash table.
377
378 //      The ParentModification lock on a node is obtained to serialize posting
379 //      or changing the fence key for a node.
380
381 //      Empty pages are chained together through the ALLOC page and reused.
382
383 //      Access macros to address slot and key values from the page
384 //      Page slots use 1 based indexing.
385
386 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
387 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
388 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
389
390 void bt_putid(unsigned char *dest, uid id)
391 {
392 int i = BtId;
393
394         while( i-- )
395                 dest[i] = (unsigned char)id, id >>= 8;
396 }
397
398 uid bt_getid(unsigned char *src)
399 {
400 uid id = 0;
401 int i;
402
403         for( i = 0; i < BtId; i++ )
404                 id <<= 8, id |= *src++; 
405
406         return id;
407 }
408
409 uid bt_newdup (BtDb *bt)
410 {
411 #ifdef unix
412         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
413 #else
414         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
415 #endif
416 }
417
418 //      Phase-Fair reader/writer lock implementation
419
420 void WriteLock (RWLock *lock)
421 {
422 ushort w, r, tix;
423
424 #ifdef unix
425         tix = __sync_fetch_and_add (lock->ticket, 1);
426 #else
427         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
428 #endif
429         // wait for our ticket to come up
430
431         while( tix != lock->serving[0] )
432 #ifdef unix
433                 sched_yield();
434 #else
435                 SwitchToThread ();
436 #endif
437
438         w = PRES | (tix & PHID);
439 #ifdef  unix
440         r = __sync_fetch_and_add (lock->rin, w);
441 #else
442         r = _InterlockedExchangeAdd16 (lock->rin, w);
443 #endif
444         while( r != *lock->rout )
445 #ifdef unix
446                 sched_yield();
447 #else
448                 SwitchToThread();
449 #endif
450 }
451
452 void WriteRelease (RWLock *lock)
453 {
454 #ifdef unix
455         __sync_fetch_and_and (lock->rin, ~MASK);
456 #else
457         _InterlockedAnd16 (lock->rin, ~MASK);
458 #endif
459         lock->serving[0]++;
460 }
461
462 void ReadLock (RWLock *lock)
463 {
464 ushort w;
465 #ifdef unix
466         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
467 #else
468         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
469 #endif
470         if( w )
471           while( w == (*lock->rin & MASK) )
472 #ifdef unix
473                 sched_yield ();
474 #else
475                 SwitchToThread ();
476 #endif
477 }
478
479 void ReadRelease (RWLock *lock)
480 {
481 #ifdef unix
482         __sync_fetch_and_add (lock->rout, RINC);
483 #else
484         _InterlockedExchangeAdd16 (lock->rout, RINC);
485 #endif
486 }
487
488 //      Spin Latch Manager
489
490 //      wait until write lock mode is clear
491 //      and add 1 to the share count
492
493 void bt_spinreadlock(BtSpinLatch *latch)
494 {
495 uint prev;
496
497   do {
498 #ifdef unix
499         prev = __sync_fetch_and_add ((uint *)latch, SHARE);
500 #else
501         prev = _InterlockedExchangeAdd((uint *)latch, SHARE);
502 #endif
503         //  see if exclusive request is granted or pending
504
505         if( !(prev & BOTH) )
506                 return;
507 #ifdef unix
508         prev = __sync_fetch_and_add ((uint *)latch, -SHARE);
509 #else
510         prev = _InterlockedExchangeAdd((uint *)latch, -SHARE);
511 #endif
512 #ifdef  unix
513   } while( sched_yield(), 1 );
514 #else
515   } while( SwitchToThread(), 1 );
516 #endif
517 }
518
519 //      wait for other read and write latches to relinquish
520
521 void bt_spinwritelock(BtSpinLatch *latch)
522 {
523 uint prev;
524
525   do {
526 #ifdef  unix
527         prev = __sync_fetch_and_or((uint *)latch, PEND | XCL);
528 #else
529         prev = _InterlockedOr((uint *)latch, PEND | XCL);
530 #endif
531         if( !(prev & XCL) )
532           if( !(prev & ~BOTH) )
533                 return;
534           else
535 #ifdef unix
536                 __sync_fetch_and_and ((uint *)latch, ~XCL);
537 #else
538                 _InterlockedAnd((uint *)latch, ~XCL);
539 #endif
540 #ifdef  unix
541   } while( sched_yield(), 1 );
542 #else
543   } while( SwitchToThread(), 1 );
544 #endif
545 }
546
547 //      try to obtain write lock
548
549 //      return 1 if obtained,
550 //              0 otherwise
551
552 int bt_spinwritetry(BtSpinLatch *latch)
553 {
554 uint prev;
555
556 #ifdef  unix
557         prev = __sync_fetch_and_or((uint *)latch, XCL);
558 #else
559         prev = _InterlockedOr((uint *)latch, XCL);
560 #endif
561         //      take write access if all bits are clear
562
563         if( !(prev & XCL) )
564           if( !(prev & ~BOTH) )
565                 return 1;
566           else
567 #ifdef unix
568                 __sync_fetch_and_and ((uint *)latch, ~XCL);
569 #else
570                 _InterlockedAnd((uint *)latch, ~XCL);
571 #endif
572         return 0;
573 }
574
575 //      clear write mode
576
577 void bt_spinreleasewrite(BtSpinLatch *latch)
578 {
579 #ifdef unix
580         __sync_fetch_and_and((uint *)latch, ~BOTH);
581 #else
582         _InterlockedAnd((uint *)latch, ~BOTH);
583 #endif
584 }
585
586 //      decrement reader count
587
588 void bt_spinreleaseread(BtSpinLatch *latch)
589 {
590 #ifdef unix
591         __sync_fetch_and_add((uint *)latch, -SHARE);
592 #else
593         _InterlockedExchangeAdd((uint *)latch, -SHARE);
594 #endif
595 }
596
597 //      read page from permanent location in Btree file
598
599 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
600 {
601 off64_t off = page_no << mgr->page_bits;
602
603 #ifdef unix
604         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
605                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
606                 return BTERR_read;
607         }
608 #else
609 OVERLAPPED ovl[1];
610 uint amt[1];
611
612         memset (ovl, 0, sizeof(OVERLAPPED));
613         ovl->Offset = off;
614         ovl->OffsetHigh = off >> 32;
615
616         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
617                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
618                 return BTERR_read;
619         }
620         if( *amt <  mgr->page_size ) {
621                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
622                 return BTERR_read;
623         }
624 #endif
625         return 0;
626 }
627
628 //      write page to permanent location in Btree file
629 //      clear the dirty bit
630
631 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
632 {
633 off64_t off = page_no << mgr->page_bits;
634
635 #ifdef unix
636         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
637                 return BTERR_wrt;
638 #else
639 OVERLAPPED ovl[1];
640 uint amt[1];
641
642         memset (ovl, 0, sizeof(OVERLAPPED));
643         ovl->Offset = off;
644         ovl->OffsetHigh = off >> 32;
645
646         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
647                 return BTERR_wrt;
648
649         if( *amt <  mgr->page_size )
650                 return BTERR_wrt;
651 #endif
652         return 0;
653 }
654
655 //      link latch table entry into head of latch hash table
656
657 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
658 {
659 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
660 BtLatchSet *latch = bt->mgr->latchsets + slot;
661
662         if( latch->next = bt->mgr->hashtable[hashidx].slot )
663                 bt->mgr->latchsets[latch->next].prev = slot;
664
665         bt->mgr->hashtable[hashidx].slot = slot;
666         latch->page_no = page_no;
667         latch->slot = slot;
668         latch->prev = 0;
669         latch->pin = 1;
670
671         if( loadit )
672           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
673                 return bt->err;
674           else
675                 bt->reads++;
676
677         return bt->err = 0;
678 }
679
680 //      set CLOCK bit in latch
681 //      decrement pin count
682
683 void bt_unpinlatch (BtLatchSet *latch)
684 {
685 #ifdef unix
686         if( ~latch->pin & CLOCK_bit )
687                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
688         __sync_fetch_and_add(&latch->pin, -1);
689 #else
690         if( ~latch->pin & CLOCK_bit )
691                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
692         _InterlockedDecrement16 (&latch->pin);
693 #endif
694 }
695
696 //  return the btree cached page address
697
698 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
699 {
700 BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
701
702         return page;
703 }
704
705 //      find existing latchset or inspire new one
706 //      return with latchset pinned
707
708 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
709 {
710 uint hashidx = page_no % bt->mgr->latchhash;
711 BtLatchSet *latch;
712 uint slot, idx;
713 uint lvl, cnt;
714 off64_t off;
715 uint amt[1];
716 BtPage page;
717
718   //  try to find our entry
719
720   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
721
722   if( slot = bt->mgr->hashtable[hashidx].slot ) do
723   {
724         latch = bt->mgr->latchsets + slot;
725         if( page_no == latch->page_no )
726                 break;
727   } while( slot = latch->next );
728
729   //  found our entry
730   //  increment clock
731
732   if( slot ) {
733         latch = bt->mgr->latchsets + slot;
734 #ifdef unix
735         __sync_fetch_and_add(&latch->pin, 1);
736 #else
737         _InterlockedIncrement16 (&latch->pin);
738 #endif
739         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
740         return latch;
741   }
742
743         //  see if there are any unused pool entries
744 #ifdef unix
745         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
746 #else
747         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
748 #endif
749
750         if( slot < bt->mgr->latchtotal ) {
751                 latch = bt->mgr->latchsets + slot;
752                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
753                         return NULL;
754                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
755                 return latch;
756         }
757
758 #ifdef unix
759         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
760 #else
761         _InterlockedDecrement (&bt->mgr->latchdeployed);
762 #endif
763   //  find and reuse previous entry on victim
764
765   while( 1 ) {
766 #ifdef unix
767         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
768 #else
769         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
770 #endif
771         // try to get write lock on hash chain
772         //      skip entry if not obtained
773         //      or has outstanding pins
774
775         slot %= bt->mgr->latchtotal;
776
777         if( !slot )
778                 continue;
779
780         latch = bt->mgr->latchsets + slot;
781         idx = latch->page_no % bt->mgr->latchhash;
782
783         //      see we are on same chain as hashidx
784
785         if( idx == hashidx )
786                 continue;
787
788         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
789                 continue;
790
791         //  skip this slot if it is pinned
792         //      or the CLOCK bit is set
793
794         if( latch->pin ) {
795           if( latch->pin & CLOCK_bit ) {
796 #ifdef unix
797                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
798 #else
799                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
800 #endif
801           }
802           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
803           continue;
804         }
805
806         //  update permanent page area in btree from buffer pool
807
808         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
809
810         if( latch->dirty )
811           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
812                 return NULL;
813           else
814                 latch->dirty = 0, bt->writes++;
815
816         //  unlink our available slot from its hash chain
817
818         if( latch->prev )
819                 bt->mgr->latchsets[latch->prev].next = latch->next;
820         else
821                 bt->mgr->hashtable[idx].slot = latch->next;
822
823         if( latch->next )
824                 bt->mgr->latchsets[latch->next].prev = latch->prev;
825
826         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
827
828         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
829                 return NULL;
830
831         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
832         return latch;
833   }
834 }
835
836 void bt_mgrclose (BtMgr *mgr)
837 {
838 BtLatchSet *latch;
839 uint num = 0;
840 BtPage page;
841 uint slot;
842
843         //      flush dirty pool pages to the btree
844
845         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
846                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
847                 latch = mgr->latchsets + slot;
848
849                 if( latch->dirty ) {
850                         bt_writepage(mgr, page, latch->page_no);
851                         latch->dirty = 0, num++;
852                 }
853 //              madvise (page, mgr->page_size, MADV_DONTNEED);
854         }
855
856         fprintf(stderr, "%d buffer pool pages flushed\n", num);
857
858 #ifdef unix
859         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
860         munmap (mgr->pagezero, mgr->page_size);
861 #else
862         FlushViewOfFile(mgr->pagezero, 0);
863         UnmapViewOfFile(mgr->pagezero);
864         UnmapViewOfFile(mgr->hashtable);
865         CloseHandle(mgr->halloc);
866         CloseHandle(mgr->hpool);
867 #endif
868 #ifdef unix
869         close (mgr->idx);
870         free (mgr);
871 #else
872         FlushFileBuffers(mgr->idx);
873         CloseHandle(mgr->idx);
874         GlobalFree (mgr);
875 #endif
876 }
877
878 //      close and release memory
879
880 void bt_close (BtDb *bt)
881 {
882 #ifdef unix
883         if( bt->mem )
884                 free (bt->mem);
885 #else
886         if( bt->mem)
887                 VirtualFree (bt->mem, 0, MEM_RELEASE);
888 #endif
889         free (bt);
890 }
891
892 //  open/create new btree buffer manager
893
894 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
895 //              size of page pool (e.g. 262144) and number of lock table entries.
896
897 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint lockmax)
898 {
899 uint lvl, attr, last, slot, idx;
900 unsigned char value[BtId];
901 int flag, initit = 0;
902 BtPageZero *pagezero;
903 off64_t size;
904 uint amt[1];
905 BtMgr* mgr;
906 BtKey* key;
907 BtVal *val;
908
909         // determine sanity of page size and buffer pool
910
911         if( bits > BT_maxbits )
912                 bits = BT_maxbits;
913         else if( bits < BT_minbits )
914                 bits = BT_minbits;
915
916         if( nodemax < 16 ) {
917                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
918                 return NULL;
919         }
920
921 #ifdef unix
922         mgr = calloc (1, sizeof(BtMgr));
923
924         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
925
926         if( mgr->idx == -1 ) {
927                 fprintf (stderr, "Unable to open btree file\n");
928                 return free(mgr), NULL;
929         }
930 #else
931         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
932         attr = FILE_ATTRIBUTE_NORMAL;
933         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
934
935         if( mgr->idx == INVALID_HANDLE_VALUE )
936                 return GlobalFree(mgr), NULL;
937 #endif
938
939 #ifdef unix
940         pagezero = valloc (BT_maxpage);
941         *amt = 0;
942
943         // read minimum page size to get root info
944         //      to support raw disk partition files
945         //      check if bits == 0 on the disk.
946
947         if( size = lseek (mgr->idx, 0L, 2) )
948                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
949                         if( pagezero->alloc->bits )
950                                 bits = pagezero->alloc->bits;
951                         else
952                                 initit = 1;
953                 else
954                         return free(mgr), free(pagezero), NULL;
955         else
956                 initit = 1;
957 #else
958         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
959         size = GetFileSize(mgr->idx, amt);
960
961         if( size || *amt ) {
962                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
963                         return bt_mgrclose (mgr), NULL;
964                 bits = pagezero->alloc->bits;
965         } else
966                 initit = 1;
967 #endif
968
969         mgr->page_size = 1 << bits;
970         mgr->page_bits = bits;
971
972         //  calculate number of latch hash table entries
973
974         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
975         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
976
977         //  add on the number of pages in buffer pool
978         //      along with the corresponding latch table
979
980         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
981         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
982         mgr->latchtotal = nodemax;
983
984         //      add on the sizeof the lock manager hash table and the lock table
985
986         mgr->nlatchpage += (lockmax / 16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
987
988         mgr->nlatchpage += (lockmax * sizeof(BtLockSet) + mgr->page_size - 1) / mgr->page_size;
989
990         if( !initit )
991                 goto mgrlatch;
992
993         // initialize an empty b-tree with latch page, root page, page of leaves
994         // and page(s) of latches and page pool cache
995
996         memset (pagezero, 0, 1 << bits);
997         pagezero->alloc->bits = mgr->page_bits;
998         bt_putid(pagezero->alloc->right, MIN_lvl+1);
999
1000         //  initialize left-most LEAF page in
1001         //      alloc->left.
1002
1003         bt_putid (pagezero->alloc->left, LEAF_page);
1004
1005         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1006                 fprintf (stderr, "Unable to create btree page zero\n");
1007                 return bt_mgrclose (mgr), NULL;
1008         }
1009
1010         memset (pagezero, 0, 1 << bits);
1011         pagezero->alloc->bits = mgr->page_bits;
1012
1013         for( lvl=MIN_lvl; lvl--; ) {
1014                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1015                 key = keyptr(pagezero->alloc, 1);
1016                 key->len = 2;           // create stopper key
1017                 key->key[0] = 0xff;
1018                 key->key[1] = 0xff;
1019
1020                 bt_putid(value, MIN_lvl - lvl + 1);
1021                 val = valptr(pagezero->alloc, 1);
1022                 val->len = lvl ? BtId : 0;
1023                 memcpy (val->value, value, val->len);
1024
1025                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1026                 pagezero->alloc->lvl = lvl;
1027                 pagezero->alloc->cnt = 1;
1028                 pagezero->alloc->act = 1;
1029
1030                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1031                         fprintf (stderr, "Unable to create btree page zero\n");
1032                         return bt_mgrclose (mgr), NULL;
1033                 }
1034         }
1035
1036 mgrlatch:
1037 #ifdef unix
1038         free (pagezero);
1039 #else
1040         VirtualFree (pagezero, 0, MEM_RELEASE);
1041 #endif
1042 #ifdef unix
1043         // mlock the pagezero page
1044
1045         flag = PROT_READ | PROT_WRITE;
1046         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1047         if( mgr->pagezero == MAP_FAILED ) {
1048                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1049                 return bt_mgrclose (mgr), NULL;
1050         }
1051         mlock (mgr->pagezero, mgr->page_size);
1052
1053         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1054         if( mgr->hashtable == MAP_FAILED ) {
1055                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1056                 return bt_mgrclose (mgr), NULL;
1057         }
1058 #else
1059         flag = PAGE_READWRITE;
1060         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1061         if( !mgr->halloc ) {
1062                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1063                 return bt_mgrclose (mgr), NULL;
1064         }
1065
1066         flag = FILE_MAP_WRITE;
1067         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1068         if( !mgr->pagezero ) {
1069                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1070                 return bt_mgrclose (mgr), NULL;
1071         }
1072
1073         flag = PAGE_READWRITE;
1074         size = (uid)mgr->nlatchpage << mgr->page_bits;
1075         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1076         if( !mgr->hpool ) {
1077                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1078                 return bt_mgrclose (mgr), NULL;
1079         }
1080
1081         flag = FILE_MAP_WRITE;
1082         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1083         if( !mgr->hashtable ) {
1084                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1085                 return bt_mgrclose (mgr), NULL;
1086         }
1087 #endif
1088
1089         size = (mgr->latchhash * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1090         mgr->latchsets = (BtLatchSet *)((unsigned char *)mgr->hashtable + size * mgr->page_size);
1091         size = (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1092
1093         mgr->pagepool = (unsigned char *)mgr->hashtable + (size << mgr->page_bits);
1094         mgr->hashlock = (BtHashEntry *)(mgr->pagepool + ((uid)nodemax << mgr->page_bits));
1095         mgr->locktable = (BtLockSet *)((unsigned char *)mgr->hashtable + ((uid)mgr->nlatchpage << mgr->page_bits) - lockmax * sizeof(BtLockSet));
1096
1097         mgr->lockfree = lockmax - 1;
1098         mgr->lockhash = ((unsigned char *)mgr->locktable - (unsigned char *)mgr->hashlock) / sizeof(BtHashEntry);
1099
1100         for( idx = 1; idx < lockmax; idx++ )
1101                 mgr->locktable[idx].next = idx - 1;
1102
1103         return mgr;
1104 }
1105
1106 //      open BTree access method
1107 //      based on buffer manager
1108
1109 BtDb *bt_open (BtMgr *mgr)
1110 {
1111 BtDb *bt = malloc (sizeof(*bt));
1112
1113         memset (bt, 0, sizeof(*bt));
1114         bt->mgr = mgr;
1115 #ifdef unix
1116         bt->mem = valloc (2 *mgr->page_size);
1117 #else
1118         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1119 #endif
1120         bt->frame = (BtPage)bt->mem;
1121         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1122         return bt;
1123 }
1124
1125 //  compare two keys, returning > 0, = 0, or < 0
1126 //  as the comparison value
1127
1128 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1129 {
1130 uint len1 = key1->len;
1131 int ans;
1132
1133         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1134                 return ans;
1135
1136         if( len1 > len2 )
1137                 return 1;
1138         if( len1 < len2 )
1139                 return -1;
1140
1141         return 0;
1142 }
1143
1144 // place write, read, or parent lock on requested page_no.
1145
1146 void bt_lockpage(BtLock mode, BtLatchSet *set)
1147 {
1148         switch( mode ) {
1149         case BtLockRead:
1150                 ReadLock (set->readwr);
1151                 break;
1152         case BtLockWrite:
1153                 WriteLock (set->readwr);
1154                 break;
1155         case BtLockAccess:
1156                 ReadLock (set->access);
1157                 break;
1158         case BtLockDelete:
1159                 WriteLock (set->access);
1160                 break;
1161         case BtLockParent:
1162                 WriteLock (set->parent);
1163                 break;
1164         }
1165 }
1166
1167 // remove write, read, or parent lock on requested page
1168
1169 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1170 {
1171         switch( mode ) {
1172         case BtLockRead:
1173                 ReadRelease (set->readwr);
1174                 break;
1175         case BtLockWrite:
1176                 WriteRelease (set->readwr);
1177                 break;
1178         case BtLockAccess:
1179                 ReadRelease (set->access);
1180                 break;
1181         case BtLockDelete:
1182                 WriteRelease (set->access);
1183                 break;
1184         case BtLockParent:
1185                 WriteRelease (set->parent);
1186                 break;
1187         }
1188 }
1189
1190 //      allocate a new page
1191 //      return with page latched.
1192
1193 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1194 {
1195 int blk;
1196
1197         //      lock allocation page
1198
1199         bt_spinwritelock(bt->mgr->alloclatch);
1200
1201         // use empty chain first
1202         // else allocate empty page
1203
1204         if( set->page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1205                 if( set->latch = bt_pinlatch (bt, set->page_no, 1) )
1206                         set->page = bt_mappage (bt, set->latch);
1207                 else
1208                         return bt->err = BTERR_struct, -1;
1209
1210                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1211                 bt_spinreleasewrite(bt->mgr->alloclatch);
1212
1213                 memcpy (set->page, contents, bt->mgr->page_size);
1214                 set->latch->dirty = 1;
1215                 return 0;
1216         }
1217
1218         set->page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1219         bt_putid(bt->mgr->pagezero->alloc->right, set->page_no+1);
1220
1221         // unlock allocation latch
1222
1223         bt_spinreleasewrite(bt->mgr->alloclatch);
1224
1225         //      don't load cache from btree page
1226
1227         if( set->latch = bt_pinlatch (bt, set->page_no, 0) )
1228                 set->page = bt_mappage (bt, set->latch);
1229         else
1230                 return bt->err = BTERR_struct;
1231
1232         memcpy (set->page, contents, bt->mgr->page_size);
1233         set->latch->dirty = 1;
1234         return 0;
1235 }
1236
1237 //  find slot in page for given key at a given level
1238
1239 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1240 {
1241 uint diff, higher = set->page->cnt, low = 1, slot;
1242 uint good = 0;
1243
1244         //        make stopper key an infinite fence value
1245
1246         if( bt_getid (set->page->right) )
1247                 higher++;
1248         else
1249                 good++;
1250
1251         //      low is the lowest candidate.
1252         //  loop ends when they meet
1253
1254         //  higher is already
1255         //      tested as .ge. the passed key.
1256
1257         while( diff = higher - low ) {
1258                 slot = low + ( diff >> 1 );
1259                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1260                         low = slot + 1;
1261                 else
1262                         higher = slot, good++;
1263         }
1264
1265         //      return zero if key is on right link page
1266
1267         return good ? higher : 0;
1268 }
1269
1270 //  find and load page at given level for given key
1271 //      leave page rd or wr locked as requested
1272
1273 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1274 {
1275 uid page_no = ROOT_page, prevpage = 0;
1276 uint drill = 0xff, slot;
1277 BtLatchSet *prevlatch;
1278 uint mode, prevmode;
1279
1280   //  start at root of btree and drill down
1281
1282   do {
1283         // determine lock mode of drill level
1284         mode = (drill == lvl) ? lock : BtLockRead; 
1285
1286         if( set->latch = bt_pinlatch (bt, page_no, 1) )
1287                 set->page_no = page_no;
1288         else
1289                 return 0;
1290
1291         // obtain access lock using lock chaining with Access mode
1292
1293         if( page_no > ROOT_page )
1294           bt_lockpage(BtLockAccess, set->latch);
1295
1296         set->page = bt_mappage (bt, set->latch);
1297
1298         //      release & unpin parent page
1299
1300         if( prevpage ) {
1301           bt_unlockpage(prevmode, prevlatch);
1302           bt_unpinlatch (prevlatch);
1303           prevpage = 0;
1304         }
1305
1306         // obtain read lock using lock chaining
1307
1308         bt_lockpage(mode, set->latch);
1309
1310         if( set->page->free )
1311                 return bt->err = BTERR_struct, 0;
1312
1313         if( page_no > ROOT_page )
1314           bt_unlockpage(BtLockAccess, set->latch);
1315
1316         // re-read and re-lock root after determining actual level of root
1317
1318         if( set->page->lvl != drill) {
1319                 if( set->page_no != ROOT_page )
1320                         return bt->err = BTERR_struct, 0;
1321                         
1322                 drill = set->page->lvl;
1323
1324                 if( lock != BtLockRead && drill == lvl ) {
1325                   bt_unlockpage(mode, set->latch);
1326                   bt_unpinlatch (set->latch);
1327                   continue;
1328                 }
1329         }
1330
1331         prevpage = set->page_no;
1332         prevlatch = set->latch;
1333         prevmode = mode;
1334
1335         //  find key on page at this level
1336         //  and descend to requested level
1337
1338         if( set->page->kill )
1339           goto slideright;
1340
1341         if( slot = bt_findslot (set, key, len) ) {
1342           if( drill == lvl )
1343                 return slot;
1344
1345           while( slotptr(set->page, slot)->dead )
1346                 if( slot++ < set->page->cnt )
1347                         continue;
1348                 else
1349                         goto slideright;
1350
1351           page_no = bt_getid(valptr(set->page, slot)->value);
1352           drill--;
1353           continue;
1354         }
1355
1356         //  or slide right into next page
1357
1358 slideright:
1359         page_no = bt_getid(set->page->right);
1360
1361   } while( page_no );
1362
1363   // return error on end of right chain
1364
1365   bt->err = BTERR_struct;
1366   return 0;     // return error
1367 }
1368
1369 //      return page to free list
1370 //      page must be delete & write locked
1371
1372 void bt_freepage (BtDb *bt, BtPageSet *set)
1373 {
1374         //      lock allocation page
1375
1376         bt_spinwritelock (bt->mgr->alloclatch);
1377
1378         //      store chain
1379         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1380         bt_putid(bt->mgr->pagezero->chain, set->page_no);
1381         set->latch->dirty = 1;
1382         set->page->free = 1;
1383
1384         // unlock released page
1385
1386         bt_unlockpage (BtLockDelete, set->latch);
1387         bt_unlockpage (BtLockWrite, set->latch);
1388         bt_unpinlatch (set->latch);
1389
1390         // unlock allocation page
1391
1392         bt_spinreleasewrite (bt->mgr->alloclatch);
1393 }
1394
1395 //      a fence key was deleted from a page
1396 //      push new fence value upwards
1397
1398 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1399 {
1400 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1401 unsigned char value[BtId];
1402 BtKey* ptr;
1403 uint idx;
1404
1405         //      remove the old fence value
1406
1407         ptr = keyptr(set->page, set->page->cnt);
1408         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1409         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1410         set->latch->dirty = 1;
1411
1412         //  cache new fence value
1413
1414         ptr = keyptr(set->page, set->page->cnt);
1415         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1416
1417         bt_lockpage (BtLockParent, set->latch);
1418         bt_unlockpage (BtLockWrite, set->latch);
1419
1420         //      insert new (now smaller) fence key
1421
1422         bt_putid (value, set->page_no);
1423         ptr = (BtKey*)leftkey;
1424
1425         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1426           return bt->err;
1427
1428         //      now delete old fence key
1429
1430         ptr = (BtKey*)rightkey;
1431
1432         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1433                 return bt->err;
1434
1435         bt_unlockpage (BtLockParent, set->latch);
1436         bt_unpinlatch(set->latch);
1437         return 0;
1438 }
1439
1440 //      root has a single child
1441 //      collapse a level from the tree
1442
1443 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1444 {
1445 BtPageSet child[1];
1446 uint idx;
1447
1448   // find the child entry and promote as new root contents
1449
1450   do {
1451         for( idx = 0; idx++ < root->page->cnt; )
1452           if( !slotptr(root->page, idx)->dead )
1453                 break;
1454
1455         child->page_no = bt_getid (valptr(root->page, idx)->value);
1456
1457         if( child->latch = bt_pinlatch (bt, child->page_no, 1) )
1458                 child->page = bt_mappage (bt, child->latch);
1459         else
1460                 return bt->err;
1461
1462         bt_lockpage (BtLockDelete, child->latch);
1463         bt_lockpage (BtLockWrite, child->latch);
1464
1465         memcpy (root->page, child->page, bt->mgr->page_size);
1466         root->latch->dirty = 1;
1467
1468         bt_freepage (bt, child);
1469
1470   } while( root->page->lvl > 1 && root->page->act == 1 );
1471
1472   bt_unlockpage (BtLockWrite, root->latch);
1473   bt_unpinlatch (root->latch);
1474   return 0;
1475 }
1476
1477 //      maintain reverse scan pointers by
1478 //      linking left pointer of far right node
1479
1480 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1481 {
1482 BtPageSet right2[1];
1483
1484         //      keep track of rightmost leaf page
1485
1486         if( !right_page_no ) {
1487           bt_putid (bt->mgr->pagezero->alloc->left, left_page_no);
1488           return 0;
1489         }
1490
1491         //      link right page to left page
1492
1493         if( right2->latch = bt_pinlatch (bt, right_page_no, 1) )
1494                 right2->page = bt_mappage (bt, right2->latch);
1495         else
1496                 return bt->err;
1497
1498         bt_lockpage (BtLockWrite, right2->latch);
1499
1500         bt_putid(right2->page->left, left_page_no);
1501         bt_unlockpage (BtLockWrite, right2->latch);
1502         bt_unpinlatch (right2->latch);
1503         return 0;
1504 }
1505
1506 //  find and delete key on page by marking delete flag bit
1507 //  if page becomes empty, delete it from the btree
1508
1509 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1510 {
1511 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1512 uint slot, idx, found, fence;
1513 BtPageSet set[1], right[1];
1514 unsigned char value[BtId];
1515 BtKey *ptr, *tst;
1516 BtVal *val;
1517
1518         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1519                 ptr = keyptr(set->page, slot);
1520         else
1521                 return bt->err;
1522
1523         // if librarian slot, advance to real slot
1524
1525         if( slotptr(set->page, slot)->type == Librarian )
1526                 ptr = keyptr(set->page, ++slot);
1527
1528         fence = slot == set->page->cnt;
1529
1530         // if key is found delete it, otherwise ignore request
1531
1532         if( found = !keycmp (ptr, key, len) )
1533           if( found = slotptr(set->page, slot)->dead == 0 ) {
1534                 val = valptr(set->page,slot);
1535                 slotptr(set->page, slot)->dead = 1;
1536                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1537                 set->page->act--;
1538
1539                 // collapse empty slots beneath the fence
1540
1541                 while( idx = set->page->cnt - 1 )
1542                   if( slotptr(set->page, idx)->dead ) {
1543                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1544                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1545                   } else
1546                         break;
1547           }
1548
1549         //      did we delete a fence key in an upper level?
1550
1551         if( found && lvl && set->page->act && fence )
1552           if( bt_fixfence (bt, set, lvl) )
1553                 return bt->err;
1554           else
1555                 return bt->found = found, 0;
1556
1557         //      do we need to collapse root?
1558
1559         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1560           if( bt_collapseroot (bt, set) )
1561                 return bt->err;
1562           else
1563                 return bt->found = found, 0;
1564
1565         //      return if page is not empty
1566
1567         if( set->page->act ) {
1568                 set->latch->dirty = 1;
1569                 bt_unlockpage(BtLockWrite, set->latch);
1570                 bt_unpinlatch (set->latch);
1571                 return bt->found = found, 0;
1572         }
1573
1574         //      cache copy of fence key
1575         //      to post in parent
1576
1577         ptr = keyptr(set->page, set->page->cnt);
1578         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1579
1580         //      obtain lock on right page
1581
1582         right->page_no = bt_getid(set->page->right);
1583
1584         if( right->latch = bt_pinlatch (bt, right->page_no, 1) )
1585                 right->page = bt_mappage (bt, right->latch);
1586         else
1587                 return 0;
1588
1589         bt_lockpage (BtLockWrite, right->latch);
1590
1591         if( right->page->kill )
1592                 return bt->err = BTERR_struct;
1593
1594         // transfer left link
1595
1596         memcpy (right->page->left, set->page->left, BtId);
1597
1598         // pull contents of right peer into our empty page
1599
1600         memcpy (set->page, right->page, bt->mgr->page_size);
1601         set->latch->dirty = 1;
1602
1603         // update left link
1604
1605         if( !lvl )
1606           if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1607                 return bt->err;
1608
1609         // cache copy of key to update
1610
1611         ptr = keyptr(right->page, right->page->cnt);
1612         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1613
1614         // mark right page deleted and point it to left page
1615         //      until we can post parent updates
1616
1617         bt_putid (right->page->right, set->page_no);
1618         right->latch->dirty = 1;
1619         right->page->kill = 1;
1620
1621         bt_lockpage (BtLockParent, right->latch);
1622         bt_unlockpage (BtLockWrite, right->latch);
1623
1624         bt_lockpage (BtLockParent, set->latch);
1625         bt_unlockpage (BtLockWrite, set->latch);
1626
1627         // redirect higher key directly to our new node contents
1628
1629         bt_putid (value, set->page_no);
1630         ptr = (BtKey*)higherfence;
1631
1632         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1633           return bt->err;
1634
1635         //      delete old lower key to our node
1636
1637         ptr = (BtKey*)lowerfence;
1638
1639         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1640           return bt->err;
1641
1642         //      obtain delete and write locks to right node
1643
1644         bt_unlockpage (BtLockParent, right->latch);
1645         bt_lockpage (BtLockDelete, right->latch);
1646         bt_lockpage (BtLockWrite, right->latch);
1647         bt_freepage (bt, right);
1648
1649         bt_unlockpage (BtLockParent, set->latch);
1650         bt_unpinlatch (set->latch);
1651         bt->found = found;
1652         return 0;
1653 }
1654
1655 BtKey *bt_foundkey (BtDb *bt)
1656 {
1657         return (BtKey*)(bt->key);
1658 }
1659
1660 //      advance to next slot
1661
1662 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1663 {
1664 BtLatchSet *prevlatch;
1665 uid page_no;
1666
1667         if( slot < set->page->cnt )
1668                 return slot + 1;
1669
1670         prevlatch = set->latch;
1671
1672         if( page_no = bt_getid(set->page->right) )
1673           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1674                 set->page = bt_mappage (bt, set->latch);
1675           else
1676                 return 0;
1677         else
1678           return bt->err = BTERR_struct, 0;
1679
1680         // obtain access lock using lock chaining with Access mode
1681
1682         bt_lockpage(BtLockAccess, set->latch);
1683
1684         bt_unlockpage(BtLockRead, prevlatch);
1685         bt_unpinlatch (prevlatch);
1686
1687         bt_lockpage(BtLockRead, set->latch);
1688         bt_unlockpage(BtLockAccess, set->latch);
1689
1690         set->page_no = page_no;
1691         return 1;
1692 }
1693
1694 //      find unique key or first duplicate key in
1695 //      leaf level and return number of value bytes
1696 //      or (-1) if not found.  Setup key for bt_foundkey
1697
1698 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1699 {
1700 BtPageSet set[1];
1701 uint len, slot;
1702 int ret = -1;
1703 BtKey *ptr;
1704 BtVal *val;
1705
1706   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1707    do {
1708         ptr = keyptr(set->page, slot);
1709
1710         //      skip librarian slot place holder
1711
1712         if( slotptr(set->page, slot)->type == Librarian )
1713                 ptr = keyptr(set->page, ++slot);
1714
1715         //      return actual key found
1716
1717         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1718         len = ptr->len;
1719
1720         if( slotptr(set->page, slot)->type == Duplicate )
1721                 len -= BtId;
1722
1723         //      not there if we reach the stopper key
1724
1725         if( slot == set->page->cnt )
1726           if( !bt_getid (set->page->right) )
1727                 break;
1728
1729         // if key exists, return >= 0 value bytes copied
1730         //      otherwise return (-1)
1731
1732         if( slotptr(set->page, slot)->dead )
1733                 continue;
1734
1735         if( keylen == len )
1736           if( !memcmp (ptr->key, key, len) ) {
1737                 val = valptr (set->page,slot);
1738                 if( valmax > val->len )
1739                         valmax = val->len;
1740                 memcpy (value, val->value, valmax);
1741                 ret = valmax;
1742           }
1743
1744         break;
1745
1746    } while( slot = bt_findnext (bt, set, slot) );
1747
1748   bt_unlockpage (BtLockRead, set->latch);
1749   bt_unpinlatch (set->latch);
1750   return ret;
1751 }
1752
1753 //      check page for space available,
1754 //      clean if necessary and return
1755 //      0 - page needs splitting
1756 //      >0  new slot value
1757
1758 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1759 {
1760 uint nxt = bt->mgr->page_size;
1761 BtPage page = set->page;
1762 uint cnt = 0, idx = 0;
1763 uint max = page->cnt;
1764 uint newslot = max;
1765 BtKey *key;
1766 BtVal *val;
1767
1768         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1769                 return slot;
1770
1771         //      skip cleanup and proceed to split
1772         //      if there's not enough garbage
1773         //      to bother with.
1774
1775         if( page->garbage < nxt / 5 )
1776                 return 0;
1777
1778         memcpy (bt->frame, page, bt->mgr->page_size);
1779
1780         // skip page info and set rest of page to zero
1781
1782         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1783         set->latch->dirty = 1;
1784         page->garbage = 0;
1785         page->act = 0;
1786
1787         // clean up page first by
1788         // removing deleted keys
1789
1790         while( cnt++ < max ) {
1791                 if( cnt == slot )
1792                         newslot = idx + 2;
1793                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1794                         continue;
1795
1796                 // copy the value across
1797
1798                 val = valptr(bt->frame, cnt);
1799                 nxt -= val->len + sizeof(BtVal);
1800                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1801
1802                 // copy the key across
1803
1804                 key = keyptr(bt->frame, cnt);
1805                 nxt -= key->len + sizeof(BtKey);
1806                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1807
1808                 // make a librarian slot
1809
1810                 if( idx ) {
1811                         slotptr(page, ++idx)->off = nxt;
1812                         slotptr(page, idx)->type = Librarian;
1813                         slotptr(page, idx)->dead = 1;
1814                 }
1815
1816                 // set up the slot
1817
1818                 slotptr(page, ++idx)->off = nxt;
1819                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1820
1821                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1822                         page->act++;
1823         }
1824
1825         page->min = nxt;
1826         page->cnt = idx;
1827
1828         //      see if page has enough space now, or does it need splitting?
1829
1830         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1831                 return newslot;
1832
1833         return 0;
1834 }
1835
1836 // split the root and raise the height of the btree
1837
1838 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
1839 {
1840 uint nxt = bt->mgr->page_size;
1841 unsigned char value[BtId];
1842 BtPageSet left[1];
1843 BtKey *ptr;
1844 BtVal *val;
1845
1846         //  Obtain an empty page to use, and copy the current
1847         //  root contents into it, e.g. lower keys
1848
1849         if( bt_newpage(bt, left, root->page) )
1850                 return bt->err;
1851
1852         bt_unpinlatch (left->latch);
1853
1854         // set left from higher to lower page
1855
1856         bt_putid (right->page->left, left->page_no);
1857         bt_unpinlatch (right->latch);
1858
1859         // preserve the page info at the bottom
1860         // of higher keys and set rest to zero
1861
1862         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1863
1864         // insert stopper key at top of newroot page
1865         // and increase the root height
1866
1867         nxt -= BtId + sizeof(BtVal);
1868         bt_putid (value, right->page_no);
1869         val = (BtVal *)((unsigned char *)root->page + nxt);
1870         memcpy (val->value, value, BtId);
1871         val->len = BtId;
1872
1873         nxt -= 2 + sizeof(BtKey);
1874         slotptr(root->page, 2)->off = nxt;
1875         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1876         ptr->len = 2;
1877         ptr->key[0] = 0xff;
1878         ptr->key[1] = 0xff;
1879
1880         // insert lower keys page fence key on newroot page as first key
1881
1882         nxt -= BtId + sizeof(BtVal);
1883         bt_putid (value, left->page_no);
1884         val = (BtVal *)((unsigned char *)root->page + nxt);
1885         memcpy (val->value, value, BtId);
1886         val->len = BtId;
1887
1888         nxt -= leftkey->len + sizeof(BtKey);
1889         slotptr(root->page, 1)->off = nxt;
1890         memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
1891         
1892         bt_putid(root->page->right, 0);
1893         root->page->min = nxt;          // reset lowest used offset and key count
1894         root->page->cnt = 2;
1895         root->page->act = 2;
1896         root->page->lvl++;
1897
1898         // release and unpin root pages
1899
1900         bt_unlockpage(BtLockWrite, root->latch);
1901         bt_unpinlatch (root->latch);
1902         return 0;
1903 }
1904
1905 //  split already locked full node
1906 //      return unlocked.
1907
1908 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1909 {
1910 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
1911 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1912 unsigned char value[BtId];
1913 uint lvl = set->page->lvl;
1914 BtPageSet right[1];
1915 BtKey *key, *ptr;
1916 BtVal *val, *src;
1917 uid right2;
1918 uint prev;
1919
1920         //  split higher half of keys to bt->frame
1921
1922         memset (bt->frame, 0, bt->mgr->page_size);
1923         max = set->page->cnt;
1924         cnt = max / 2;
1925         idx = 0;
1926
1927         while( cnt++ < max ) {
1928                 if( slotptr(set->page, cnt)->dead && cnt < max )
1929                         continue;
1930                 src = valptr(set->page, cnt);
1931                 nxt -= src->len + sizeof(BtVal);
1932                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1933
1934                 key = keyptr(set->page, cnt);
1935                 nxt -= key->len + sizeof(BtKey);
1936                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1937                 memcpy (ptr, key, key->len + sizeof(BtKey));
1938
1939                 //      add librarian slot
1940
1941                 if( idx ) {
1942                         slotptr(bt->frame, ++idx)->off = nxt;
1943                         slotptr(bt->frame, idx)->type = Librarian;
1944                         slotptr(bt->frame, idx)->dead = 1;
1945                 }
1946
1947                 //  add actual slot
1948
1949                 slotptr(bt->frame, ++idx)->off = nxt;
1950                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1951
1952                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1953                         bt->frame->act++;
1954         }
1955
1956         // remember existing fence key for new page to the right
1957
1958         memcpy (rightkey, key, key->len + sizeof(BtKey));
1959
1960         bt->frame->bits = bt->mgr->page_bits;
1961         bt->frame->min = nxt;
1962         bt->frame->cnt = idx;
1963         bt->frame->lvl = lvl;
1964
1965         // link right node
1966
1967         if( set->page_no > ROOT_page ) {
1968                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1969                 bt_putid(bt->frame->left, set->page_no);
1970         }
1971
1972         //      get new free page and write higher keys to it.
1973
1974         if( bt_newpage(bt, right, bt->frame) )
1975                 return bt->err;
1976
1977         // link left node
1978
1979         if( set->page_no > ROOT_page && !lvl )
1980           if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
1981                 return bt->err;
1982
1983         //      update lower keys to continue in old page
1984
1985         memcpy (bt->frame, set->page, bt->mgr->page_size);
1986         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1987         set->latch->dirty = 1;
1988
1989         nxt = bt->mgr->page_size;
1990         set->page->garbage = 0;
1991         set->page->act = 0;
1992         max /= 2;
1993         cnt = 0;
1994         idx = 0;
1995
1996         if( slotptr(bt->frame, max)->type == Librarian )
1997                 max--;
1998
1999         //  assemble page of smaller keys
2000
2001         while( cnt++ < max ) {
2002                 if( slotptr(bt->frame, cnt)->dead )
2003                         continue;
2004                 val = valptr(bt->frame, cnt);
2005                 nxt -= val->len + sizeof(BtVal);
2006                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2007
2008                 key = keyptr(bt->frame, cnt);
2009                 nxt -= key->len + sizeof(BtKey);
2010                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2011
2012                 //      add librarian slot
2013
2014                 if( idx ) {
2015                         slotptr(set->page, ++idx)->off = nxt;
2016                         slotptr(set->page, idx)->type = Librarian;
2017                         slotptr(set->page, idx)->dead = 1;
2018                 }
2019
2020                 //      add actual slot
2021
2022                 slotptr(set->page, ++idx)->off = nxt;
2023                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2024                 set->page->act++;
2025         }
2026
2027         // remember fence key for smaller page
2028
2029         memcpy(fencekey, key, key->len + sizeof(BtKey));
2030
2031         bt_putid(set->page->right, right->page_no);
2032         set->page->min = nxt;
2033         set->page->cnt = idx;
2034
2035         // if current page is the root page, split it
2036
2037         if( set->page_no == ROOT_page )
2038                 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2039
2040         // insert new fences in their parent pages
2041
2042         bt_lockpage (BtLockParent, right->latch);
2043
2044         bt_lockpage (BtLockParent, set->latch);
2045         bt_unlockpage (BtLockWrite, set->latch);
2046
2047         // insert new fence for reformulated left block of smaller keys
2048
2049         bt_putid (value, set->page_no);
2050
2051         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2052                 return bt->err;
2053
2054         // switch fence for right block of larger keys to new right page
2055
2056         bt_putid (value, right->page_no);
2057
2058         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2059                 return bt->err;
2060
2061         bt_unlockpage (BtLockParent, set->latch);
2062         bt_unpinlatch (set->latch);
2063
2064         bt_unlockpage (BtLockParent, right->latch);
2065         bt_unpinlatch (right->latch);
2066         return 0;
2067 }
2068
2069 //      install new key and value onto page
2070 //      page must already be checked for
2071 //      adequate space
2072
2073 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2074 {
2075 uint idx, librarian;
2076 BtSlot *node;
2077 BtKey *ptr;
2078 BtVal *val;
2079
2080         //      if found slot > desired slot and previous slot
2081         //      is a librarian slot, use it
2082
2083         if( slot > 1 )
2084           if( slotptr(set->page, slot-1)->type == Librarian )
2085                 slot--;
2086
2087         // copy value onto page
2088
2089         set->page->min -= vallen + sizeof(BtVal);
2090         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2091         memcpy (val->value, value, vallen);
2092         val->len = vallen;
2093
2094         // copy key onto page
2095
2096         set->page->min -= keylen + sizeof(BtKey);
2097         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2098         memcpy (ptr->key, key, keylen);
2099         ptr->len = keylen;
2100         
2101         //      find first empty slot
2102
2103         for( idx = slot; idx < set->page->cnt; idx++ )
2104           if( slotptr(set->page, idx)->dead )
2105                 break;
2106
2107         // now insert key into array before slot
2108
2109         if( idx == set->page->cnt )
2110                 idx += 2, set->page->cnt += 2, librarian = 2;
2111         else
2112                 librarian = 1;
2113
2114         set->latch->dirty = 1;
2115         set->page->act++;
2116
2117         while( idx > slot + librarian - 1 )
2118                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2119
2120         //      add librarian slot
2121
2122         if( librarian > 1 ) {
2123                 node = slotptr(set->page, slot++);
2124                 node->off = set->page->min;
2125                 node->type = Librarian;
2126                 node->dead = 1;
2127         }
2128
2129         //      fill in new slot
2130
2131         node = slotptr(set->page, slot);
2132         node->off = set->page->min;
2133         node->type = type;
2134         node->dead = 0;
2135
2136         bt_unlockpage (BtLockWrite, set->latch);
2137         bt_unpinlatch (set->latch);
2138         return 0;
2139 }
2140
2141 //  Insert new key into the btree at given level.
2142 //      either add a new key or update/add an existing one
2143
2144 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, int unique)
2145 {
2146 unsigned char newkey[BT_keyarray];
2147 uint slot, idx, len;
2148 BtPageSet set[1];
2149 BtKey *ptr, *ins;
2150 uid sequence;
2151 BtVal *val;
2152 uint type;
2153
2154   // set up the key we're working on
2155
2156   ins = (BtKey*)newkey;
2157   memcpy (ins->key, key, keylen);
2158   ins->len = keylen;
2159
2160   // is this a non-unique index value?
2161
2162   if( unique )
2163         type = Unique;
2164   else {
2165         type = Duplicate;
2166         sequence = bt_newdup (bt);
2167         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2168         ins->len += BtId;
2169   }
2170
2171   while( 1 ) { // find the page and slot for the current key
2172         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2173                 ptr = keyptr(set->page, slot);
2174         else {
2175                 if( !bt->err )
2176                         bt->err = BTERR_ovflw;
2177                 return bt->err;
2178         }
2179
2180         // if librarian slot == found slot, advance to real slot
2181
2182         if( slotptr(set->page, slot)->type == Librarian )
2183           if( !keycmp (ptr, key, keylen) )
2184                 ptr = keyptr(set->page, ++slot);
2185
2186         len = ptr->len;
2187
2188         if( slotptr(set->page, slot)->type == Duplicate )
2189                 len -= BtId;
2190
2191         //  if inserting a duplicate key or unique key
2192         //      check for adequate space on the page
2193         //      and insert the new key before slot.
2194
2195         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2196           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2197                 if( bt_splitpage (bt, set) )
2198                   return bt->err;
2199                 else
2200                   continue;
2201
2202           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2203         }
2204
2205         // if key already exists, update value and return
2206
2207         val = valptr(set->page, slot);
2208
2209         if( val->len >= vallen ) {
2210                 if( slotptr(set->page, slot)->dead )
2211                         set->page->act++;
2212                 set->page->garbage += val->len - vallen;
2213                 set->latch->dirty = 1;
2214                 slotptr(set->page, slot)->dead = 0;
2215                 val->len = vallen;
2216                 memcpy (val->value, value, vallen);
2217                 bt_unlockpage(BtLockWrite, set->latch);
2218                 bt_unpinlatch (set->latch);
2219                 return 0;
2220         }
2221
2222         //      new update value doesn't fit in existing value area
2223
2224         if( !slotptr(set->page, slot)->dead )
2225                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2226         else {
2227                 slotptr(set->page, slot)->dead = 0;
2228                 set->page->act++;
2229         }
2230
2231         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2232           if( bt_splitpage (bt, set) )
2233                 return bt->err;
2234           else
2235                 continue;
2236
2237         set->page->min -= vallen + sizeof(BtVal);
2238         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2239         memcpy (val->value, value, vallen);
2240         val->len = vallen;
2241
2242         set->latch->dirty = 1;
2243         set->page->min -= keylen + sizeof(BtKey);
2244         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2245         memcpy (ptr->key, key, keylen);
2246         ptr->len = keylen;
2247         
2248         slotptr(set->page, slot)->off = set->page->min;
2249         bt_unlockpage(BtLockWrite, set->latch);
2250         bt_unpinlatch (set->latch);
2251         return 0;
2252   }
2253   return 0;
2254 }
2255
2256 //      compute hash of string
2257
2258 uint bt_hashkey (unsigned char *key, unsigned int len)
2259 {
2260 uint hash = 0;
2261
2262         while( len >= sizeof(uint) )
2263                 hash *= 11, hash += *(uint *)key, len -= sizeof(uint), key += sizeof(uint);
2264
2265         while( len )
2266                 hash *= 11, hash += *key++ * len--;
2267
2268         return hash;
2269 }
2270
2271 //      add a new lock table entry
2272
2273 uint bt_newlock (BtDb *bt, BtKey *key, uint hashidx)
2274 {
2275 BtLockSet *lock = bt->mgr->locktable;
2276 uint slot, prev;
2277
2278         //      obtain lock manager global lock
2279
2280         bt_spinwritelock (bt->mgr->locklatch);
2281
2282         //      return NULL if table is full
2283
2284         if( !(slot = bt->mgr->lockfree) ) {
2285                 bt_spinreleasewrite (bt->mgr->locklatch);
2286                 return 0;
2287         }
2288
2289         //  maintain free chain
2290
2291         bt->mgr->lockfree = lock[slot].next;
2292         bt_spinreleasewrite (bt->mgr->locklatch);
2293         
2294         if( prev = bt->mgr->hashlock[hashidx].slot )
2295                 lock[prev].prev = slot;
2296
2297         bt->mgr->hashlock[hashidx].slot = slot;
2298         lock[slot].hashidx = hashidx;
2299         lock[slot].next = prev;
2300         lock[slot].prev = 0;
2301
2302         //      save the key being locked
2303
2304         memcpy (lock[slot].key, key, key->len + sizeof(BtKey));
2305         return slot;
2306 }
2307
2308 //      add key to the lock table
2309 //      block until available.
2310
2311 uint bt_setlock(BtDb *bt, BtKey *key)
2312 {
2313 uint hashidx = bt_hashkey(key->key, key->len) % bt->mgr->lockhash;
2314 BtLockSet *lock = NULL;
2315 BtKey *key2;
2316 uint slot;
2317
2318   //  find existing lock entry
2319   //  or recover from full table
2320
2321   while( lock == NULL ) {
2322         //      obtain lock on hash slot
2323
2324         bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2325
2326         if( slot = bt->mgr->hashlock[hashidx].slot )
2327           do {
2328                 lock = bt->mgr->locktable + slot;
2329                 key2 = (BtKey *)lock->key;
2330
2331                 if( !keycmp (key, key2->key, key2->len) )
2332                         break;
2333           } while( slot = lock->next );
2334
2335         if( slot )
2336                 break;
2337
2338         if( slot = bt_newlock (bt, key, hashidx) )
2339                 break;
2340
2341         bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2342 #ifdef unix
2343         sched_yield();
2344 #else
2345         SwitchToThread ();
2346 #endif
2347   }
2348
2349   lock = bt->mgr->locktable + slot;
2350   lock->pin++;
2351
2352   bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2353   WriteLock (lock->readwr);
2354   return slot;
2355 }
2356
2357 void bt_lockclr (BtDb *bt, uint slot)
2358 {
2359 BtLockSet *lock = bt->mgr->locktable + slot;
2360 uint hashidx = lock->hashidx;
2361 uint next, prev;
2362         
2363         bt_spinwritelock (bt->mgr->hashlock[hashidx].latch);
2364         WriteRelease (lock->readwr);
2365
2366         // if entry is no longer in use,
2367         //      return it to the free chain.
2368
2369         if( !--lock->pin ) {
2370                 if( next = lock->next )
2371                         bt->mgr->locktable[next].prev = lock->prev;
2372
2373                 if( prev = lock->prev )
2374                         bt->mgr->locktable[prev].next = lock->next;
2375                 else
2376                         bt->mgr->hashlock[lock->hashidx].slot = next;
2377
2378                 bt_spinwritelock (bt->mgr->locklatch);
2379                 lock->next = bt->mgr->lockfree;
2380                 bt->mgr->lockfree = slot;
2381                 bt_spinreleasewrite (bt->mgr->locklatch);
2382         }
2383
2384         bt_spinreleasewrite (bt->mgr->hashlock[hashidx].latch);
2385 }
2386
2387 //      atomic insert of a batch of keys.
2388 //      return -1 if BTERR is set
2389 //      otherwise return slot number
2390 //      causing the key constraint violation
2391 //      or zero on successful completion.
2392
2393 int bt_atomicinsert (BtDb *bt, BtPage source)
2394 {
2395 uint locks[MAX_atomic];
2396 BtKey *key, *key2;
2397 int result = 0;
2398 BtSlot temp[1];
2399 uint slot, idx;
2400 BtVal *val;
2401 int type;
2402
2403         // first sort the list of keys into order to
2404         //      prevent deadlocks between threads.
2405
2406         for( slot = 1; slot++ < source->cnt; ) {
2407           *temp = *slotptr(source,slot);
2408           key = keyptr (source,slot);
2409           for( idx = slot; --idx; ) {
2410             key2 = keyptr (source,idx);
2411                 if( keycmp (key, key2->key, key2->len) < 0 ) {
2412                   *slotptr(source,idx+1) = *slotptr(source,idx);
2413                   *slotptr(source,idx) = *temp;
2414                 } else
2415                   break;
2416           }
2417         }
2418
2419         // take each unique-type key and add it to the lock table
2420
2421         for( slot = 0; slot++ < source->cnt; )
2422           if( slotptr(source, slot)->type == Unique )
2423                 locks[slot] = bt_setlock (bt, keyptr(source,slot));
2424
2425         // Lookup each unique key and determine constraint violations
2426
2427         for( slot = 0; slot++ < source->cnt; )
2428           if( slotptr(source, slot)->type == Unique ) {
2429                 key = keyptr(source, slot);
2430                 if( bt_findkey (bt, key->key, key->len, NULL, 0) < 0 )
2431                   continue;
2432                 result = slot;
2433                 break;
2434           }
2435
2436         //      add each key to the btree
2437
2438         if( !result )
2439          for( slot = 0; slot++ < source->cnt; ) {
2440           key = keyptr(source,slot);
2441           val = valptr(source,slot);
2442           type = slotptr(source,slot)->type;
2443           if( bt_insertkey (bt, key->key, key->len, 0, val->value, val->len, type == Unique) )
2444                 return -1;
2445          }
2446
2447         // remove each unique-type key from the lock table
2448
2449         for( slot = 0; slot++ < source->cnt; )
2450           if( slotptr(source, slot)->type == Unique )
2451                 bt_lockclr (bt, locks[slot]);
2452
2453         return result;
2454 }
2455
2456 //      set cursor to highest slot on highest page
2457
2458 uint bt_lastkey (BtDb *bt)
2459 {
2460 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2461 BtPageSet set[1];
2462 uint slot;
2463
2464         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2465                 set->page = bt_mappage (bt, set->latch);
2466         else
2467                 return 0;
2468
2469     bt_lockpage(BtLockRead, set->latch);
2470
2471         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2472         slot = set->page->cnt;
2473
2474     bt_unlockpage(BtLockRead, set->latch);
2475         bt_unpinlatch (set->latch);
2476         return slot;
2477 }
2478
2479 //      return previous slot on cursor page
2480
2481 uint bt_prevkey (BtDb *bt, uint slot)
2482 {
2483 BtPageSet set[1];
2484 uid left;
2485
2486         if( --slot )
2487                 return slot;
2488
2489         if( left = bt_getid(bt->cursor->left) )
2490                 bt->cursor_page = left;
2491         else
2492                 return 0;
2493
2494         if( set->latch = bt_pinlatch (bt, left, 1) )
2495                 set->page = bt_mappage (bt, set->latch);
2496         else
2497                 return 0;
2498
2499     bt_lockpage(BtLockRead, set->latch);
2500         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2501         bt_unlockpage(BtLockRead, set->latch);
2502         bt_unpinlatch (set->latch);
2503         return bt->cursor->cnt;
2504 }
2505
2506 //  return next slot on cursor page
2507 //  or slide cursor right into next page
2508
2509 uint bt_nextkey (BtDb *bt, uint slot)
2510 {
2511 BtPageSet set[1];
2512 uid right;
2513
2514   do {
2515         right = bt_getid(bt->cursor->right);
2516
2517         while( slot++ < bt->cursor->cnt )
2518           if( slotptr(bt->cursor,slot)->dead )
2519                 continue;
2520           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2521                 return slot;
2522           else
2523                 break;
2524
2525         if( !right )
2526                 break;
2527
2528         bt->cursor_page = right;
2529
2530         if( set->latch = bt_pinlatch (bt, right, 1) )
2531                 set->page = bt_mappage (bt, set->latch);
2532         else
2533                 return 0;
2534
2535     bt_lockpage(BtLockRead, set->latch);
2536
2537         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2538
2539         bt_unlockpage(BtLockRead, set->latch);
2540         bt_unpinlatch (set->latch);
2541         slot = 0;
2542
2543   } while( 1 );
2544
2545   return bt->err = 0;
2546 }
2547
2548 //  cache page of keys into cursor and return starting slot for given key
2549
2550 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2551 {
2552 BtPageSet set[1];
2553 uint slot;
2554
2555         // cache page for retrieval
2556
2557         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2558           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2559         else
2560           return 0;
2561
2562         bt->cursor_page = set->page_no;
2563
2564         bt_unlockpage(BtLockRead, set->latch);
2565         bt_unpinlatch (set->latch);
2566         return slot;
2567 }
2568
2569 BtKey *bt_key(BtDb *bt, uint slot)
2570 {
2571         return keyptr(bt->cursor, slot);
2572 }
2573
2574 BtVal *bt_val(BtDb *bt, uint slot)
2575 {
2576         return valptr(bt->cursor,slot);
2577 }
2578
2579 #ifdef STANDALONE
2580
2581 #ifndef unix
2582 double getCpuTime(int type)
2583 {
2584 FILETIME crtime[1];
2585 FILETIME xittime[1];
2586 FILETIME systime[1];
2587 FILETIME usrtime[1];
2588 SYSTEMTIME timeconv[1];
2589 double ans = 0;
2590
2591         memset (timeconv, 0, sizeof(SYSTEMTIME));
2592
2593         switch( type ) {
2594         case 0:
2595                 GetSystemTimeAsFileTime (xittime);
2596                 FileTimeToSystemTime (xittime, timeconv);
2597                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2598                 break;
2599         case 1:
2600                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2601                 FileTimeToSystemTime (usrtime, timeconv);
2602                 break;
2603         case 2:
2604                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2605                 FileTimeToSystemTime (systime, timeconv);
2606                 break;
2607         }
2608
2609         ans += (double)timeconv->wHour * 3600;
2610         ans += (double)timeconv->wMinute * 60;
2611         ans += (double)timeconv->wSecond;
2612         ans += (double)timeconv->wMilliseconds / 1000;
2613         return ans;
2614 }
2615 #else
2616 #include <time.h>
2617 #include <sys/resource.h>
2618
2619 double getCpuTime(int type)
2620 {
2621 struct rusage used[1];
2622 struct timeval tv[1];
2623
2624         switch( type ) {
2625         case 0:
2626                 gettimeofday(tv, NULL);
2627                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2628
2629         case 1:
2630                 getrusage(RUSAGE_SELF, used);
2631                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2632
2633         case 2:
2634                 getrusage(RUSAGE_SELF, used);
2635                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2636         }
2637
2638         return 0;
2639 }
2640 #endif
2641
2642 void bt_poolaudit (BtMgr *mgr)
2643 {
2644 BtLatchSet *latch;
2645 uint slot = 0;
2646
2647         while( slot++ < mgr->latchdeployed ) {
2648                 latch = mgr->latchsets + slot;
2649
2650                 if( *latch->readwr->rin & MASK )
2651                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2652                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2653
2654                 if( *latch->access->rin & MASK )
2655                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2656                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2657
2658                 if( *latch->parent->rin & MASK )
2659                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2660                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2661
2662                 if( latch->pin & ~CLOCK_bit ) {
2663                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2664                         latch->pin = 0;
2665                 }
2666         }
2667 }
2668
2669 uint bt_latchaudit (BtDb *bt)
2670 {
2671 ushort idx, hashidx;
2672 uid next, page_no;
2673 BtLatchSet *latch;
2674 uint cnt = 0;
2675 BtKey *ptr;
2676
2677         if( *(ushort *)(bt->mgr->alloclatch) )
2678                 fprintf(stderr, "Alloc page locked\n");
2679         *(uint *)(bt->mgr->alloclatch) = 0;
2680
2681         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2682                 latch = bt->mgr->latchsets + idx;
2683                 if( *latch->readwr->rin & MASK )
2684                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2685                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2686
2687                 if( *latch->access->rin & MASK )
2688                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2689                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2690
2691                 if( *latch->parent->rin & MASK )
2692                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2693                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2694
2695                 if( latch->pin ) {
2696                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2697                         latch->pin = 0;
2698                 }
2699         }
2700
2701         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2702           if( *(uint *)(bt->mgr->hashtable[hashidx].latch) )
2703                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2704
2705           *(uint *)(bt->mgr->hashtable[hashidx].latch) = 0;
2706
2707           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2708                 latch = bt->mgr->latchsets + idx;
2709                 if( latch->pin )
2710                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2711           } while( idx = latch->next );
2712         }
2713
2714         page_no = LEAF_page;
2715
2716         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2717         uid off = page_no << bt->mgr->page_bits;
2718 #ifdef unix
2719           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2720 #else
2721         DWORD amt[1];
2722
2723           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2724
2725           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2726                 return bt->err = BTERR_map;
2727
2728           if( *amt <  bt->mgr->page_size )
2729                 return bt->err = BTERR_map;
2730 #endif
2731                 if( !bt->frame->free && !bt->frame->lvl )
2732                         cnt += bt->frame->act;
2733                 page_no++;
2734         }
2735                 
2736         cnt--;  // remove stopper key
2737         fprintf(stderr, " Total keys read %d\n", cnt);
2738
2739         bt_close (bt);
2740         return 0;
2741 }
2742
2743 typedef struct {
2744         char idx;
2745         char *type;
2746         char *infile;
2747         BtMgr *mgr;
2748 } ThreadArg;
2749
2750 //  standalone program to index file of keys
2751 //  then list them onto std-out
2752
2753 #ifdef unix
2754 void *index_file (void *arg)
2755 #else
2756 uint __stdcall index_file (void *arg)
2757 #endif
2758 {
2759 int line = 0, found = 0, cnt = 0, unique;
2760 uid next, page_no = LEAF_page;  // start on first page of leaves
2761 BtPage page = calloc (4096, 1);
2762 unsigned char key[BT_maxkey];
2763 ThreadArg *args = arg;
2764 int ch, len = 0, slot;
2765 BtPageSet set[1];
2766 int atomic;
2767 BtKey *ptr;
2768 BtVal *val;
2769 BtDb *bt;
2770 FILE *in;
2771
2772         bt = bt_open (args->mgr);
2773
2774         unique = (args->type[1] | 0x20) != 'd';
2775         atomic = (args->type[1] | 0x20) == 'a';
2776
2777         switch(args->type[0] | 0x20)
2778         {
2779         case 'a':
2780                 fprintf(stderr, "started latch mgr audit\n");
2781                 cnt = bt_latchaudit (bt);
2782                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2783                 break;
2784
2785         case 'p':
2786                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2787                 if( in = fopen (args->infile, "rb") )
2788                   while( ch = getc(in), ch != EOF )
2789                         if( ch == '\n' )
2790                         {
2791                           line++;
2792
2793                           if( atomic ) {
2794                                 memset (page, 0, 4096);
2795                                 slotptr(page, 1)->off = 2048;
2796                                 slotptr(page, 1)->type = Unique;
2797                                 ptr = keyptr(page,1);
2798                                 ptr->len = 10;
2799                                 memcpy(ptr->key, key, 10);
2800                                 val = valptr(page,1);
2801                                 val->len = len - 10;
2802                                 memcpy (val->value, key + 10, len - 10);
2803                                 page->cnt = 1;
2804                                 if( slot = bt_atomicinsert (bt, page) )
2805                                   fprintf(stderr, "Error %d Line: %d\n", slot, line), exit(0);
2806                           }
2807                           else if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2808                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2809                           len = 0;
2810                         }
2811                         else if( len < BT_maxkey )
2812                                 key[len++] = ch;
2813                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2814                 break;
2815
2816         case 'w':
2817                 fprintf(stderr, "started indexing for %s\n", args->infile);
2818                 if( in = fopen (args->infile, "rb") )
2819                   while( ch = getc(in), ch != EOF )
2820                         if( ch == '\n' )
2821                         {
2822                           line++;
2823                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2824                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2825                           len = 0;
2826                         }
2827                         else if( len < BT_maxkey )
2828                                 key[len++] = ch;
2829                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2830                 break;
2831
2832         case 'd':
2833                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2834                 if( in = fopen (args->infile, "rb") )
2835                   while( ch = getc(in), ch != EOF )
2836                         if( ch == '\n' )
2837                         {
2838                           line++;
2839                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2840                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2841                           ptr = (BtKey*)(bt->key);
2842                           found++;
2843
2844                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2845                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2846                           len = 0;
2847                         }
2848                         else if( len < BT_maxkey )
2849                                 key[len++] = ch;
2850                 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2851                 break;
2852
2853         case 'f':
2854                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2855                 if( in = fopen (args->infile, "rb") )
2856                   while( ch = getc(in), ch != EOF )
2857                         if( ch == '\n' )
2858                         {
2859                           line++;
2860                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2861                                 found++;
2862                           else if( bt->err )
2863                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2864                           len = 0;
2865                         }
2866                         else if( len < BT_maxkey )
2867                                 key[len++] = ch;
2868                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2869                 break;
2870
2871         case 's':
2872                 fprintf(stderr, "started scanning\n");
2873                 do {
2874                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2875                                 set->page = bt_mappage (bt, set->latch);
2876                         else
2877                                 fprintf(stderr, "unable to obtain latch"), exit(1);
2878                         bt_lockpage (BtLockRead, set->latch);
2879                         next = bt_getid (set->page->right);
2880
2881                         for( slot = 0; slot++ < set->page->cnt; )
2882                          if( next || slot < set->page->cnt )
2883                           if( !slotptr(set->page, slot)->dead ) {
2884                                 ptr = keyptr(set->page, slot);
2885                                 len = ptr->len;
2886
2887                             if( slotptr(set->page, slot)->type == Duplicate )
2888                                         len -= BtId;
2889
2890                                 fwrite (ptr->key, len, 1, stdout);
2891                                 val = valptr(set->page, slot);
2892                                 fwrite (val->value, val->len, 1, stdout);
2893                                 fputc ('\n', stdout);
2894                                 cnt++;
2895                            }
2896
2897                         bt_unlockpage (BtLockRead, set->latch);
2898                         bt_unpinlatch (set->latch);
2899                 } while( page_no = next );
2900
2901                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2902                 break;
2903
2904         case 'r':
2905                 fprintf(stderr, "started reverse scan\n");
2906                 if( slot = bt_lastkey (bt) )
2907                    while( slot = bt_prevkey (bt, slot) ) {
2908                         if( slotptr(bt->cursor, slot)->dead )
2909                           continue;
2910
2911                         ptr = keyptr(bt->cursor, slot);
2912                         len = ptr->len;
2913
2914                         if( slotptr(bt->cursor, slot)->type == Duplicate )
2915                                 len -= BtId;
2916
2917                         fwrite (ptr->key, len, 1, stdout);
2918                         val = valptr(bt->cursor, slot);
2919                         fwrite (val->value, val->len, 1, stdout);
2920                         fputc ('\n', stdout);
2921                         cnt++;
2922                   }
2923
2924                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2925                 break;
2926
2927         case 'c':
2928 #ifdef unix
2929                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2930 #endif
2931                 fprintf(stderr, "started counting\n");
2932                 page_no = LEAF_page;
2933
2934                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2935                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
2936                                 break;
2937
2938                         if( !bt->frame->free && !bt->frame->lvl )
2939                                 cnt += bt->frame->act;
2940
2941                         bt->reads++;
2942                         page_no++;
2943                 }
2944                 
2945                 cnt--;  // remove stopper key
2946                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2947                 break;
2948         }
2949
2950         bt_close (bt);
2951 #ifdef unix
2952         return NULL;
2953 #else
2954         return 0;
2955 #endif
2956 }
2957
2958 typedef struct timeval timer;
2959
2960 int main (int argc, char **argv)
2961 {
2962 int idx, cnt, len, slot, err;
2963 int segsize, bits = 16;
2964 double start, stop;
2965 #ifdef unix
2966 pthread_t *threads;
2967 #else
2968 HANDLE *threads;
2969 #endif
2970 ThreadArg *args;
2971 uint poolsize = 0;
2972 uint locksize = 0;
2973 float elapsed;
2974 char key[1];
2975 BtMgr *mgr;
2976 BtKey *ptr;
2977 BtDb *bt;
2978
2979         if( argc < 3 ) {
2980                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find/Atomic [page_bits buffer_pool_size lock_mgr_size src_file1 src_file2 ... ]\n", argv[0]);
2981                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2982                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
2983                 fprintf (stderr, "  lock_mgr_size is the maximum number of outstanding key locks\n");
2984                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2985                 exit(0);
2986         }
2987
2988         start = getCpuTime(0);
2989
2990         if( argc > 3 )
2991                 bits = atoi(argv[3]);
2992
2993         if( argc > 4 )
2994                 poolsize = atoi(argv[4]);
2995
2996         if( !poolsize )
2997                 fprintf (stderr, "Warning: no mapped_pool\n");
2998
2999         if( argc > 5 )
3000                 locksize = atoi(argv[5]);
3001
3002         cnt = argc - 6;
3003 #ifdef unix
3004         threads = malloc (cnt * sizeof(pthread_t));
3005 #else
3006         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3007 #endif
3008         args = malloc (cnt * sizeof(ThreadArg));
3009
3010         mgr = bt_mgr ((argv[1]), bits, poolsize, locksize);
3011
3012         if( !mgr ) {
3013                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3014                 exit (1);
3015         }
3016
3017         //      fire off threads
3018
3019         for( idx = 0; idx < cnt; idx++ ) {
3020                 args[idx].infile = argv[idx + 6];
3021                 args[idx].type = argv[2];
3022                 args[idx].mgr = mgr;
3023                 args[idx].idx = idx;
3024 #ifdef unix
3025                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3026                         fprintf(stderr, "Error creating thread %d\n", err);
3027 #else
3028                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3029 #endif
3030         }
3031
3032         //      wait for termination
3033
3034 #ifdef unix
3035         for( idx = 0; idx < cnt; idx++ )
3036                 pthread_join (threads[idx], NULL);
3037 #else
3038         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3039
3040         for( idx = 0; idx < cnt; idx++ )
3041                 CloseHandle(threads[idx]);
3042
3043 #endif
3044         bt_poolaudit(mgr);
3045         bt_mgrclose (mgr);
3046
3047         elapsed = getCpuTime(0) - start;
3048         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3049         elapsed = getCpuTime(1);
3050         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3051         elapsed = getCpuTime(2);
3052         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3053 }
3054
3055 #endif  //STANDALONE