]> pd.if.org Git - btree/blob - threadskv8.c
Fix small bug in main when there is less t han one input file
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and ACID batched key-value updates
9
10 // 26 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 /*
88 There are six lock types for each node in four independent sets: 
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
95 */
96
97 typedef enum{
98         BtLockAccess = 1,
99         BtLockDelete = 2,
100         BtLockRead   = 4,
101         BtLockWrite  = 8,
102         BtLockParent = 16,
103         BtLockAtomic = 32
104 } BtLock;
105
106 //      definition for phase-fair reader/writer lock implementation
107
108 typedef struct {
109         ushort rin[1];
110         ushort rout[1];
111         ushort ticket[1];
112         ushort serving[1];
113 } RWLock;
114
115 #define PHID 0x1
116 #define PRES 0x2
117 #define MASK 0x3
118 #define RINC 0x4
119
120 //      definition for spin latch implementation
121
122 // exclusive is set for write access
123 // share is count of read accessors
124 // grant write lock when share == 0
125
126 volatile typedef struct {
127         ushort exclusive:1;
128         ushort pending:1;
129         ushort share:14;
130 } BtSpinLatch;
131
132 #define XCL 1
133 #define PEND 2
134 #define BOTH 3
135 #define SHARE 4
136
137 //      write only reentrant lock
138
139 typedef struct {
140         BtSpinLatch xcl[1];
141         volatile ushort tid[1];
142         volatile ushort dup[1];
143 } WOLock;
144
145 //  hash table entries
146
147 typedef struct {
148         volatile uint slot;             // Latch table entry at head of chain
149         BtSpinLatch latch[1];
150 } BtHashEntry;
151
152 //      latch manager table structure
153
154 typedef struct {
155         uid page_no;                    // latch set page number
156         RWLock readwr[1];               // read/write page lock
157         RWLock access[1];               // Access Intent/Page delete
158         WOLock parent[1];               // Posting of fence key in parent
159         WOLock atomic[1];               // Atomic update in progress
160         uint split;                             // right split page atomic insert
161         uint entry;                             // entry slot in latch table
162         uint next;                              // next entry in hash table chain
163         uint prev;                              // prev entry in hash table chain
164         volatile ushort pin;    // number of outstanding threads
165         ushort dirty:1;                 // page in cache is dirty
166 } BtLatchSet;
167
168 //      Define the length of the page record numbers
169
170 #define BtId 6
171
172 //      Page key slot definition.
173
174 //      Keys are marked dead, but remain on the page until
175 //      it cleanup is called. The fence key (highest key) for
176 //      a leaf page is always present, even after cleanup.
177
178 //      Slot types
179
180 //      In addition to the Unique keys that occupy slots
181 //      there are Librarian and Duplicate key
182 //      slots occupying the key slot array.
183
184 //      The Librarian slots are dead keys that
185 //      serve as filler, available to add new Unique
186 //      or Dup slots that are inserted into the B-tree.
187
188 //      The Duplicate slots have had their key bytes extended
189 //      by 6 bytes to contain a binary duplicate key uniqueifier.
190
191 typedef enum {
192         Unique,
193         Librarian,
194         Duplicate,
195         Delete,
196         Update
197 } BtSlotType;
198
199 typedef struct {
200         uint off:BT_maxbits;    // page offset for key start
201         uint type:3;                    // type of slot
202         uint dead:1;                    // set for deleted slot
203 } BtSlot;
204
205 //      The key structure occupies space at the upper end of
206 //      each page.  It's a length byte followed by the key
207 //      bytes.
208
209 typedef struct {
210         unsigned char len;              // this can be changed to a ushort or uint
211         unsigned char key[0];
212 } BtKey;
213
214 //      the value structure also occupies space at the upper
215 //      end of the page. Each key is immediately followed by a value.
216
217 typedef struct {
218         unsigned char len;              // this can be changed to a ushort or uint
219         unsigned char value[0];
220 } BtVal;
221
222 #define BT_maxkey       255             // maximum number of bytes in a key
223 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
224
225 //      The first part of an index page.
226 //      It is immediately followed
227 //      by the BtSlot array of keys.
228
229 //      note that this structure size
230 //      must be a multiple of 8 bytes
231 //      in order to place dups correctly.
232
233 typedef struct BtPage_ {
234         uint cnt;                                       // count of keys in page
235         uint act;                                       // count of active keys
236         uint min;                                       // next key offset
237         uint garbage;                           // page garbage in bytes
238         unsigned char bits:7;           // page size in bits
239         unsigned char free:1;           // page is on free chain
240         unsigned char lvl:7;            // level of page
241         unsigned char kill:1;           // page is being deleted
242         unsigned char right[BtId];      // page number to right
243         unsigned char left[BtId];       // page number to left
244         unsigned char filler[2];        // padding to multiple of 8
245 } *BtPage;
246
247 //  The loadpage interface object
248
249 typedef struct {
250         BtPage page;            // current page pointer
251         BtLatchSet *latch;      // current page latch set
252 } BtPageSet;
253
254 //      structure for latch manager on ALLOC_page
255
256 typedef struct {
257         struct BtPage_ alloc[1];        // next page_no in right ptr
258         unsigned long long dups[1];     // global duplicate key uniqueifier
259         unsigned char chain[BtId];      // head of free page_nos chain
260 } BtPageZero;
261
262 //      The object structure for Btree access
263
264 typedef struct {
265         uint page_size;                         // page size    
266         uint page_bits;                         // page size in bits    
267 #ifdef unix
268         int idx;
269 #else
270         HANDLE idx;
271 #endif
272         BtPageZero *pagezero;           // mapped allocation page
273         BtSpinLatch lock[1];            // allocation area lite latch
274         uint latchdeployed;                     // highest number of latch entries deployed
275         uint nlatchpage;                        // number of latch pages at BT_latch
276         uint latchtotal;                        // number of page latch entries
277         uint latchhash;                         // number of latch hash table slots
278         uint latchvictim;                       // next latch entry to examine
279         ushort thread_no[1];            // next thread number
280         BtHashEntry *hashtable;         // the buffer pool hash table entries
281         BtLatchSet *latchsets;          // mapped latch set from buffer pool
282         unsigned char *pagepool;        // mapped to the buffer pool pages
283 #ifndef unix
284         HANDLE halloc;                          // allocation handle
285         HANDLE hpool;                           // buffer pool handle
286 #endif
287 } BtMgr;
288
289 typedef struct {
290         BtMgr *mgr;                             // buffer manager for thread
291         BtPage cursor;                  // cached frame for start/next (never mapped)
292         BtPage frame;                   // spare frame for the page split (never mapped)
293         uid cursor_page;                                // current cursor page number   
294         unsigned char *mem;                             // frame, cursor, page memory buffer
295         unsigned char key[BT_keyarray]; // last found complete key
296         int found;                              // last delete or insert was found
297         int err;                                // last error
298         int reads, writes;              // number of reads and writes from the btree
299         ushort thread_no;               // thread number
300 } BtDb;
301
302 typedef enum {
303         BTERR_ok = 0,
304         BTERR_struct,
305         BTERR_ovflw,
306         BTERR_lock,
307         BTERR_map,
308         BTERR_read,
309         BTERR_wrt,
310         BTERR_atomic
311 } BTERR;
312
313 #define CLOCK_bit 0x8000
314
315 // B-Tree functions
316 extern void bt_close (BtDb *bt);
317 extern BtDb *bt_open (BtMgr *mgr);
318 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
319 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
320 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
321 extern BtKey *bt_foundkey (BtDb *bt);
322 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
323 extern uint bt_nextkey   (BtDb *bt, uint slot);
324
325 //      manager functions
326 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
327 void bt_mgrclose (BtMgr *mgr);
328
329 //  Helper functions to return slot values
330 //      from the cursor page.
331
332 extern BtKey *bt_key (BtDb *bt, uint slot);
333 extern BtVal *bt_val (BtDb *bt, uint slot);
334
335 //  The page is allocated from low and hi ends.
336 //  The key slots are allocated from the bottom,
337 //      while the text and value of the key
338 //  are allocated from the top.  When the two
339 //  areas meet, the page is split into two.
340
341 //  A key consists of a length byte, two bytes of
342 //  index number (0 - 65535), and up to 253 bytes
343 //  of key value.
344
345 //  Associated with each key is a value byte string
346 //      containing any value desired.
347
348 //  The b-tree root is always located at page 1.
349 //      The first leaf page of level zero is always
350 //      located on page 2.
351
352 //      The b-tree pages are linked with next
353 //      pointers to facilitate enumerators,
354 //      and provide for concurrency.
355
356 //      When to root page fills, it is split in two and
357 //      the tree height is raised by a new root at page
358 //      one with two keys.
359
360 //      Deleted keys are marked with a dead bit until
361 //      page cleanup. The fence key for a leaf node is
362 //      always present
363
364 //  To achieve maximum concurrency one page is locked at a time
365 //  as the tree is traversed to find leaf key in question. The right
366 //  page numbers are used in cases where the page is being split,
367 //      or consolidated.
368
369 //  Page 0 is dedicated to lock for new page extensions,
370 //      and chains empty pages together for reuse. It also
371 //      contains the latch manager hash table.
372
373 //      The ParentModification lock on a node is obtained to serialize posting
374 //      or changing the fence key for a node.
375
376 //      Empty pages are chained together through the ALLOC page and reused.
377
378 //      Access macros to address slot and key values from the page
379 //      Page slots use 1 based indexing.
380
381 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
382 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
383 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
384
385 void bt_putid(unsigned char *dest, uid id)
386 {
387 int i = BtId;
388
389         while( i-- )
390                 dest[i] = (unsigned char)id, id >>= 8;
391 }
392
393 uid bt_getid(unsigned char *src)
394 {
395 uid id = 0;
396 int i;
397
398         for( i = 0; i < BtId; i++ )
399                 id <<= 8, id |= *src++; 
400
401         return id;
402 }
403
404 uid bt_newdup (BtDb *bt)
405 {
406 #ifdef unix
407         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
408 #else
409         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
410 #endif
411 }
412
413 void bt_spinreleasewrite(BtSpinLatch *latch);
414 void bt_spinwritelock(BtSpinLatch *latch);
415
416 //      Write-Only Reentrant Lock
417
418 void WriteOLock (WOLock *lock, ushort tid)
419 {
420   while( 1 ) {
421         bt_spinwritelock(lock->xcl);
422
423         if( *lock->tid == tid ) {
424                 *lock->dup += 1;
425                 bt_spinreleasewrite(lock->xcl);
426                 return;
427         }
428         if( !*lock->tid ) {
429                 *lock->tid = tid;
430                 bt_spinreleasewrite(lock->xcl);
431                 return;
432         }
433         bt_spinreleasewrite(lock->xcl);
434         sched_yield();
435   }
436 }
437
438 void WriteORelease (WOLock *lock)
439 {
440         if( *lock->dup ) {
441                 *lock->dup -= 1;
442                 return;
443         }
444
445         *lock->tid = 0;
446 }
447
448 //      Phase-Fair reader/writer lock implementation
449
450 void WriteLock (RWLock *lock)
451 {
452 ushort w, r, tix;
453
454 #ifdef unix
455         tix = __sync_fetch_and_add (lock->ticket, 1);
456 #else
457         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
458 #endif
459         // wait for our ticket to come up
460
461         while( tix != lock->serving[0] )
462 #ifdef unix
463                 sched_yield();
464 #else
465                 SwitchToThread ();
466 #endif
467
468         w = PRES | (tix & PHID);
469 #ifdef  unix
470         r = __sync_fetch_and_add (lock->rin, w);
471 #else
472         r = _InterlockedExchangeAdd16 (lock->rin, w);
473 #endif
474         while( r != *lock->rout )
475 #ifdef unix
476                 sched_yield();
477 #else
478                 SwitchToThread();
479 #endif
480 }
481
482 void WriteRelease (RWLock *lock)
483 {
484 #ifdef unix
485         __sync_fetch_and_and (lock->rin, ~MASK);
486 #else
487         _InterlockedAnd16 (lock->rin, ~MASK);
488 #endif
489         lock->serving[0]++;
490 }
491
492 void ReadLock (RWLock *lock)
493 {
494 ushort w;
495 #ifdef unix
496         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
497 #else
498         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
499 #endif
500         if( w )
501           while( w == (*lock->rin & MASK) )
502 #ifdef unix
503                 sched_yield ();
504 #else
505                 SwitchToThread ();
506 #endif
507 }
508
509 void ReadRelease (RWLock *lock)
510 {
511 #ifdef unix
512         __sync_fetch_and_add (lock->rout, RINC);
513 #else
514         _InterlockedExchangeAdd16 (lock->rout, RINC);
515 #endif
516 }
517
518 //      Spin Latch Manager
519
520 //      wait until write lock mode is clear
521 //      and add 1 to the share count
522
523 void bt_spinreadlock(BtSpinLatch *latch)
524 {
525 ushort prev;
526
527   do {
528 #ifdef unix
529         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
530 #else
531         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
532 #endif
533         //  see if exclusive request is granted or pending
534
535         if( !(prev & BOTH) )
536                 return;
537 #ifdef unix
538         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
539 #else
540         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
541 #endif
542 #ifdef  unix
543   } while( sched_yield(), 1 );
544 #else
545   } while( SwitchToThread(), 1 );
546 #endif
547 }
548
549 //      wait for other read and write latches to relinquish
550
551 void bt_spinwritelock(BtSpinLatch *latch)
552 {
553 ushort prev;
554
555   do {
556 #ifdef  unix
557         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
558 #else
559         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
560 #endif
561         if( !(prev & XCL) )
562           if( !(prev & ~BOTH) )
563                 return;
564           else
565 #ifdef unix
566                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
567 #else
568                 _InterlockedAnd16((ushort *)latch, ~XCL);
569 #endif
570 #ifdef  unix
571   } while( sched_yield(), 1 );
572 #else
573   } while( SwitchToThread(), 1 );
574 #endif
575 }
576
577 //      try to obtain write lock
578
579 //      return 1 if obtained,
580 //              0 otherwise
581
582 int bt_spinwritetry(BtSpinLatch *latch)
583 {
584 ushort prev;
585
586 #ifdef  unix
587         prev = __sync_fetch_and_or((ushort *)latch, XCL);
588 #else
589         prev = _InterlockedOr16((ushort *)latch, XCL);
590 #endif
591         //      take write access if all bits are clear
592
593         if( !(prev & XCL) )
594           if( !(prev & ~BOTH) )
595                 return 1;
596           else
597 #ifdef unix
598                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
599 #else
600                 _InterlockedAnd16((ushort *)latch, ~XCL);
601 #endif
602         return 0;
603 }
604
605 //      clear write mode
606
607 void bt_spinreleasewrite(BtSpinLatch *latch)
608 {
609 #ifdef unix
610         __sync_fetch_and_and((ushort *)latch, ~BOTH);
611 #else
612         _InterlockedAnd16((ushort *)latch, ~BOTH);
613 #endif
614 }
615
616 //      decrement reader count
617
618 void bt_spinreleaseread(BtSpinLatch *latch)
619 {
620 #ifdef unix
621         __sync_fetch_and_add((ushort *)latch, -SHARE);
622 #else
623         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
624 #endif
625 }
626
627 //      read page from permanent location in Btree file
628
629 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
630 {
631 off64_t off = page_no << mgr->page_bits;
632
633 #ifdef unix
634         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
635                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
636                 return BTERR_read;
637         }
638 #else
639 OVERLAPPED ovl[1];
640 uint amt[1];
641
642         memset (ovl, 0, sizeof(OVERLAPPED));
643         ovl->Offset = off;
644         ovl->OffsetHigh = off >> 32;
645
646         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
647                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
648                 return BTERR_read;
649         }
650         if( *amt <  mgr->page_size ) {
651                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
652                 return BTERR_read;
653         }
654 #endif
655         return 0;
656 }
657
658 //      write page to permanent location in Btree file
659 //      clear the dirty bit
660
661 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
662 {
663 off64_t off = page_no << mgr->page_bits;
664
665 #ifdef unix
666         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
667                 return BTERR_wrt;
668 #else
669 OVERLAPPED ovl[1];
670 uint amt[1];
671
672         memset (ovl, 0, sizeof(OVERLAPPED));
673         ovl->Offset = off;
674         ovl->OffsetHigh = off >> 32;
675
676         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
677                 return BTERR_wrt;
678
679         if( *amt <  mgr->page_size )
680                 return BTERR_wrt;
681 #endif
682         return 0;
683 }
684
685 //      link latch table entry into head of latch hash table
686
687 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
688 {
689 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
690 BtLatchSet *latch = bt->mgr->latchsets + slot;
691
692         if( latch->next = bt->mgr->hashtable[hashidx].slot )
693                 bt->mgr->latchsets[latch->next].prev = slot;
694
695         bt->mgr->hashtable[hashidx].slot = slot;
696         latch->page_no = page_no;
697         latch->entry = slot;
698         latch->split = 0;
699         latch->prev = 0;
700         latch->pin = 1;
701
702         if( loadit )
703           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
704                 return bt->err;
705           else
706                 bt->reads++;
707
708         return bt->err = 0;
709 }
710
711 //      set CLOCK bit in latch
712 //      decrement pin count
713
714 void bt_unpinlatch (BtLatchSet *latch)
715 {
716 #ifdef unix
717         if( ~latch->pin & CLOCK_bit )
718                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
719         __sync_fetch_and_add(&latch->pin, -1);
720 #else
721         if( ~latch->pin & CLOCK_bit )
722                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
723         _InterlockedDecrement16 (&latch->pin);
724 #endif
725 }
726
727 //  return the btree cached page address
728
729 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
730 {
731 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
732
733         return page;
734 }
735
736 //      find existing latchset or inspire new one
737 //      return with latchset pinned
738
739 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
740 {
741 uint hashidx = page_no % bt->mgr->latchhash;
742 BtLatchSet *latch;
743 uint slot, idx;
744 uint lvl, cnt;
745 off64_t off;
746 uint amt[1];
747 BtPage page;
748
749   //  try to find our entry
750
751   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
752
753   if( slot = bt->mgr->hashtable[hashidx].slot ) do
754   {
755         latch = bt->mgr->latchsets + slot;
756         if( page_no == latch->page_no )
757                 break;
758   } while( slot = latch->next );
759
760   //  found our entry
761   //  increment clock
762
763   if( slot ) {
764         latch = bt->mgr->latchsets + slot;
765 #ifdef unix
766         __sync_fetch_and_add(&latch->pin, 1);
767 #else
768         _InterlockedIncrement16 (&latch->pin);
769 #endif
770         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
771         return latch;
772   }
773
774         //  see if there are any unused pool entries
775 #ifdef unix
776         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
777 #else
778         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
779 #endif
780
781         if( slot < bt->mgr->latchtotal ) {
782                 latch = bt->mgr->latchsets + slot;
783                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
784                         return NULL;
785                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
786                 return latch;
787         }
788
789 #ifdef unix
790         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
791 #else
792         _InterlockedDecrement (&bt->mgr->latchdeployed);
793 #endif
794   //  find and reuse previous entry on victim
795
796   while( 1 ) {
797 #ifdef unix
798         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
799 #else
800         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
801 #endif
802         // try to get write lock on hash chain
803         //      skip entry if not obtained
804         //      or has outstanding pins
805
806         slot %= bt->mgr->latchtotal;
807
808         if( !slot )
809                 continue;
810
811         latch = bt->mgr->latchsets + slot;
812         idx = latch->page_no % bt->mgr->latchhash;
813
814         //      see we are on same chain as hashidx
815
816         if( idx == hashidx )
817                 continue;
818
819         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
820                 continue;
821
822         //  skip this slot if it is pinned
823         //      or the CLOCK bit is set
824
825         if( latch->pin ) {
826           if( latch->pin & CLOCK_bit ) {
827 #ifdef unix
828                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
829 #else
830                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
831 #endif
832           }
833           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
834           continue;
835         }
836
837         //  update permanent page area in btree from buffer pool
838
839         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
840
841         if( latch->dirty )
842           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
843                 return NULL;
844           else
845                 latch->dirty = 0, bt->writes++;
846
847         //  unlink our available slot from its hash chain
848
849         if( latch->prev )
850                 bt->mgr->latchsets[latch->prev].next = latch->next;
851         else
852                 bt->mgr->hashtable[idx].slot = latch->next;
853
854         if( latch->next )
855                 bt->mgr->latchsets[latch->next].prev = latch->prev;
856
857         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
858
859         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
860                 return NULL;
861
862         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
863         return latch;
864   }
865 }
866
867 void bt_mgrclose (BtMgr *mgr)
868 {
869 BtLatchSet *latch;
870 uint num = 0;
871 BtPage page;
872 uint slot;
873
874         //      flush dirty pool pages to the btree
875
876         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
877                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
878                 latch = mgr->latchsets + slot;
879
880                 if( latch->dirty ) {
881                         bt_writepage(mgr, page, latch->page_no);
882                         latch->dirty = 0, num++;
883                 }
884 //              madvise (page, mgr->page_size, MADV_DONTNEED);
885         }
886
887         fprintf(stderr, "%d buffer pool pages flushed\n", num);
888
889 #ifdef unix
890         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
891         munmap (mgr->pagezero, mgr->page_size);
892 #else
893         FlushViewOfFile(mgr->pagezero, 0);
894         UnmapViewOfFile(mgr->pagezero);
895         UnmapViewOfFile(mgr->hashtable);
896         CloseHandle(mgr->halloc);
897         CloseHandle(mgr->hpool);
898 #endif
899 #ifdef unix
900         close (mgr->idx);
901         free (mgr);
902 #else
903         FlushFileBuffers(mgr->idx);
904         CloseHandle(mgr->idx);
905         GlobalFree (mgr);
906 #endif
907 }
908
909 //      close and release memory
910
911 void bt_close (BtDb *bt)
912 {
913 #ifdef unix
914         if( bt->mem )
915                 free (bt->mem);
916 #else
917         if( bt->mem)
918                 VirtualFree (bt->mem, 0, MEM_RELEASE);
919 #endif
920         free (bt);
921 }
922
923 //  open/create new btree buffer manager
924
925 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
926 //              size of page pool (e.g. 262144)
927
928 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
929 {
930 uint lvl, attr, last, slot, idx;
931 uint nlatchpage, latchhash;
932 unsigned char value[BtId];
933 int flag, initit = 0;
934 BtPageZero *pagezero;
935 off64_t size;
936 uint amt[1];
937 BtMgr* mgr;
938 BtKey* key;
939 BtVal *val;
940
941         // determine sanity of page size and buffer pool
942
943         if( bits > BT_maxbits )
944                 bits = BT_maxbits;
945         else if( bits < BT_minbits )
946                 bits = BT_minbits;
947
948         if( nodemax < 16 ) {
949                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
950                 return NULL;
951         }
952
953 #ifdef unix
954         mgr = calloc (1, sizeof(BtMgr));
955
956         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
957
958         if( mgr->idx == -1 ) {
959                 fprintf (stderr, "Unable to open btree file\n");
960                 return free(mgr), NULL;
961         }
962 #else
963         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
964         attr = FILE_ATTRIBUTE_NORMAL;
965         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
966
967         if( mgr->idx == INVALID_HANDLE_VALUE )
968                 return GlobalFree(mgr), NULL;
969 #endif
970
971 #ifdef unix
972         pagezero = valloc (BT_maxpage);
973         *amt = 0;
974
975         // read minimum page size to get root info
976         //      to support raw disk partition files
977         //      check if bits == 0 on the disk.
978
979         if( size = lseek (mgr->idx, 0L, 2) )
980                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
981                         if( pagezero->alloc->bits )
982                                 bits = pagezero->alloc->bits;
983                         else
984                                 initit = 1;
985                 else
986                         return free(mgr), free(pagezero), NULL;
987         else
988                 initit = 1;
989 #else
990         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
991         size = GetFileSize(mgr->idx, amt);
992
993         if( size || *amt ) {
994                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
995                         return bt_mgrclose (mgr), NULL;
996                 bits = pagezero->alloc->bits;
997         } else
998                 initit = 1;
999 #endif
1000
1001         mgr->page_size = 1 << bits;
1002         mgr->page_bits = bits;
1003
1004         //  calculate number of latch hash table entries
1005
1006         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1007         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
1008
1009         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1010         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1011         mgr->latchtotal = nodemax;
1012
1013         if( !initit )
1014                 goto mgrlatch;
1015
1016         // initialize an empty b-tree with latch page, root page, page of leaves
1017         // and page(s) of latches and page pool cache
1018
1019         memset (pagezero, 0, 1 << bits);
1020         pagezero->alloc->bits = mgr->page_bits;
1021         bt_putid(pagezero->alloc->right, MIN_lvl+1);
1022
1023         //  initialize left-most LEAF page in
1024         //      alloc->left.
1025
1026         bt_putid (pagezero->alloc->left, LEAF_page);
1027
1028         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1029                 fprintf (stderr, "Unable to create btree page zero\n");
1030                 return bt_mgrclose (mgr), NULL;
1031         }
1032
1033         memset (pagezero, 0, 1 << bits);
1034         pagezero->alloc->bits = mgr->page_bits;
1035
1036         for( lvl=MIN_lvl; lvl--; ) {
1037                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1038                 key = keyptr(pagezero->alloc, 1);
1039                 key->len = 2;           // create stopper key
1040                 key->key[0] = 0xff;
1041                 key->key[1] = 0xff;
1042
1043                 bt_putid(value, MIN_lvl - lvl + 1);
1044                 val = valptr(pagezero->alloc, 1);
1045                 val->len = lvl ? BtId : 0;
1046                 memcpy (val->value, value, val->len);
1047
1048                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1049                 pagezero->alloc->lvl = lvl;
1050                 pagezero->alloc->cnt = 1;
1051                 pagezero->alloc->act = 1;
1052
1053                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1054                         fprintf (stderr, "Unable to create btree page zero\n");
1055                         return bt_mgrclose (mgr), NULL;
1056                 }
1057         }
1058
1059 mgrlatch:
1060 #ifdef unix
1061         free (pagezero);
1062 #else
1063         VirtualFree (pagezero, 0, MEM_RELEASE);
1064 #endif
1065 #ifdef unix
1066         // mlock the pagezero page
1067
1068         flag = PROT_READ | PROT_WRITE;
1069         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1070         if( mgr->pagezero == MAP_FAILED ) {
1071                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1072                 return bt_mgrclose (mgr), NULL;
1073         }
1074         mlock (mgr->pagezero, mgr->page_size);
1075
1076         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1077         if( mgr->hashtable == MAP_FAILED ) {
1078                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1079                 return bt_mgrclose (mgr), NULL;
1080         }
1081 #else
1082         flag = PAGE_READWRITE;
1083         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1084         if( !mgr->halloc ) {
1085                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1086                 return bt_mgrclose (mgr), NULL;
1087         }
1088
1089         flag = FILE_MAP_WRITE;
1090         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1091         if( !mgr->pagezero ) {
1092                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1093                 return bt_mgrclose (mgr), NULL;
1094         }
1095
1096         flag = PAGE_READWRITE;
1097         size = (uid)mgr->nlatchpage << mgr->page_bits;
1098         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1099         if( !mgr->hpool ) {
1100                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1101                 return bt_mgrclose (mgr), NULL;
1102         }
1103
1104         flag = FILE_MAP_WRITE;
1105         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1106         if( !mgr->hashtable ) {
1107                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1108                 return bt_mgrclose (mgr), NULL;
1109         }
1110 #endif
1111
1112         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1113         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1114
1115         return mgr;
1116 }
1117
1118 //      open BTree access method
1119 //      based on buffer manager
1120
1121 BtDb *bt_open (BtMgr *mgr)
1122 {
1123 BtDb *bt = malloc (sizeof(*bt));
1124
1125         memset (bt, 0, sizeof(*bt));
1126         bt->mgr = mgr;
1127 #ifdef unix
1128         bt->mem = valloc (2 *mgr->page_size);
1129 #else
1130         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1131 #endif
1132         bt->frame = (BtPage)bt->mem;
1133         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1134 #ifdef unix
1135         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1136 #else
1137         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1138 #endif
1139         return bt;
1140 }
1141
1142 //  compare two keys, return > 0, = 0, or < 0
1143 //  =0: keys are same
1144 //  -1: key2 > key1
1145 //  +1: key2 < key1
1146 //  as the comparison value
1147
1148 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1149 {
1150 uint len1 = key1->len;
1151 int ans;
1152
1153         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1154                 return ans;
1155
1156         if( len1 > len2 )
1157                 return 1;
1158         if( len1 < len2 )
1159                 return -1;
1160
1161         return 0;
1162 }
1163
1164 // place write, read, or parent lock on requested page_no.
1165
1166 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1167 {
1168         switch( mode ) {
1169         case BtLockRead:
1170                 ReadLock (latch->readwr);
1171                 break;
1172         case BtLockWrite:
1173                 WriteLock (latch->readwr);
1174                 break;
1175         case BtLockAccess:
1176                 ReadLock (latch->access);
1177                 break;
1178         case BtLockDelete:
1179                 WriteLock (latch->access);
1180                 break;
1181         case BtLockParent:
1182                 WriteOLock (latch->parent, bt->thread_no);
1183                 break;
1184         case BtLockAtomic:
1185                 WriteOLock (latch->atomic, bt->thread_no);
1186                 break;
1187         case BtLockAtomic | BtLockRead:
1188                 WriteOLock (latch->atomic, bt->thread_no);
1189                 ReadLock (latch->readwr);
1190                 break;
1191         }
1192 }
1193
1194 // remove write, read, or parent lock on requested page
1195
1196 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1197 {
1198         switch( mode ) {
1199         case BtLockRead:
1200                 ReadRelease (latch->readwr);
1201                 break;
1202         case BtLockWrite:
1203                 WriteRelease (latch->readwr);
1204                 break;
1205         case BtLockAccess:
1206                 ReadRelease (latch->access);
1207                 break;
1208         case BtLockDelete:
1209                 WriteRelease (latch->access);
1210                 break;
1211         case BtLockParent:
1212                 WriteORelease (latch->parent);
1213                 break;
1214         case BtLockAtomic:
1215                 WriteORelease (latch->atomic);
1216                 break;
1217         case BtLockAtomic | BtLockRead:
1218                 WriteORelease (latch->atomic);
1219                 ReadRelease (latch->readwr);
1220                 break;
1221         }
1222 }
1223
1224 //      allocate a new page
1225 //      return with page latched, but unlocked.
1226
1227 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1228 {
1229 uid page_no;
1230 int blk;
1231
1232         //      lock allocation page
1233
1234         bt_spinwritelock(bt->mgr->lock);
1235
1236         // use empty chain first
1237         // else allocate empty page
1238
1239         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1240                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1241                         set->page = bt_mappage (bt, set->latch);
1242                 else
1243                         return bt->err = BTERR_struct, -1;
1244
1245                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1246                 bt_spinreleasewrite(bt->mgr->lock);
1247
1248                 memcpy (set->page, contents, bt->mgr->page_size);
1249                 set->latch->dirty = 1;
1250                 return 0;
1251         }
1252
1253         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1254         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1255
1256         // unlock allocation latch
1257
1258         bt_spinreleasewrite(bt->mgr->lock);
1259
1260         //      don't load cache from btree page
1261
1262         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1263                 set->page = bt_mappage (bt, set->latch);
1264         else
1265                 return bt->err = BTERR_struct;
1266
1267         memcpy (set->page, contents, bt->mgr->page_size);
1268         set->latch->dirty = 1;
1269         return 0;
1270 }
1271
1272 //  find slot in page for given key at a given level
1273
1274 int bt_findslot (BtPage page, unsigned char *key, uint len)
1275 {
1276 uint diff, higher = page->cnt, low = 1, slot;
1277 uint good = 0;
1278
1279         //        make stopper key an infinite fence value
1280
1281         if( bt_getid (page->right) )
1282                 higher++;
1283         else
1284                 good++;
1285
1286         //      low is the lowest candidate.
1287         //  loop ends when they meet
1288
1289         //  higher is already
1290         //      tested as .ge. the passed key.
1291
1292         while( diff = higher - low ) {
1293                 slot = low + ( diff >> 1 );
1294                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1295                         low = slot + 1;
1296                 else
1297                         higher = slot, good++;
1298         }
1299
1300         //      return zero if key is on right link page
1301
1302         return good ? higher : 0;
1303 }
1304
1305 //  find and load page at given level for given key
1306 //      leave page rd or wr locked as requested
1307
1308 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1309 {
1310 uid page_no = ROOT_page, prevpage = 0;
1311 uint drill = 0xff, slot;
1312 BtLatchSet *prevlatch;
1313 uint mode, prevmode;
1314
1315   //  start at root of btree and drill down
1316
1317   do {
1318         // determine lock mode of drill level
1319         mode = (drill == lvl) ? lock : BtLockRead; 
1320
1321         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1322           return 0;
1323
1324         // obtain access lock using lock chaining with Access mode
1325
1326         if( page_no > ROOT_page )
1327           bt_lockpage(bt, BtLockAccess, set->latch);
1328
1329         set->page = bt_mappage (bt, set->latch);
1330
1331         //      release & unpin parent or left sibling page
1332
1333         if( prevpage ) {
1334           bt_unlockpage(bt, prevmode, prevlatch);
1335           bt_unpinlatch (prevlatch);
1336           prevpage = 0;
1337         }
1338
1339         // obtain mode lock using lock chaining through AccessLock
1340
1341         bt_lockpage(bt, mode, set->latch);
1342
1343         if( set->page->free )
1344                 return bt->err = BTERR_struct, 0;
1345
1346         if( page_no > ROOT_page )
1347           bt_unlockpage(bt, BtLockAccess, set->latch);
1348
1349         // re-read and re-lock root after determining actual level of root
1350
1351         if( set->page->lvl != drill) {
1352                 if( set->latch->page_no != ROOT_page )
1353                         return bt->err = BTERR_struct, 0;
1354                         
1355                 drill = set->page->lvl;
1356
1357                 if( lock != BtLockRead && drill == lvl ) {
1358                   bt_unlockpage(bt, mode, set->latch);
1359                   bt_unpinlatch (set->latch);
1360                   continue;
1361                 }
1362         }
1363
1364         prevpage = set->latch->page_no;
1365         prevlatch = set->latch;
1366         prevmode = mode;
1367
1368         //  find key on page at this level
1369         //  and descend to requested level
1370
1371         if( !set->page->kill )
1372          if( slot = bt_findslot (set->page, key, len) ) {
1373           if( drill == lvl )
1374                 return slot;
1375
1376           // find next non-dead slot -- the fence key if nothing else
1377
1378           while( slotptr(set->page, slot)->dead )
1379                 if( slot++ < set->page->cnt )
1380                   continue;
1381                 else
1382                   return bt->err = BTERR_struct, 0;
1383
1384           page_no = bt_getid(valptr(set->page, slot)->value);
1385           drill--;
1386           continue;
1387          }
1388
1389         //  or slide right into next page
1390
1391         page_no = bt_getid(set->page->right);
1392   } while( page_no );
1393
1394   // return error on end of right chain
1395
1396   bt->err = BTERR_struct;
1397   return 0;     // return error
1398 }
1399
1400 //      return page to free list
1401 //      page must be delete & write locked
1402
1403 void bt_freepage (BtDb *bt, BtPageSet *set)
1404 {
1405         //      lock allocation page
1406
1407         bt_spinwritelock (bt->mgr->lock);
1408
1409         //      store chain
1410
1411         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1412         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1413         set->latch->dirty = 1;
1414         set->page->free = 1;
1415
1416         // unlock released page
1417
1418         bt_unlockpage (bt, BtLockDelete, set->latch);
1419         bt_unlockpage (bt, BtLockWrite, set->latch);
1420         bt_unpinlatch (set->latch);
1421
1422         // unlock allocation page
1423
1424         bt_spinreleasewrite (bt->mgr->lock);
1425 }
1426
1427 //      a fence key was deleted from a page
1428 //      push new fence value upwards
1429
1430 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1431 {
1432 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1433 unsigned char value[BtId];
1434 BtKey* ptr;
1435 uint idx;
1436
1437         //      remove the old fence value
1438
1439         ptr = keyptr(set->page, set->page->cnt);
1440         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1441         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1442         set->latch->dirty = 1;
1443
1444         //  cache new fence value
1445
1446         ptr = keyptr(set->page, set->page->cnt);
1447         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1448
1449         bt_lockpage (bt, BtLockParent, set->latch);
1450         bt_unlockpage (bt, BtLockWrite, set->latch);
1451
1452         //      insert new (now smaller) fence key
1453
1454         bt_putid (value, set->latch->page_no);
1455         ptr = (BtKey*)leftkey;
1456
1457         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1458           return bt->err;
1459
1460         //      now delete old fence key
1461
1462         ptr = (BtKey*)rightkey;
1463
1464         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1465                 return bt->err;
1466
1467         bt_unlockpage (bt, BtLockParent, set->latch);
1468         bt_unpinlatch(set->latch);
1469         return 0;
1470 }
1471
1472 //      root has a single child
1473 //      collapse a level from the tree
1474
1475 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1476 {
1477 BtPageSet child[1];
1478 uid page_no;
1479 uint idx;
1480
1481   // find the child entry and promote as new root contents
1482
1483   do {
1484         for( idx = 0; idx++ < root->page->cnt; )
1485           if( !slotptr(root->page, idx)->dead )
1486                 break;
1487
1488         page_no = bt_getid (valptr(root->page, idx)->value);
1489
1490         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1491                 child->page = bt_mappage (bt, child->latch);
1492         else
1493                 return bt->err;
1494
1495         bt_lockpage (bt, BtLockDelete, child->latch);
1496         bt_lockpage (bt, BtLockWrite, child->latch);
1497
1498         memcpy (root->page, child->page, bt->mgr->page_size);
1499         root->latch->dirty = 1;
1500
1501         bt_freepage (bt, child);
1502
1503   } while( root->page->lvl > 1 && root->page->act == 1 );
1504
1505   bt_unlockpage (bt, BtLockWrite, root->latch);
1506   bt_unpinlatch (root->latch);
1507   return 0;
1508 }
1509
1510 //  delete a page and manage keys
1511 //  call with page writelocked
1512 //      returns with page unpinned
1513
1514 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1515 {
1516 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1517 unsigned char value[BtId];
1518 uint lvl = set->page->lvl;
1519 BtPageSet right[1];
1520 uid page_no;
1521 BtKey *ptr;
1522
1523         //      cache copy of fence key
1524         //      to post in parent
1525
1526         ptr = keyptr(set->page, set->page->cnt);
1527         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1528
1529         //      obtain lock on right page
1530
1531         page_no = bt_getid(set->page->right);
1532
1533         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1534                 right->page = bt_mappage (bt, right->latch);
1535         else
1536                 return 0;
1537
1538         bt_lockpage (bt, BtLockWrite, right->latch);
1539
1540         // cache copy of key to update
1541
1542         ptr = keyptr(right->page, right->page->cnt);
1543         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1544
1545         if( right->page->kill )
1546                 return bt->err = BTERR_struct;
1547
1548         // pull contents of right peer into our empty page
1549
1550         memcpy (set->page, right->page, bt->mgr->page_size);
1551         set->latch->dirty = 1;
1552
1553         // mark right page deleted and point it to left page
1554         //      until we can post parent updates that remove access
1555         //      to the deleted page.
1556
1557         bt_putid (right->page->right, set->latch->page_no);
1558         right->latch->dirty = 1;
1559         right->page->kill = 1;
1560
1561         bt_lockpage (bt, BtLockParent, right->latch);
1562         bt_unlockpage (bt, BtLockWrite, right->latch);
1563
1564         bt_lockpage (bt, BtLockParent, set->latch);
1565         bt_unlockpage (bt, BtLockWrite, set->latch);
1566
1567         // redirect higher key directly to our new node contents
1568
1569         bt_putid (value, set->latch->page_no);
1570         ptr = (BtKey*)higherfence;
1571
1572         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1573           return bt->err;
1574
1575         //      delete old lower key to our node
1576
1577         ptr = (BtKey*)lowerfence;
1578
1579         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1580           return bt->err;
1581
1582         //      obtain delete and write locks to right node
1583
1584         bt_unlockpage (bt, BtLockParent, right->latch);
1585         bt_lockpage (bt, BtLockDelete, right->latch);
1586         bt_lockpage (bt, BtLockWrite, right->latch);
1587         bt_freepage (bt, right);
1588
1589         bt_unlockpage (bt, BtLockParent, set->latch);
1590         bt_unpinlatch (set->latch);
1591         return 0;
1592 }
1593
1594 //  find and delete key on page by marking delete flag bit
1595 //  if page becomes empty, delete it from the btree
1596
1597 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1598 {
1599 uint slot, idx, found, fence;
1600 BtPageSet set[1];
1601 BtKey *ptr;
1602 BtVal *val;
1603
1604         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1605                 ptr = keyptr(set->page, slot);
1606         else
1607                 return bt->err;
1608
1609         // if librarian slot, advance to real slot
1610
1611         if( slotptr(set->page, slot)->type == Librarian )
1612                 ptr = keyptr(set->page, ++slot);
1613
1614         fence = slot == set->page->cnt;
1615
1616         // if key is found delete it, otherwise ignore request
1617
1618         if( found = !keycmp (ptr, key, len) )
1619           if( found = slotptr(set->page, slot)->dead == 0 ) {
1620                 val = valptr(set->page,slot);
1621                 slotptr(set->page, slot)->dead = 1;
1622                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1623                 set->page->act--;
1624
1625                 // collapse empty slots beneath the fence
1626
1627                 while( idx = set->page->cnt - 1 )
1628                   if( slotptr(set->page, idx)->dead ) {
1629                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1630                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1631                   } else
1632                         break;
1633           }
1634
1635         //      did we delete a fence key in an upper level?
1636
1637         if( found && lvl && set->page->act && fence )
1638           if( bt_fixfence (bt, set, lvl) )
1639                 return bt->err;
1640           else
1641                 return 0;
1642
1643         //      do we need to collapse root?
1644
1645         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1646           if( bt_collapseroot (bt, set) )
1647                 return bt->err;
1648           else
1649                 return 0;
1650
1651         //      delete empty page
1652
1653         if( !set->page->act )
1654                 return bt_deletepage (bt, set);
1655
1656         set->latch->dirty = 1;
1657         bt_unlockpage(bt, BtLockWrite, set->latch);
1658         bt_unpinlatch (set->latch);
1659         return 0;
1660 }
1661
1662 BtKey *bt_foundkey (BtDb *bt)
1663 {
1664         return (BtKey*)(bt->key);
1665 }
1666
1667 //      advance to next slot
1668
1669 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1670 {
1671 BtLatchSet *prevlatch;
1672 uid page_no;
1673
1674         if( slot < set->page->cnt )
1675                 return slot + 1;
1676
1677         prevlatch = set->latch;
1678
1679         if( page_no = bt_getid(set->page->right) )
1680           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1681                 set->page = bt_mappage (bt, set->latch);
1682           else
1683                 return 0;
1684         else
1685           return bt->err = BTERR_struct, 0;
1686
1687         // obtain access lock using lock chaining with Access mode
1688
1689         bt_lockpage(bt, BtLockAccess, set->latch);
1690
1691         bt_unlockpage(bt, BtLockRead, prevlatch);
1692         bt_unpinlatch (prevlatch);
1693
1694         bt_lockpage(bt, BtLockRead, set->latch);
1695         bt_unlockpage(bt, BtLockAccess, set->latch);
1696         return 1;
1697 }
1698
1699 //      find unique key or first duplicate key in
1700 //      leaf level and return number of value bytes
1701 //      or (-1) if not found.  Setup key for bt_foundkey
1702
1703 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1704 {
1705 BtPageSet set[1];
1706 uint len, slot;
1707 int ret = -1;
1708 BtKey *ptr;
1709 BtVal *val;
1710
1711   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1712    do {
1713         ptr = keyptr(set->page, slot);
1714
1715         //      skip librarian slot place holder
1716
1717         if( slotptr(set->page, slot)->type == Librarian )
1718                 ptr = keyptr(set->page, ++slot);
1719
1720         //      return actual key found
1721
1722         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1723         len = ptr->len;
1724
1725         if( slotptr(set->page, slot)->type == Duplicate )
1726                 len -= BtId;
1727
1728         //      not there if we reach the stopper key
1729
1730         if( slot == set->page->cnt )
1731           if( !bt_getid (set->page->right) )
1732                 break;
1733
1734         // if key exists, return >= 0 value bytes copied
1735         //      otherwise return (-1)
1736
1737         if( slotptr(set->page, slot)->dead )
1738                 continue;
1739
1740         if( keylen == len )
1741           if( !memcmp (ptr->key, key, len) ) {
1742                 val = valptr (set->page,slot);
1743                 if( valmax > val->len )
1744                         valmax = val->len;
1745                 memcpy (value, val->value, valmax);
1746                 ret = valmax;
1747           }
1748
1749         break;
1750
1751    } while( slot = bt_findnext (bt, set, slot) );
1752
1753   bt_unlockpage (bt, BtLockRead, set->latch);
1754   bt_unpinlatch (set->latch);
1755   return ret;
1756 }
1757
1758 //      check page for space available,
1759 //      clean if necessary and return
1760 //      0 - page needs splitting
1761 //      >0  new slot value
1762
1763 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1764 {
1765 uint nxt = bt->mgr->page_size;
1766 BtPage page = set->page;
1767 uint cnt = 0, idx = 0;
1768 uint max = page->cnt;
1769 uint newslot = max;
1770 BtKey *key;
1771 BtVal *val;
1772
1773         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1774                 return slot;
1775
1776         //      skip cleanup and proceed to split
1777         //      if there's not enough garbage
1778         //      to bother with.
1779
1780         if( page->garbage < nxt / 5 )
1781                 return 0;
1782
1783         memcpy (bt->frame, page, bt->mgr->page_size);
1784
1785         // skip page info and set rest of page to zero
1786
1787         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1788         set->latch->dirty = 1;
1789         page->garbage = 0;
1790         page->act = 0;
1791
1792         // clean up page first by
1793         // removing deleted keys
1794
1795         while( cnt++ < max ) {
1796                 if( cnt == slot )
1797                         newslot = idx + 2;
1798
1799                 if( cnt < max || bt->frame->lvl )
1800                   if( slotptr(bt->frame,cnt)->dead )
1801                         continue;
1802
1803                 // copy the value across
1804
1805                 val = valptr(bt->frame, cnt);
1806                 nxt -= val->len + sizeof(BtVal);
1807                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1808
1809                 // copy the key across
1810
1811                 key = keyptr(bt->frame, cnt);
1812                 nxt -= key->len + sizeof(BtKey);
1813                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1814
1815                 // make a librarian slot
1816
1817                 slotptr(page, ++idx)->off = nxt;
1818                 slotptr(page, idx)->type = Librarian;
1819                 slotptr(page, idx)->dead = 1;
1820
1821                 // set up the slot
1822
1823                 slotptr(page, ++idx)->off = nxt;
1824                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1825
1826                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1827                         page->act++;
1828         }
1829
1830         page->min = nxt;
1831         page->cnt = idx;
1832
1833         //      see if page has enough space now, or does it need splitting?
1834
1835         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1836                 return newslot;
1837
1838         return 0;
1839 }
1840
1841 // split the root and raise the height of the btree
1842
1843 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1844 {  
1845 unsigned char leftkey[BT_keyarray];
1846 uint nxt = bt->mgr->page_size;
1847 unsigned char value[BtId];
1848 BtPageSet left[1];
1849 uid left_page_no;
1850 BtKey *ptr;
1851 BtVal *val;
1852
1853         //      save left page fence key for new root
1854
1855         ptr = keyptr(root->page, root->page->cnt);
1856         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1857
1858         //  Obtain an empty page to use, and copy the current
1859         //  root contents into it, e.g. lower keys
1860
1861         if( bt_newpage(bt, left, root->page) )
1862                 return bt->err;
1863
1864         left_page_no = left->latch->page_no;
1865         bt_unpinlatch (left->latch);
1866
1867         // preserve the page info at the bottom
1868         // of higher keys and set rest to zero
1869
1870         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1871
1872         // insert stopper key at top of newroot page
1873         // and increase the root height
1874
1875         nxt -= BtId + sizeof(BtVal);
1876         bt_putid (value, right->page_no);
1877         val = (BtVal *)((unsigned char *)root->page + nxt);
1878         memcpy (val->value, value, BtId);
1879         val->len = BtId;
1880
1881         nxt -= 2 + sizeof(BtKey);
1882         slotptr(root->page, 2)->off = nxt;
1883         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1884         ptr->len = 2;
1885         ptr->key[0] = 0xff;
1886         ptr->key[1] = 0xff;
1887
1888         // insert lower keys page fence key on newroot page as first key
1889
1890         nxt -= BtId + sizeof(BtVal);
1891         bt_putid (value, left_page_no);
1892         val = (BtVal *)((unsigned char *)root->page + nxt);
1893         memcpy (val->value, value, BtId);
1894         val->len = BtId;
1895
1896         ptr = (BtKey *)leftkey;
1897         nxt -= ptr->len + sizeof(BtKey);
1898         slotptr(root->page, 1)->off = nxt;
1899         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1900         
1901         bt_putid(root->page->right, 0);
1902         root->page->min = nxt;          // reset lowest used offset and key count
1903         root->page->cnt = 2;
1904         root->page->act = 2;
1905         root->page->lvl++;
1906
1907         // release and unpin root pages
1908
1909         bt_unlockpage(bt, BtLockWrite, root->latch);
1910         bt_unpinlatch (root->latch);
1911
1912         bt_unpinlatch (right);
1913         return 0;
1914 }
1915
1916 //  split already locked full node
1917 //      leave it locked.
1918 //      return pool entry for new right
1919 //      page, unlocked
1920
1921 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1922 {
1923 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1924 uint lvl = set->page->lvl;
1925 BtPageSet right[1];
1926 BtKey *key, *ptr;
1927 BtVal *val, *src;
1928 uid right2;
1929 uint prev;
1930
1931         //  split higher half of keys to bt->frame
1932
1933         memset (bt->frame, 0, bt->mgr->page_size);
1934         max = set->page->cnt;
1935         cnt = max / 2;
1936         idx = 0;
1937
1938         while( cnt++ < max ) {
1939                 if( cnt < max || set->page->lvl )
1940                   if( slotptr(set->page, cnt)->dead )
1941                         continue;
1942
1943                 src = valptr(set->page, cnt);
1944                 nxt -= src->len + sizeof(BtVal);
1945                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1946
1947                 key = keyptr(set->page, cnt);
1948                 nxt -= key->len + sizeof(BtKey);
1949                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1950                 memcpy (ptr, key, key->len + sizeof(BtKey));
1951
1952                 //      add librarian slot
1953
1954                 slotptr(bt->frame, ++idx)->off = nxt;
1955                 slotptr(bt->frame, idx)->type = Librarian;
1956                 slotptr(bt->frame, idx)->dead = 1;
1957
1958                 //  add actual slot
1959
1960                 slotptr(bt->frame, ++idx)->off = nxt;
1961                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1962
1963                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1964                         bt->frame->act++;
1965         }
1966
1967         bt->frame->bits = bt->mgr->page_bits;
1968         bt->frame->min = nxt;
1969         bt->frame->cnt = idx;
1970         bt->frame->lvl = lvl;
1971
1972         // link right node
1973
1974         if( set->latch->page_no > ROOT_page )
1975                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1976
1977         //      get new free page and write higher keys to it.
1978
1979         if( bt_newpage(bt, right, bt->frame) )
1980                 return 0;
1981
1982         memcpy (bt->frame, set->page, bt->mgr->page_size);
1983         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1984         set->latch->dirty = 1;
1985
1986         nxt = bt->mgr->page_size;
1987         set->page->garbage = 0;
1988         set->page->act = 0;
1989         max /= 2;
1990         cnt = 0;
1991         idx = 0;
1992
1993         if( slotptr(bt->frame, max)->type == Librarian )
1994                 max--;
1995
1996         //  assemble page of smaller keys
1997
1998         while( cnt++ < max ) {
1999                 if( slotptr(bt->frame, cnt)->dead )
2000                         continue;
2001                 val = valptr(bt->frame, cnt);
2002                 nxt -= val->len + sizeof(BtVal);
2003                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2004
2005                 key = keyptr(bt->frame, cnt);
2006                 nxt -= key->len + sizeof(BtKey);
2007                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2008
2009                 //      add librarian slot
2010
2011                 slotptr(set->page, ++idx)->off = nxt;
2012                 slotptr(set->page, idx)->type = Librarian;
2013                 slotptr(set->page, idx)->dead = 1;
2014
2015                 //      add actual slot
2016
2017                 slotptr(set->page, ++idx)->off = nxt;
2018                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2019                 set->page->act++;
2020         }
2021
2022         bt_putid(set->page->right, right->latch->page_no);
2023         set->page->min = nxt;
2024         set->page->cnt = idx;
2025
2026         return right->latch->entry;
2027 }
2028
2029 //      fix keys for newly split page
2030 //      call with page locked, return
2031 //      unlocked
2032
2033 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2034 {
2035 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2036 unsigned char value[BtId];
2037 uint lvl = set->page->lvl;
2038 BtPage page;
2039 BtKey *ptr;
2040
2041         // if current page is the root page, split it
2042
2043         if( set->latch->page_no == ROOT_page )
2044                 return bt_splitroot (bt, set, right);
2045
2046         ptr = keyptr(set->page, set->page->cnt);
2047         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2048
2049         page = bt_mappage (bt, right);
2050
2051         ptr = keyptr(page, page->cnt);
2052         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2053
2054         // insert new fences in their parent pages
2055
2056         bt_lockpage (bt, BtLockParent, right);
2057
2058         bt_lockpage (bt, BtLockParent, set->latch);
2059         bt_unlockpage (bt, BtLockWrite, set->latch);
2060
2061         // insert new fence for reformulated left block of smaller keys
2062
2063         bt_putid (value, set->latch->page_no);
2064         ptr = (BtKey *)leftkey;
2065
2066         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2067                 return bt->err;
2068
2069         // switch fence for right block of larger keys to new right page
2070
2071         bt_putid (value, right->page_no);
2072         ptr = (BtKey *)rightkey;
2073
2074         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2075                 return bt->err;
2076
2077         bt_unlockpage (bt, BtLockParent, set->latch);
2078         bt_unpinlatch (set->latch);
2079
2080         bt_unlockpage (bt, BtLockParent, right);
2081         bt_unpinlatch (right);
2082         return 0;
2083 }
2084
2085 //      install new key and value onto page
2086 //      page must already be checked for
2087 //      adequate space
2088
2089 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2090 {
2091 uint idx, librarian;
2092 BtSlot *node;
2093 BtKey *ptr;
2094 BtVal *val;
2095
2096         //      if found slot > desired slot and previous slot
2097         //      is a librarian slot, use it
2098
2099         if( slot > 1 )
2100           if( slotptr(set->page, slot-1)->type == Librarian )
2101                 slot--;
2102
2103         // copy value onto page
2104
2105         set->page->min -= vallen + sizeof(BtVal);
2106         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2107         memcpy (val->value, value, vallen);
2108         val->len = vallen;
2109
2110         // copy key onto page
2111
2112         set->page->min -= keylen + sizeof(BtKey);
2113         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2114         memcpy (ptr->key, key, keylen);
2115         ptr->len = keylen;
2116         
2117         //      find first empty slot
2118
2119         for( idx = slot; idx < set->page->cnt; idx++ )
2120           if( slotptr(set->page, idx)->dead )
2121                 break;
2122
2123         // now insert key into array before slot
2124
2125         if( idx == set->page->cnt )
2126                 idx += 2, set->page->cnt += 2, librarian = 2;
2127         else
2128                 librarian = 1;
2129
2130         set->latch->dirty = 1;
2131         set->page->act++;
2132
2133         while( idx > slot + librarian - 1 )
2134                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2135
2136         //      add librarian slot
2137
2138         if( librarian > 1 ) {
2139                 node = slotptr(set->page, slot++);
2140                 node->off = set->page->min;
2141                 node->type = Librarian;
2142                 node->dead = 1;
2143         }
2144
2145         //      fill in new slot
2146
2147         node = slotptr(set->page, slot);
2148         node->off = set->page->min;
2149         node->type = type;
2150         node->dead = 0;
2151
2152         if( release ) {
2153                 bt_unlockpage (bt, BtLockWrite, set->latch);
2154                 bt_unpinlatch (set->latch);
2155         }
2156
2157         return 0;
2158 }
2159
2160 //  Insert new key into the btree at given level.
2161 //      either add a new key or update/add an existing one
2162
2163 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2164 {
2165 unsigned char newkey[BT_keyarray];
2166 uint slot, idx, len, entry;
2167 BtPageSet set[1];
2168 BtKey *ptr, *ins;
2169 uid sequence;
2170 BtVal *val;
2171 uint type;
2172
2173   // set up the key we're working on
2174
2175   ins = (BtKey*)newkey;
2176   memcpy (ins->key, key, keylen);
2177   ins->len = keylen;
2178
2179   // is this a non-unique index value?
2180
2181   if( unique )
2182         type = Unique;
2183   else {
2184         type = Duplicate;
2185         sequence = bt_newdup (bt);
2186         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2187         ins->len += BtId;
2188   }
2189
2190   while( 1 ) { // find the page and slot for the current key
2191         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2192                 ptr = keyptr(set->page, slot);
2193         else {
2194                 if( !bt->err )
2195                         bt->err = BTERR_ovflw;
2196                 return bt->err;
2197         }
2198
2199         // if librarian slot == found slot, advance to real slot
2200
2201         if( slotptr(set->page, slot)->type == Librarian )
2202           if( !keycmp (ptr, key, keylen) )
2203                 ptr = keyptr(set->page, ++slot);
2204
2205         len = ptr->len;
2206
2207         if( slotptr(set->page, slot)->type == Duplicate )
2208                 len -= BtId;
2209
2210         //  if inserting a duplicate key or unique key
2211         //      check for adequate space on the page
2212         //      and insert the new key before slot.
2213
2214         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2215           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2216                 if( !(entry = bt_splitpage (bt, set)) )
2217                   return bt->err;
2218                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2219                   return bt->err;
2220                 else
2221                   continue;
2222
2223           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2224         }
2225
2226         // if key already exists, update value and return
2227
2228         val = valptr(set->page, slot);
2229
2230         if( val->len >= vallen ) {
2231                 if( slotptr(set->page, slot)->dead )
2232                         set->page->act++;
2233                 set->page->garbage += val->len - vallen;
2234                 set->latch->dirty = 1;
2235                 slotptr(set->page, slot)->dead = 0;
2236                 val->len = vallen;
2237                 memcpy (val->value, value, vallen);
2238                 bt_unlockpage(bt, BtLockWrite, set->latch);
2239                 bt_unpinlatch (set->latch);
2240                 return 0;
2241         }
2242
2243         //      new update value doesn't fit in existing value area
2244
2245         if( !slotptr(set->page, slot)->dead )
2246                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2247         else {
2248                 slotptr(set->page, slot)->dead = 0;
2249                 set->page->act++;
2250         }
2251
2252         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2253           if( !(entry = bt_splitpage (bt, set)) )
2254                 return bt->err;
2255           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2256                 return bt->err;
2257           else
2258                 continue;
2259
2260         set->page->min -= vallen + sizeof(BtVal);
2261         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2262         memcpy (val->value, value, vallen);
2263         val->len = vallen;
2264
2265         set->latch->dirty = 1;
2266         set->page->min -= keylen + sizeof(BtKey);
2267         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2268         memcpy (ptr->key, key, keylen);
2269         ptr->len = keylen;
2270         
2271         slotptr(set->page, slot)->off = set->page->min;
2272         bt_unlockpage(bt, BtLockWrite, set->latch);
2273         bt_unpinlatch (set->latch);
2274         return 0;
2275   }
2276   return 0;
2277 }
2278
2279 typedef struct {
2280         uint entry;                     // latch table entry number
2281         uint slot:31;           // page slot number
2282         uint reuse:1;           // reused previous page
2283 } AtomicTxn;
2284
2285 typedef struct {
2286         uid page_no;            // page number for split leaf
2287         void *next;                     // next key to insert
2288         uint entry:29;          // latch table entry number
2289         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2290         uint nounlock:1;        // don't unlock ParentModification
2291         unsigned char leafkey[BT_keyarray];
2292 } AtomicKey;
2293
2294 //      determine actual page where key is located
2295 //  return slot number
2296
2297 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2298 {
2299 BtKey *key = keyptr(source,src);
2300 uint slot = locks[src].slot;
2301 uint entry;
2302
2303         if( src > 1 && locks[src].reuse )
2304           entry = locks[src-1].entry, slot = 0;
2305         else
2306           entry = locks[src].entry;
2307
2308         if( slot ) {
2309                 set->latch = bt->mgr->latchsets + entry;
2310                 set->page = bt_mappage (bt, set->latch);
2311                 return slot;
2312         }
2313
2314         //      is locks->reuse set? or was slot zeroed?
2315         //      if so, find where our key is located 
2316         //      on current page or pages split on
2317         //      same page txn operations.
2318
2319         do {
2320                 set->latch = bt->mgr->latchsets + entry;
2321                 set->page = bt_mappage (bt, set->latch);
2322
2323                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2324                   if( slotptr(set->page, slot)->type == Librarian )
2325                         slot++;
2326                   if( locks[src].reuse )
2327                         locks[src].entry = entry;
2328                   return slot;
2329                 }
2330         } while( entry = set->latch->split );
2331
2332         bt->err = BTERR_atomic;
2333         return 0;
2334 }
2335
2336 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2337 {
2338 BtKey *key = keyptr(source, src);
2339 BtVal *val = valptr(source, src);
2340 BtLatchSet *latch;
2341 BtPageSet set[1];
2342 uint entry, slot;
2343
2344   while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2345         if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
2346           return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2347
2348         if( entry = bt_splitpage (bt, set) )
2349           latch = bt->mgr->latchsets + entry;
2350         else
2351           return bt->err;
2352
2353         //      splice right page into split chain
2354         //      and WriteLock it.
2355
2356         bt_lockpage(bt, BtLockWrite, latch);
2357         latch->split = set->latch->split;
2358         set->latch->split = entry;
2359         locks[src].slot = 0;
2360   }
2361
2362   return bt->err = BTERR_atomic;
2363 }
2364
2365 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2366 {
2367 BtKey *key = keyptr(source, src);
2368 uint idx, entry, slot;
2369 BtPageSet set[1];
2370 BtKey *ptr;
2371 BtVal *val;
2372
2373         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2374           ptr = keyptr(set->page, slot);
2375         else
2376           return bt->err = BTERR_struct;
2377
2378         if( !keycmp (ptr, key->key, key->len) )
2379           if( !slotptr(set->page, slot)->dead )
2380                 slotptr(set->page, slot)->dead = 1;
2381           else
2382                 return 0;
2383         else
2384                 return 0;
2385
2386         val = valptr(set->page, slot);
2387         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2388         set->latch->dirty = 1;
2389         set->page->act--;
2390         bt->found++;
2391         return 0;
2392 }
2393
2394 //      delete an empty master page for a transaction
2395
2396 //      note that the far right page never empties because
2397 //      it always contains (at least) the infinite stopper key
2398 //      and that all pages that don't contain any keys are
2399 //      deleted, or are being held under Atomic lock.
2400
2401 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2402 {
2403 BtPageSet right[1], temp[1];
2404 unsigned char value[BtId];
2405 uid right_page_no;
2406 BtKey *ptr;
2407
2408         bt_lockpage(bt, BtLockWrite, prev->latch);
2409
2410         //      grab the right sibling
2411
2412         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2413                 right->page = bt_mappage (bt, right->latch);
2414         else
2415                 return bt->err;
2416
2417         bt_lockpage(bt, BtLockAtomic, right->latch);
2418         bt_lockpage(bt, BtLockWrite, right->latch);
2419
2420         //      and pull contents over empty page
2421         //      while preserving master's left link
2422
2423         memcpy (right->page->left, prev->page->left, BtId);
2424         memcpy (prev->page, right->page, bt->mgr->page_size);
2425
2426         //      forward seekers to old right sibling
2427         //      to new page location in set
2428
2429         bt_putid (right->page->right, prev->latch->page_no);
2430         right->latch->dirty = 1;
2431         right->page->kill = 1;
2432
2433         //      remove pointer to right page for searchers by
2434         //      changing right fence key to point to the master page
2435
2436         ptr = keyptr(right->page,right->page->cnt);
2437         bt_putid (value, prev->latch->page_no);
2438
2439         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2440                 return bt->err;
2441
2442         //  now that master page is in good shape we can
2443         //      remove its locks.
2444
2445         bt_unlockpage (bt, BtLockAtomic, prev->latch);
2446         bt_unlockpage (bt, BtLockWrite, prev->latch);
2447
2448         //  fix master's right sibling's left pointer
2449         //      to remove scanner's poiner to the right page
2450
2451         if( right_page_no = bt_getid (prev->page->right) ) {
2452           if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2453                 temp->page = bt_mappage (bt, temp->latch);
2454
2455           bt_lockpage (bt, BtLockWrite, temp->latch);
2456           bt_putid (temp->page->left, prev->latch->page_no);
2457           temp->latch->dirty = 1;
2458
2459           bt_unlockpage (bt, BtLockWrite, temp->latch);
2460           bt_unpinlatch (temp->latch);
2461         } else {        // master is now the far right page
2462           bt_spinwritelock (bt->mgr->lock);
2463           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2464           bt_spinreleasewrite(bt->mgr->lock);
2465         }
2466
2467         //      now that there are no pointers to the right page
2468         //      we can delete it after the last read access occurs
2469
2470         bt_unlockpage (bt, BtLockWrite, right->latch);
2471         bt_unlockpage (bt, BtLockAtomic, right->latch);
2472         bt_lockpage (bt, BtLockDelete, right->latch);
2473         bt_lockpage (bt, BtLockWrite, right->latch);
2474         bt_freepage (bt, right);
2475         return 0;
2476 }
2477
2478 //      atomic modification of a batch of keys.
2479
2480 //      return -1 if BTERR is set
2481 //      otherwise return slot number
2482 //      causing the key constraint violation
2483 //      or zero on successful completion.
2484
2485 int bt_atomictxn (BtDb *bt, BtPage source)
2486 {
2487 uint src, idx, slot, samepage, entry;
2488 AtomicKey *head, *tail, *leaf;
2489 BtPageSet set[1], prev[1];
2490 unsigned char value[BtId];
2491 BtKey *key, *ptr, *key2;
2492 BtLatchSet *latch;
2493 AtomicTxn *locks;
2494 int result = 0;
2495 BtSlot temp[1];
2496 BtPage page;
2497 BtVal *val;
2498 uid right;
2499 int type;
2500
2501   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2502   head = NULL;
2503   tail = NULL;
2504
2505   // stable sort the list of keys into order to
2506   //    prevent deadlocks between threads.
2507
2508   for( src = 1; src++ < source->cnt; ) {
2509         *temp = *slotptr(source,src);
2510         key = keyptr (source,src);
2511
2512         for( idx = src; --idx; ) {
2513           key2 = keyptr (source,idx);
2514           if( keycmp (key, key2->key, key2->len) < 0 ) {
2515                 *slotptr(source,idx+1) = *slotptr(source,idx);
2516                 *slotptr(source,idx) = *temp;
2517           } else
2518                 break;
2519         }
2520   }
2521
2522   // Load the leaf page for each key
2523   // group same page references with reuse bit
2524   // and determine any constraint violations
2525
2526   for( src = 0; src++ < source->cnt; ) {
2527         key = keyptr(source, src);
2528         slot = 0;
2529
2530         // first determine if this modification falls
2531         // on the same page as the previous modification
2532         //      note that the far right leaf page is a special case
2533
2534         if( samepage = src > 1 )
2535           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2536                 slot = bt_findslot(set->page, key->key, key->len);
2537           else
2538                 bt_unlockpage(bt, BtLockRead, set->latch); 
2539
2540         if( !slot )
2541           if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
2542                 set->latch->split = 0;
2543           else
2544                 return -1;
2545
2546         if( slotptr(set->page, slot)->type == Librarian )
2547           ptr = keyptr(set->page, ++slot);
2548         else
2549           ptr = keyptr(set->page, slot);
2550
2551         if( !samepage ) {
2552           locks[src].entry = set->latch->entry;
2553           locks[src].slot = slot;
2554           locks[src].reuse = 0;
2555         } else {
2556           locks[src].entry = 0;
2557           locks[src].slot = 0;
2558           locks[src].reuse = 1;
2559         }
2560
2561         switch( slotptr(source, src)->type ) {
2562         case Duplicate:
2563         case Unique:
2564           if( !slotptr(set->page, slot)->dead )
2565            if( slot < set->page->cnt || bt_getid (set->page->right) )
2566             if( !keycmp (ptr, key->key, key->len) ) {
2567
2568                   // return constraint violation if key already exists
2569
2570                   bt_unlockpage(bt, BtLockRead, set->latch);
2571                   result = src;
2572
2573                   while( src ) {
2574                         if( locks[src].entry ) {
2575                           set->latch = bt->mgr->latchsets + locks[src].entry;
2576                           bt_unlockpage(bt, BtLockAtomic, set->latch);
2577                           bt_unpinlatch (set->latch);
2578                         }
2579                         src--;
2580                   }
2581                   free (locks);
2582                   return result;
2583                 }
2584           break;
2585         }
2586   }
2587
2588   //  unlock last loadpage lock
2589
2590   if( source->cnt )
2591         bt_unlockpage(bt, BtLockRead, set->latch);
2592
2593   //  obtain write lock for each master page
2594
2595   for( src = 0; src++ < source->cnt; )
2596         if( locks[src].reuse )
2597           continue;
2598         else
2599           bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2600
2601   // insert or delete each key
2602   // process any splits or merges
2603   // release Write & Atomic latches
2604   // set ParentModifications and build
2605   // queue of keys to insert for split pages
2606   // or delete for deleted pages.
2607
2608   // run through txn list backwards
2609
2610   samepage = source->cnt + 1;
2611
2612   for( src = source->cnt; src; src-- ) {
2613         if( locks[src].reuse )
2614           continue;
2615
2616         //  perform the txn operations
2617         //      from smaller to larger on
2618         //  the same page
2619
2620         for( idx = src; idx < samepage; idx++ )
2621          switch( slotptr(source,idx)->type ) {
2622          case Delete:
2623           if( bt_atomicdelete (bt, source, locks, idx) )
2624                 return -1;
2625           break;
2626
2627         case Duplicate:
2628         case Unique:
2629           if( bt_atomicinsert (bt, source, locks, idx) )
2630                 return -1;
2631           break;
2632         }
2633
2634         //      after the same page operations have finished,
2635         //  process master page for splits or deletion.
2636
2637         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2638         prev->page = bt_mappage (bt, prev->latch);
2639         samepage = src;
2640
2641         //  pick-up all splits from master page
2642         //      each one is already WriteLocked.
2643
2644         entry = prev->latch->split;
2645
2646         while( entry ) {
2647           set->latch = bt->mgr->latchsets + entry;
2648           set->page = bt_mappage (bt, set->latch);
2649           entry = set->latch->split;
2650
2651           // delete empty master page by undoing its split
2652           //  (this is potentially another empty page)
2653           //  note that there are no new left pointers yet
2654
2655           if( !prev->page->act ) {
2656                 memcpy (set->page->left, prev->page->left, BtId);
2657                 memcpy (prev->page, set->page, bt->mgr->page_size);
2658                 bt_lockpage (bt, BtLockDelete, set->latch);
2659                 bt_freepage (bt, set);
2660
2661                 prev->latch->dirty = 1;
2662                 continue;
2663           }
2664
2665           // remove empty page from the split chain
2666
2667           if( !set->page->act ) {
2668                 memcpy (prev->page->right, set->page->right, BtId);
2669                 prev->latch->split = set->latch->split;
2670                 bt_lockpage (bt, BtLockDelete, set->latch);
2671                 bt_freepage (bt, set);
2672                 continue;
2673           }
2674
2675           //  schedule prev fence key update
2676
2677           ptr = keyptr(prev->page,prev->page->cnt);
2678           leaf = calloc (sizeof(AtomicKey), 1);
2679
2680           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2681           leaf->page_no = prev->latch->page_no;
2682           leaf->entry = prev->latch->entry;
2683           leaf->type = 0;
2684
2685           if( tail )
2686                 tail->next = leaf;
2687           else
2688                 head = leaf;
2689
2690           tail = leaf;
2691
2692           // splice in the left link into the split page
2693
2694           bt_putid (set->page->left, prev->latch->page_no);
2695           bt_lockpage(bt, BtLockParent, prev->latch);
2696           bt_unlockpage(bt, BtLockWrite, prev->latch);
2697           *prev = *set;
2698         }
2699
2700         //  update left pointer in next right page from last split page
2701         //      (if all splits were reversed, latch->split == 0)
2702
2703         if( latch->split ) {
2704           //  fix left pointer in master's original (now split)
2705           //  far right sibling or set rightmost page in page zero
2706
2707           if( right = bt_getid (prev->page->right) ) {
2708                 if( set->latch = bt_pinlatch (bt, right, 1) )
2709                   set->page = bt_mappage (bt, set->latch);
2710                 else
2711                   return -1;
2712
2713             bt_lockpage (bt, BtLockWrite, set->latch);
2714             bt_putid (set->page->left, prev->latch->page_no);
2715                 set->latch->dirty = 1;
2716             bt_unlockpage (bt, BtLockWrite, set->latch);
2717                 bt_unpinlatch (set->latch);
2718           } else {      // prev is rightmost page
2719             bt_spinwritelock (bt->mgr->lock);
2720                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2721             bt_spinreleasewrite(bt->mgr->lock);
2722           }
2723
2724           //  Process last page split in chain
2725
2726           ptr = keyptr(prev->page,prev->page->cnt);
2727           leaf = calloc (sizeof(AtomicKey), 1);
2728
2729           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2730           leaf->page_no = prev->latch->page_no;
2731           leaf->entry = prev->latch->entry;
2732           leaf->type = 0;
2733   
2734           if( tail )
2735                 tail->next = leaf;
2736           else
2737                 head = leaf;
2738
2739           tail = leaf;
2740
2741           bt_lockpage(bt, BtLockParent, prev->latch);
2742           bt_unlockpage(bt, BtLockWrite, prev->latch);
2743
2744           //  remove atomic lock on master page
2745
2746           bt_unlockpage(bt, BtLockAtomic, latch);
2747           continue;
2748         }
2749
2750         //  finished if prev page occupied (either master or final split)
2751
2752         if( prev->page->act ) {
2753           bt_unlockpage(bt, BtLockWrite, latch);
2754           bt_unlockpage(bt, BtLockAtomic, latch);
2755           bt_unpinlatch(latch);
2756           continue;
2757         }
2758
2759         // any and all splits were reversed, and the
2760         // master page located in prev is empty, delete it
2761         // by pulling over master's right sibling.
2762
2763         // Remove empty master's fence key
2764
2765         ptr = keyptr(prev->page,prev->page->cnt);
2766
2767         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2768                 return -1;
2769
2770         //      perform the remainder of the delete
2771         //      from the FIFO queue
2772
2773         leaf = calloc (sizeof(AtomicKey), 1);
2774
2775         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2776         leaf->page_no = prev->latch->page_no;
2777         leaf->entry = prev->latch->entry;
2778         leaf->nounlock = 1;
2779         leaf->type = 2;
2780   
2781         if( tail )
2782           tail->next = leaf;
2783         else
2784           head = leaf;
2785
2786         tail = leaf;
2787
2788         //      leave atomic lock in place until
2789         //      deletion completes in next phase.
2790
2791         bt_unlockpage(bt, BtLockWrite, prev->latch);
2792   }
2793   
2794   //  add & delete keys for any pages split or merged during transaction
2795
2796   if( leaf = head )
2797     do {
2798           set->latch = bt->mgr->latchsets + leaf->entry;
2799           set->page = bt_mappage (bt, set->latch);
2800
2801           bt_putid (value, leaf->page_no);
2802           ptr = (BtKey *)leaf->leafkey;
2803
2804           switch( leaf->type ) {
2805           case 0:       // insert key
2806             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2807                   return -1;
2808
2809                 break;
2810
2811           case 1:       // delete key
2812                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2813                   return -1;
2814
2815                 break;
2816
2817           case 2:       // free page
2818                 if( bt_atomicfree (bt, set) )
2819                   return -1;
2820
2821                 break;
2822           }
2823
2824           if( !leaf->nounlock )
2825             bt_unlockpage (bt, BtLockParent, set->latch);
2826
2827           bt_unpinlatch (set->latch);
2828           tail = leaf->next;
2829           free (leaf);
2830         } while( leaf = tail );
2831
2832   // return success
2833
2834   free (locks);
2835   return 0;
2836 }
2837
2838 //      set cursor to highest slot on highest page
2839
2840 uint bt_lastkey (BtDb *bt)
2841 {
2842 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2843 BtPageSet set[1];
2844
2845         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2846                 set->page = bt_mappage (bt, set->latch);
2847         else
2848                 return 0;
2849
2850     bt_lockpage(bt, BtLockRead, set->latch);
2851         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2852     bt_unlockpage(bt, BtLockRead, set->latch);
2853         bt_unpinlatch (set->latch);
2854
2855         bt->cursor_page = page_no;
2856         return bt->cursor->cnt;
2857 }
2858
2859 //      return previous slot on cursor page
2860
2861 uint bt_prevkey (BtDb *bt, uint slot)
2862 {
2863 uid ourright, next, us = bt->cursor_page;
2864 BtPageSet set[1];
2865
2866         if( --slot )
2867                 return slot;
2868
2869         ourright = bt_getid(bt->cursor->right);
2870
2871 goleft:
2872         if( !(next = bt_getid(bt->cursor->left)) )
2873                 return 0;
2874
2875 findourself:
2876         bt->cursor_page = next;
2877
2878         if( set->latch = bt_pinlatch (bt, next, 1) )
2879                 set->page = bt_mappage (bt, set->latch);
2880         else
2881                 return 0;
2882
2883     bt_lockpage(bt, BtLockRead, set->latch);
2884         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2885         bt_unlockpage(bt, BtLockRead, set->latch);
2886         bt_unpinlatch (set->latch);
2887         
2888         next = bt_getid (bt->cursor->right);
2889
2890         if( bt->cursor->kill )
2891                 goto findourself;
2892
2893         if( next != us )
2894           if( next == ourright )
2895                 goto goleft;
2896           else
2897                 goto findourself;
2898
2899         return bt->cursor->cnt;
2900 }
2901
2902 //  return next slot on cursor page
2903 //  or slide cursor right into next page
2904
2905 uint bt_nextkey (BtDb *bt, uint slot)
2906 {
2907 BtPageSet set[1];
2908 uid right;
2909
2910   do {
2911         right = bt_getid(bt->cursor->right);
2912
2913         while( slot++ < bt->cursor->cnt )
2914           if( slotptr(bt->cursor,slot)->dead )
2915                 continue;
2916           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2917                 return slot;
2918           else
2919                 break;
2920
2921         if( !right )
2922                 break;
2923
2924         bt->cursor_page = right;
2925
2926         if( set->latch = bt_pinlatch (bt, right, 1) )
2927                 set->page = bt_mappage (bt, set->latch);
2928         else
2929                 return 0;
2930
2931     bt_lockpage(bt, BtLockRead, set->latch);
2932
2933         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2934
2935         bt_unlockpage(bt, BtLockRead, set->latch);
2936         bt_unpinlatch (set->latch);
2937         slot = 0;
2938
2939   } while( 1 );
2940
2941   return bt->err = 0;
2942 }
2943
2944 //  cache page of keys into cursor and return starting slot for given key
2945
2946 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2947 {
2948 BtPageSet set[1];
2949 uint slot;
2950
2951         // cache page for retrieval
2952
2953         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2954           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2955         else
2956           return 0;
2957
2958         bt->cursor_page = set->latch->page_no;
2959
2960         bt_unlockpage(bt, BtLockRead, set->latch);
2961         bt_unpinlatch (set->latch);
2962         return slot;
2963 }
2964
2965 BtKey *bt_key(BtDb *bt, uint slot)
2966 {
2967         return keyptr(bt->cursor, slot);
2968 }
2969
2970 BtVal *bt_val(BtDb *bt, uint slot)
2971 {
2972         return valptr(bt->cursor,slot);
2973 }
2974
2975 #ifdef STANDALONE
2976
2977 #ifndef unix
2978 double getCpuTime(int type)
2979 {
2980 FILETIME crtime[1];
2981 FILETIME xittime[1];
2982 FILETIME systime[1];
2983 FILETIME usrtime[1];
2984 SYSTEMTIME timeconv[1];
2985 double ans = 0;
2986
2987         memset (timeconv, 0, sizeof(SYSTEMTIME));
2988
2989         switch( type ) {
2990         case 0:
2991                 GetSystemTimeAsFileTime (xittime);
2992                 FileTimeToSystemTime (xittime, timeconv);
2993                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2994                 break;
2995         case 1:
2996                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2997                 FileTimeToSystemTime (usrtime, timeconv);
2998                 break;
2999         case 2:
3000                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3001                 FileTimeToSystemTime (systime, timeconv);
3002                 break;
3003         }
3004
3005         ans += (double)timeconv->wHour * 3600;
3006         ans += (double)timeconv->wMinute * 60;
3007         ans += (double)timeconv->wSecond;
3008         ans += (double)timeconv->wMilliseconds / 1000;
3009         return ans;
3010 }
3011 #else
3012 #include <time.h>
3013 #include <sys/resource.h>
3014
3015 double getCpuTime(int type)
3016 {
3017 struct rusage used[1];
3018 struct timeval tv[1];
3019
3020         switch( type ) {
3021         case 0:
3022                 gettimeofday(tv, NULL);
3023                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3024
3025         case 1:
3026                 getrusage(RUSAGE_SELF, used);
3027                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3028
3029         case 2:
3030                 getrusage(RUSAGE_SELF, used);
3031                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3032         }
3033
3034         return 0;
3035 }
3036 #endif
3037
3038 void bt_poolaudit (BtMgr *mgr)
3039 {
3040 BtLatchSet *latch;
3041 uint slot = 0;
3042
3043         while( slot++ < mgr->latchdeployed ) {
3044                 latch = mgr->latchsets + slot;
3045
3046                 if( *latch->readwr->rin & MASK )
3047                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3048                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3049
3050                 if( *latch->access->rin & MASK )
3051                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3052                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3053
3054                 if( *latch->parent->tid )
3055                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3056                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3057
3058                 if( latch->pin & ~CLOCK_bit ) {
3059                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3060                         latch->pin = 0;
3061                 }
3062         }
3063 }
3064
3065 uint bt_latchaudit (BtDb *bt)
3066 {
3067 ushort idx, hashidx;
3068 uid next, page_no;
3069 BtLatchSet *latch;
3070 uint cnt = 0;
3071 BtKey *ptr;
3072
3073         if( *(ushort *)(bt->mgr->lock) )
3074                 fprintf(stderr, "Alloc page locked\n");
3075         *(ushort *)(bt->mgr->lock) = 0;
3076
3077         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3078                 latch = bt->mgr->latchsets + idx;
3079                 if( *latch->readwr->rin & MASK )
3080                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3081                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3082
3083                 if( *latch->access->rin & MASK )
3084                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3085                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3086
3087                 if( *latch->parent->tid )
3088                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3089                 memset ((ushort *)latch->parent, 0, sizeof(WOLock));
3090
3091                 if( latch->pin ) {
3092                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3093                         latch->pin = 0;
3094                 }
3095         }
3096
3097         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3098           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3099                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3100
3101           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3102
3103           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3104                 latch = bt->mgr->latchsets + idx;
3105                 if( latch->pin )
3106                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3107           } while( idx = latch->next );
3108         }
3109
3110         page_no = LEAF_page;
3111
3112         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3113         uid off = page_no << bt->mgr->page_bits;
3114 #ifdef unix
3115           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3116 #else
3117         DWORD amt[1];
3118
3119           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3120
3121           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3122                 return bt->err = BTERR_map;
3123
3124           if( *amt <  bt->mgr->page_size )
3125                 return bt->err = BTERR_map;
3126 #endif
3127                 if( !bt->frame->free && !bt->frame->lvl )
3128                         cnt += bt->frame->act;
3129                 page_no++;
3130         }
3131                 
3132         cnt--;  // remove stopper key
3133         fprintf(stderr, " Total keys read %d\n", cnt);
3134
3135         bt_close (bt);
3136         return 0;
3137 }
3138
3139 typedef struct {
3140         char idx;
3141         char *type;
3142         char *infile;
3143         BtMgr *mgr;
3144         int num;
3145 } ThreadArg;
3146
3147 //  standalone program to index file of keys
3148 //  then list them onto std-out
3149
3150 #ifdef unix
3151 void *index_file (void *arg)
3152 #else
3153 uint __stdcall index_file (void *arg)
3154 #endif
3155 {
3156 int line = 0, found = 0, cnt = 0, idx;
3157 uid next, page_no = LEAF_page;  // start on first page of leaves
3158 int ch, len = 0, slot, type = 0;
3159 unsigned char key[BT_maxkey];
3160 unsigned char txn[65536];
3161 ThreadArg *args = arg;
3162 BtPageSet set[1];
3163 uint nxt = 65536;
3164 BtPage page;
3165 BtKey *ptr;
3166 BtVal *val;
3167 BtDb *bt;
3168 FILE *in;
3169
3170         bt = bt_open (args->mgr);
3171         page = (BtPage)txn;
3172
3173         if( args->idx < strlen (args->type) )
3174                 ch = args->type[args->idx];
3175         else
3176                 ch = args->type[strlen(args->type) - 1];
3177
3178         switch(ch | 0x20)
3179         {
3180         case 'a':
3181                 fprintf(stderr, "started latch mgr audit\n");
3182                 cnt = bt_latchaudit (bt);
3183                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3184                 break;
3185
3186         case 'd':
3187                 type = Delete;
3188
3189         case 'p':
3190                 if( !type )
3191                         type = Unique;
3192
3193                 if( args->num )
3194                  if( type == Delete )
3195                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3196                  else
3197                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3198                 else
3199                  if( type == Delete )
3200                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3201                  else
3202                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3203
3204                 if( in = fopen (args->infile, "rb") )
3205                   while( ch = getc(in), ch != EOF )
3206                         if( ch == '\n' )
3207                         {
3208                           line++;
3209
3210                           if( !args->num ) {
3211                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3212                                   fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3213                             len = 0;
3214                                 continue;
3215                           }
3216
3217                           nxt -= len - 10;
3218                           memcpy (txn + nxt, key + 10, len - 10);
3219                           nxt -= 1;
3220                           txn[nxt] = len - 10;
3221                           nxt -= 10;
3222                           memcpy (txn + nxt, key, 10);
3223                           nxt -= 1;
3224                           txn[nxt] = 10;
3225                           slotptr(page,++cnt)->off  = nxt;
3226                           slotptr(page,cnt)->type = type;
3227                           len = 0;
3228
3229                           if( cnt < args->num )
3230                                 continue;
3231
3232                           page->cnt = cnt;
3233                           page->act = cnt;
3234                           page->min = nxt;
3235
3236                           if( bt_atomictxn (bt, page) )
3237                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3238                           nxt = sizeof(txn);
3239                           cnt = 0;
3240
3241                         }
3242                         else if( len < BT_maxkey )
3243                                 key[len++] = ch;
3244                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3245                 break;
3246
3247         case 'w':
3248                 fprintf(stderr, "started indexing for %s\n", args->infile);
3249                 if( in = fopen (args->infile, "r") )
3250                   while( ch = getc(in), ch != EOF )
3251                         if( ch == '\n' )
3252                         {
3253                           line++;
3254
3255                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3256                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3257                           len = 0;
3258                         }
3259                         else if( len < BT_maxkey )
3260                                 key[len++] = ch;
3261                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3262                 break;
3263
3264         case 'f':
3265                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3266                 if( in = fopen (args->infile, "rb") )
3267                   while( ch = getc(in), ch != EOF )
3268                         if( ch == '\n' )
3269                         {
3270                           line++;
3271                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3272                                 found++;
3273                           else if( bt->err )
3274                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3275                           len = 0;
3276                         }
3277                         else if( len < BT_maxkey )
3278                                 key[len++] = ch;
3279                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3280                 break;
3281
3282         case 's':
3283                 fprintf(stderr, "started scanning\n");
3284                 do {
3285                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3286                                 set->page = bt_mappage (bt, set->latch);
3287                         else
3288                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3289                         bt_lockpage (bt, BtLockRead, set->latch);
3290                         next = bt_getid (set->page->right);
3291
3292                         for( slot = 0; slot++ < set->page->cnt; )
3293                          if( next || slot < set->page->cnt )
3294                           if( !slotptr(set->page, slot)->dead ) {
3295                                 ptr = keyptr(set->page, slot);
3296                                 len = ptr->len;
3297
3298                             if( slotptr(set->page, slot)->type == Duplicate )
3299                                         len -= BtId;
3300
3301                                 fwrite (ptr->key, len, 1, stdout);
3302                                 val = valptr(set->page, slot);
3303                                 fwrite (val->value, val->len, 1, stdout);
3304                                 fputc ('\n', stdout);
3305                                 cnt++;
3306                            }
3307
3308                         bt_unlockpage (bt, BtLockRead, set->latch);
3309                         bt_unpinlatch (set->latch);
3310                 } while( page_no = next );
3311
3312                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3313                 break;
3314
3315         case 'r':
3316                 fprintf(stderr, "started reverse scan\n");
3317                 if( slot = bt_lastkey (bt) )
3318                    while( slot = bt_prevkey (bt, slot) ) {
3319                         if( slotptr(bt->cursor, slot)->dead )
3320                           continue;
3321
3322                         ptr = keyptr(bt->cursor, slot);
3323                         len = ptr->len;
3324
3325                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3326                                 len -= BtId;
3327
3328                         fwrite (ptr->key, len, 1, stdout);
3329                         val = valptr(bt->cursor, slot);
3330                         fwrite (val->value, val->len, 1, stdout);
3331                         fputc ('\n', stdout);
3332                         cnt++;
3333                   }
3334
3335                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3336                 break;
3337
3338         case 'c':
3339 #ifdef unix
3340                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3341 #endif
3342                 fprintf(stderr, "started counting\n");
3343                 page_no = LEAF_page;
3344
3345                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3346                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3347                                 break;
3348
3349                         if( !bt->frame->free && !bt->frame->lvl )
3350                                 cnt += bt->frame->act;
3351
3352                         bt->reads++;
3353                         page_no++;
3354                 }
3355                 
3356                 cnt--;  // remove stopper key
3357                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3358                 break;
3359         }
3360
3361         bt_close (bt);
3362 #ifdef unix
3363         return NULL;
3364 #else
3365         return 0;
3366 #endif
3367 }
3368
3369 typedef struct timeval timer;
3370
3371 int main (int argc, char **argv)
3372 {
3373 int idx, cnt, len, slot, err;
3374 int segsize, bits = 16;
3375 double start, stop;
3376 #ifdef unix
3377 pthread_t *threads;
3378 #else
3379 HANDLE *threads;
3380 #endif
3381 ThreadArg *args;
3382 uint poolsize = 0;
3383 float elapsed;
3384 int num = 0;
3385 char key[1];
3386 BtMgr *mgr;
3387 BtKey *ptr;
3388 BtDb *bt;
3389
3390         if( argc < 3 ) {
3391                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3392                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3393                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3394                 fprintf (stderr, "  page_bits is the page size in bits\n");
3395                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3396                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3397                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3398                 exit(0);
3399         }
3400
3401         start = getCpuTime(0);
3402
3403         if( argc > 3 )
3404                 bits = atoi(argv[3]);
3405
3406         if( argc > 4 )
3407                 poolsize = atoi(argv[4]);
3408
3409         if( !poolsize )
3410                 fprintf (stderr, "Warning: no mapped_pool\n");
3411
3412         if( argc > 5 )
3413                 num = atoi(argv[5]);
3414
3415         cnt = argc - 6;
3416 #ifdef unix
3417         threads = malloc (cnt * sizeof(pthread_t));
3418 #else
3419         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3420 #endif
3421         args = malloc (cnt * sizeof(ThreadArg));
3422
3423         mgr = bt_mgr ((argv[1]), bits, poolsize);
3424
3425         if( !mgr ) {
3426                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3427                 exit (1);
3428         }
3429
3430         //      fire off threads
3431
3432         for( idx = 0; idx < cnt; idx++ ) {
3433                 args[idx].infile = argv[idx + 6];
3434                 args[idx].type = argv[2];
3435                 args[idx].mgr = mgr;
3436                 args[idx].num = num;
3437                 args[idx].idx = idx;
3438 #ifdef unix
3439                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3440                         fprintf(stderr, "Error creating thread %d\n", err);
3441 #else
3442                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3443 #endif
3444         }
3445
3446         //      wait for termination
3447
3448 #ifdef unix
3449         for( idx = 0; idx < cnt; idx++ )
3450                 pthread_join (threads[idx], NULL);
3451 #else
3452         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3453
3454         for( idx = 0; idx < cnt; idx++ )
3455                 CloseHandle(threads[idx]);
3456
3457 #endif
3458         bt_poolaudit(mgr);
3459         bt_mgrclose (mgr);
3460
3461         elapsed = getCpuTime(0) - start;
3462         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3463         elapsed = getCpuTime(1);
3464         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3465         elapsed = getCpuTime(2);
3466         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3467 }
3468
3469 #endif  //STANDALONE