]> pd.if.org Git - btree/blob - threadskv8.c
3bae89e89b861f8d99806ef2937a5fada4805f80
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      traditional buffer pool manager
7 //      and ACID batched key updates
8
9 // 21 SEP 2014
10
11 // author: karl malbrain, malbrain@cal.berkeley.edu
12
13 /*
14 This work, including the source code, documentation
15 and related data, is placed into the public domain.
16
17 The orginal author is Karl Malbrain.
18
19 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
20 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
21 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
22 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
23 RESULTING FROM THE USE, MODIFICATION, OR
24 REDISTRIBUTION OF THIS SOFTWARE.
25 */
26
27 // Please see the project home page for documentation
28 // code.google.com/p/high-concurrency-btree
29
30 #define _FILE_OFFSET_BITS 64
31 #define _LARGEFILE64_SOURCE
32
33 #ifdef linux
34 #define _GNU_SOURCE
35 #endif
36
37 #ifdef unix
38 #include <unistd.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <fcntl.h>
42 #include <sys/time.h>
43 #include <sys/mman.h>
44 #include <errno.h>
45 #include <pthread.h>
46 #else
47 #define WIN32_LEAN_AND_MEAN
48 #include <windows.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51 #include <time.h>
52 #include <fcntl.h>
53 #include <process.h>
54 #include <intrin.h>
55 #endif
56
57 #include <memory.h>
58 #include <string.h>
59 #include <stddef.h>
60
61 typedef unsigned long long      uid;
62
63 #ifndef unix
64 typedef unsigned long long      off64_t;
65 typedef unsigned short          ushort;
66 typedef unsigned int            uint;
67 #endif
68
69 #define BT_ro 0x6f72    // ro
70 #define BT_rw 0x7772    // rw
71
72 #define BT_maxbits              24                                      // maximum page size in bits
73 #define BT_minbits              9                                       // minimum page size in bits
74 #define BT_minpage              (1 << BT_minbits)       // minimum page size
75 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
76
77 //  BTree page number constants
78 #define ALLOC_page              0       // allocation page
79 #define ROOT_page               1       // root of the btree
80 #define LEAF_page               2       // first page of leaves
81
82 //      Number of levels to create in a new BTree
83
84 #define MIN_lvl                 2
85
86 /*
87 There are six lock types for each node in four independent sets: 
88 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
89 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
90 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
91 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
92 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
93 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
94 */
95
96 typedef enum{
97         BtLockAccess = 1,
98         BtLockDelete = 2,
99         BtLockRead   = 4,
100         BtLockWrite  = 8,
101         BtLockParent = 16,
102         BtLockAtomic = 32
103 } BtLock;
104
105 //      definition for phase-fair reader/writer lock implementation
106
107 typedef struct {
108         ushort rin[1];
109         ushort rout[1];
110         ushort ticket[1];
111         ushort serving[1];
112 } RWLock;
113
114 #define PHID 0x1
115 #define PRES 0x2
116 #define MASK 0x3
117 #define RINC 0x4
118
119 //      definition for spin latch implementation
120
121 // exclusive is set for write access
122 // share is count of read accessors
123 // grant write lock when share == 0
124
125 volatile typedef struct {
126         ushort exclusive:1;
127         ushort pending:1;
128         ushort share:14;
129 } BtSpinLatch;
130
131 #define XCL 1
132 #define PEND 2
133 #define BOTH 3
134 #define SHARE 4
135
136 //  hash table entries
137
138 typedef struct {
139         volatile uint slot;             // Latch table entry at head of chain
140         BtSpinLatch latch[1];
141 } BtHashEntry;
142
143 //      latch manager table structure
144
145 typedef struct {
146         uid page_no;                    // latch set page number
147         RWLock readwr[1];               // read/write page lock
148         RWLock access[1];               // Access Intent/Page delete
149         RWLock parent[1];               // Posting of fence key in parent
150         RWLock atomic[1];               // Atomic update in progress
151         uint split;                             // right split page atomic insert
152         uint entry;                             // entry slot in latch table
153         uint next;                              // next entry in hash table chain
154         uint prev;                              // prev entry in hash table chain
155         volatile ushort pin;    // number of outstanding threads
156         ushort dirty:1;                 // page in cache is dirty
157 #ifdef unix
158         pthread_t atomictid;    // pid holding atomic lock
159 #else
160         uint atomictid;                 // pid holding atomic lock
161 #endif
162 } BtLatchSet;
163
164 //      Define the length of the page record numbers
165
166 #define BtId 6
167
168 //      Page key slot definition.
169
170 //      Keys are marked dead, but remain on the page until
171 //      it cleanup is called. The fence key (highest key) for
172 //      a leaf page is always present, even after cleanup.
173
174 //      Slot types
175
176 //      In addition to the Unique keys that occupy slots
177 //      there are Librarian and Duplicate key
178 //      slots occupying the key slot array.
179
180 //      The Librarian slots are dead keys that
181 //      serve as filler, available to add new Unique
182 //      or Dup slots that are inserted into the B-tree.
183
184 //      The Duplicate slots have had their key bytes extended
185 //      by 6 bytes to contain a binary duplicate key uniqueifier.
186
187 typedef enum {
188         Unique,
189         Librarian,
190         Duplicate,
191         Delete
192 } BtSlotType;
193
194 typedef struct {
195         uint off:BT_maxbits;    // page offset for key start
196         uint type:3;                    // type of slot
197         uint dead:1;                    // set for deleted slot
198 } BtSlot;
199
200 //      The key structure occupies space at the upper end of
201 //      each page.  It's a length byte followed by the key
202 //      bytes.
203
204 typedef struct {
205         unsigned char len;              // this can be changed to a ushort or uint
206         unsigned char key[0];
207 } BtKey;
208
209 //      the value structure also occupies space at the upper
210 //      end of the page. Each key is immediately followed by a value.
211
212 typedef struct {
213         unsigned char len;              // this can be changed to a ushort or uint
214         unsigned char value[0];
215 } BtVal;
216
217 #define BT_maxkey       255             // maximum number of bytes in a key
218 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
219
220 //      The first part of an index page.
221 //      It is immediately followed
222 //      by the BtSlot array of keys.
223
224 //      note that this structure size
225 //      must be a multiple of 8 bytes
226 //      in order to place dups correctly.
227
228 typedef struct BtPage_ {
229         uint cnt;                                       // count of keys in page
230         uint act;                                       // count of active keys
231         uint min;                                       // next key offset
232         uint garbage;                           // page garbage in bytes
233         unsigned char bits:7;           // page size in bits
234         unsigned char free:1;           // page is on free chain
235         unsigned char lvl:7;            // level of page
236         unsigned char kill:1;           // page is being deleted
237         unsigned char right[BtId];      // page number to right
238 } *BtPage;
239
240 //  The loadpage interface object
241
242 typedef struct {
243         BtPage page;            // current page pointer
244         BtLatchSet *latch;      // current page latch set
245 } BtPageSet;
246
247 //      structure for latch manager on ALLOC_page
248
249 typedef struct {
250         struct BtPage_ alloc[1];        // next page_no in right ptr
251         unsigned long long dups[1];     // global duplicate key uniqueifier
252         unsigned char chain[BtId];      // head of free page_nos chain
253 } BtPageZero;
254
255 //      The object structure for Btree access
256
257 typedef struct {
258         uint page_size;                         // page size    
259         uint page_bits;                         // page size in bits    
260 #ifdef unix
261         int idx;
262 #else
263         HANDLE idx;
264 #endif
265         BtPageZero *pagezero;           // mapped allocation page
266         BtSpinLatch lock[1];            // allocation area lite latch
267         uint latchdeployed;                     // highest number of latch entries deployed
268         uint nlatchpage;                        // number of latch pages at BT_latch
269         uint latchtotal;                        // number of page latch entries
270         uint latchhash;                         // number of latch hash table slots
271         uint latchvictim;                       // next latch entry to examine
272         BtHashEntry *hashtable;         // the buffer pool hash table entries
273         BtLatchSet *latchsets;          // mapped latch set from buffer pool
274         unsigned char *pagepool;        // mapped to the buffer pool pages
275 #ifndef unix
276         HANDLE halloc;                          // allocation handle
277         HANDLE hpool;                           // buffer pool handle
278 #endif
279 } BtMgr;
280
281 typedef struct {
282         BtMgr *mgr;                             // buffer manager for thread
283         BtPage cursor;                  // cached frame for start/next (never mapped)
284         BtPage frame;                   // spare frame for the page split (never mapped)
285         uid cursor_page;                                // current cursor page number   
286         unsigned char *mem;                             // frame, cursor, page memory buffer
287         unsigned char key[BT_keyarray]; // last found complete key
288         int found;                              // last delete or insert was found
289         int err;                                // last error
290         int reads, writes;              // number of reads and writes from the btree
291 } BtDb;
292
293 typedef enum {
294         BTERR_ok = 0,
295         BTERR_struct,
296         BTERR_ovflw,
297         BTERR_lock,
298         BTERR_map,
299         BTERR_read,
300         BTERR_wrt,
301         BTERR_atomic
302 } BTERR;
303
304 #define CLOCK_bit 0x8000
305
306 // B-Tree functions
307 extern void bt_close (BtDb *bt);
308 extern BtDb *bt_open (BtMgr *mgr);
309 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
310 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
311 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
312 extern BtKey *bt_foundkey (BtDb *bt);
313 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
314 extern uint bt_nextkey   (BtDb *bt, uint slot);
315
316 //      manager functions
317 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
318 void bt_mgrclose (BtMgr *mgr);
319
320 //  Helper functions to return slot values
321 //      from the cursor page.
322
323 extern BtKey *bt_key (BtDb *bt, uint slot);
324 extern BtVal *bt_val (BtDb *bt, uint slot);
325
326 //  The page is allocated from low and hi ends.
327 //  The key slots are allocated from the bottom,
328 //      while the text and value of the key
329 //  are allocated from the top.  When the two
330 //  areas meet, the page is split into two.
331
332 //  A key consists of a length byte, two bytes of
333 //  index number (0 - 65535), and up to 253 bytes
334 //  of key value.
335
336 //  Associated with each key is a value byte string
337 //      containing any value desired.
338
339 //  The b-tree root is always located at page 1.
340 //      The first leaf page of level zero is always
341 //      located on page 2.
342
343 //      The b-tree pages are linked with next
344 //      pointers to facilitate enumerators,
345 //      and provide for concurrency.
346
347 //      When to root page fills, it is split in two and
348 //      the tree height is raised by a new root at page
349 //      one with two keys.
350
351 //      Deleted keys are marked with a dead bit until
352 //      page cleanup. The fence key for a leaf node is
353 //      always present
354
355 //  To achieve maximum concurrency one page is locked at a time
356 //  as the tree is traversed to find leaf key in question. The right
357 //  page numbers are used in cases where the page is being split,
358 //      or consolidated.
359
360 //  Page 0 is dedicated to lock for new page extensions,
361 //      and chains empty pages together for reuse. It also
362 //      contains the latch manager hash table.
363
364 //      The ParentModification lock on a node is obtained to serialize posting
365 //      or changing the fence key for a node.
366
367 //      Empty pages are chained together through the ALLOC page and reused.
368
369 //      Access macros to address slot and key values from the page
370 //      Page slots use 1 based indexing.
371
372 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
373 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
374 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
375
376 void bt_putid(unsigned char *dest, uid id)
377 {
378 int i = BtId;
379
380         while( i-- )
381                 dest[i] = (unsigned char)id, id >>= 8;
382 }
383
384 uid bt_getid(unsigned char *src)
385 {
386 uid id = 0;
387 int i;
388
389         for( i = 0; i < BtId; i++ )
390                 id <<= 8, id |= *src++; 
391
392         return id;
393 }
394
395 uid bt_newdup (BtDb *bt)
396 {
397 #ifdef unix
398         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
399 #else
400         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
401 #endif
402 }
403
404 //      Phase-Fair reader/writer lock implementation
405
406 void WriteLock (RWLock *lock)
407 {
408 ushort w, r, tix;
409
410 #ifdef unix
411         tix = __sync_fetch_and_add (lock->ticket, 1);
412 #else
413         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
414 #endif
415         // wait for our ticket to come up
416
417         while( tix != lock->serving[0] )
418 #ifdef unix
419                 sched_yield();
420 #else
421                 SwitchToThread ();
422 #endif
423
424         w = PRES | (tix & PHID);
425 #ifdef  unix
426         r = __sync_fetch_and_add (lock->rin, w);
427 #else
428         r = _InterlockedExchangeAdd16 (lock->rin, w);
429 #endif
430         while( r != *lock->rout )
431 #ifdef unix
432                 sched_yield();
433 #else
434                 SwitchToThread();
435 #endif
436 }
437
438 void WriteRelease (RWLock *lock)
439 {
440 #ifdef unix
441         __sync_fetch_and_and (lock->rin, ~MASK);
442 #else
443         _InterlockedAnd16 (lock->rin, ~MASK);
444 #endif
445         lock->serving[0]++;
446 }
447
448 void ReadLock (RWLock *lock)
449 {
450 ushort w;
451 #ifdef unix
452         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
453 #else
454         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
455 #endif
456         if( w )
457           while( w == (*lock->rin & MASK) )
458 #ifdef unix
459                 sched_yield ();
460 #else
461                 SwitchToThread ();
462 #endif
463 }
464
465 void ReadRelease (RWLock *lock)
466 {
467 #ifdef unix
468         __sync_fetch_and_add (lock->rout, RINC);
469 #else
470         _InterlockedExchangeAdd16 (lock->rout, RINC);
471 #endif
472 }
473
474 //      Spin Latch Manager
475
476 //      wait until write lock mode is clear
477 //      and add 1 to the share count
478
479 void bt_spinreadlock(BtSpinLatch *latch)
480 {
481 ushort prev;
482
483   do {
484 #ifdef unix
485         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
486 #else
487         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
488 #endif
489         //  see if exclusive request is granted or pending
490
491         if( !(prev & BOTH) )
492                 return;
493 #ifdef unix
494         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
495 #else
496         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
497 #endif
498 #ifdef  unix
499   } while( sched_yield(), 1 );
500 #else
501   } while( SwitchToThread(), 1 );
502 #endif
503 }
504
505 //      wait for other read and write latches to relinquish
506
507 void bt_spinwritelock(BtSpinLatch *latch)
508 {
509 ushort prev;
510
511   do {
512 #ifdef  unix
513         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
514 #else
515         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
516 #endif
517         if( !(prev & XCL) )
518           if( !(prev & ~BOTH) )
519                 return;
520           else
521 #ifdef unix
522                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
523 #else
524                 _InterlockedAnd16((ushort *)latch, ~XCL);
525 #endif
526 #ifdef  unix
527   } while( sched_yield(), 1 );
528 #else
529   } while( SwitchToThread(), 1 );
530 #endif
531 }
532
533 //      try to obtain write lock
534
535 //      return 1 if obtained,
536 //              0 otherwise
537
538 int bt_spinwritetry(BtSpinLatch *latch)
539 {
540 ushort prev;
541
542 #ifdef  unix
543         prev = __sync_fetch_and_or((ushort *)latch, XCL);
544 #else
545         prev = _InterlockedOr16((ushort *)latch, XCL);
546 #endif
547         //      take write access if all bits are clear
548
549         if( !(prev & XCL) )
550           if( !(prev & ~BOTH) )
551                 return 1;
552           else
553 #ifdef unix
554                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
555 #else
556                 _InterlockedAnd16((ushort *)latch, ~XCL);
557 #endif
558         return 0;
559 }
560
561 //      clear write mode
562
563 void bt_spinreleasewrite(BtSpinLatch *latch)
564 {
565 #ifdef unix
566         __sync_fetch_and_and((ushort *)latch, ~BOTH);
567 #else
568         _InterlockedAnd16((ushort *)latch, ~BOTH);
569 #endif
570 }
571
572 //      decrement reader count
573
574 void bt_spinreleaseread(BtSpinLatch *latch)
575 {
576 #ifdef unix
577         __sync_fetch_and_add((ushort *)latch, -SHARE);
578 #else
579         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
580 #endif
581 }
582
583 //      read page from permanent location in Btree file
584
585 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
586 {
587 off64_t off = page_no << mgr->page_bits;
588
589 #ifdef unix
590         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
591                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
592                 return BTERR_read;
593         }
594 #else
595 OVERLAPPED ovl[1];
596 uint amt[1];
597
598         memset (ovl, 0, sizeof(OVERLAPPED));
599         ovl->Offset = off;
600         ovl->OffsetHigh = off >> 32;
601
602         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
603                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
604                 return BTERR_read;
605         }
606         if( *amt <  mgr->page_size ) {
607                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
608                 return BTERR_read;
609         }
610 #endif
611         return 0;
612 }
613
614 //      write page to permanent location in Btree file
615 //      clear the dirty bit
616
617 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
618 {
619 off64_t off = page_no << mgr->page_bits;
620
621 #ifdef unix
622         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
623                 return BTERR_wrt;
624 #else
625 OVERLAPPED ovl[1];
626 uint amt[1];
627
628         memset (ovl, 0, sizeof(OVERLAPPED));
629         ovl->Offset = off;
630         ovl->OffsetHigh = off >> 32;
631
632         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
633                 return BTERR_wrt;
634
635         if( *amt <  mgr->page_size )
636                 return BTERR_wrt;
637 #endif
638         return 0;
639 }
640
641 //      link latch table entry into head of latch hash table
642
643 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
644 {
645 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
646 BtLatchSet *latch = bt->mgr->latchsets + slot;
647
648         if( latch->next = bt->mgr->hashtable[hashidx].slot )
649                 bt->mgr->latchsets[latch->next].prev = slot;
650
651         bt->mgr->hashtable[hashidx].slot = slot;
652         memset (&latch->atomictid, 0, sizeof(latch->atomictid));
653         latch->page_no = page_no;
654         latch->entry = slot;
655         latch->split = 0;
656         latch->prev = 0;
657         latch->pin = 1;
658
659         if( loadit )
660           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
661                 return bt->err;
662           else
663                 bt->reads++;
664
665         return bt->err = 0;
666 }
667
668 //      set CLOCK bit in latch
669 //      decrement pin count
670
671 void bt_unpinlatch (BtLatchSet *latch)
672 {
673 #ifdef unix
674         if( ~latch->pin & CLOCK_bit )
675                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
676         __sync_fetch_and_add(&latch->pin, -1);
677 #else
678         if( ~latch->pin & CLOCK_bit )
679                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
680         _InterlockedDecrement16 (&latch->pin);
681 #endif
682 }
683
684 //  return the btree cached page address
685
686 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
687 {
688 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
689
690         return page;
691 }
692
693 //      find existing latchset or inspire new one
694 //      return with latchset pinned
695
696 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
697 {
698 uint hashidx = page_no % bt->mgr->latchhash;
699 BtLatchSet *latch;
700 uint slot, idx;
701 uint lvl, cnt;
702 off64_t off;
703 uint amt[1];
704 BtPage page;
705
706   //  try to find our entry
707
708   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
709
710   if( slot = bt->mgr->hashtable[hashidx].slot ) do
711   {
712         latch = bt->mgr->latchsets + slot;
713         if( page_no == latch->page_no )
714                 break;
715   } while( slot = latch->next );
716
717   //  found our entry
718   //  increment clock
719
720   if( slot ) {
721         latch = bt->mgr->latchsets + slot;
722 #ifdef unix
723         __sync_fetch_and_add(&latch->pin, 1);
724 #else
725         _InterlockedIncrement16 (&latch->pin);
726 #endif
727         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
728         return latch;
729   }
730
731         //  see if there are any unused pool entries
732 #ifdef unix
733         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
734 #else
735         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
736 #endif
737
738         if( slot < bt->mgr->latchtotal ) {
739                 latch = bt->mgr->latchsets + slot;
740                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
741                         return NULL;
742                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
743                 return latch;
744         }
745
746 #ifdef unix
747         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
748 #else
749         _InterlockedDecrement (&bt->mgr->latchdeployed);
750 #endif
751   //  find and reuse previous entry on victim
752
753   while( 1 ) {
754 #ifdef unix
755         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
756 #else
757         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
758 #endif
759         // try to get write lock on hash chain
760         //      skip entry if not obtained
761         //      or has outstanding pins
762
763         slot %= bt->mgr->latchtotal;
764
765         if( !slot )
766                 continue;
767
768         latch = bt->mgr->latchsets + slot;
769         idx = latch->page_no % bt->mgr->latchhash;
770
771         //      see we are on same chain as hashidx
772
773         if( idx == hashidx )
774                 continue;
775
776         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
777                 continue;
778
779         //  skip this slot if it is pinned
780         //      or the CLOCK bit is set
781
782         if( latch->pin ) {
783           if( latch->pin & CLOCK_bit ) {
784 #ifdef unix
785                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
786 #else
787                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
788 #endif
789           }
790           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
791           continue;
792         }
793
794         //  update permanent page area in btree from buffer pool
795
796         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
797
798         if( latch->dirty )
799           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
800                 return NULL;
801           else
802                 latch->dirty = 0, bt->writes++;
803
804         //  unlink our available slot from its hash chain
805
806         if( latch->prev )
807                 bt->mgr->latchsets[latch->prev].next = latch->next;
808         else
809                 bt->mgr->hashtable[idx].slot = latch->next;
810
811         if( latch->next )
812                 bt->mgr->latchsets[latch->next].prev = latch->prev;
813
814         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
815
816         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
817                 return NULL;
818
819         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
820         return latch;
821   }
822 }
823
824 void bt_mgrclose (BtMgr *mgr)
825 {
826 BtLatchSet *latch;
827 uint num = 0;
828 BtPage page;
829 uint slot;
830
831         //      flush dirty pool pages to the btree
832
833         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
834                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
835                 latch = mgr->latchsets + slot;
836
837                 if( latch->dirty ) {
838                         bt_writepage(mgr, page, latch->page_no);
839                         latch->dirty = 0, num++;
840                 }
841 //              madvise (page, mgr->page_size, MADV_DONTNEED);
842         }
843
844         fprintf(stderr, "%d buffer pool pages flushed\n", num);
845
846 #ifdef unix
847         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
848         munmap (mgr->pagezero, mgr->page_size);
849 #else
850         FlushViewOfFile(mgr->pagezero, 0);
851         UnmapViewOfFile(mgr->pagezero);
852         UnmapViewOfFile(mgr->hashtable);
853         CloseHandle(mgr->halloc);
854         CloseHandle(mgr->hpool);
855 #endif
856 #ifdef unix
857         close (mgr->idx);
858         free (mgr);
859 #else
860         FlushFileBuffers(mgr->idx);
861         CloseHandle(mgr->idx);
862         GlobalFree (mgr);
863 #endif
864 }
865
866 //      close and release memory
867
868 void bt_close (BtDb *bt)
869 {
870 #ifdef unix
871         if( bt->mem )
872                 free (bt->mem);
873 #else
874         if( bt->mem)
875                 VirtualFree (bt->mem, 0, MEM_RELEASE);
876 #endif
877         free (bt);
878 }
879
880 //  open/create new btree buffer manager
881
882 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
883 //              size of page pool (e.g. 262144)
884
885 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
886 {
887 uint lvl, attr, last, slot, idx;
888 uint nlatchpage, latchhash;
889 unsigned char value[BtId];
890 int flag, initit = 0;
891 BtPageZero *pagezero;
892 off64_t size;
893 uint amt[1];
894 BtMgr* mgr;
895 BtKey* key;
896 BtVal *val;
897
898         // determine sanity of page size and buffer pool
899
900         if( bits > BT_maxbits )
901                 bits = BT_maxbits;
902         else if( bits < BT_minbits )
903                 bits = BT_minbits;
904
905         if( nodemax < 16 ) {
906                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
907                 return NULL;
908         }
909
910 #ifdef unix
911         mgr = calloc (1, sizeof(BtMgr));
912
913         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
914
915         if( mgr->idx == -1 ) {
916                 fprintf (stderr, "Unable to open btree file\n");
917                 return free(mgr), NULL;
918         }
919 #else
920         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
921         attr = FILE_ATTRIBUTE_NORMAL;
922         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
923
924         if( mgr->idx == INVALID_HANDLE_VALUE )
925                 return GlobalFree(mgr), NULL;
926 #endif
927
928 #ifdef unix
929         pagezero = valloc (BT_maxpage);
930         *amt = 0;
931
932         // read minimum page size to get root info
933         //      to support raw disk partition files
934         //      check if bits == 0 on the disk.
935
936         if( size = lseek (mgr->idx, 0L, 2) )
937                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
938                         if( pagezero->alloc->bits )
939                                 bits = pagezero->alloc->bits;
940                         else
941                                 initit = 1;
942                 else
943                         return free(mgr), free(pagezero), NULL;
944         else
945                 initit = 1;
946 #else
947         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
948         size = GetFileSize(mgr->idx, amt);
949
950         if( size || *amt ) {
951                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
952                         return bt_mgrclose (mgr), NULL;
953                 bits = pagezero->alloc->bits;
954         } else
955                 initit = 1;
956 #endif
957
958         mgr->page_size = 1 << bits;
959         mgr->page_bits = bits;
960
961         //  calculate number of latch hash table entries
962
963         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
964         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
965
966         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
967         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
968         mgr->latchtotal = nodemax;
969
970         if( !initit )
971                 goto mgrlatch;
972
973         // initialize an empty b-tree with latch page, root page, page of leaves
974         // and page(s) of latches and page pool cache
975
976         memset (pagezero, 0, 1 << bits);
977         pagezero->alloc->bits = mgr->page_bits;
978         bt_putid(pagezero->alloc->right, MIN_lvl+1);
979
980         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
981                 fprintf (stderr, "Unable to create btree page zero\n");
982                 return bt_mgrclose (mgr), NULL;
983         }
984
985         memset (pagezero, 0, 1 << bits);
986         pagezero->alloc->bits = mgr->page_bits;
987
988         for( lvl=MIN_lvl; lvl--; ) {
989                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
990                 key = keyptr(pagezero->alloc, 1);
991                 key->len = 2;           // create stopper key
992                 key->key[0] = 0xff;
993                 key->key[1] = 0xff;
994
995                 bt_putid(value, MIN_lvl - lvl + 1);
996                 val = valptr(pagezero->alloc, 1);
997                 val->len = lvl ? BtId : 0;
998                 memcpy (val->value, value, val->len);
999
1000                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1001                 pagezero->alloc->lvl = lvl;
1002                 pagezero->alloc->cnt = 1;
1003                 pagezero->alloc->act = 1;
1004
1005                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1006                         fprintf (stderr, "Unable to create btree page zero\n");
1007                         return bt_mgrclose (mgr), NULL;
1008                 }
1009         }
1010
1011 mgrlatch:
1012 #ifdef unix
1013         free (pagezero);
1014 #else
1015         VirtualFree (pagezero, 0, MEM_RELEASE);
1016 #endif
1017 #ifdef unix
1018         // mlock the pagezero page
1019
1020         flag = PROT_READ | PROT_WRITE;
1021         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1022         if( mgr->pagezero == MAP_FAILED ) {
1023                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1024                 return bt_mgrclose (mgr), NULL;
1025         }
1026         mlock (mgr->pagezero, mgr->page_size);
1027
1028         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1029         if( mgr->hashtable == MAP_FAILED ) {
1030                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1031                 return bt_mgrclose (mgr), NULL;
1032         }
1033 #else
1034         flag = PAGE_READWRITE;
1035         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1036         if( !mgr->halloc ) {
1037                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1038                 return bt_mgrclose (mgr), NULL;
1039         }
1040
1041         flag = FILE_MAP_WRITE;
1042         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1043         if( !mgr->pagezero ) {
1044                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1045                 return bt_mgrclose (mgr), NULL;
1046         }
1047
1048         flag = PAGE_READWRITE;
1049         size = (uid)mgr->nlatchpage << mgr->page_bits;
1050         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1051         if( !mgr->hpool ) {
1052                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1053                 return bt_mgrclose (mgr), NULL;
1054         }
1055
1056         flag = FILE_MAP_WRITE;
1057         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1058         if( !mgr->hashtable ) {
1059                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1060                 return bt_mgrclose (mgr), NULL;
1061         }
1062 #endif
1063
1064         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1065         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1066
1067         return mgr;
1068 }
1069
1070 //      open BTree access method
1071 //      based on buffer manager
1072
1073 BtDb *bt_open (BtMgr *mgr)
1074 {
1075 BtDb *bt = malloc (sizeof(*bt));
1076
1077         memset (bt, 0, sizeof(*bt));
1078         bt->mgr = mgr;
1079 #ifdef unix
1080         bt->mem = valloc (2 *mgr->page_size);
1081 #else
1082         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1083 #endif
1084         bt->frame = (BtPage)bt->mem;
1085         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1086         return bt;
1087 }
1088
1089 //  compare two keys, return > 0, = 0, or < 0
1090 //  =0: keys are same
1091 //  -1: key2 > key1
1092 //  +1: key2 < key1
1093 //  as the comparison value
1094
1095 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1096 {
1097 uint len1 = key1->len;
1098 int ans;
1099
1100         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1101                 return ans;
1102
1103         if( len1 > len2 )
1104                 return 1;
1105         if( len1 < len2 )
1106                 return -1;
1107
1108         return 0;
1109 }
1110
1111 // place write, read, or parent lock on requested page_no.
1112
1113 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1114 {
1115         switch( mode ) {
1116         case BtLockRead:
1117                 ReadLock (latch->readwr);
1118                 break;
1119         case BtLockWrite:
1120                 WriteLock (latch->readwr);
1121                 break;
1122         case BtLockAccess:
1123                 ReadLock (latch->access);
1124                 break;
1125         case BtLockDelete:
1126                 WriteLock (latch->access);
1127                 break;
1128         case BtLockParent:
1129                 WriteLock (latch->parent);
1130                 break;
1131         case BtLockAtomic:
1132                 WriteLock (latch->atomic);
1133                 break;
1134         case BtLockAtomic | BtLockRead:
1135                 WriteLock (latch->atomic);
1136                 ReadLock (latch->readwr);
1137                 break;
1138         }
1139 }
1140
1141 // remove write, read, or parent lock on requested page
1142
1143 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1144 {
1145         switch( mode ) {
1146         case BtLockRead:
1147                 ReadRelease (latch->readwr);
1148                 break;
1149         case BtLockWrite:
1150                 WriteRelease (latch->readwr);
1151                 break;
1152         case BtLockAccess:
1153                 ReadRelease (latch->access);
1154                 break;
1155         case BtLockDelete:
1156                 WriteRelease (latch->access);
1157                 break;
1158         case BtLockParent:
1159                 WriteRelease (latch->parent);
1160                 break;
1161         case BtLockAtomic:
1162                 memset (&latch->atomictid, 0, sizeof(latch->atomictid));
1163                 WriteRelease (latch->atomic);
1164                 break;
1165         case BtLockAtomic | BtLockRead:
1166                 ReadRelease (latch->readwr);
1167                 memset (&latch->atomictid, 0, sizeof(latch->atomictid));
1168                 WriteRelease (latch->atomic);
1169                 break;
1170         }
1171 }
1172
1173 //      allocate a new page
1174 //      return with page latched, but unlocked.
1175
1176 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1177 {
1178 uid page_no;
1179 int blk;
1180
1181         //      lock allocation page
1182
1183         bt_spinwritelock(bt->mgr->lock);
1184
1185         // use empty chain first
1186         // else allocate empty page
1187
1188         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1189                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1190                         set->page = bt_mappage (bt, set->latch);
1191                 else
1192                         return bt->err = BTERR_struct, -1;
1193
1194                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1195                 bt_spinreleasewrite(bt->mgr->lock);
1196
1197                 memcpy (set->page, contents, bt->mgr->page_size);
1198                 set->latch->dirty = 1;
1199                 return 0;
1200         }
1201
1202         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1203         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1204
1205         // unlock allocation latch
1206
1207         bt_spinreleasewrite(bt->mgr->lock);
1208
1209         //      don't load cache from btree page
1210
1211         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1212                 set->page = bt_mappage (bt, set->latch);
1213         else
1214                 return bt->err = BTERR_struct;
1215
1216         memcpy (set->page, contents, bt->mgr->page_size);
1217         set->latch->dirty = 1;
1218         return 0;
1219 }
1220
1221 //  find slot in page for given key at a given level
1222
1223 int bt_findslot (BtPage page, unsigned char *key, uint len)
1224 {
1225 uint diff, higher = page->cnt, low = 1, slot;
1226 uint good = 0;
1227
1228         //        make stopper key an infinite fence value
1229
1230         if( bt_getid (page->right) )
1231                 higher++;
1232         else
1233                 good++;
1234
1235         //      low is the lowest candidate.
1236         //  loop ends when they meet
1237
1238         //  higher is already
1239         //      tested as .ge. the passed key.
1240
1241         while( diff = higher - low ) {
1242                 slot = low + ( diff >> 1 );
1243                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1244                         low = slot + 1;
1245                 else
1246                         higher = slot, good++;
1247         }
1248
1249         //      return zero if key is on right link page
1250
1251         return good ? higher : 0;
1252 }
1253
1254 //  find and load page at given level for given key
1255 //      leave page rd or wr locked as requested
1256
1257 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1258 {
1259 uid page_no = ROOT_page, prevpage = 0;
1260 uint drill = 0xff, slot;
1261 BtLatchSet *prevlatch;
1262 uint mode, prevmode;
1263
1264   //  start at root of btree and drill down
1265
1266   do {
1267         // determine lock mode of drill level
1268         mode = (drill == lvl) ? lock : BtLockRead; 
1269
1270         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1271           return 0;
1272
1273         // obtain access lock using lock chaining with Access mode
1274
1275         if( page_no > ROOT_page )
1276           bt_lockpage(BtLockAccess, set->latch);
1277
1278         set->page = bt_mappage (bt, set->latch);
1279
1280         //      release & unpin parent page
1281
1282         if( prevpage ) {
1283           bt_unlockpage(prevmode, prevlatch);
1284           bt_unpinlatch (prevlatch);
1285           prevpage = 0;
1286         }
1287
1288         //      skip Atomic lock on leaf page if already held
1289
1290         if( !drill )
1291          if( mode & BtLockAtomic )
1292           if( pthread_equal (set->latch->atomictid, pthread_self()) )
1293                 mode &= ~BtLockAtomic;
1294
1295         // obtain mode lock using lock chaining through AccessLock
1296
1297         bt_lockpage(mode, set->latch);
1298
1299         if( mode & BtLockAtomic )
1300                 set->latch->atomictid = pthread_self();
1301
1302         if( set->page->free )
1303                 return bt->err = BTERR_struct, 0;
1304
1305         if( page_no > ROOT_page )
1306           bt_unlockpage(BtLockAccess, set->latch);
1307
1308         // re-read and re-lock root after determining actual level of root
1309
1310         if( set->page->lvl != drill) {
1311                 if( set->latch->page_no != ROOT_page )
1312                         return bt->err = BTERR_struct, 0;
1313                         
1314                 drill = set->page->lvl;
1315
1316                 if( lock != BtLockRead && drill == lvl ) {
1317                   bt_unlockpage(mode, set->latch);
1318                   bt_unpinlatch (set->latch);
1319                   continue;
1320                 }
1321         }
1322
1323         prevpage = set->latch->page_no;
1324         prevlatch = set->latch;
1325         prevmode = mode;
1326
1327         //  find key on page at this level
1328         //  and descend to requested level
1329
1330         if( set->page->kill )
1331           goto slideright;
1332
1333         if( slot = bt_findslot (set->page, key, len) ) {
1334           if( drill == lvl )
1335                 return slot;
1336
1337           while( slotptr(set->page, slot)->dead )
1338                 if( slot++ < set->page->cnt )
1339                         continue;
1340                 else
1341                         goto slideright;
1342
1343           page_no = bt_getid(valptr(set->page, slot)->value);
1344           drill--;
1345           continue;
1346         }
1347
1348         //  or slide right into next page
1349
1350 slideright:
1351         page_no = bt_getid(set->page->right);
1352
1353   } while( page_no );
1354
1355   // return error on end of right chain
1356
1357   bt->err = BTERR_struct;
1358   return 0;     // return error
1359 }
1360
1361 //      return page to free list
1362 //      page must be delete & write locked
1363
1364 void bt_freepage (BtDb *bt, BtPageSet *set)
1365 {
1366         //      lock allocation page
1367
1368         bt_spinwritelock (bt->mgr->lock);
1369
1370         //      store chain
1371
1372         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1373         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1374         set->latch->dirty = 1;
1375         set->page->free = 1;
1376
1377         // unlock released page
1378
1379         bt_unlockpage (BtLockDelete, set->latch);
1380         bt_unlockpage (BtLockWrite, set->latch);
1381         bt_unpinlatch (set->latch);
1382
1383         // unlock allocation page
1384
1385         bt_spinreleasewrite (bt->mgr->lock);
1386 }
1387
1388 //      a fence key was deleted from a page
1389 //      push new fence value upwards
1390
1391 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1392 {
1393 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1394 unsigned char value[BtId];
1395 BtKey* ptr;
1396 uint idx;
1397
1398         //      remove the old fence value
1399
1400         ptr = keyptr(set->page, set->page->cnt);
1401         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1402         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1403         set->latch->dirty = 1;
1404
1405         //  cache new fence value
1406
1407         ptr = keyptr(set->page, set->page->cnt);
1408         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1409
1410         bt_lockpage (BtLockParent, set->latch);
1411         bt_unlockpage (BtLockWrite, set->latch);
1412
1413         //      insert new (now smaller) fence key
1414
1415         bt_putid (value, set->latch->page_no);
1416         ptr = (BtKey*)leftkey;
1417
1418         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1419           return bt->err;
1420
1421         //      now delete old fence key
1422
1423         ptr = (BtKey*)rightkey;
1424
1425         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1426                 return bt->err;
1427
1428         bt_unlockpage (BtLockParent, set->latch);
1429         bt_unpinlatch(set->latch);
1430         return 0;
1431 }
1432
1433 //      root has a single child
1434 //      collapse a level from the tree
1435
1436 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1437 {
1438 BtPageSet child[1];
1439 uid page_no;
1440 uint idx;
1441
1442   // find the child entry and promote as new root contents
1443
1444   do {
1445         for( idx = 0; idx++ < root->page->cnt; )
1446           if( !slotptr(root->page, idx)->dead )
1447                 break;
1448
1449         page_no = bt_getid (valptr(root->page, idx)->value);
1450
1451         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1452                 child->page = bt_mappage (bt, child->latch);
1453         else
1454                 return bt->err;
1455
1456         bt_lockpage (BtLockDelete, child->latch);
1457         bt_lockpage (BtLockWrite, child->latch);
1458
1459         memcpy (root->page, child->page, bt->mgr->page_size);
1460         root->latch->dirty = 1;
1461
1462         bt_freepage (bt, child);
1463
1464   } while( root->page->lvl > 1 && root->page->act == 1 );
1465
1466   bt_unlockpage (BtLockWrite, root->latch);
1467   bt_unpinlatch (root->latch);
1468   return 0;
1469 }
1470
1471 //  delete a page and manage keys
1472 //  call with page writelocked
1473 //      returns with page unpinned
1474
1475 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1476 {
1477 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1478 unsigned char value[BtId];
1479 uint lvl = set->page->lvl;
1480 BtPageSet right[1];
1481 uid page_no;
1482 BtKey *ptr;
1483
1484         //      cache copy of fence key
1485         //      to post in parent
1486
1487         ptr = keyptr(set->page, set->page->cnt);
1488         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1489
1490         //      obtain lock on right page
1491
1492         page_no = bt_getid(set->page->right);
1493
1494         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1495                 right->page = bt_mappage (bt, right->latch);
1496         else
1497                 return 0;
1498
1499         bt_lockpage (BtLockWrite, right->latch);
1500
1501         // cache copy of key to update
1502
1503         ptr = keyptr(right->page, right->page->cnt);
1504         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1505
1506         if( right->page->kill )
1507                 return bt->err = BTERR_struct;
1508
1509         // pull contents of right peer into our empty page
1510
1511         memcpy (set->page, right->page, bt->mgr->page_size);
1512         set->latch->dirty = 1;
1513
1514         // mark right page deleted and point it to left page
1515         //      until we can post parent updates that remove access
1516         //      to the deleted page.
1517
1518         bt_putid (right->page->right, set->latch->page_no);
1519         right->latch->dirty = 1;
1520         right->page->kill = 1;
1521
1522         bt_lockpage (BtLockParent, right->latch);
1523         bt_unlockpage (BtLockWrite, right->latch);
1524
1525         bt_lockpage (BtLockParent, set->latch);
1526         bt_unlockpage (BtLockWrite, set->latch);
1527
1528         // redirect higher key directly to our new node contents
1529
1530         bt_putid (value, set->latch->page_no);
1531         ptr = (BtKey*)higherfence;
1532
1533         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1534           return bt->err;
1535
1536         //      delete old lower key to our node
1537
1538         ptr = (BtKey*)lowerfence;
1539
1540         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1541           return bt->err;
1542
1543         //      obtain delete and write locks to right node
1544
1545         bt_unlockpage (BtLockParent, right->latch);
1546         bt_lockpage (BtLockDelete, right->latch);
1547         bt_lockpage (BtLockWrite, right->latch);
1548         bt_freepage (bt, right);
1549
1550         bt_unlockpage (BtLockParent, set->latch);
1551         bt_unpinlatch (set->latch);
1552         bt->found = 1;
1553         return 0;
1554 }
1555
1556 //  find and delete key on page by marking delete flag bit
1557 //  if page becomes empty, delete it from the btree
1558
1559 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1560 {
1561 uint slot, idx, found, fence;
1562 BtPageSet set[1];
1563 BtKey *ptr;
1564 BtVal *val;
1565
1566         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1567                 ptr = keyptr(set->page, slot);
1568         else
1569                 return bt->err;
1570
1571         // if librarian slot, advance to real slot
1572
1573         if( slotptr(set->page, slot)->type == Librarian )
1574                 ptr = keyptr(set->page, ++slot);
1575
1576         fence = slot == set->page->cnt;
1577
1578         // if key is found delete it, otherwise ignore request
1579
1580         if( found = !keycmp (ptr, key, len) )
1581           if( found = slotptr(set->page, slot)->dead == 0 ) {
1582                 val = valptr(set->page,slot);
1583                 slotptr(set->page, slot)->dead = 1;
1584                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1585                 set->page->act--;
1586
1587                 // collapse empty slots beneath the fence
1588
1589                 while( idx = set->page->cnt - 1 )
1590                   if( slotptr(set->page, idx)->dead ) {
1591                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1592                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1593                   } else
1594                         break;
1595           }
1596
1597         //      did we delete a fence key in an upper level?
1598
1599         if( found && lvl && set->page->act && fence )
1600           if( bt_fixfence (bt, set, lvl) )
1601                 return bt->err;
1602           else
1603                 return bt->found = found, 0;
1604
1605         //      do we need to collapse root?
1606
1607         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1608           if( bt_collapseroot (bt, set) )
1609                 return bt->err;
1610           else
1611                 return bt->found = found, 0;
1612
1613         //      delete empty page
1614
1615         if( !set->page->act )
1616                 return bt_deletepage (bt, set);
1617
1618         set->latch->dirty = 1;
1619         bt_unlockpage(BtLockWrite, set->latch);
1620         bt_unpinlatch (set->latch);
1621         return bt->found = found, 0;
1622 }
1623
1624 BtKey *bt_foundkey (BtDb *bt)
1625 {
1626         return (BtKey*)(bt->key);
1627 }
1628
1629 //      advance to next slot
1630
1631 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1632 {
1633 BtLatchSet *prevlatch;
1634 uid page_no;
1635
1636         if( slot < set->page->cnt )
1637                 return slot + 1;
1638
1639         prevlatch = set->latch;
1640
1641         if( page_no = bt_getid(set->page->right) )
1642           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1643                 set->page = bt_mappage (bt, set->latch);
1644           else
1645                 return 0;
1646         else
1647           return bt->err = BTERR_struct, 0;
1648
1649         // obtain access lock using lock chaining with Access mode
1650
1651         bt_lockpage(BtLockAccess, set->latch);
1652
1653         bt_unlockpage(BtLockRead, prevlatch);
1654         bt_unpinlatch (prevlatch);
1655
1656         bt_lockpage(BtLockRead, set->latch);
1657         bt_unlockpage(BtLockAccess, set->latch);
1658         return 1;
1659 }
1660
1661 //      find unique key or first duplicate key in
1662 //      leaf level and return number of value bytes
1663 //      or (-1) if not found.  Setup key for bt_foundkey
1664
1665 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1666 {
1667 BtPageSet set[1];
1668 uint len, slot;
1669 int ret = -1;
1670 BtKey *ptr;
1671 BtVal *val;
1672
1673   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1674    do {
1675         ptr = keyptr(set->page, slot);
1676
1677         //      skip librarian slot place holder
1678
1679         if( slotptr(set->page, slot)->type == Librarian )
1680                 ptr = keyptr(set->page, ++slot);
1681
1682         //      return actual key found
1683
1684         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1685         len = ptr->len;
1686
1687         if( slotptr(set->page, slot)->type == Duplicate )
1688                 len -= BtId;
1689
1690         //      not there if we reach the stopper key
1691
1692         if( slot == set->page->cnt )
1693           if( !bt_getid (set->page->right) )
1694                 break;
1695
1696         // if key exists, return >= 0 value bytes copied
1697         //      otherwise return (-1)
1698
1699         if( slotptr(set->page, slot)->dead )
1700                 continue;
1701
1702         if( keylen == len )
1703           if( !memcmp (ptr->key, key, len) ) {
1704                 val = valptr (set->page,slot);
1705                 if( valmax > val->len )
1706                         valmax = val->len;
1707                 memcpy (value, val->value, valmax);
1708                 ret = valmax;
1709           }
1710
1711         break;
1712
1713    } while( slot = bt_findnext (bt, set, slot) );
1714
1715   bt_unlockpage (BtLockRead, set->latch);
1716   bt_unpinlatch (set->latch);
1717   return ret;
1718 }
1719
1720 //      check page for space available,
1721 //      clean if necessary and return
1722 //      0 - page needs splitting
1723 //      >0  new slot value
1724
1725 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1726 {
1727 uint nxt = bt->mgr->page_size;
1728 BtPage page = set->page;
1729 uint cnt = 0, idx = 0;
1730 uint max = page->cnt;
1731 uint newslot = max;
1732 BtKey *key;
1733 BtVal *val;
1734
1735         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1736                 return slot;
1737
1738         //      skip cleanup and proceed to split
1739         //      if there's not enough garbage
1740         //      to bother with.
1741
1742         if( page->garbage < nxt / 5 )
1743                 return 0;
1744
1745         memcpy (bt->frame, page, bt->mgr->page_size);
1746
1747         // skip page info and set rest of page to zero
1748
1749         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1750         set->latch->dirty = 1;
1751         page->garbage = 0;
1752         page->act = 0;
1753
1754         // clean up page first by
1755         // removing deleted keys
1756
1757         while( cnt++ < max ) {
1758                 if( cnt == slot )
1759                         newslot = idx + 2;
1760                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1761                         continue;
1762
1763                 // copy the value across
1764
1765                 val = valptr(bt->frame, cnt);
1766                 nxt -= val->len + sizeof(BtVal);
1767                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1768
1769                 // copy the key across
1770
1771                 key = keyptr(bt->frame, cnt);
1772                 nxt -= key->len + sizeof(BtKey);
1773                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1774
1775                 // make a librarian slot
1776
1777                 if( idx ) {
1778                         slotptr(page, ++idx)->off = nxt;
1779                         slotptr(page, idx)->type = Librarian;
1780                         slotptr(page, idx)->dead = 1;
1781                 }
1782
1783                 // set up the slot
1784
1785                 slotptr(page, ++idx)->off = nxt;
1786                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1787
1788                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1789                         page->act++;
1790         }
1791
1792         page->min = nxt;
1793         page->cnt = idx;
1794
1795         //      see if page has enough space now, or does it need splitting?
1796
1797         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1798                 return newslot;
1799
1800         return 0;
1801 }
1802
1803 // split the root and raise the height of the btree
1804
1805 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1806 {  
1807 unsigned char leftkey[BT_keyarray];
1808 uint nxt = bt->mgr->page_size;
1809 unsigned char value[BtId];
1810 BtPageSet left[1];
1811 uid left_page_no;
1812 BtKey *ptr;
1813 BtVal *val;
1814
1815         //      save left page fence key for new root
1816
1817         ptr = keyptr(root->page, root->page->cnt);
1818         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1819
1820         //  Obtain an empty page to use, and copy the current
1821         //  root contents into it, e.g. lower keys
1822
1823         if( bt_newpage(bt, left, root->page) )
1824                 return bt->err;
1825
1826         left_page_no = left->latch->page_no;
1827         bt_unpinlatch (left->latch);
1828
1829         // preserve the page info at the bottom
1830         // of higher keys and set rest to zero
1831
1832         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1833
1834         // insert stopper key at top of newroot page
1835         // and increase the root height
1836
1837         nxt -= BtId + sizeof(BtVal);
1838         bt_putid (value, right->page_no);
1839         val = (BtVal *)((unsigned char *)root->page + nxt);
1840         memcpy (val->value, value, BtId);
1841         val->len = BtId;
1842
1843         nxt -= 2 + sizeof(BtKey);
1844         slotptr(root->page, 2)->off = nxt;
1845         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1846         ptr->len = 2;
1847         ptr->key[0] = 0xff;
1848         ptr->key[1] = 0xff;
1849
1850         // insert lower keys page fence key on newroot page as first key
1851
1852         nxt -= BtId + sizeof(BtVal);
1853         bt_putid (value, left_page_no);
1854         val = (BtVal *)((unsigned char *)root->page + nxt);
1855         memcpy (val->value, value, BtId);
1856         val->len = BtId;
1857
1858         ptr = (BtKey *)leftkey;
1859         nxt -= ptr->len + sizeof(BtKey);
1860         slotptr(root->page, 1)->off = nxt;
1861         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1862         
1863         bt_putid(root->page->right, 0);
1864         root->page->min = nxt;          // reset lowest used offset and key count
1865         root->page->cnt = 2;
1866         root->page->act = 2;
1867         root->page->lvl++;
1868
1869         // release and unpin root pages
1870
1871         bt_unlockpage(BtLockWrite, root->latch);
1872         bt_unpinlatch (root->latch);
1873
1874         bt_unpinlatch (right);
1875         return 0;
1876 }
1877
1878 //  split already locked full node
1879 //      leave it locked.
1880 //      return pool entry for new right
1881 //      page, unlocked
1882
1883 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1884 {
1885 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1886 uint lvl = set->page->lvl;
1887 BtPageSet right[1];
1888 BtKey *key, *ptr;
1889 BtVal *val, *src;
1890 uid right2;
1891 uint prev;
1892
1893         //  split higher half of keys to bt->frame
1894
1895         memset (bt->frame, 0, bt->mgr->page_size);
1896         max = set->page->cnt;
1897         cnt = max / 2;
1898         idx = 0;
1899
1900         while( cnt++ < max ) {
1901                 if( slotptr(set->page, cnt)->dead && cnt < max )
1902                         continue;
1903                 src = valptr(set->page, cnt);
1904                 nxt -= src->len + sizeof(BtVal);
1905                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1906
1907                 key = keyptr(set->page, cnt);
1908                 nxt -= key->len + sizeof(BtKey);
1909                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1910                 memcpy (ptr, key, key->len + sizeof(BtKey));
1911
1912                 //      add librarian slot
1913
1914                 if( idx ) {
1915                         slotptr(bt->frame, ++idx)->off = nxt;
1916                         slotptr(bt->frame, idx)->type = Librarian;
1917                         slotptr(bt->frame, idx)->dead = 1;
1918                 }
1919
1920                 //  add actual slot
1921
1922                 slotptr(bt->frame, ++idx)->off = nxt;
1923                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1924
1925                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1926                         bt->frame->act++;
1927         }
1928
1929         bt->frame->bits = bt->mgr->page_bits;
1930         bt->frame->min = nxt;
1931         bt->frame->cnt = idx;
1932         bt->frame->lvl = lvl;
1933
1934         // link right node
1935
1936         if( set->latch->page_no > ROOT_page )
1937                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1938
1939         //      get new free page and write higher keys to it.
1940
1941         if( bt_newpage(bt, right, bt->frame) )
1942                 return 0;
1943
1944         memcpy (bt->frame, set->page, bt->mgr->page_size);
1945         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1946         set->latch->dirty = 1;
1947
1948         nxt = bt->mgr->page_size;
1949         set->page->garbage = 0;
1950         set->page->act = 0;
1951         max /= 2;
1952         cnt = 0;
1953         idx = 0;
1954
1955         if( slotptr(bt->frame, max)->type == Librarian )
1956                 max--;
1957
1958         //  assemble page of smaller keys
1959
1960         while( cnt++ < max ) {
1961                 if( slotptr(bt->frame, cnt)->dead )
1962                         continue;
1963                 val = valptr(bt->frame, cnt);
1964                 nxt -= val->len + sizeof(BtVal);
1965                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1966
1967                 key = keyptr(bt->frame, cnt);
1968                 nxt -= key->len + sizeof(BtKey);
1969                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1970
1971                 //      add librarian slot
1972
1973                 if( idx ) {
1974                         slotptr(set->page, ++idx)->off = nxt;
1975                         slotptr(set->page, idx)->type = Librarian;
1976                         slotptr(set->page, idx)->dead = 1;
1977                 }
1978
1979                 //      add actual slot
1980
1981                 slotptr(set->page, ++idx)->off = nxt;
1982                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1983                 set->page->act++;
1984         }
1985
1986         bt_putid(set->page->right, right->latch->page_no);
1987         set->page->min = nxt;
1988         set->page->cnt = idx;
1989
1990         return right->latch->entry;
1991 }
1992
1993 //      fix keys for newly split page
1994 //      call with page locked, return
1995 //      unlocked
1996
1997 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
1998 {
1999 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2000 unsigned char value[BtId];
2001 uint lvl = set->page->lvl;
2002 BtPage page;
2003 BtKey *ptr;
2004
2005         // if current page is the root page, split it
2006
2007         if( set->latch->page_no == ROOT_page )
2008                 return bt_splitroot (bt, set, right);
2009
2010         ptr = keyptr(set->page, set->page->cnt);
2011         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2012
2013         page = bt_mappage (bt, right);
2014
2015         ptr = keyptr(page, page->cnt);
2016         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2017
2018         // insert new fences in their parent pages
2019
2020         bt_lockpage (BtLockParent, right);
2021
2022         bt_lockpage (BtLockParent, set->latch);
2023         bt_unlockpage (BtLockWrite, set->latch);
2024
2025         // insert new fence for reformulated left block of smaller keys
2026
2027         bt_putid (value, set->latch->page_no);
2028         ptr = (BtKey *)leftkey;
2029
2030         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2031                 return bt->err;
2032
2033         // switch fence for right block of larger keys to new right page
2034
2035         bt_putid (value, right->page_no);
2036         ptr = (BtKey *)rightkey;
2037
2038         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2039                 return bt->err;
2040
2041         bt_unlockpage (BtLockParent, set->latch);
2042         bt_unpinlatch (set->latch);
2043
2044         bt_unlockpage (BtLockParent, right);
2045         bt_unpinlatch (right);
2046         return 0;
2047 }
2048
2049 //      install new key and value onto page
2050 //      page must already be checked for
2051 //      adequate space
2052
2053 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2054 {
2055 uint idx, librarian;
2056 BtSlot *node;
2057 BtKey *ptr;
2058 BtVal *val;
2059
2060         //      if found slot > desired slot and previous slot
2061         //      is a librarian slot, use it
2062
2063         if( slot > 1 )
2064           if( slotptr(set->page, slot-1)->type == Librarian )
2065                 slot--;
2066
2067         // copy value onto page
2068
2069         set->page->min -= vallen + sizeof(BtVal);
2070         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2071         memcpy (val->value, value, vallen);
2072         val->len = vallen;
2073
2074         // copy key onto page
2075
2076         set->page->min -= keylen + sizeof(BtKey);
2077         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2078         memcpy (ptr->key, key, keylen);
2079         ptr->len = keylen;
2080         
2081         //      find first empty slot
2082
2083         for( idx = slot; idx < set->page->cnt; idx++ )
2084           if( slotptr(set->page, idx)->dead )
2085                 break;
2086
2087         // now insert key into array before slot
2088
2089         if( idx == set->page->cnt )
2090                 idx += 2, set->page->cnt += 2, librarian = 2;
2091         else
2092                 librarian = 1;
2093
2094         set->latch->dirty = 1;
2095         set->page->act++;
2096
2097         while( idx > slot + librarian - 1 )
2098                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2099
2100         //      add librarian slot
2101
2102         if( librarian > 1 ) {
2103                 node = slotptr(set->page, slot++);
2104                 node->off = set->page->min;
2105                 node->type = Librarian;
2106                 node->dead = 1;
2107         }
2108
2109         //      fill in new slot
2110
2111         node = slotptr(set->page, slot);
2112         node->off = set->page->min;
2113         node->type = type;
2114         node->dead = 0;
2115
2116         if( release ) {
2117                 bt_unlockpage (BtLockWrite, set->latch);
2118                 bt_unpinlatch (set->latch);
2119         }
2120
2121         return 0;
2122 }
2123
2124 //  Insert new key into the btree at given level.
2125 //      either add a new key or update/add an existing one
2126
2127 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2128 {
2129 unsigned char newkey[BT_keyarray];
2130 uint slot, idx, len, entry;
2131 BtPageSet set[1];
2132 BtKey *ptr, *ins;
2133 uid sequence;
2134 BtVal *val;
2135 uint type;
2136
2137   // set up the key we're working on
2138
2139   ins = (BtKey*)newkey;
2140   memcpy (ins->key, key, keylen);
2141   ins->len = keylen;
2142
2143   // is this a non-unique index value?
2144
2145   if( unique )
2146         type = Unique;
2147   else {
2148         type = Duplicate;
2149         sequence = bt_newdup (bt);
2150         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2151         ins->len += BtId;
2152   }
2153
2154   while( 1 ) { // find the page and slot for the current key
2155         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2156                 ptr = keyptr(set->page, slot);
2157         else {
2158                 if( !bt->err )
2159                         bt->err = BTERR_ovflw;
2160                 return bt->err;
2161         }
2162
2163         // if librarian slot == found slot, advance to real slot
2164
2165         if( slotptr(set->page, slot)->type == Librarian )
2166           if( !keycmp (ptr, key, keylen) )
2167                 ptr = keyptr(set->page, ++slot);
2168
2169         len = ptr->len;
2170
2171         if( slotptr(set->page, slot)->type == Duplicate )
2172                 len -= BtId;
2173
2174         //  if inserting a duplicate key or unique key
2175         //      check for adequate space on the page
2176         //      and insert the new key before slot.
2177
2178         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2179           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2180                 if( !(entry = bt_splitpage (bt, set)) )
2181                   return bt->err;
2182                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2183                   return bt->err;
2184                 else
2185                   continue;
2186
2187           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2188         }
2189
2190         // if key already exists, update value and return
2191
2192         val = valptr(set->page, slot);
2193
2194         if( val->len >= vallen ) {
2195                 if( slotptr(set->page, slot)->dead )
2196                         set->page->act++;
2197                 set->page->garbage += val->len - vallen;
2198                 set->latch->dirty = 1;
2199                 slotptr(set->page, slot)->dead = 0;
2200                 val->len = vallen;
2201                 memcpy (val->value, value, vallen);
2202                 bt_unlockpage(BtLockWrite, set->latch);
2203                 bt_unpinlatch (set->latch);
2204                 return 0;
2205         }
2206
2207         //      new update value doesn't fit in existing value area
2208
2209         if( !slotptr(set->page, slot)->dead )
2210                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2211         else {
2212                 slotptr(set->page, slot)->dead = 0;
2213                 set->page->act++;
2214         }
2215
2216         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2217           if( !(entry = bt_splitpage (bt, set)) )
2218                 return bt->err;
2219           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2220                 return bt->err;
2221           else
2222                 continue;
2223
2224         set->page->min -= vallen + sizeof(BtVal);
2225         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2226         memcpy (val->value, value, vallen);
2227         val->len = vallen;
2228
2229         set->latch->dirty = 1;
2230         set->page->min -= keylen + sizeof(BtKey);
2231         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2232         memcpy (ptr->key, key, keylen);
2233         ptr->len = keylen;
2234         
2235         slotptr(set->page, slot)->off = set->page->min;
2236         bt_unlockpage(BtLockWrite, set->latch);
2237         bt_unpinlatch (set->latch);
2238         return 0;
2239   }
2240   return 0;
2241 }
2242
2243 typedef struct {
2244         uint entry;                     // latch table entry number
2245         uint slot:30;           // page slot number
2246         uint reuse:1;           // reused previous page
2247         uint emptied:1;         // page was emptied
2248 } AtomicMod;
2249
2250 typedef struct {
2251         uid page_no;            // page number for split leaf
2252         void *next;                     // next key to insert
2253         uint entry;                     // latch table entry number
2254         unsigned char leafkey[BT_keyarray];
2255 } AtomicKey;
2256
2257 //      determine actual page where key is located
2258 //  return slot number with set page locked
2259
2260 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2261 {
2262 BtKey *key = keyptr(source,src);
2263 uint slot = locks[src].slot;
2264 uint entry;
2265
2266         if( src > 1 && locks[src].reuse )
2267           entry = locks[src-1].entry, slot = 0;
2268         else
2269           entry = locks[src].entry;
2270
2271         if( slot ) {
2272                 set->latch = bt->mgr->latchsets + entry;
2273                 set->page = bt_mappage (bt, set->latch);
2274                 return slot;
2275         }
2276
2277         //      is locks->reuse set?
2278         //      if so, find where our key
2279         //      is located on previous page or split pages
2280
2281         do {
2282                 set->latch = bt->mgr->latchsets + entry;
2283                 set->page = bt_mappage (bt, set->latch);
2284
2285                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2286                   if( locks[src].reuse )
2287                         locks[src].entry = entry;
2288                   return slot;
2289                 }
2290         } while( entry = set->latch->split );
2291
2292         bt->err = BTERR_atomic;
2293         return 0;
2294 }
2295
2296 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2297 {
2298 BtKey *key = keyptr(source, src);
2299 BtVal *val = valptr(source, src);
2300 BtLatchSet *latch;
2301 BtPageSet set[1];
2302 uint entry;
2303
2304   while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2305         if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2306           return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2307
2308         if( entry = bt_splitpage (bt, set) )
2309           latch = bt->mgr->latchsets + entry;
2310         else
2311           return bt->err;
2312
2313         //      splice right page into split chain
2314         //      and WriteLock it.
2315
2316         latch->split = set->latch->split;
2317         set->latch->split = entry;
2318         bt_lockpage(BtLockWrite, latch);
2319   }
2320
2321   return bt->err = BTERR_atomic;
2322 }
2323
2324 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2325 {
2326 BtKey *key = keyptr(source, src);
2327 uint idx, entry, slot;
2328 BtPageSet set[1];
2329 BtKey *ptr;
2330 BtVal *val;
2331
2332         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2333                 slotptr(set->page, slot)->dead = 1;
2334         else
2335                 return bt->err = BTERR_struct;
2336
2337         ptr = keyptr(set->page, slot);
2338         val = valptr(set->page, slot);
2339
2340         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2341         set->page->act--;
2342
2343         // collapse empty slots beneath the fence
2344
2345         while( idx = set->page->cnt - 1 )
2346           if( slotptr(set->page, idx)->dead ) {
2347                 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2348                 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2349           } else
2350                 break;
2351
2352         set->latch->dirty = 1;
2353         return 0;
2354 }
2355
2356 //      qsort comparison function
2357
2358 int bt_qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2359 {
2360 BtKey *key1 = (BtKey *)((unsigned char *)page + slot1->off);
2361 BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off);
2362
2363         return keycmp (key1, key2->key, key2->len);
2364 }
2365
2366 //      atomic modification of a batch of keys.
2367
2368 //      return -1 if BTERR is set
2369 //      otherwise return slot number
2370 //      causing the key constraint violation
2371 //      or zero on successful completion.
2372
2373 int bt_atomicmods (BtDb *bt, BtPage source)
2374 {
2375 uint src, idx, slot, samepage, entry, next, split;
2376 AtomicKey *head, *tail, *leaf;
2377 BtPageSet set[1], prev[1];
2378 unsigned char value[BtId];
2379 BtKey *key, *ptr, *key2;
2380 BtLatchSet *latch;
2381 AtomicMod *locks;
2382 int result = 0;
2383 BtSlot temp[1];
2384 BtPage page;
2385 BtVal *val;
2386 int type;
2387
2388   locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2389   head = NULL;
2390   tail = NULL;
2391
2392   // first sort the list of keys into order to
2393   //    prevent deadlocks between threads.
2394   qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)bt_qsortcmp, source);
2395 /*
2396   for( src = 1; src++ < source->cnt; ) {
2397         *temp = *slotptr(source,src);
2398         key = keyptr (source,src);
2399
2400         for( idx = src; --idx; ) {
2401           key2 = keyptr (source,idx);
2402           if( keycmp (key, key2->key, key2->len) < 0 ) {
2403                 *slotptr(source,idx+1) = *slotptr(source,idx);
2404                 *slotptr(source,idx) = *temp;
2405           } else
2406                 break;
2407         }
2408   }
2409 */
2410   // Load the leafpage for each key
2411   // and determine any constraint violations
2412
2413   for( src = 0; src++ < source->cnt; ) {
2414         key = keyptr(source, src);
2415         val = valptr(source, src);
2416         slot = 0;
2417
2418         // first determine if this modification falls
2419         // on the same page as the previous modification
2420         //      note that the far right leaf page is a special case
2421
2422         if( samepage = src > 1 )
2423           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 )
2424                 slot = bt_findslot(set->page, key->key, key->len);
2425           else // release read on previous page
2426                 bt_unlockpage(BtLockRead, set->latch); 
2427
2428         if( !slot )
2429           if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
2430                 set->latch->split = 0;
2431           else
2432                 return -1;
2433
2434         if( slotptr(set->page, slot)->type == Librarian )
2435           ptr = keyptr(set->page, ++slot);
2436         else
2437           ptr = keyptr(set->page, slot);
2438
2439         if( !samepage ) {
2440           locks[src].entry = set->latch->entry;
2441           locks[src].slot = slot;
2442           locks[src].reuse = 0;
2443         } else {
2444           locks[src].entry = 0;
2445           locks[src].slot = 0;
2446           locks[src].reuse = 1;
2447         }
2448
2449         switch( slotptr(source, src)->type ) {
2450         case Duplicate:
2451         case Unique:
2452           if( !slotptr(set->page, slot)->dead )
2453            if( slot < set->page->cnt || bt_getid (set->page->right) )
2454             if( ptr->len == key->len && !memcmp (ptr->key, key->key, key->len) ) {
2455
2456                   // return constraint violation if key already exists
2457
2458                   result = src;
2459
2460                   while( src ) {
2461                         if( locks[src].entry ) {
2462                           set->latch = bt->mgr->latchsets + locks[src].entry;
2463                           bt_unlockpage(BtLockAtomic, set->latch);
2464                           bt_unlockpage(BtLockRead, set->latch);
2465                           bt_unpinlatch (set->latch);
2466                         }
2467                         src--;
2468                   }
2469
2470                   return result;
2471                 }
2472
2473           break;
2474
2475         case Delete:
2476           if( !slotptr(set->page, slot)->dead )
2477            if( ptr->len == key->len )
2478                 if( !memcmp (ptr->key, key->key, key->len) )
2479               break;
2480
2481           //  Key to delete doesn't exist
2482
2483           result = src;
2484
2485           while( src ) {
2486                 if( locks[src].entry ) {
2487                   set->latch = bt->mgr->latchsets + locks[src].entry;
2488                   bt_unlockpage(BtLockAtomic, set->latch);
2489                   bt_unlockpage(BtLockRead, set->latch);
2490                   bt_unpinlatch (set->latch);
2491                 }
2492                 src--;
2493           }
2494
2495           return result;
2496         }
2497
2498   }
2499
2500   //  unlock last loadpage lock
2501
2502   if( source->cnt > 1 )
2503         bt_unlockpage(BtLockRead, set->latch);
2504
2505   //  obtain write lock for each page
2506
2507   for( src = 0; src++ < source->cnt; )
2508         if( locks[src].entry )
2509           bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2510
2511   // insert or delete each key
2512
2513   for( src = 0; src++ < source->cnt; )
2514         if( slotptr(source,src)->type == Delete )
2515           if( bt_atomicdelete (bt, source, locks, src) )
2516                 return -1;
2517           else
2518                 continue;
2519         else
2520           if( bt_atomicinsert (bt, source, locks, src) )
2521                 return -1;
2522           else
2523                 continue;
2524
2525   // set ParentModification and release WriteLock
2526   // leave empty pages locked for later processing
2527   // build queue of keys to insert for split pages
2528
2529   for( src = 0; src++ < source->cnt; ) {
2530         if( locks[src].reuse )
2531           continue;
2532
2533         prev->latch = bt->mgr->latchsets + locks[src].entry;
2534         prev->page = bt_mappage (bt, prev->latch);
2535
2536         //  pick-up all splits from original page
2537
2538         split = next = prev->latch->split;
2539
2540         if( !prev->page->act )
2541           locks[src].emptied = 1;
2542
2543         while( entry = next ) {
2544           set->latch = bt->mgr->latchsets + entry;
2545           set->page = bt_mappage (bt, set->latch);
2546           next = set->latch->split;
2547           set->latch->split = 0;
2548
2549           // delete empty previous page
2550
2551           if( !prev->page->act ) {
2552                 memcpy (prev->page, set->page, bt->mgr->page_size);
2553                 bt_lockpage (BtLockDelete, set->latch);
2554                 bt_freepage (bt, set);
2555
2556                 prev->latch->dirty = 1;
2557
2558                 if( prev->page->act )
2559                   locks[src].emptied = 0;
2560
2561                 continue;
2562           }
2563
2564           // prev page is not emptied 
2565           locks[src].emptied = 0;
2566
2567           //  schedule previous fence key update
2568
2569           ptr = keyptr(prev->page,prev->page->cnt);
2570           leaf = malloc (sizeof(AtomicKey));
2571
2572           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2573           leaf->page_no = prev->latch->page_no;
2574           leaf->entry = prev->latch->entry;
2575           leaf->next = NULL;
2576
2577           if( tail )
2578                 tail->next = leaf;
2579           else
2580                 head = leaf;
2581
2582           tail = leaf;
2583
2584           // remove empty block from the split chain
2585
2586           if( !set->page->act ) {
2587                 memcpy (prev->page->right, set->page->right, BtId);
2588                 bt_lockpage (BtLockDelete, set->latch);
2589                 bt_freepage (bt, set);
2590                 continue;
2591           }
2592
2593           bt_lockpage(BtLockParent, prev->latch);
2594           bt_unlockpage(BtLockWrite, prev->latch);
2595           *prev = *set;
2596         }
2597
2598         //  was entire chain emptied?
2599
2600         if( !prev->page->act )
2601                 continue;
2602
2603         if( !split ) {
2604                 bt_unlockpage(BtLockWrite, prev->latch);
2605                 continue;
2606         }
2607
2608         //      Process last page split in chain
2609
2610         ptr = keyptr(prev->page,prev->page->cnt);
2611         leaf = malloc (sizeof(AtomicKey));
2612
2613         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2614         leaf->page_no = prev->latch->page_no;
2615         leaf->entry = prev->latch->entry;
2616         leaf->next = NULL;
2617
2618         if( tail )
2619           tail->next = leaf;
2620         else
2621           head = leaf;
2622
2623         tail = leaf;
2624
2625         bt_lockpage(BtLockParent, prev->latch);
2626         bt_unlockpage(BtLockWrite, prev->latch);
2627   }
2628   
2629   // remove Atomic locks and
2630   // process any empty pages
2631
2632   for( src = source->cnt; src; src-- ) {
2633         if( locks[src].reuse )
2634           continue;
2635
2636         set->latch = bt->mgr->latchsets + locks[src].entry;
2637         set->page = bt_mappage (bt, set->latch);
2638
2639         //  clear original page split field
2640
2641         split = set->latch->split;
2642         set->latch->split = 0;
2643         bt_unlockpage (BtLockAtomic, set->latch);
2644
2645         //  delete page emptied by our atomic action
2646
2647         if( locks[src].emptied )
2648           if( bt_deletepage (bt, set) )
2649                 return bt->err;
2650           else
2651                 continue;
2652
2653         if( !split )
2654                 bt_unpinlatch (set->latch);
2655   }
2656
2657   //  add keys for any pages split during action
2658
2659   if( leaf = head )
2660     do {
2661           ptr = (BtKey *)leaf->leafkey;
2662           bt_putid (value, leaf->page_no);
2663
2664           if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2665                 return bt->err;
2666
2667           bt_unlockpage (BtLockParent, bt->mgr->latchsets + leaf->entry);
2668           bt_unpinlatch (bt->mgr->latchsets + leaf->entry);
2669           tail = leaf->next;
2670           free (leaf);
2671         } while( leaf = tail );
2672
2673   // return success
2674
2675   free (locks);
2676   return 0;
2677 }
2678
2679 //  return next slot on cursor page
2680 //  or slide cursor right into next page
2681
2682 uint bt_nextkey (BtDb *bt, uint slot)
2683 {
2684 BtPageSet set[1];
2685 uid right;
2686
2687   do {
2688         right = bt_getid(bt->cursor->right);
2689
2690         while( slot++ < bt->cursor->cnt )
2691           if( slotptr(bt->cursor,slot)->dead )
2692                 continue;
2693           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2694                 return slot;
2695           else
2696                 break;
2697
2698         if( !right )
2699                 break;
2700
2701         bt->cursor_page = right;
2702
2703         if( set->latch = bt_pinlatch (bt, right, 1) )
2704                 set->page = bt_mappage (bt, set->latch);
2705         else
2706                 return 0;
2707
2708     bt_lockpage(BtLockRead, set->latch);
2709
2710         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2711
2712         bt_unlockpage(BtLockRead, set->latch);
2713         bt_unpinlatch (set->latch);
2714         slot = 0;
2715
2716   } while( 1 );
2717
2718   return bt->err = 0;
2719 }
2720
2721 //  cache page of keys into cursor and return starting slot for given key
2722
2723 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2724 {
2725 BtPageSet set[1];
2726 uint slot;
2727
2728         // cache page for retrieval
2729
2730         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2731           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2732         else
2733           return 0;
2734
2735         bt->cursor_page = set->latch->page_no;
2736
2737         bt_unlockpage(BtLockRead, set->latch);
2738         bt_unpinlatch (set->latch);
2739         return slot;
2740 }
2741
2742 BtKey *bt_key(BtDb *bt, uint slot)
2743 {
2744         return keyptr(bt->cursor, slot);
2745 }
2746
2747 BtVal *bt_val(BtDb *bt, uint slot)
2748 {
2749         return valptr(bt->cursor,slot);
2750 }
2751
2752 #ifdef STANDALONE
2753
2754 #ifndef unix
2755 double getCpuTime(int type)
2756 {
2757 FILETIME crtime[1];
2758 FILETIME xittime[1];
2759 FILETIME systime[1];
2760 FILETIME usrtime[1];
2761 SYSTEMTIME timeconv[1];
2762 double ans = 0;
2763
2764         memset (timeconv, 0, sizeof(SYSTEMTIME));
2765
2766         switch( type ) {
2767         case 0:
2768                 GetSystemTimeAsFileTime (xittime);
2769                 FileTimeToSystemTime (xittime, timeconv);
2770                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2771                 break;
2772         case 1:
2773                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2774                 FileTimeToSystemTime (usrtime, timeconv);
2775                 break;
2776         case 2:
2777                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2778                 FileTimeToSystemTime (systime, timeconv);
2779                 break;
2780         }
2781
2782         ans += (double)timeconv->wHour * 3600;
2783         ans += (double)timeconv->wMinute * 60;
2784         ans += (double)timeconv->wSecond;
2785         ans += (double)timeconv->wMilliseconds / 1000;
2786         return ans;
2787 }
2788 #else
2789 #include <time.h>
2790 #include <sys/resource.h>
2791
2792 double getCpuTime(int type)
2793 {
2794 struct rusage used[1];
2795 struct timeval tv[1];
2796
2797         switch( type ) {
2798         case 0:
2799                 gettimeofday(tv, NULL);
2800                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2801
2802         case 1:
2803                 getrusage(RUSAGE_SELF, used);
2804                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2805
2806         case 2:
2807                 getrusage(RUSAGE_SELF, used);
2808                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2809         }
2810
2811         return 0;
2812 }
2813 #endif
2814
2815 void bt_poolaudit (BtMgr *mgr)
2816 {
2817 BtLatchSet *latch;
2818 uint slot = 0;
2819
2820         while( slot++ < mgr->latchdeployed ) {
2821                 latch = mgr->latchsets + slot;
2822
2823                 if( *latch->readwr->rin & MASK )
2824                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2825                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2826
2827                 if( *latch->access->rin & MASK )
2828                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2829                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2830
2831                 if( *latch->parent->rin & MASK )
2832                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2833                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2834
2835                 if( latch->pin & ~CLOCK_bit ) {
2836                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2837                         latch->pin = 0;
2838                 }
2839         }
2840 }
2841
2842 uint bt_latchaudit (BtDb *bt)
2843 {
2844 ushort idx, hashidx;
2845 uid next, page_no;
2846 BtLatchSet *latch;
2847 uint cnt = 0;
2848 BtKey *ptr;
2849
2850         if( *(ushort *)(bt->mgr->lock) )
2851                 fprintf(stderr, "Alloc page locked\n");
2852         *(ushort *)(bt->mgr->lock) = 0;
2853
2854         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2855                 latch = bt->mgr->latchsets + idx;
2856                 if( *latch->readwr->rin & MASK )
2857                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2858                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2859
2860                 if( *latch->access->rin & MASK )
2861                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2862                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2863
2864                 if( *latch->parent->rin & MASK )
2865                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2866                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2867
2868                 if( latch->pin ) {
2869                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2870                         latch->pin = 0;
2871                 }
2872         }
2873
2874         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2875           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
2876                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2877
2878           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
2879
2880           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2881                 latch = bt->mgr->latchsets + idx;
2882                 if( latch->pin )
2883                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2884           } while( idx = latch->next );
2885         }
2886
2887         page_no = LEAF_page;
2888
2889         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2890         uid off = page_no << bt->mgr->page_bits;
2891 #ifdef unix
2892           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2893 #else
2894         DWORD amt[1];
2895
2896           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2897
2898           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2899                 return bt->err = BTERR_map;
2900
2901           if( *amt <  bt->mgr->page_size )
2902                 return bt->err = BTERR_map;
2903 #endif
2904                 if( !bt->frame->free && !bt->frame->lvl )
2905                         cnt += bt->frame->act;
2906                 page_no++;
2907         }
2908                 
2909         cnt--;  // remove stopper key
2910         fprintf(stderr, " Total keys read %d\n", cnt);
2911
2912         bt_close (bt);
2913         return 0;
2914 }
2915
2916 typedef struct {
2917         char idx;
2918         char *type;
2919         char *infile;
2920         BtMgr *mgr;
2921         int num;
2922 } ThreadArg;
2923
2924 //  standalone program to index file of keys
2925 //  then list them onto std-out
2926
2927 #ifdef unix
2928 void *index_file (void *arg)
2929 #else
2930 uint __stdcall index_file (void *arg)
2931 #endif
2932 {
2933 int line = 0, found = 0, cnt = 0, unique;
2934 uid next, page_no = LEAF_page;  // start on first page of leaves
2935 unsigned char key[BT_maxkey];
2936 unsigned char txn[65536];
2937 ThreadArg *args = arg;
2938 int ch, len = 0, slot;
2939 BtPageSet set[1];
2940 uint nxt = 65536;
2941 BtPage page;
2942 BtKey *ptr;
2943 BtVal *val;
2944 BtDb *bt;
2945 FILE *in;
2946
2947         bt = bt_open (args->mgr);
2948         page = (BtPage)txn;
2949
2950         unique = (args->type[1] | 0x20) != 'd';
2951
2952         switch(args->type[0] | 0x20)
2953         {
2954         case 'a':
2955                 fprintf(stderr, "started latch mgr audit\n");
2956                 cnt = bt_latchaudit (bt);
2957                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2958                 break;
2959
2960         case 't':
2961                 fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
2962                 if( in = fopen (args->infile, "rb") )
2963                   while( ch = getc(in), ch != EOF )
2964                         if( ch == '\n' )
2965                         {
2966                           line++;
2967                           nxt -= len - 10;
2968                           memcpy (txn + nxt, key + 10, len - 10);
2969                           nxt -= 1;
2970                           txn[nxt] = len - 10;
2971                           nxt -= 10;
2972                           memcpy (txn + nxt, key, 10);
2973                           nxt -= 1;
2974                           txn[nxt] = 10;
2975                           slotptr(page,++cnt)->off  = nxt;
2976                           slotptr(page,cnt)->type = Unique;
2977                           len = 0;
2978
2979                           if( cnt < 25 )
2980                                 continue;
2981
2982                           page->cnt = cnt;
2983                           page->act = cnt;
2984                           page->min = nxt;
2985
2986                           if( bt_atomicmods (bt, page) )
2987                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2988                           nxt = 65536;
2989                           cnt = 0;
2990
2991                         }
2992                         else if( len < BT_maxkey )
2993                                 key[len++] = ch;
2994                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2995                 break;
2996
2997         case 'p':
2998                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2999                 if( in = fopen (args->infile, "rb") )
3000                   while( ch = getc(in), ch != EOF )
3001                         if( ch == '\n' )
3002                         {
3003                           line++;
3004
3005                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
3006                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3007                           len = 0;
3008                         }
3009                         else if( len < BT_maxkey )
3010                                 key[len++] = ch;
3011                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3012                 break;
3013
3014         case 'w':
3015                 fprintf(stderr, "started indexing for %s\n", args->infile);
3016                 if( in = fopen (args->infile, "r") )
3017                   while( ch = getc(in), ch != EOF )
3018                         if( ch == '\n' )
3019                         {
3020                           line++;
3021
3022                           if( args->num == 1 )
3023                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3024
3025                           else if( args->num )
3026                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3027
3028                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
3029                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3030                           len = 0;
3031                         }
3032                         else if( len < BT_maxkey )
3033                                 key[len++] = ch;
3034                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3035                 break;
3036
3037         case 'd':
3038                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
3039                 if( in = fopen (args->infile, "rb") )
3040                   while( ch = getc(in), ch != EOF )
3041                         if( ch == '\n' )
3042                         {
3043                           line++;
3044                           if( args->num == 1 )
3045                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3046
3047                           else if( args->num )
3048                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3049
3050                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
3051                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
3052                           ptr = (BtKey*)(bt->key);
3053                           found++;
3054
3055                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
3056                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3057                           len = 0;
3058                         }
3059                         else if( len < BT_maxkey )
3060                                 key[len++] = ch;
3061                 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3062                 break;
3063
3064         case 'f':
3065                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3066                 if( in = fopen (args->infile, "rb") )
3067                   while( ch = getc(in), ch != EOF )
3068                         if( ch == '\n' )
3069                         {
3070                           line++;
3071                           if( args->num == 1 )
3072                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3073
3074                           else if( args->num )
3075                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3076
3077                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3078                                 found++;
3079                           else if( bt->err )
3080                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3081                           len = 0;
3082                         }
3083                         else if( len < BT_maxkey )
3084                                 key[len++] = ch;
3085                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3086                 break;
3087
3088         case 's':
3089                 fprintf(stderr, "started scanning\n");
3090                 do {
3091                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3092                                 set->page = bt_mappage (bt, set->latch);
3093                         else
3094                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3095                         bt_lockpage (BtLockRead, set->latch);
3096                         next = bt_getid (set->page->right);
3097
3098                         for( slot = 0; slot++ < set->page->cnt; )
3099                          if( next || slot < set->page->cnt )
3100                           if( !slotptr(set->page, slot)->dead ) {
3101                                 ptr = keyptr(set->page, slot);
3102                                 len = ptr->len;
3103
3104                             if( slotptr(set->page, slot)->type == Duplicate )
3105                                         len -= BtId;
3106
3107                                 fwrite (ptr->key, len, 1, stdout);
3108                                 val = valptr(set->page, slot);
3109                                 fwrite (val->value, val->len, 1, stdout);
3110                                 fputc ('\n', stdout);
3111                                 cnt++;
3112                            }
3113
3114                         bt_unlockpage (BtLockRead, set->latch);
3115                         bt_unpinlatch (set->latch);
3116                 } while( page_no = next );
3117
3118                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3119                 break;
3120
3121         case 'c':
3122 #ifdef unix
3123                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3124 #endif
3125                 fprintf(stderr, "started counting\n");
3126                 page_no = LEAF_page;
3127
3128                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3129                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3130                                 break;
3131
3132                         if( !bt->frame->free && !bt->frame->lvl )
3133                                 cnt += bt->frame->act;
3134
3135                         bt->reads++;
3136                         page_no++;
3137                 }
3138                 
3139                 cnt--;  // remove stopper key
3140                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3141                 break;
3142         }
3143
3144         bt_close (bt);
3145 #ifdef unix
3146         return NULL;
3147 #else
3148         return 0;
3149 #endif
3150 }
3151
3152 typedef struct timeval timer;
3153
3154 int main (int argc, char **argv)
3155 {
3156 int idx, cnt, len, slot, err;
3157 int segsize, bits = 16;
3158 double start, stop;
3159 #ifdef unix
3160 pthread_t *threads;
3161 #else
3162 HANDLE *threads;
3163 #endif
3164 ThreadArg *args;
3165 uint poolsize = 0;
3166 float elapsed;
3167 int num = 0;
3168 char key[1];
3169 BtMgr *mgr;
3170 BtKey *ptr;
3171 BtDb *bt;
3172
3173         if( argc < 3 ) {
3174                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3175                 fprintf (stderr, "  where page_bits is the page size in bits\n");
3176                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3177                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
3178                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3179                 exit(0);
3180         }
3181
3182         start = getCpuTime(0);
3183
3184         if( argc > 3 )
3185                 bits = atoi(argv[3]);
3186
3187         if( argc > 4 )
3188                 poolsize = atoi(argv[4]);
3189
3190         if( !poolsize )
3191                 fprintf (stderr, "Warning: no mapped_pool\n");
3192
3193         if( argc > 5 )
3194                 num = atoi(argv[5]);
3195
3196         cnt = argc - 6;
3197 #ifdef unix
3198         threads = malloc (cnt * sizeof(pthread_t));
3199 #else
3200         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3201 #endif
3202         args = malloc (cnt * sizeof(ThreadArg));
3203
3204         mgr = bt_mgr ((argv[1]), bits, poolsize);
3205
3206         if( !mgr ) {
3207                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3208                 exit (1);
3209         }
3210
3211         //      fire off threads
3212
3213         for( idx = 0; idx < cnt; idx++ ) {
3214                 args[idx].infile = argv[idx + 6];
3215                 args[idx].type = argv[2];
3216                 args[idx].mgr = mgr;
3217                 args[idx].num = num;
3218                 args[idx].idx = idx;
3219 #ifdef unix
3220                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3221                         fprintf(stderr, "Error creating thread %d\n", err);
3222 #else
3223                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3224 #endif
3225         }
3226
3227         //      wait for termination
3228
3229 #ifdef unix
3230         for( idx = 0; idx < cnt; idx++ )
3231                 pthread_join (threads[idx], NULL);
3232 #else
3233         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3234
3235         for( idx = 0; idx < cnt; idx++ )
3236                 CloseHandle(threads[idx]);
3237
3238 #endif
3239         bt_poolaudit(mgr);
3240         bt_mgrclose (mgr);
3241
3242         elapsed = getCpuTime(0) - start;
3243         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3244         elapsed = getCpuTime(1);
3245         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3246         elapsed = getCpuTime(2);
3247         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3248 }
3249
3250 #endif  //STANDALONE