]> pd.if.org Git - btree/blob - threadskv8.c
Add re-entrant WOnly queued lock, remove bt_atomicload
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and ACID batched key-value updates
9
10 // 26 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 /*
88 There are six lock types for each node in four independent sets: 
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
95 */
96
97 typedef enum{
98         BtLockAccess = 1,
99         BtLockDelete = 2,
100         BtLockRead   = 4,
101         BtLockWrite  = 8,
102         BtLockParent = 16,
103         BtLockAtomic = 32
104 } BtLock;
105
106 //      definition for phase-fair reader/writer lock implementation
107
108 typedef struct {
109         ushort rin[1];
110         ushort rout[1];
111         ushort ticket[1];
112         ushort serving[1];
113 } RWLock;
114
115 //      write only queue lock
116
117 typedef struct {
118         ushort ticket[1];
119         ushort serving[1];
120         ushort tid;
121         ushort dup;
122 } WOLock;
123
124 #define PHID 0x1
125 #define PRES 0x2
126 #define MASK 0x3
127 #define RINC 0x4
128
129 //      definition for spin latch implementation
130
131 // exclusive is set for write access
132 // share is count of read accessors
133 // grant write lock when share == 0
134
135 volatile typedef struct {
136         ushort exclusive:1;
137         ushort pending:1;
138         ushort share:14;
139 } BtSpinLatch;
140
141 #define XCL 1
142 #define PEND 2
143 #define BOTH 3
144 #define SHARE 4
145
146 //  hash table entries
147
148 typedef struct {
149         volatile uint slot;             // Latch table entry at head of chain
150         BtSpinLatch latch[1];
151 } BtHashEntry;
152
153 //      latch manager table structure
154
155 typedef struct {
156         uid page_no;                    // latch set page number
157         RWLock readwr[1];               // read/write page lock
158         RWLock access[1];               // Access Intent/Page delete
159         WOLock parent[1];               // Posting of fence key in parent
160         WOLock atomic[1];               // Atomic update in progress
161         uint split;                             // right split page atomic insert
162         uint entry;                             // entry slot in latch table
163         uint next;                              // next entry in hash table chain
164         uint prev;                              // prev entry in hash table chain
165         volatile ushort pin;    // number of outstanding threads
166         ushort dirty:1;                 // page in cache is dirty
167 } BtLatchSet;
168
169 //      Define the length of the page record numbers
170
171 #define BtId 6
172
173 //      Page key slot definition.
174
175 //      Keys are marked dead, but remain on the page until
176 //      it cleanup is called. The fence key (highest key) for
177 //      a leaf page is always present, even after cleanup.
178
179 //      Slot types
180
181 //      In addition to the Unique keys that occupy slots
182 //      there are Librarian and Duplicate key
183 //      slots occupying the key slot array.
184
185 //      The Librarian slots are dead keys that
186 //      serve as filler, available to add new Unique
187 //      or Dup slots that are inserted into the B-tree.
188
189 //      The Duplicate slots have had their key bytes extended
190 //      by 6 bytes to contain a binary duplicate key uniqueifier.
191
192 typedef enum {
193         Unique,
194         Librarian,
195         Duplicate,
196         Delete,
197         Update
198 } BtSlotType;
199
200 typedef struct {
201         uint off:BT_maxbits;    // page offset for key start
202         uint type:3;                    // type of slot
203         uint dead:1;                    // set for deleted slot
204 } BtSlot;
205
206 //      The key structure occupies space at the upper end of
207 //      each page.  It's a length byte followed by the key
208 //      bytes.
209
210 typedef struct {
211         unsigned char len;              // this can be changed to a ushort or uint
212         unsigned char key[0];
213 } BtKey;
214
215 //      the value structure also occupies space at the upper
216 //      end of the page. Each key is immediately followed by a value.
217
218 typedef struct {
219         unsigned char len;              // this can be changed to a ushort or uint
220         unsigned char value[0];
221 } BtVal;
222
223 #define BT_maxkey       255             // maximum number of bytes in a key
224 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
225
226 //      The first part of an index page.
227 //      It is immediately followed
228 //      by the BtSlot array of keys.
229
230 //      note that this structure size
231 //      must be a multiple of 8 bytes
232 //      in order to place dups correctly.
233
234 typedef struct BtPage_ {
235         uint cnt;                                       // count of keys in page
236         uint act;                                       // count of active keys
237         uint min;                                       // next key offset
238         uint garbage;                           // page garbage in bytes
239         unsigned char bits:7;           // page size in bits
240         unsigned char free:1;           // page is on free chain
241         unsigned char lvl:7;            // level of page
242         unsigned char kill:1;           // page is being deleted
243         unsigned char right[BtId];      // page number to right
244         unsigned char left[BtId];       // page number to left
245         unsigned char filler[2];        // padding to multiple of 8
246 } *BtPage;
247
248 //  The loadpage interface object
249
250 typedef struct {
251         BtPage page;            // current page pointer
252         BtLatchSet *latch;      // current page latch set
253 } BtPageSet;
254
255 //      structure for latch manager on ALLOC_page
256
257 typedef struct {
258         struct BtPage_ alloc[1];        // next page_no in right ptr
259         unsigned long long dups[1];     // global duplicate key uniqueifier
260         unsigned char chain[BtId];      // head of free page_nos chain
261 } BtPageZero;
262
263 //      The object structure for Btree access
264
265 typedef struct {
266         uint page_size;                         // page size    
267         uint page_bits;                         // page size in bits    
268 #ifdef unix
269         int idx;
270 #else
271         HANDLE idx;
272 #endif
273         BtPageZero *pagezero;           // mapped allocation page
274         BtSpinLatch lock[1];            // allocation area lite latch
275         uint latchdeployed;                     // highest number of latch entries deployed
276         uint nlatchpage;                        // number of latch pages at BT_latch
277         uint latchtotal;                        // number of page latch entries
278         uint latchhash;                         // number of latch hash table slots
279         uint latchvictim;                       // next latch entry to examine
280         ushort thread_no[1];            // next thread number
281         BtHashEntry *hashtable;         // the buffer pool hash table entries
282         BtLatchSet *latchsets;          // mapped latch set from buffer pool
283         unsigned char *pagepool;        // mapped to the buffer pool pages
284 #ifndef unix
285         HANDLE halloc;                          // allocation handle
286         HANDLE hpool;                           // buffer pool handle
287 #endif
288 } BtMgr;
289
290 typedef struct {
291         BtMgr *mgr;                             // buffer manager for thread
292         BtPage cursor;                  // cached frame for start/next (never mapped)
293         BtPage frame;                   // spare frame for the page split (never mapped)
294         uid cursor_page;                                // current cursor page number   
295         unsigned char *mem;                             // frame, cursor, page memory buffer
296         unsigned char key[BT_keyarray]; // last found complete key
297         int found;                              // last delete or insert was found
298         int err;                                // last error
299         int reads, writes;              // number of reads and writes from the btree
300         ushort thread_no;               // thread number
301 } BtDb;
302
303 typedef enum {
304         BTERR_ok = 0,
305         BTERR_struct,
306         BTERR_ovflw,
307         BTERR_lock,
308         BTERR_map,
309         BTERR_read,
310         BTERR_wrt,
311         BTERR_atomic
312 } BTERR;
313
314 #define CLOCK_bit 0x8000
315
316 // B-Tree functions
317 extern void bt_close (BtDb *bt);
318 extern BtDb *bt_open (BtMgr *mgr);
319 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
320 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
321 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
322 extern BtKey *bt_foundkey (BtDb *bt);
323 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
324 extern uint bt_nextkey   (BtDb *bt, uint slot);
325
326 //      manager functions
327 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
328 void bt_mgrclose (BtMgr *mgr);
329
330 //  Helper functions to return slot values
331 //      from the cursor page.
332
333 extern BtKey *bt_key (BtDb *bt, uint slot);
334 extern BtVal *bt_val (BtDb *bt, uint slot);
335
336 //  The page is allocated from low and hi ends.
337 //  The key slots are allocated from the bottom,
338 //      while the text and value of the key
339 //  are allocated from the top.  When the two
340 //  areas meet, the page is split into two.
341
342 //  A key consists of a length byte, two bytes of
343 //  index number (0 - 65535), and up to 253 bytes
344 //  of key value.
345
346 //  Associated with each key is a value byte string
347 //      containing any value desired.
348
349 //  The b-tree root is always located at page 1.
350 //      The first leaf page of level zero is always
351 //      located on page 2.
352
353 //      The b-tree pages are linked with next
354 //      pointers to facilitate enumerators,
355 //      and provide for concurrency.
356
357 //      When to root page fills, it is split in two and
358 //      the tree height is raised by a new root at page
359 //      one with two keys.
360
361 //      Deleted keys are marked with a dead bit until
362 //      page cleanup. The fence key for a leaf node is
363 //      always present
364
365 //  To achieve maximum concurrency one page is locked at a time
366 //  as the tree is traversed to find leaf key in question. The right
367 //  page numbers are used in cases where the page is being split,
368 //      or consolidated.
369
370 //  Page 0 is dedicated to lock for new page extensions,
371 //      and chains empty pages together for reuse. It also
372 //      contains the latch manager hash table.
373
374 //      The ParentModification lock on a node is obtained to serialize posting
375 //      or changing the fence key for a node.
376
377 //      Empty pages are chained together through the ALLOC page and reused.
378
379 //      Access macros to address slot and key values from the page
380 //      Page slots use 1 based indexing.
381
382 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
383 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
384 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
385
386 void bt_putid(unsigned char *dest, uid id)
387 {
388 int i = BtId;
389
390         while( i-- )
391                 dest[i] = (unsigned char)id, id >>= 8;
392 }
393
394 uid bt_getid(unsigned char *src)
395 {
396 uid id = 0;
397 int i;
398
399         for( i = 0; i < BtId; i++ )
400                 id <<= 8, id |= *src++; 
401
402         return id;
403 }
404
405 uid bt_newdup (BtDb *bt)
406 {
407 #ifdef unix
408         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
409 #else
410         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
411 #endif
412 }
413
414 //      Write-Only Queue Lock
415
416 void WriteOLock (WOLock *lock, ushort tid)
417 {
418 ushort tix;
419
420         if( lock->tid == tid ) {
421                 lock->dup++;
422                 return;
423         }
424 #ifdef unix
425         tix = __sync_fetch_and_add (lock->ticket, 1);
426 #else
427         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
428 #endif
429         // wait for our ticket to come up
430
431         while( tix != lock->serving[0] )
432 #ifdef unix
433                 sched_yield();
434 #else
435                 SwitchToThread ();
436 #endif
437         lock->tid = tid;
438 }
439
440 void WriteORelease (WOLock *lock)
441 {
442         if( lock->dup ) {
443                 lock->dup--;
444                 return;
445         }
446
447         lock->tid = 0;
448         lock->serving[0]++;
449 }
450
451 //      Phase-Fair reader/writer lock implementation
452
453 void WriteLock (RWLock *lock)
454 {
455 ushort w, r, tix;
456
457 #ifdef unix
458         tix = __sync_fetch_and_add (lock->ticket, 1);
459 #else
460         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
461 #endif
462         // wait for our ticket to come up
463
464         while( tix != lock->serving[0] )
465 #ifdef unix
466                 sched_yield();
467 #else
468                 SwitchToThread ();
469 #endif
470
471         w = PRES | (tix & PHID);
472 #ifdef  unix
473         r = __sync_fetch_and_add (lock->rin, w);
474 #else
475         r = _InterlockedExchangeAdd16 (lock->rin, w);
476 #endif
477         while( r != *lock->rout )
478 #ifdef unix
479                 sched_yield();
480 #else
481                 SwitchToThread();
482 #endif
483 }
484
485 void WriteRelease (RWLock *lock)
486 {
487 #ifdef unix
488         __sync_fetch_and_and (lock->rin, ~MASK);
489 #else
490         _InterlockedAnd16 (lock->rin, ~MASK);
491 #endif
492         lock->serving[0]++;
493 }
494
495 void ReadLock (RWLock *lock)
496 {
497 ushort w;
498 #ifdef unix
499         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
500 #else
501         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
502 #endif
503         if( w )
504           while( w == (*lock->rin & MASK) )
505 #ifdef unix
506                 sched_yield ();
507 #else
508                 SwitchToThread ();
509 #endif
510 }
511
512 void ReadRelease (RWLock *lock)
513 {
514 #ifdef unix
515         __sync_fetch_and_add (lock->rout, RINC);
516 #else
517         _InterlockedExchangeAdd16 (lock->rout, RINC);
518 #endif
519 }
520
521 //      Spin Latch Manager
522
523 //      wait until write lock mode is clear
524 //      and add 1 to the share count
525
526 void bt_spinreadlock(BtSpinLatch *latch)
527 {
528 ushort prev;
529
530   do {
531 #ifdef unix
532         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
533 #else
534         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
535 #endif
536         //  see if exclusive request is granted or pending
537
538         if( !(prev & BOTH) )
539                 return;
540 #ifdef unix
541         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
542 #else
543         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
544 #endif
545 #ifdef  unix
546   } while( sched_yield(), 1 );
547 #else
548   } while( SwitchToThread(), 1 );
549 #endif
550 }
551
552 //      wait for other read and write latches to relinquish
553
554 void bt_spinwritelock(BtSpinLatch *latch)
555 {
556 ushort prev;
557
558   do {
559 #ifdef  unix
560         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
561 #else
562         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
563 #endif
564         if( !(prev & XCL) )
565           if( !(prev & ~BOTH) )
566                 return;
567           else
568 #ifdef unix
569                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
570 #else
571                 _InterlockedAnd16((ushort *)latch, ~XCL);
572 #endif
573 #ifdef  unix
574   } while( sched_yield(), 1 );
575 #else
576   } while( SwitchToThread(), 1 );
577 #endif
578 }
579
580 //      try to obtain write lock
581
582 //      return 1 if obtained,
583 //              0 otherwise
584
585 int bt_spinwritetry(BtSpinLatch *latch)
586 {
587 ushort prev;
588
589 #ifdef  unix
590         prev = __sync_fetch_and_or((ushort *)latch, XCL);
591 #else
592         prev = _InterlockedOr16((ushort *)latch, XCL);
593 #endif
594         //      take write access if all bits are clear
595
596         if( !(prev & XCL) )
597           if( !(prev & ~BOTH) )
598                 return 1;
599           else
600 #ifdef unix
601                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
602 #else
603                 _InterlockedAnd16((ushort *)latch, ~XCL);
604 #endif
605         return 0;
606 }
607
608 //      clear write mode
609
610 void bt_spinreleasewrite(BtSpinLatch *latch)
611 {
612 #ifdef unix
613         __sync_fetch_and_and((ushort *)latch, ~BOTH);
614 #else
615         _InterlockedAnd16((ushort *)latch, ~BOTH);
616 #endif
617 }
618
619 //      decrement reader count
620
621 void bt_spinreleaseread(BtSpinLatch *latch)
622 {
623 #ifdef unix
624         __sync_fetch_and_add((ushort *)latch, -SHARE);
625 #else
626         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
627 #endif
628 }
629
630 //      read page from permanent location in Btree file
631
632 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
633 {
634 off64_t off = page_no << mgr->page_bits;
635
636 #ifdef unix
637         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
638                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
639                 return BTERR_read;
640         }
641 #else
642 OVERLAPPED ovl[1];
643 uint amt[1];
644
645         memset (ovl, 0, sizeof(OVERLAPPED));
646         ovl->Offset = off;
647         ovl->OffsetHigh = off >> 32;
648
649         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
650                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
651                 return BTERR_read;
652         }
653         if( *amt <  mgr->page_size ) {
654                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
655                 return BTERR_read;
656         }
657 #endif
658         return 0;
659 }
660
661 //      write page to permanent location in Btree file
662 //      clear the dirty bit
663
664 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
665 {
666 off64_t off = page_no << mgr->page_bits;
667
668 #ifdef unix
669         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
670                 return BTERR_wrt;
671 #else
672 OVERLAPPED ovl[1];
673 uint amt[1];
674
675         memset (ovl, 0, sizeof(OVERLAPPED));
676         ovl->Offset = off;
677         ovl->OffsetHigh = off >> 32;
678
679         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
680                 return BTERR_wrt;
681
682         if( *amt <  mgr->page_size )
683                 return BTERR_wrt;
684 #endif
685         return 0;
686 }
687
688 //      link latch table entry into head of latch hash table
689
690 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
691 {
692 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
693 BtLatchSet *latch = bt->mgr->latchsets + slot;
694
695         if( latch->next = bt->mgr->hashtable[hashidx].slot )
696                 bt->mgr->latchsets[latch->next].prev = slot;
697
698         bt->mgr->hashtable[hashidx].slot = slot;
699         latch->page_no = page_no;
700         latch->entry = slot;
701         latch->split = 0;
702         latch->prev = 0;
703         latch->pin = 1;
704
705         if( loadit )
706           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
707                 return bt->err;
708           else
709                 bt->reads++;
710
711         return bt->err = 0;
712 }
713
714 //      set CLOCK bit in latch
715 //      decrement pin count
716
717 void bt_unpinlatch (BtLatchSet *latch)
718 {
719 #ifdef unix
720         if( ~latch->pin & CLOCK_bit )
721                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
722         __sync_fetch_and_add(&latch->pin, -1);
723 #else
724         if( ~latch->pin & CLOCK_bit )
725                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
726         _InterlockedDecrement16 (&latch->pin);
727 #endif
728 }
729
730 //  return the btree cached page address
731
732 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
733 {
734 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
735
736         return page;
737 }
738
739 //      find existing latchset or inspire new one
740 //      return with latchset pinned
741
742 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
743 {
744 uint hashidx = page_no % bt->mgr->latchhash;
745 BtLatchSet *latch;
746 uint slot, idx;
747 uint lvl, cnt;
748 off64_t off;
749 uint amt[1];
750 BtPage page;
751
752   //  try to find our entry
753
754   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
755
756   if( slot = bt->mgr->hashtable[hashidx].slot ) do
757   {
758         latch = bt->mgr->latchsets + slot;
759         if( page_no == latch->page_no )
760                 break;
761   } while( slot = latch->next );
762
763   //  found our entry
764   //  increment clock
765
766   if( slot ) {
767         latch = bt->mgr->latchsets + slot;
768 #ifdef unix
769         __sync_fetch_and_add(&latch->pin, 1);
770 #else
771         _InterlockedIncrement16 (&latch->pin);
772 #endif
773         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
774         return latch;
775   }
776
777         //  see if there are any unused pool entries
778 #ifdef unix
779         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
780 #else
781         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
782 #endif
783
784         if( slot < bt->mgr->latchtotal ) {
785                 latch = bt->mgr->latchsets + slot;
786                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
787                         return NULL;
788                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
789                 return latch;
790         }
791
792 #ifdef unix
793         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
794 #else
795         _InterlockedDecrement (&bt->mgr->latchdeployed);
796 #endif
797   //  find and reuse previous entry on victim
798
799   while( 1 ) {
800 #ifdef unix
801         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
802 #else
803         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
804 #endif
805         // try to get write lock on hash chain
806         //      skip entry if not obtained
807         //      or has outstanding pins
808
809         slot %= bt->mgr->latchtotal;
810
811         if( !slot )
812                 continue;
813
814         latch = bt->mgr->latchsets + slot;
815         idx = latch->page_no % bt->mgr->latchhash;
816
817         //      see we are on same chain as hashidx
818
819         if( idx == hashidx )
820                 continue;
821
822         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
823                 continue;
824
825         //  skip this slot if it is pinned
826         //      or the CLOCK bit is set
827
828         if( latch->pin ) {
829           if( latch->pin & CLOCK_bit ) {
830 #ifdef unix
831                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
832 #else
833                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
834 #endif
835           }
836           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
837           continue;
838         }
839
840         //  update permanent page area in btree from buffer pool
841
842         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
843
844         if( latch->dirty )
845           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
846                 return NULL;
847           else
848                 latch->dirty = 0, bt->writes++;
849
850         //  unlink our available slot from its hash chain
851
852         if( latch->prev )
853                 bt->mgr->latchsets[latch->prev].next = latch->next;
854         else
855                 bt->mgr->hashtable[idx].slot = latch->next;
856
857         if( latch->next )
858                 bt->mgr->latchsets[latch->next].prev = latch->prev;
859
860         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
861
862         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
863                 return NULL;
864
865         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
866         return latch;
867   }
868 }
869
870 void bt_mgrclose (BtMgr *mgr)
871 {
872 BtLatchSet *latch;
873 uint num = 0;
874 BtPage page;
875 uint slot;
876
877         //      flush dirty pool pages to the btree
878
879         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
880                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
881                 latch = mgr->latchsets + slot;
882
883                 if( latch->dirty ) {
884                         bt_writepage(mgr, page, latch->page_no);
885                         latch->dirty = 0, num++;
886                 }
887 //              madvise (page, mgr->page_size, MADV_DONTNEED);
888         }
889
890         fprintf(stderr, "%d buffer pool pages flushed\n", num);
891
892 #ifdef unix
893         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
894         munmap (mgr->pagezero, mgr->page_size);
895 #else
896         FlushViewOfFile(mgr->pagezero, 0);
897         UnmapViewOfFile(mgr->pagezero);
898         UnmapViewOfFile(mgr->hashtable);
899         CloseHandle(mgr->halloc);
900         CloseHandle(mgr->hpool);
901 #endif
902 #ifdef unix
903         close (mgr->idx);
904         free (mgr);
905 #else
906         FlushFileBuffers(mgr->idx);
907         CloseHandle(mgr->idx);
908         GlobalFree (mgr);
909 #endif
910 }
911
912 //      close and release memory
913
914 void bt_close (BtDb *bt)
915 {
916 #ifdef unix
917         if( bt->mem )
918                 free (bt->mem);
919 #else
920         if( bt->mem)
921                 VirtualFree (bt->mem, 0, MEM_RELEASE);
922 #endif
923         free (bt);
924 }
925
926 //  open/create new btree buffer manager
927
928 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
929 //              size of page pool (e.g. 262144)
930
931 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
932 {
933 uint lvl, attr, last, slot, idx;
934 uint nlatchpage, latchhash;
935 unsigned char value[BtId];
936 int flag, initit = 0;
937 BtPageZero *pagezero;
938 off64_t size;
939 uint amt[1];
940 BtMgr* mgr;
941 BtKey* key;
942 BtVal *val;
943
944         // determine sanity of page size and buffer pool
945
946         if( bits > BT_maxbits )
947                 bits = BT_maxbits;
948         else if( bits < BT_minbits )
949                 bits = BT_minbits;
950
951         if( nodemax < 16 ) {
952                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
953                 return NULL;
954         }
955
956 #ifdef unix
957         mgr = calloc (1, sizeof(BtMgr));
958
959         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
960
961         if( mgr->idx == -1 ) {
962                 fprintf (stderr, "Unable to open btree file\n");
963                 return free(mgr), NULL;
964         }
965 #else
966         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
967         attr = FILE_ATTRIBUTE_NORMAL;
968         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
969
970         if( mgr->idx == INVALID_HANDLE_VALUE )
971                 return GlobalFree(mgr), NULL;
972 #endif
973
974 #ifdef unix
975         pagezero = valloc (BT_maxpage);
976         *amt = 0;
977
978         // read minimum page size to get root info
979         //      to support raw disk partition files
980         //      check if bits == 0 on the disk.
981
982         if( size = lseek (mgr->idx, 0L, 2) )
983                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
984                         if( pagezero->alloc->bits )
985                                 bits = pagezero->alloc->bits;
986                         else
987                                 initit = 1;
988                 else
989                         return free(mgr), free(pagezero), NULL;
990         else
991                 initit = 1;
992 #else
993         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
994         size = GetFileSize(mgr->idx, amt);
995
996         if( size || *amt ) {
997                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
998                         return bt_mgrclose (mgr), NULL;
999                 bits = pagezero->alloc->bits;
1000         } else
1001                 initit = 1;
1002 #endif
1003
1004         mgr->page_size = 1 << bits;
1005         mgr->page_bits = bits;
1006
1007         //  calculate number of latch hash table entries
1008
1009         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1010         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
1011
1012         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1013         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1014         mgr->latchtotal = nodemax;
1015
1016         if( !initit )
1017                 goto mgrlatch;
1018
1019         // initialize an empty b-tree with latch page, root page, page of leaves
1020         // and page(s) of latches and page pool cache
1021
1022         memset (pagezero, 0, 1 << bits);
1023         pagezero->alloc->bits = mgr->page_bits;
1024         bt_putid(pagezero->alloc->right, MIN_lvl+1);
1025
1026         //  initialize left-most LEAF page in
1027         //      alloc->left.
1028
1029         bt_putid (pagezero->alloc->left, LEAF_page);
1030
1031         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1032                 fprintf (stderr, "Unable to create btree page zero\n");
1033                 return bt_mgrclose (mgr), NULL;
1034         }
1035
1036         memset (pagezero, 0, 1 << bits);
1037         pagezero->alloc->bits = mgr->page_bits;
1038
1039         for( lvl=MIN_lvl; lvl--; ) {
1040                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1041                 key = keyptr(pagezero->alloc, 1);
1042                 key->len = 2;           // create stopper key
1043                 key->key[0] = 0xff;
1044                 key->key[1] = 0xff;
1045
1046                 bt_putid(value, MIN_lvl - lvl + 1);
1047                 val = valptr(pagezero->alloc, 1);
1048                 val->len = lvl ? BtId : 0;
1049                 memcpy (val->value, value, val->len);
1050
1051                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1052                 pagezero->alloc->lvl = lvl;
1053                 pagezero->alloc->cnt = 1;
1054                 pagezero->alloc->act = 1;
1055
1056                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1057                         fprintf (stderr, "Unable to create btree page zero\n");
1058                         return bt_mgrclose (mgr), NULL;
1059                 }
1060         }
1061
1062 mgrlatch:
1063 #ifdef unix
1064         free (pagezero);
1065 #else
1066         VirtualFree (pagezero, 0, MEM_RELEASE);
1067 #endif
1068 #ifdef unix
1069         // mlock the pagezero page
1070
1071         flag = PROT_READ | PROT_WRITE;
1072         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1073         if( mgr->pagezero == MAP_FAILED ) {
1074                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1075                 return bt_mgrclose (mgr), NULL;
1076         }
1077         mlock (mgr->pagezero, mgr->page_size);
1078
1079         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1080         if( mgr->hashtable == MAP_FAILED ) {
1081                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1082                 return bt_mgrclose (mgr), NULL;
1083         }
1084 #else
1085         flag = PAGE_READWRITE;
1086         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1087         if( !mgr->halloc ) {
1088                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1089                 return bt_mgrclose (mgr), NULL;
1090         }
1091
1092         flag = FILE_MAP_WRITE;
1093         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1094         if( !mgr->pagezero ) {
1095                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1096                 return bt_mgrclose (mgr), NULL;
1097         }
1098
1099         flag = PAGE_READWRITE;
1100         size = (uid)mgr->nlatchpage << mgr->page_bits;
1101         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1102         if( !mgr->hpool ) {
1103                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1104                 return bt_mgrclose (mgr), NULL;
1105         }
1106
1107         flag = FILE_MAP_WRITE;
1108         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1109         if( !mgr->hashtable ) {
1110                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1111                 return bt_mgrclose (mgr), NULL;
1112         }
1113 #endif
1114
1115         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1116         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1117
1118         return mgr;
1119 }
1120
1121 //      open BTree access method
1122 //      based on buffer manager
1123
1124 BtDb *bt_open (BtMgr *mgr)
1125 {
1126 BtDb *bt = malloc (sizeof(*bt));
1127
1128         memset (bt, 0, sizeof(*bt));
1129         bt->mgr = mgr;
1130 #ifdef unix
1131         bt->mem = valloc (2 *mgr->page_size);
1132 #else
1133         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1134 #endif
1135         bt->frame = (BtPage)bt->mem;
1136         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1137 #ifdef unix
1138         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1139 #else
1140         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1141 #endif
1142         return bt;
1143 }
1144
1145 //  compare two keys, return > 0, = 0, or < 0
1146 //  =0: keys are same
1147 //  -1: key2 > key1
1148 //  +1: key2 < key1
1149 //  as the comparison value
1150
1151 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1152 {
1153 uint len1 = key1->len;
1154 int ans;
1155
1156         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1157                 return ans;
1158
1159         if( len1 > len2 )
1160                 return 1;
1161         if( len1 < len2 )
1162                 return -1;
1163
1164         return 0;
1165 }
1166
1167 // place write, read, or parent lock on requested page_no.
1168
1169 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1170 {
1171         switch( mode ) {
1172         case BtLockRead:
1173                 ReadLock (latch->readwr);
1174                 break;
1175         case BtLockWrite:
1176                 WriteLock (latch->readwr);
1177                 break;
1178         case BtLockAccess:
1179                 ReadLock (latch->access);
1180                 break;
1181         case BtLockDelete:
1182                 WriteLock (latch->access);
1183                 break;
1184         case BtLockParent:
1185                 WriteOLock (latch->parent, bt->thread_no);
1186                 break;
1187         case BtLockAtomic:
1188                 WriteOLock (latch->atomic, bt->thread_no);
1189                 break;
1190         case BtLockAtomic | BtLockRead:
1191                 WriteOLock (latch->atomic, bt->thread_no);
1192                 ReadLock (latch->readwr);
1193                 break;
1194         }
1195 }
1196
1197 // remove write, read, or parent lock on requested page
1198
1199 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1200 {
1201         switch( mode ) {
1202         case BtLockRead:
1203                 ReadRelease (latch->readwr);
1204                 break;
1205         case BtLockWrite:
1206                 WriteRelease (latch->readwr);
1207                 break;
1208         case BtLockAccess:
1209                 ReadRelease (latch->access);
1210                 break;
1211         case BtLockDelete:
1212                 WriteRelease (latch->access);
1213                 break;
1214         case BtLockParent:
1215                 WriteORelease (latch->parent);
1216                 break;
1217         case BtLockAtomic:
1218                 WriteORelease (latch->atomic);
1219                 break;
1220         case BtLockAtomic | BtLockRead:
1221                 WriteORelease (latch->atomic);
1222                 ReadRelease (latch->readwr);
1223                 break;
1224         }
1225 }
1226
1227 //      allocate a new page
1228 //      return with page latched, but unlocked.
1229
1230 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1231 {
1232 uid page_no;
1233 int blk;
1234
1235         //      lock allocation page
1236
1237         bt_spinwritelock(bt->mgr->lock);
1238
1239         // use empty chain first
1240         // else allocate empty page
1241
1242         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1243                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1244                         set->page = bt_mappage (bt, set->latch);
1245                 else
1246                         return bt->err = BTERR_struct, -1;
1247
1248                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1249                 bt_spinreleasewrite(bt->mgr->lock);
1250
1251                 memcpy (set->page, contents, bt->mgr->page_size);
1252                 set->latch->dirty = 1;
1253                 return 0;
1254         }
1255
1256         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1257         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1258
1259         // unlock allocation latch
1260
1261         bt_spinreleasewrite(bt->mgr->lock);
1262
1263         //      don't load cache from btree page
1264
1265         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1266                 set->page = bt_mappage (bt, set->latch);
1267         else
1268                 return bt->err = BTERR_struct;
1269
1270         memcpy (set->page, contents, bt->mgr->page_size);
1271         set->latch->dirty = 1;
1272         return 0;
1273 }
1274
1275 //  find slot in page for given key at a given level
1276
1277 int bt_findslot (BtPage page, unsigned char *key, uint len)
1278 {
1279 uint diff, higher = page->cnt, low = 1, slot;
1280 uint good = 0;
1281
1282         //        make stopper key an infinite fence value
1283
1284         if( bt_getid (page->right) )
1285                 higher++;
1286         else
1287                 good++;
1288
1289         //      low is the lowest candidate.
1290         //  loop ends when they meet
1291
1292         //  higher is already
1293         //      tested as .ge. the passed key.
1294
1295         while( diff = higher - low ) {
1296                 slot = low + ( diff >> 1 );
1297                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1298                         low = slot + 1;
1299                 else
1300                         higher = slot, good++;
1301         }
1302
1303         //      return zero if key is on right link page
1304
1305         return good ? higher : 0;
1306 }
1307
1308 //  find and load page at given level for given key
1309 //      leave page rd or wr locked as requested
1310
1311 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1312 {
1313 uid page_no = ROOT_page, prevpage = 0;
1314 uint drill = 0xff, slot;
1315 BtLatchSet *prevlatch;
1316 uint mode, prevmode;
1317
1318   //  start at root of btree and drill down
1319
1320   do {
1321         // determine lock mode of drill level
1322         mode = (drill == lvl) ? lock : BtLockRead; 
1323
1324         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1325           return 0;
1326
1327         // obtain access lock using lock chaining with Access mode
1328
1329         if( page_no > ROOT_page )
1330           bt_lockpage(bt, BtLockAccess, set->latch);
1331
1332         set->page = bt_mappage (bt, set->latch);
1333
1334         //      release & unpin parent or left sibling page
1335
1336         if( prevpage ) {
1337           bt_unlockpage(bt, prevmode, prevlatch);
1338           bt_unpinlatch (prevlatch);
1339           prevpage = 0;
1340         }
1341
1342         // obtain mode lock using lock chaining through AccessLock
1343
1344         bt_lockpage(bt, mode, set->latch);
1345
1346         if( set->page->free )
1347                 return bt->err = BTERR_struct, 0;
1348
1349         if( page_no > ROOT_page )
1350           bt_unlockpage(bt, BtLockAccess, set->latch);
1351
1352         // re-read and re-lock root after determining actual level of root
1353
1354         if( set->page->lvl != drill) {
1355                 if( set->latch->page_no != ROOT_page )
1356                         return bt->err = BTERR_struct, 0;
1357                         
1358                 drill = set->page->lvl;
1359
1360                 if( lock != BtLockRead && drill == lvl ) {
1361                   bt_unlockpage(bt, mode, set->latch);
1362                   bt_unpinlatch (set->latch);
1363                   continue;
1364                 }
1365         }
1366
1367         prevpage = set->latch->page_no;
1368         prevlatch = set->latch;
1369         prevmode = mode;
1370
1371         //  find key on page at this level
1372         //  and descend to requested level
1373
1374         if( !set->page->kill )
1375          if( slot = bt_findslot (set->page, key, len) ) {
1376           if( drill == lvl )
1377                 return slot;
1378
1379           // find next non-dead slot -- the fence key if nothing else
1380
1381           while( slotptr(set->page, slot)->dead )
1382                 if( slot++ < set->page->cnt )
1383                   continue;
1384                 else
1385                   return bt->err = BTERR_struct, 0;
1386
1387           page_no = bt_getid(valptr(set->page, slot)->value);
1388           drill--;
1389           continue;
1390          }
1391
1392         //  or slide right into next page
1393
1394         page_no = bt_getid(set->page->right);
1395   } while( page_no );
1396
1397   // return error on end of right chain
1398
1399   bt->err = BTERR_struct;
1400   return 0;     // return error
1401 }
1402
1403 //      return page to free list
1404 //      page must be delete & write locked
1405
1406 void bt_freepage (BtDb *bt, BtPageSet *set)
1407 {
1408         //      lock allocation page
1409
1410         bt_spinwritelock (bt->mgr->lock);
1411
1412         //      store chain
1413
1414         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1415         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1416         set->latch->dirty = 1;
1417         set->page->free = 1;
1418
1419         // unlock released page
1420
1421         bt_unlockpage (bt, BtLockDelete, set->latch);
1422         bt_unlockpage (bt, BtLockWrite, set->latch);
1423         bt_unpinlatch (set->latch);
1424
1425         // unlock allocation page
1426
1427         bt_spinreleasewrite (bt->mgr->lock);
1428 }
1429
1430 //      a fence key was deleted from a page
1431 //      push new fence value upwards
1432
1433 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1434 {
1435 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1436 unsigned char value[BtId];
1437 BtKey* ptr;
1438 uint idx;
1439
1440         //      remove the old fence value
1441
1442         ptr = keyptr(set->page, set->page->cnt);
1443         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1444         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1445         set->latch->dirty = 1;
1446
1447         //  cache new fence value
1448
1449         ptr = keyptr(set->page, set->page->cnt);
1450         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1451
1452         bt_lockpage (bt, BtLockParent, set->latch);
1453         bt_unlockpage (bt, BtLockWrite, set->latch);
1454
1455         //      insert new (now smaller) fence key
1456
1457         bt_putid (value, set->latch->page_no);
1458         ptr = (BtKey*)leftkey;
1459
1460         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1461           return bt->err;
1462
1463         //      now delete old fence key
1464
1465         ptr = (BtKey*)rightkey;
1466
1467         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1468                 return bt->err;
1469
1470         bt_unlockpage (bt, BtLockParent, set->latch);
1471         bt_unpinlatch(set->latch);
1472         return 0;
1473 }
1474
1475 //      root has a single child
1476 //      collapse a level from the tree
1477
1478 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1479 {
1480 BtPageSet child[1];
1481 uid page_no;
1482 uint idx;
1483
1484   // find the child entry and promote as new root contents
1485
1486   do {
1487         for( idx = 0; idx++ < root->page->cnt; )
1488           if( !slotptr(root->page, idx)->dead )
1489                 break;
1490
1491         page_no = bt_getid (valptr(root->page, idx)->value);
1492
1493         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1494                 child->page = bt_mappage (bt, child->latch);
1495         else
1496                 return bt->err;
1497
1498         bt_lockpage (bt, BtLockDelete, child->latch);
1499         bt_lockpage (bt, BtLockWrite, child->latch);
1500
1501         memcpy (root->page, child->page, bt->mgr->page_size);
1502         root->latch->dirty = 1;
1503
1504         bt_freepage (bt, child);
1505
1506   } while( root->page->lvl > 1 && root->page->act == 1 );
1507
1508   bt_unlockpage (bt, BtLockWrite, root->latch);
1509   bt_unpinlatch (root->latch);
1510   return 0;
1511 }
1512
1513 //  delete a page and manage keys
1514 //  call with page writelocked
1515 //      returns with page unpinned
1516
1517 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1518 {
1519 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1520 unsigned char value[BtId];
1521 uint lvl = set->page->lvl;
1522 BtPageSet right[1];
1523 uid page_no;
1524 BtKey *ptr;
1525
1526         //      cache copy of fence key
1527         //      to post in parent
1528
1529         ptr = keyptr(set->page, set->page->cnt);
1530         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1531
1532         //      obtain lock on right page
1533
1534         page_no = bt_getid(set->page->right);
1535
1536         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1537                 right->page = bt_mappage (bt, right->latch);
1538         else
1539                 return 0;
1540
1541         bt_lockpage (bt, BtLockWrite, right->latch);
1542
1543         // cache copy of key to update
1544
1545         ptr = keyptr(right->page, right->page->cnt);
1546         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1547
1548         if( right->page->kill )
1549                 return bt->err = BTERR_struct;
1550
1551         // pull contents of right peer into our empty page
1552
1553         memcpy (set->page, right->page, bt->mgr->page_size);
1554         set->latch->dirty = 1;
1555
1556         // mark right page deleted and point it to left page
1557         //      until we can post parent updates that remove access
1558         //      to the deleted page.
1559
1560         bt_putid (right->page->right, set->latch->page_no);
1561         right->latch->dirty = 1;
1562         right->page->kill = 1;
1563
1564         bt_lockpage (bt, BtLockParent, right->latch);
1565         bt_unlockpage (bt, BtLockWrite, right->latch);
1566
1567         bt_lockpage (bt, BtLockParent, set->latch);
1568         bt_unlockpage (bt, BtLockWrite, set->latch);
1569
1570         // redirect higher key directly to our new node contents
1571
1572         bt_putid (value, set->latch->page_no);
1573         ptr = (BtKey*)higherfence;
1574
1575         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1576           return bt->err;
1577
1578         //      delete old lower key to our node
1579
1580         ptr = (BtKey*)lowerfence;
1581
1582         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1583           return bt->err;
1584
1585         //      obtain delete and write locks to right node
1586
1587         bt_unlockpage (bt, BtLockParent, right->latch);
1588         bt_lockpage (bt, BtLockDelete, right->latch);
1589         bt_lockpage (bt, BtLockWrite, right->latch);
1590         bt_freepage (bt, right);
1591
1592         bt_unlockpage (bt, BtLockParent, set->latch);
1593         bt_unpinlatch (set->latch);
1594         return 0;
1595 }
1596
1597 //  find and delete key on page by marking delete flag bit
1598 //  if page becomes empty, delete it from the btree
1599
1600 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1601 {
1602 uint slot, idx, found, fence;
1603 BtPageSet set[1];
1604 BtKey *ptr;
1605 BtVal *val;
1606
1607         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1608                 ptr = keyptr(set->page, slot);
1609         else
1610                 return bt->err;
1611
1612         // if librarian slot, advance to real slot
1613
1614         if( slotptr(set->page, slot)->type == Librarian )
1615                 ptr = keyptr(set->page, ++slot);
1616
1617         fence = slot == set->page->cnt;
1618
1619         // if key is found delete it, otherwise ignore request
1620
1621         if( found = !keycmp (ptr, key, len) )
1622           if( found = slotptr(set->page, slot)->dead == 0 ) {
1623                 val = valptr(set->page,slot);
1624                 slotptr(set->page, slot)->dead = 1;
1625                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1626                 set->page->act--;
1627
1628                 // collapse empty slots beneath the fence
1629
1630                 while( idx = set->page->cnt - 1 )
1631                   if( slotptr(set->page, idx)->dead ) {
1632                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1633                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1634                   } else
1635                         break;
1636           }
1637
1638         //      did we delete a fence key in an upper level?
1639
1640         if( found && lvl && set->page->act && fence )
1641           if( bt_fixfence (bt, set, lvl) )
1642                 return bt->err;
1643           else
1644                 return 0;
1645
1646         //      do we need to collapse root?
1647
1648         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1649           if( bt_collapseroot (bt, set) )
1650                 return bt->err;
1651           else
1652                 return 0;
1653
1654         //      delete empty page
1655
1656         if( !set->page->act )
1657                 return bt_deletepage (bt, set);
1658
1659         set->latch->dirty = 1;
1660         bt_unlockpage(bt, BtLockWrite, set->latch);
1661         bt_unpinlatch (set->latch);
1662         return 0;
1663 }
1664
1665 BtKey *bt_foundkey (BtDb *bt)
1666 {
1667         return (BtKey*)(bt->key);
1668 }
1669
1670 //      advance to next slot
1671
1672 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1673 {
1674 BtLatchSet *prevlatch;
1675 uid page_no;
1676
1677         if( slot < set->page->cnt )
1678                 return slot + 1;
1679
1680         prevlatch = set->latch;
1681
1682         if( page_no = bt_getid(set->page->right) )
1683           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1684                 set->page = bt_mappage (bt, set->latch);
1685           else
1686                 return 0;
1687         else
1688           return bt->err = BTERR_struct, 0;
1689
1690         // obtain access lock using lock chaining with Access mode
1691
1692         bt_lockpage(bt, BtLockAccess, set->latch);
1693
1694         bt_unlockpage(bt, BtLockRead, prevlatch);
1695         bt_unpinlatch (prevlatch);
1696
1697         bt_lockpage(bt, BtLockRead, set->latch);
1698         bt_unlockpage(bt, BtLockAccess, set->latch);
1699         return 1;
1700 }
1701
1702 //      find unique key or first duplicate key in
1703 //      leaf level and return number of value bytes
1704 //      or (-1) if not found.  Setup key for bt_foundkey
1705
1706 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1707 {
1708 BtPageSet set[1];
1709 uint len, slot;
1710 int ret = -1;
1711 BtKey *ptr;
1712 BtVal *val;
1713
1714   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1715    do {
1716         ptr = keyptr(set->page, slot);
1717
1718         //      skip librarian slot place holder
1719
1720         if( slotptr(set->page, slot)->type == Librarian )
1721                 ptr = keyptr(set->page, ++slot);
1722
1723         //      return actual key found
1724
1725         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1726         len = ptr->len;
1727
1728         if( slotptr(set->page, slot)->type == Duplicate )
1729                 len -= BtId;
1730
1731         //      not there if we reach the stopper key
1732
1733         if( slot == set->page->cnt )
1734           if( !bt_getid (set->page->right) )
1735                 break;
1736
1737         // if key exists, return >= 0 value bytes copied
1738         //      otherwise return (-1)
1739
1740         if( slotptr(set->page, slot)->dead )
1741                 continue;
1742
1743         if( keylen == len )
1744           if( !memcmp (ptr->key, key, len) ) {
1745                 val = valptr (set->page,slot);
1746                 if( valmax > val->len )
1747                         valmax = val->len;
1748                 memcpy (value, val->value, valmax);
1749                 ret = valmax;
1750           }
1751
1752         break;
1753
1754    } while( slot = bt_findnext (bt, set, slot) );
1755
1756   bt_unlockpage (bt, BtLockRead, set->latch);
1757   bt_unpinlatch (set->latch);
1758   return ret;
1759 }
1760
1761 //      check page for space available,
1762 //      clean if necessary and return
1763 //      0 - page needs splitting
1764 //      >0  new slot value
1765
1766 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1767 {
1768 uint nxt = bt->mgr->page_size;
1769 BtPage page = set->page;
1770 uint cnt = 0, idx = 0;
1771 uint max = page->cnt;
1772 uint newslot = max;
1773 BtKey *key;
1774 BtVal *val;
1775
1776         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1777                 return slot;
1778
1779         //      skip cleanup and proceed to split
1780         //      if there's not enough garbage
1781         //      to bother with.
1782
1783         if( page->garbage < nxt / 5 )
1784                 return 0;
1785
1786         memcpy (bt->frame, page, bt->mgr->page_size);
1787
1788         // skip page info and set rest of page to zero
1789
1790         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1791         set->latch->dirty = 1;
1792         page->garbage = 0;
1793         page->act = 0;
1794
1795         // clean up page first by
1796         // removing deleted keys
1797
1798         while( cnt++ < max ) {
1799                 if( cnt == slot )
1800                         newslot = idx + 2;
1801
1802                 if( cnt < max || bt->frame->lvl )
1803                   if( slotptr(bt->frame,cnt)->dead )
1804                         continue;
1805
1806                 // copy the value across
1807
1808                 val = valptr(bt->frame, cnt);
1809                 nxt -= val->len + sizeof(BtVal);
1810                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1811
1812                 // copy the key across
1813
1814                 key = keyptr(bt->frame, cnt);
1815                 nxt -= key->len + sizeof(BtKey);
1816                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1817
1818                 // make a librarian slot
1819
1820                 slotptr(page, ++idx)->off = nxt;
1821                 slotptr(page, idx)->type = Librarian;
1822                 slotptr(page, idx)->dead = 1;
1823
1824                 // set up the slot
1825
1826                 slotptr(page, ++idx)->off = nxt;
1827                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1828
1829                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1830                         page->act++;
1831         }
1832
1833         page->min = nxt;
1834         page->cnt = idx;
1835
1836         //      see if page has enough space now, or does it need splitting?
1837
1838         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1839                 return newslot;
1840
1841         return 0;
1842 }
1843
1844 // split the root and raise the height of the btree
1845
1846 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1847 {  
1848 unsigned char leftkey[BT_keyarray];
1849 uint nxt = bt->mgr->page_size;
1850 unsigned char value[BtId];
1851 BtPageSet left[1];
1852 uid left_page_no;
1853 BtKey *ptr;
1854 BtVal *val;
1855
1856         //      save left page fence key for new root
1857
1858         ptr = keyptr(root->page, root->page->cnt);
1859         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1860
1861         //  Obtain an empty page to use, and copy the current
1862         //  root contents into it, e.g. lower keys
1863
1864         if( bt_newpage(bt, left, root->page) )
1865                 return bt->err;
1866
1867         left_page_no = left->latch->page_no;
1868         bt_unpinlatch (left->latch);
1869
1870         // preserve the page info at the bottom
1871         // of higher keys and set rest to zero
1872
1873         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1874
1875         // insert stopper key at top of newroot page
1876         // and increase the root height
1877
1878         nxt -= BtId + sizeof(BtVal);
1879         bt_putid (value, right->page_no);
1880         val = (BtVal *)((unsigned char *)root->page + nxt);
1881         memcpy (val->value, value, BtId);
1882         val->len = BtId;
1883
1884         nxt -= 2 + sizeof(BtKey);
1885         slotptr(root->page, 2)->off = nxt;
1886         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1887         ptr->len = 2;
1888         ptr->key[0] = 0xff;
1889         ptr->key[1] = 0xff;
1890
1891         // insert lower keys page fence key on newroot page as first key
1892
1893         nxt -= BtId + sizeof(BtVal);
1894         bt_putid (value, left_page_no);
1895         val = (BtVal *)((unsigned char *)root->page + nxt);
1896         memcpy (val->value, value, BtId);
1897         val->len = BtId;
1898
1899         ptr = (BtKey *)leftkey;
1900         nxt -= ptr->len + sizeof(BtKey);
1901         slotptr(root->page, 1)->off = nxt;
1902         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1903         
1904         bt_putid(root->page->right, 0);
1905         root->page->min = nxt;          // reset lowest used offset and key count
1906         root->page->cnt = 2;
1907         root->page->act = 2;
1908         root->page->lvl++;
1909
1910         // release and unpin root pages
1911
1912         bt_unlockpage(bt, BtLockWrite, root->latch);
1913         bt_unpinlatch (root->latch);
1914
1915         bt_unpinlatch (right);
1916         return 0;
1917 }
1918
1919 //  split already locked full node
1920 //      leave it locked.
1921 //      return pool entry for new right
1922 //      page, unlocked
1923
1924 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1925 {
1926 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1927 uint lvl = set->page->lvl;
1928 BtPageSet right[1];
1929 BtKey *key, *ptr;
1930 BtVal *val, *src;
1931 uid right2;
1932 uint prev;
1933
1934         //  split higher half of keys to bt->frame
1935
1936         memset (bt->frame, 0, bt->mgr->page_size);
1937         max = set->page->cnt;
1938         cnt = max / 2;
1939         idx = 0;
1940
1941         while( cnt++ < max ) {
1942                 if( cnt < max || set->page->lvl )
1943                   if( slotptr(set->page, cnt)->dead )
1944                         continue;
1945
1946                 src = valptr(set->page, cnt);
1947                 nxt -= src->len + sizeof(BtVal);
1948                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1949
1950                 key = keyptr(set->page, cnt);
1951                 nxt -= key->len + sizeof(BtKey);
1952                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1953                 memcpy (ptr, key, key->len + sizeof(BtKey));
1954
1955                 //      add librarian slot
1956
1957                 slotptr(bt->frame, ++idx)->off = nxt;
1958                 slotptr(bt->frame, idx)->type = Librarian;
1959                 slotptr(bt->frame, idx)->dead = 1;
1960
1961                 //  add actual slot
1962
1963                 slotptr(bt->frame, ++idx)->off = nxt;
1964                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1965
1966                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1967                         bt->frame->act++;
1968         }
1969
1970         bt->frame->bits = bt->mgr->page_bits;
1971         bt->frame->min = nxt;
1972         bt->frame->cnt = idx;
1973         bt->frame->lvl = lvl;
1974
1975         // link right node
1976
1977         if( set->latch->page_no > ROOT_page )
1978                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1979
1980         //      get new free page and write higher keys to it.
1981
1982         if( bt_newpage(bt, right, bt->frame) )
1983                 return 0;
1984
1985         memcpy (bt->frame, set->page, bt->mgr->page_size);
1986         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1987         set->latch->dirty = 1;
1988
1989         nxt = bt->mgr->page_size;
1990         set->page->garbage = 0;
1991         set->page->act = 0;
1992         max /= 2;
1993         cnt = 0;
1994         idx = 0;
1995
1996         if( slotptr(bt->frame, max)->type == Librarian )
1997                 max--;
1998
1999         //  assemble page of smaller keys
2000
2001         while( cnt++ < max ) {
2002                 if( slotptr(bt->frame, cnt)->dead )
2003                         continue;
2004                 val = valptr(bt->frame, cnt);
2005                 nxt -= val->len + sizeof(BtVal);
2006                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2007
2008                 key = keyptr(bt->frame, cnt);
2009                 nxt -= key->len + sizeof(BtKey);
2010                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2011
2012                 //      add librarian slot
2013
2014                 slotptr(set->page, ++idx)->off = nxt;
2015                 slotptr(set->page, idx)->type = Librarian;
2016                 slotptr(set->page, idx)->dead = 1;
2017
2018                 //      add actual slot
2019
2020                 slotptr(set->page, ++idx)->off = nxt;
2021                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2022                 set->page->act++;
2023         }
2024
2025         bt_putid(set->page->right, right->latch->page_no);
2026         set->page->min = nxt;
2027         set->page->cnt = idx;
2028
2029         return right->latch->entry;
2030 }
2031
2032 //      fix keys for newly split page
2033 //      call with page locked, return
2034 //      unlocked
2035
2036 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2037 {
2038 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2039 unsigned char value[BtId];
2040 uint lvl = set->page->lvl;
2041 BtPage page;
2042 BtKey *ptr;
2043
2044         // if current page is the root page, split it
2045
2046         if( set->latch->page_no == ROOT_page )
2047                 return bt_splitroot (bt, set, right);
2048
2049         ptr = keyptr(set->page, set->page->cnt);
2050         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2051
2052         page = bt_mappage (bt, right);
2053
2054         ptr = keyptr(page, page->cnt);
2055         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2056
2057         // insert new fences in their parent pages
2058
2059         bt_lockpage (bt, BtLockParent, right);
2060
2061         bt_lockpage (bt, BtLockParent, set->latch);
2062         bt_unlockpage (bt, BtLockWrite, set->latch);
2063
2064         // insert new fence for reformulated left block of smaller keys
2065
2066         bt_putid (value, set->latch->page_no);
2067         ptr = (BtKey *)leftkey;
2068
2069         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2070                 return bt->err;
2071
2072         // switch fence for right block of larger keys to new right page
2073
2074         bt_putid (value, right->page_no);
2075         ptr = (BtKey *)rightkey;
2076
2077         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2078                 return bt->err;
2079
2080         bt_unlockpage (bt, BtLockParent, set->latch);
2081         bt_unpinlatch (set->latch);
2082
2083         bt_unlockpage (bt, BtLockParent, right);
2084         bt_unpinlatch (right);
2085         return 0;
2086 }
2087
2088 //      install new key and value onto page
2089 //      page must already be checked for
2090 //      adequate space
2091
2092 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2093 {
2094 uint idx, librarian;
2095 BtSlot *node;
2096 BtKey *ptr;
2097 BtVal *val;
2098
2099         //      if found slot > desired slot and previous slot
2100         //      is a librarian slot, use it
2101
2102         if( slot > 1 )
2103           if( slotptr(set->page, slot-1)->type == Librarian )
2104                 slot--;
2105
2106         // copy value onto page
2107
2108         set->page->min -= vallen + sizeof(BtVal);
2109         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2110         memcpy (val->value, value, vallen);
2111         val->len = vallen;
2112
2113         // copy key onto page
2114
2115         set->page->min -= keylen + sizeof(BtKey);
2116         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2117         memcpy (ptr->key, key, keylen);
2118         ptr->len = keylen;
2119         
2120         //      find first empty slot
2121
2122         for( idx = slot; idx < set->page->cnt; idx++ )
2123           if( slotptr(set->page, idx)->dead )
2124                 break;
2125
2126         // now insert key into array before slot
2127
2128         if( idx == set->page->cnt )
2129                 idx += 2, set->page->cnt += 2, librarian = 2;
2130         else
2131                 librarian = 1;
2132
2133         set->latch->dirty = 1;
2134         set->page->act++;
2135
2136         while( idx > slot + librarian - 1 )
2137                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2138
2139         //      add librarian slot
2140
2141         if( librarian > 1 ) {
2142                 node = slotptr(set->page, slot++);
2143                 node->off = set->page->min;
2144                 node->type = Librarian;
2145                 node->dead = 1;
2146         }
2147
2148         //      fill in new slot
2149
2150         node = slotptr(set->page, slot);
2151         node->off = set->page->min;
2152         node->type = type;
2153         node->dead = 0;
2154
2155         if( release ) {
2156                 bt_unlockpage (bt, BtLockWrite, set->latch);
2157                 bt_unpinlatch (set->latch);
2158         }
2159
2160         return 0;
2161 }
2162
2163 //  Insert new key into the btree at given level.
2164 //      either add a new key or update/add an existing one
2165
2166 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2167 {
2168 unsigned char newkey[BT_keyarray];
2169 uint slot, idx, len, entry;
2170 BtPageSet set[1];
2171 BtKey *ptr, *ins;
2172 uid sequence;
2173 BtVal *val;
2174 uint type;
2175
2176   // set up the key we're working on
2177
2178   ins = (BtKey*)newkey;
2179   memcpy (ins->key, key, keylen);
2180   ins->len = keylen;
2181
2182   // is this a non-unique index value?
2183
2184   if( unique )
2185         type = Unique;
2186   else {
2187         type = Duplicate;
2188         sequence = bt_newdup (bt);
2189         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2190         ins->len += BtId;
2191   }
2192
2193   while( 1 ) { // find the page and slot for the current key
2194         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2195                 ptr = keyptr(set->page, slot);
2196         else {
2197                 if( !bt->err )
2198                         bt->err = BTERR_ovflw;
2199                 return bt->err;
2200         }
2201
2202         // if librarian slot == found slot, advance to real slot
2203
2204         if( slotptr(set->page, slot)->type == Librarian )
2205           if( !keycmp (ptr, key, keylen) )
2206                 ptr = keyptr(set->page, ++slot);
2207
2208         len = ptr->len;
2209
2210         if( slotptr(set->page, slot)->type == Duplicate )
2211                 len -= BtId;
2212
2213         //  if inserting a duplicate key or unique key
2214         //      check for adequate space on the page
2215         //      and insert the new key before slot.
2216
2217         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2218           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2219                 if( !(entry = bt_splitpage (bt, set)) )
2220                   return bt->err;
2221                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2222                   return bt->err;
2223                 else
2224                   continue;
2225
2226           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2227         }
2228
2229         // if key already exists, update value and return
2230
2231         val = valptr(set->page, slot);
2232
2233         if( val->len >= vallen ) {
2234                 if( slotptr(set->page, slot)->dead )
2235                         set->page->act++;
2236                 set->page->garbage += val->len - vallen;
2237                 set->latch->dirty = 1;
2238                 slotptr(set->page, slot)->dead = 0;
2239                 val->len = vallen;
2240                 memcpy (val->value, value, vallen);
2241                 bt_unlockpage(bt, BtLockWrite, set->latch);
2242                 bt_unpinlatch (set->latch);
2243                 return 0;
2244         }
2245
2246         //      new update value doesn't fit in existing value area
2247
2248         if( !slotptr(set->page, slot)->dead )
2249                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2250         else {
2251                 slotptr(set->page, slot)->dead = 0;
2252                 set->page->act++;
2253         }
2254
2255         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2256           if( !(entry = bt_splitpage (bt, set)) )
2257                 return bt->err;
2258           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2259                 return bt->err;
2260           else
2261                 continue;
2262
2263         set->page->min -= vallen + sizeof(BtVal);
2264         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2265         memcpy (val->value, value, vallen);
2266         val->len = vallen;
2267
2268         set->latch->dirty = 1;
2269         set->page->min -= keylen + sizeof(BtKey);
2270         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2271         memcpy (ptr->key, key, keylen);
2272         ptr->len = keylen;
2273         
2274         slotptr(set->page, slot)->off = set->page->min;
2275         bt_unlockpage(bt, BtLockWrite, set->latch);
2276         bt_unpinlatch (set->latch);
2277         return 0;
2278   }
2279   return 0;
2280 }
2281
2282 typedef struct {
2283         uint entry;                     // latch table entry number
2284         uint slot:31;           // page slot number
2285         uint reuse:1;           // reused previous page
2286 } AtomicTxn;
2287
2288 typedef struct {
2289         uid page_no;            // page number for split leaf
2290         void *next;                     // next key to insert
2291         uint entry:29;          // latch table entry number
2292         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2293         uint nounlock:1;        // don't unlock ParentModification
2294         unsigned char leafkey[BT_keyarray];
2295 } AtomicKey;
2296
2297 //      determine actual page where key is located
2298 //  return slot number
2299
2300 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2301 {
2302 BtKey *key = keyptr(source,src);
2303 uint slot = locks[src].slot;
2304 uint entry;
2305
2306         if( src > 1 && locks[src].reuse )
2307           entry = locks[src-1].entry, slot = 0;
2308         else
2309           entry = locks[src].entry;
2310
2311         if( slot ) {
2312                 set->latch = bt->mgr->latchsets + entry;
2313                 set->page = bt_mappage (bt, set->latch);
2314                 return slot;
2315         }
2316
2317         //      is locks->reuse set? or was slot zeroed?
2318         //      if so, find where our key is located 
2319         //      on current page or pages split on
2320         //      same page txn operations.
2321
2322         do {
2323                 set->latch = bt->mgr->latchsets + entry;
2324                 set->page = bt_mappage (bt, set->latch);
2325
2326                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2327                   if( slotptr(set->page, slot)->type == Librarian )
2328                         slot++;
2329                   if( locks[src].reuse )
2330                         locks[src].entry = entry;
2331                   return slot;
2332                 }
2333         } while( entry = set->latch->split );
2334
2335         bt->err = BTERR_atomic;
2336         return 0;
2337 }
2338
2339 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2340 {
2341 BtKey *key = keyptr(source, src);
2342 BtVal *val = valptr(source, src);
2343 BtLatchSet *latch;
2344 BtPageSet set[1];
2345 uint entry, slot;
2346
2347   while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2348         if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
2349           return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2350
2351         if( entry = bt_splitpage (bt, set) )
2352           latch = bt->mgr->latchsets + entry;
2353         else
2354           return bt->err;
2355
2356         //      splice right page into split chain
2357         //      and WriteLock it.
2358
2359         bt_lockpage(bt, BtLockWrite, latch);
2360         latch->split = set->latch->split;
2361         set->latch->split = entry;
2362         locks[src].slot = 0;
2363   }
2364
2365   return bt->err = BTERR_atomic;
2366 }
2367
2368 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2369 {
2370 BtKey *key = keyptr(source, src);
2371 uint idx, entry, slot;
2372 BtPageSet set[1];
2373 BtKey *ptr;
2374 BtVal *val;
2375
2376         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2377           ptr = keyptr(set->page, slot);
2378         else
2379           return bt->err = BTERR_struct;
2380
2381         if( !keycmp (ptr, key->key, key->len) )
2382           if( !slotptr(set->page, slot)->dead )
2383                 slotptr(set->page, slot)->dead = 1;
2384           else
2385                 return 0;
2386         else
2387                 return 0;
2388
2389         val = valptr(set->page, slot);
2390         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2391         set->latch->dirty = 1;
2392         set->page->act--;
2393         bt->found++;
2394         return 0;
2395 }
2396
2397 //      delete an empty master page for a transaction
2398
2399 //      note that the far right page never empties because
2400 //      it always contains (at least) the infinite stopper key
2401 //      and that all pages that don't contain any keys are
2402 //      deleted, or are being held under Atomic lock.
2403
2404 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2405 {
2406 BtPageSet right[1], temp[1];
2407 unsigned char value[BtId];
2408 uid right_page_no;
2409 BtKey *ptr;
2410
2411         bt_lockpage(bt, BtLockWrite, prev->latch);
2412
2413         //      grab the right sibling
2414
2415         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2416                 right->page = bt_mappage (bt, right->latch);
2417         else
2418                 return bt->err;
2419
2420         bt_lockpage(bt, BtLockAtomic, right->latch);
2421         bt_lockpage(bt, BtLockWrite, right->latch);
2422
2423         //      and pull contents over empty page
2424         //      while preserving master's left link
2425
2426         memcpy (right->page->left, prev->page->left, BtId);
2427         memcpy (prev->page, right->page, bt->mgr->page_size);
2428
2429         //      forward seekers to old right sibling
2430         //      to new page location in set
2431
2432         bt_putid (right->page->right, prev->latch->page_no);
2433         right->latch->dirty = 1;
2434         right->page->kill = 1;
2435
2436         //      remove pointer to right page for searchers by
2437         //      changing right fence key to point to the master page
2438
2439         ptr = keyptr(right->page,right->page->cnt);
2440         bt_putid (value, prev->latch->page_no);
2441
2442         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2443                 return bt->err;
2444
2445         //  now that master page is in good shape we can
2446         //      remove its locks.
2447
2448         bt_unlockpage (bt, BtLockAtomic, prev->latch);
2449         bt_unlockpage (bt, BtLockWrite, prev->latch);
2450
2451         //  fix master's right sibling's left pointer
2452         //      to remove scanner's poiner to the right page
2453
2454         if( right_page_no = bt_getid (prev->page->right) ) {
2455           if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2456                 temp->page = bt_mappage (bt, temp->latch);
2457
2458           bt_lockpage (bt, BtLockWrite, temp->latch);
2459           bt_putid (temp->page->left, prev->latch->page_no);
2460           temp->latch->dirty = 1;
2461
2462           bt_unlockpage (bt, BtLockWrite, temp->latch);
2463           bt_unpinlatch (temp->latch);
2464         } else {        // master is now the far right page
2465           bt_spinwritelock (bt->mgr->lock);
2466           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2467           bt_spinreleasewrite(bt->mgr->lock);
2468         }
2469
2470         //      now that there are no pointers to the right page
2471         //      we can delete it after the last read access occurs
2472
2473         bt_unlockpage (bt, BtLockWrite, right->latch);
2474         bt_unlockpage (bt, BtLockAtomic, right->latch);
2475         bt_lockpage (bt, BtLockDelete, right->latch);
2476         bt_lockpage (bt, BtLockWrite, right->latch);
2477         bt_freepage (bt, right);
2478         return 0;
2479 }
2480
2481 //      atomic modification of a batch of keys.
2482
2483 //      return -1 if BTERR is set
2484 //      otherwise return slot number
2485 //      causing the key constraint violation
2486 //      or zero on successful completion.
2487
2488 int bt_atomictxn (BtDb *bt, BtPage source)
2489 {
2490 uint src, idx, slot, samepage, entry;
2491 AtomicKey *head, *tail, *leaf;
2492 BtPageSet set[1], prev[1];
2493 unsigned char value[BtId];
2494 BtKey *key, *ptr, *key2;
2495 BtLatchSet *latch;
2496 AtomicTxn *locks;
2497 int result = 0;
2498 BtSlot temp[1];
2499 BtPage page;
2500 BtVal *val;
2501 uid right;
2502 int type;
2503
2504   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2505   head = NULL;
2506   tail = NULL;
2507
2508   // stable sort the list of keys into order to
2509   //    prevent deadlocks between threads.
2510
2511   for( src = 1; src++ < source->cnt; ) {
2512         *temp = *slotptr(source,src);
2513         key = keyptr (source,src);
2514
2515         for( idx = src; --idx; ) {
2516           key2 = keyptr (source,idx);
2517           if( keycmp (key, key2->key, key2->len) < 0 ) {
2518                 *slotptr(source,idx+1) = *slotptr(source,idx);
2519                 *slotptr(source,idx) = *temp;
2520           } else
2521                 break;
2522         }
2523   }
2524
2525   // Load the leaf page for each key
2526   // group same page references with reuse bit
2527   // and determine any constraint violations
2528
2529   for( src = 0; src++ < source->cnt; ) {
2530         key = keyptr(source, src);
2531         slot = 0;
2532
2533         // first determine if this modification falls
2534         // on the same page as the previous modification
2535         //      note that the far right leaf page is a special case
2536
2537         if( samepage = src > 1 )
2538           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2539                 slot = bt_findslot(set->page, key->key, key->len);
2540           else
2541                 bt_unlockpage(bt, BtLockRead, set->latch); 
2542
2543         if( !slot )
2544           if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
2545                 set->latch->split = 0;
2546           else
2547                 return -1;
2548
2549         if( slotptr(set->page, slot)->type == Librarian )
2550           ptr = keyptr(set->page, ++slot);
2551         else
2552           ptr = keyptr(set->page, slot);
2553
2554         if( !samepage ) {
2555           locks[src].entry = set->latch->entry;
2556           locks[src].slot = slot;
2557           locks[src].reuse = 0;
2558         } else {
2559           locks[src].entry = 0;
2560           locks[src].slot = 0;
2561           locks[src].reuse = 1;
2562         }
2563
2564         switch( slotptr(source, src)->type ) {
2565         case Duplicate:
2566         case Unique:
2567           if( !slotptr(set->page, slot)->dead )
2568            if( slot < set->page->cnt || bt_getid (set->page->right) )
2569             if( !keycmp (ptr, key->key, key->len) ) {
2570
2571                   // return constraint violation if key already exists
2572
2573                   bt_unlockpage(bt, BtLockRead, set->latch);
2574                   result = src;
2575
2576                   while( src ) {
2577                         if( locks[src].entry ) {
2578                           set->latch = bt->mgr->latchsets + locks[src].entry;
2579                           bt_unlockpage(bt, BtLockAtomic, set->latch);
2580                           bt_unpinlatch (set->latch);
2581                         }
2582                         src--;
2583                   }
2584                   free (locks);
2585                   return result;
2586                 }
2587           break;
2588         }
2589   }
2590
2591   //  unlock last loadpage lock
2592
2593   if( source->cnt )
2594         bt_unlockpage(bt, BtLockRead, set->latch);
2595
2596   //  obtain write lock for each master page
2597
2598   for( src = 0; src++ < source->cnt; )
2599         if( locks[src].reuse )
2600           continue;
2601         else
2602           bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2603
2604   // insert or delete each key
2605   // process any splits or merges
2606   // release Write & Atomic latches
2607   // set ParentModifications and build
2608   // queue of keys to insert for split pages
2609   // or delete for deleted pages.
2610
2611   // run through txn list backwards
2612
2613   samepage = source->cnt + 1;
2614
2615   for( src = source->cnt; src; src-- ) {
2616         if( locks[src].reuse )
2617           continue;
2618
2619         //  perform the txn operations
2620         //      from smaller to larger on
2621         //  the same page
2622
2623         for( idx = src; idx < samepage; idx++ )
2624          switch( slotptr(source,idx)->type ) {
2625          case Delete:
2626           if( bt_atomicdelete (bt, source, locks, idx) )
2627                 return -1;
2628           break;
2629
2630         case Duplicate:
2631         case Unique:
2632           if( bt_atomicinsert (bt, source, locks, idx) )
2633                 return -1;
2634           break;
2635         }
2636
2637         //      after the same page operations have finished,
2638         //  process master page for splits or deletion.
2639
2640         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2641         prev->page = bt_mappage (bt, prev->latch);
2642         samepage = src;
2643
2644         //  pick-up all splits from master page
2645         //      each one is already WriteLocked.
2646
2647         entry = prev->latch->split;
2648
2649         while( entry ) {
2650           set->latch = bt->mgr->latchsets + entry;
2651           set->page = bt_mappage (bt, set->latch);
2652           entry = set->latch->split;
2653
2654           // delete empty master page by undoing its split
2655           //  (this is potentially another empty page)
2656           //  note that there are no new left pointers yet
2657
2658           if( !prev->page->act ) {
2659                 memcpy (set->page->left, prev->page->left, BtId);
2660                 memcpy (prev->page, set->page, bt->mgr->page_size);
2661                 bt_lockpage (bt, BtLockDelete, set->latch);
2662                 bt_freepage (bt, set);
2663
2664                 prev->latch->dirty = 1;
2665                 continue;
2666           }
2667
2668           // remove empty page from the split chain
2669
2670           if( !set->page->act ) {
2671                 memcpy (prev->page->right, set->page->right, BtId);
2672                 prev->latch->split = set->latch->split;
2673                 bt_lockpage (bt, BtLockDelete, set->latch);
2674                 bt_freepage (bt, set);
2675                 continue;
2676           }
2677
2678           //  schedule prev fence key update
2679
2680           ptr = keyptr(prev->page,prev->page->cnt);
2681           leaf = calloc (sizeof(AtomicKey), 1);
2682
2683           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2684           leaf->page_no = prev->latch->page_no;
2685           leaf->entry = prev->latch->entry;
2686           leaf->type = 0;
2687
2688           if( tail )
2689                 tail->next = leaf;
2690           else
2691                 head = leaf;
2692
2693           tail = leaf;
2694
2695           // splice in the left link into the split page
2696
2697           bt_putid (set->page->left, prev->latch->page_no);
2698           bt_lockpage(bt, BtLockParent, prev->latch);
2699           bt_unlockpage(bt, BtLockWrite, prev->latch);
2700           *prev = *set;
2701         }
2702
2703         //  update left pointer in next right page from last split page
2704         //      (if all splits were reversed, latch->split == 0)
2705
2706         if( latch->split ) {
2707           //  fix left pointer in master's original (now split)
2708           //  far right sibling or set rightmost page in page zero
2709
2710           if( right = bt_getid (prev->page->right) ) {
2711                 if( set->latch = bt_pinlatch (bt, right, 1) )
2712                   set->page = bt_mappage (bt, set->latch);
2713                 else
2714                   return -1;
2715
2716             bt_lockpage (bt, BtLockWrite, set->latch);
2717             bt_putid (set->page->left, prev->latch->page_no);
2718                 set->latch->dirty = 1;
2719             bt_unlockpage (bt, BtLockWrite, set->latch);
2720                 bt_unpinlatch (set->latch);
2721           } else {      // prev is rightmost page
2722             bt_spinwritelock (bt->mgr->lock);
2723                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2724             bt_spinreleasewrite(bt->mgr->lock);
2725           }
2726
2727           //  Process last page split in chain
2728
2729           ptr = keyptr(prev->page,prev->page->cnt);
2730           leaf = calloc (sizeof(AtomicKey), 1);
2731
2732           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2733           leaf->page_no = prev->latch->page_no;
2734           leaf->entry = prev->latch->entry;
2735           leaf->type = 0;
2736   
2737           if( tail )
2738                 tail->next = leaf;
2739           else
2740                 head = leaf;
2741
2742           tail = leaf;
2743
2744           bt_lockpage(bt, BtLockParent, prev->latch);
2745           bt_unlockpage(bt, BtLockWrite, prev->latch);
2746
2747           //  remove atomic lock on master page
2748
2749           bt_unlockpage(bt, BtLockAtomic, latch);
2750           continue;
2751         }
2752
2753         //  finished if prev page occupied (either master or final split)
2754
2755         if( prev->page->act ) {
2756           bt_unlockpage(bt, BtLockWrite, latch);
2757           bt_unlockpage(bt, BtLockAtomic, latch);
2758           bt_unpinlatch(latch);
2759           continue;
2760         }
2761
2762         // any and all splits were reversed, and the
2763         // master page located in prev is empty, delete it
2764         // by pulling over master's right sibling.
2765
2766         // Remove empty master's fence key
2767
2768         ptr = keyptr(prev->page,prev->page->cnt);
2769
2770         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2771                 return -1;
2772
2773         //      perform the remainder of the delete
2774         //      from the FIFO queue
2775
2776         leaf = calloc (sizeof(AtomicKey), 1);
2777
2778         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2779         leaf->page_no = prev->latch->page_no;
2780         leaf->entry = prev->latch->entry;
2781         leaf->nounlock = 1;
2782         leaf->type = 2;
2783   
2784         if( tail )
2785           tail->next = leaf;
2786         else
2787           head = leaf;
2788
2789         tail = leaf;
2790
2791         //      leave atomic lock in place until
2792         //      deletion completes in next phase.
2793
2794         bt_unlockpage(bt, BtLockWrite, prev->latch);
2795   }
2796   
2797   //  add & delete keys for any pages split or merged during transaction
2798
2799   if( leaf = head )
2800     do {
2801           set->latch = bt->mgr->latchsets + leaf->entry;
2802           set->page = bt_mappage (bt, set->latch);
2803
2804           bt_putid (value, leaf->page_no);
2805           ptr = (BtKey *)leaf->leafkey;
2806
2807           switch( leaf->type ) {
2808           case 0:       // insert key
2809             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2810                   return -1;
2811
2812                 break;
2813
2814           case 1:       // delete key
2815                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2816                   return -1;
2817
2818                 break;
2819
2820           case 2:       // free page
2821                 if( bt_atomicfree (bt, set) )
2822                   return -1;
2823
2824                 break;
2825           }
2826
2827           if( !leaf->nounlock )
2828             bt_unlockpage (bt, BtLockParent, set->latch);
2829
2830           bt_unpinlatch (set->latch);
2831           tail = leaf->next;
2832           free (leaf);
2833         } while( leaf = tail );
2834
2835   // return success
2836
2837   free (locks);
2838   return 0;
2839 }
2840
2841 //      set cursor to highest slot on highest page
2842
2843 uint bt_lastkey (BtDb *bt)
2844 {
2845 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2846 BtPageSet set[1];
2847
2848         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2849                 set->page = bt_mappage (bt, set->latch);
2850         else
2851                 return 0;
2852
2853     bt_lockpage(bt, BtLockRead, set->latch);
2854         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2855     bt_unlockpage(bt, BtLockRead, set->latch);
2856         bt_unpinlatch (set->latch);
2857
2858         bt->cursor_page = page_no;
2859         return bt->cursor->cnt;
2860 }
2861
2862 //      return previous slot on cursor page
2863
2864 uint bt_prevkey (BtDb *bt, uint slot)
2865 {
2866 uid ourright, next, us = bt->cursor_page;
2867 BtPageSet set[1];
2868
2869         if( --slot )
2870                 return slot;
2871
2872         ourright = bt_getid(bt->cursor->right);
2873
2874 goleft:
2875         if( !(next = bt_getid(bt->cursor->left)) )
2876                 return 0;
2877
2878 findourself:
2879         bt->cursor_page = next;
2880
2881         if( set->latch = bt_pinlatch (bt, next, 1) )
2882                 set->page = bt_mappage (bt, set->latch);
2883         else
2884                 return 0;
2885
2886     bt_lockpage(bt, BtLockRead, set->latch);
2887         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2888         bt_unlockpage(bt, BtLockRead, set->latch);
2889         bt_unpinlatch (set->latch);
2890         
2891         next = bt_getid (bt->cursor->right);
2892
2893         if( bt->cursor->kill )
2894                 goto findourself;
2895
2896         if( next != us )
2897           if( next == ourright )
2898                 goto goleft;
2899           else
2900                 goto findourself;
2901
2902         return bt->cursor->cnt;
2903 }
2904
2905 //  return next slot on cursor page
2906 //  or slide cursor right into next page
2907
2908 uint bt_nextkey (BtDb *bt, uint slot)
2909 {
2910 BtPageSet set[1];
2911 uid right;
2912
2913   do {
2914         right = bt_getid(bt->cursor->right);
2915
2916         while( slot++ < bt->cursor->cnt )
2917           if( slotptr(bt->cursor,slot)->dead )
2918                 continue;
2919           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2920                 return slot;
2921           else
2922                 break;
2923
2924         if( !right )
2925                 break;
2926
2927         bt->cursor_page = right;
2928
2929         if( set->latch = bt_pinlatch (bt, right, 1) )
2930                 set->page = bt_mappage (bt, set->latch);
2931         else
2932                 return 0;
2933
2934     bt_lockpage(bt, BtLockRead, set->latch);
2935
2936         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2937
2938         bt_unlockpage(bt, BtLockRead, set->latch);
2939         bt_unpinlatch (set->latch);
2940         slot = 0;
2941
2942   } while( 1 );
2943
2944   return bt->err = 0;
2945 }
2946
2947 //  cache page of keys into cursor and return starting slot for given key
2948
2949 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2950 {
2951 BtPageSet set[1];
2952 uint slot;
2953
2954         // cache page for retrieval
2955
2956         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2957           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2958         else
2959           return 0;
2960
2961         bt->cursor_page = set->latch->page_no;
2962
2963         bt_unlockpage(bt, BtLockRead, set->latch);
2964         bt_unpinlatch (set->latch);
2965         return slot;
2966 }
2967
2968 BtKey *bt_key(BtDb *bt, uint slot)
2969 {
2970         return keyptr(bt->cursor, slot);
2971 }
2972
2973 BtVal *bt_val(BtDb *bt, uint slot)
2974 {
2975         return valptr(bt->cursor,slot);
2976 }
2977
2978 #ifdef STANDALONE
2979
2980 #ifndef unix
2981 double getCpuTime(int type)
2982 {
2983 FILETIME crtime[1];
2984 FILETIME xittime[1];
2985 FILETIME systime[1];
2986 FILETIME usrtime[1];
2987 SYSTEMTIME timeconv[1];
2988 double ans = 0;
2989
2990         memset (timeconv, 0, sizeof(SYSTEMTIME));
2991
2992         switch( type ) {
2993         case 0:
2994                 GetSystemTimeAsFileTime (xittime);
2995                 FileTimeToSystemTime (xittime, timeconv);
2996                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2997                 break;
2998         case 1:
2999                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3000                 FileTimeToSystemTime (usrtime, timeconv);
3001                 break;
3002         case 2:
3003                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3004                 FileTimeToSystemTime (systime, timeconv);
3005                 break;
3006         }
3007
3008         ans += (double)timeconv->wHour * 3600;
3009         ans += (double)timeconv->wMinute * 60;
3010         ans += (double)timeconv->wSecond;
3011         ans += (double)timeconv->wMilliseconds / 1000;
3012         return ans;
3013 }
3014 #else
3015 #include <time.h>
3016 #include <sys/resource.h>
3017
3018 double getCpuTime(int type)
3019 {
3020 struct rusage used[1];
3021 struct timeval tv[1];
3022
3023         switch( type ) {
3024         case 0:
3025                 gettimeofday(tv, NULL);
3026                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3027
3028         case 1:
3029                 getrusage(RUSAGE_SELF, used);
3030                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3031
3032         case 2:
3033                 getrusage(RUSAGE_SELF, used);
3034                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3035         }
3036
3037         return 0;
3038 }
3039 #endif
3040
3041 void bt_poolaudit (BtMgr *mgr)
3042 {
3043 BtLatchSet *latch;
3044 uint slot = 0;
3045
3046         while( slot++ < mgr->latchdeployed ) {
3047                 latch = mgr->latchsets + slot;
3048
3049                 if( *latch->readwr->rin & MASK )
3050                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3051                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3052
3053                 if( *latch->access->rin & MASK )
3054                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3055                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3056
3057                 if( *latch->parent->ticket != *latch->parent->serving )
3058                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3059                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3060
3061                 if( latch->pin & ~CLOCK_bit ) {
3062                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3063                         latch->pin = 0;
3064                 }
3065         }
3066 }
3067
3068 uint bt_latchaudit (BtDb *bt)
3069 {
3070 ushort idx, hashidx;
3071 uid next, page_no;
3072 BtLatchSet *latch;
3073 uint cnt = 0;
3074 BtKey *ptr;
3075
3076         if( *(ushort *)(bt->mgr->lock) )
3077                 fprintf(stderr, "Alloc page locked\n");
3078         *(ushort *)(bt->mgr->lock) = 0;
3079
3080         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3081                 latch = bt->mgr->latchsets + idx;
3082                 if( *latch->readwr->rin & MASK )
3083                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3084                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3085
3086                 if( *latch->access->rin & MASK )
3087                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3088                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3089
3090                 if( *latch->parent->ticket != *latch->parent->serving )
3091                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3092                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3093
3094                 if( latch->pin ) {
3095                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3096                         latch->pin = 0;
3097                 }
3098         }
3099
3100         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3101           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3102                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3103
3104           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3105
3106           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3107                 latch = bt->mgr->latchsets + idx;
3108                 if( latch->pin )
3109                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3110           } while( idx = latch->next );
3111         }
3112
3113         page_no = LEAF_page;
3114
3115         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3116         uid off = page_no << bt->mgr->page_bits;
3117 #ifdef unix
3118           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3119 #else
3120         DWORD amt[1];
3121
3122           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3123
3124           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3125                 return bt->err = BTERR_map;
3126
3127           if( *amt <  bt->mgr->page_size )
3128                 return bt->err = BTERR_map;
3129 #endif
3130                 if( !bt->frame->free && !bt->frame->lvl )
3131                         cnt += bt->frame->act;
3132                 page_no++;
3133         }
3134                 
3135         cnt--;  // remove stopper key
3136         fprintf(stderr, " Total keys read %d\n", cnt);
3137
3138         bt_close (bt);
3139         return 0;
3140 }
3141
3142 typedef struct {
3143         char idx;
3144         char *type;
3145         char *infile;
3146         BtMgr *mgr;
3147         int num;
3148 } ThreadArg;
3149
3150 //  standalone program to index file of keys
3151 //  then list them onto std-out
3152
3153 #ifdef unix
3154 void *index_file (void *arg)
3155 #else
3156 uint __stdcall index_file (void *arg)
3157 #endif
3158 {
3159 int line = 0, found = 0, cnt = 0, idx;
3160 uid next, page_no = LEAF_page;  // start on first page of leaves
3161 int ch, len = 0, slot, type = 0;
3162 unsigned char key[BT_maxkey];
3163 unsigned char txn[65536];
3164 ThreadArg *args = arg;
3165 BtPageSet set[1];
3166 uint nxt = 65536;
3167 BtPage page;
3168 BtKey *ptr;
3169 BtVal *val;
3170 BtDb *bt;
3171 FILE *in;
3172
3173         bt = bt_open (args->mgr);
3174         page = (BtPage)txn;
3175
3176         if( args->idx < strlen (args->type) )
3177                 ch = args->type[args->idx];
3178         else
3179                 ch = args->type[strlen(args->type) - 1];
3180
3181         switch(ch | 0x20)
3182         {
3183         case 'a':
3184                 fprintf(stderr, "started latch mgr audit\n");
3185                 cnt = bt_latchaudit (bt);
3186                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3187                 break;
3188
3189         case 'd':
3190                 type = Delete;
3191
3192         case 'p':
3193                 if( !type )
3194                         type = Unique;
3195
3196                 if( args->num )
3197                  if( type == Delete )
3198                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3199                  else
3200                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3201                 else
3202                  if( type == Delete )
3203                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3204                  else
3205                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3206
3207                 if( in = fopen (args->infile, "rb") )
3208                   while( ch = getc(in), ch != EOF )
3209                         if( ch == '\n' )
3210                         {
3211                           line++;
3212
3213                           if( !args->num ) {
3214                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3215                                   fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3216                             len = 0;
3217                                 continue;
3218                           }
3219
3220                           nxt -= len - 10;
3221                           memcpy (txn + nxt, key + 10, len - 10);
3222                           nxt -= 1;
3223                           txn[nxt] = len - 10;
3224                           nxt -= 10;
3225                           memcpy (txn + nxt, key, 10);
3226                           nxt -= 1;
3227                           txn[nxt] = 10;
3228                           slotptr(page,++cnt)->off  = nxt;
3229                           slotptr(page,cnt)->type = type;
3230                           len = 0;
3231
3232                           if( cnt < args->num )
3233                                 continue;
3234
3235                           page->cnt = cnt;
3236                           page->act = cnt;
3237                           page->min = nxt;
3238
3239                           if( bt_atomictxn (bt, page) )
3240                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3241                           nxt = sizeof(txn);
3242                           cnt = 0;
3243
3244                         }
3245                         else if( len < BT_maxkey )
3246                                 key[len++] = ch;
3247                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3248                 break;
3249
3250         case 'w':
3251                 fprintf(stderr, "started indexing for %s\n", args->infile);
3252                 if( in = fopen (args->infile, "r") )
3253                   while( ch = getc(in), ch != EOF )
3254                         if( ch == '\n' )
3255                         {
3256                           line++;
3257
3258                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3259                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3260                           len = 0;
3261                         }
3262                         else if( len < BT_maxkey )
3263                                 key[len++] = ch;
3264                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3265                 break;
3266
3267         case 'f':
3268                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3269                 if( in = fopen (args->infile, "rb") )
3270                   while( ch = getc(in), ch != EOF )
3271                         if( ch == '\n' )
3272                         {
3273                           line++;
3274                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3275                                 found++;
3276                           else if( bt->err )
3277                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3278                           len = 0;
3279                         }
3280                         else if( len < BT_maxkey )
3281                                 key[len++] = ch;
3282                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3283                 break;
3284
3285         case 's':
3286                 fprintf(stderr, "started scanning\n");
3287                 do {
3288                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3289                                 set->page = bt_mappage (bt, set->latch);
3290                         else
3291                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3292                         bt_lockpage (bt, BtLockRead, set->latch);
3293                         next = bt_getid (set->page->right);
3294
3295                         for( slot = 0; slot++ < set->page->cnt; )
3296                          if( next || slot < set->page->cnt )
3297                           if( !slotptr(set->page, slot)->dead ) {
3298                                 ptr = keyptr(set->page, slot);
3299                                 len = ptr->len;
3300
3301                             if( slotptr(set->page, slot)->type == Duplicate )
3302                                         len -= BtId;
3303
3304                                 fwrite (ptr->key, len, 1, stdout);
3305                                 val = valptr(set->page, slot);
3306                                 fwrite (val->value, val->len, 1, stdout);
3307                                 fputc ('\n', stdout);
3308                                 cnt++;
3309                            }
3310
3311                         bt_unlockpage (bt, BtLockRead, set->latch);
3312                         bt_unpinlatch (set->latch);
3313                 } while( page_no = next );
3314
3315                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3316                 break;
3317
3318         case 'r':
3319                 fprintf(stderr, "started reverse scan\n");
3320                 if( slot = bt_lastkey (bt) )
3321                    while( slot = bt_prevkey (bt, slot) ) {
3322                         if( slotptr(bt->cursor, slot)->dead )
3323                           continue;
3324
3325                         ptr = keyptr(bt->cursor, slot);
3326                         len = ptr->len;
3327
3328                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3329                                 len -= BtId;
3330
3331                         fwrite (ptr->key, len, 1, stdout);
3332                         val = valptr(bt->cursor, slot);
3333                         fwrite (val->value, val->len, 1, stdout);
3334                         fputc ('\n', stdout);
3335                         cnt++;
3336                   }
3337
3338                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3339                 break;
3340
3341         case 'c':
3342 #ifdef unix
3343                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3344 #endif
3345                 fprintf(stderr, "started counting\n");
3346                 page_no = LEAF_page;
3347
3348                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3349                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3350                                 break;
3351
3352                         if( !bt->frame->free && !bt->frame->lvl )
3353                                 cnt += bt->frame->act;
3354
3355                         bt->reads++;
3356                         page_no++;
3357                 }
3358                 
3359                 cnt--;  // remove stopper key
3360                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3361                 break;
3362         }
3363
3364         bt_close (bt);
3365 #ifdef unix
3366         return NULL;
3367 #else
3368         return 0;
3369 #endif
3370 }
3371
3372 typedef struct timeval timer;
3373
3374 int main (int argc, char **argv)
3375 {
3376 int idx, cnt, len, slot, err;
3377 int segsize, bits = 16;
3378 double start, stop;
3379 #ifdef unix
3380 pthread_t *threads;
3381 #else
3382 HANDLE *threads;
3383 #endif
3384 ThreadArg *args;
3385 uint poolsize = 0;
3386 float elapsed;
3387 int num = 0;
3388 char key[1];
3389 BtMgr *mgr;
3390 BtKey *ptr;
3391 BtDb *bt;
3392
3393         if( argc < 3 ) {
3394                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3395                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3396                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3397                 fprintf (stderr, "  page_bits is the page size in bits\n");
3398                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3399                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3400                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3401                 exit(0);
3402         }
3403
3404         start = getCpuTime(0);
3405
3406         if( argc > 3 )
3407                 bits = atoi(argv[3]);
3408
3409         if( argc > 4 )
3410                 poolsize = atoi(argv[4]);
3411
3412         if( !poolsize )
3413                 fprintf (stderr, "Warning: no mapped_pool\n");
3414
3415         if( argc > 5 )
3416                 num = atoi(argv[5]);
3417
3418         cnt = argc - 6;
3419 #ifdef unix
3420         threads = malloc (cnt * sizeof(pthread_t));
3421 #else
3422         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3423 #endif
3424         args = malloc (cnt * sizeof(ThreadArg));
3425
3426         mgr = bt_mgr ((argv[1]), bits, poolsize);
3427
3428         if( !mgr ) {
3429                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3430                 exit (1);
3431         }
3432
3433         //      fire off threads
3434
3435         for( idx = 0; idx < cnt; idx++ ) {
3436                 args[idx].infile = argv[idx + 6];
3437                 args[idx].type = argv[2];
3438                 args[idx].mgr = mgr;
3439                 args[idx].num = num;
3440                 args[idx].idx = idx;
3441 #ifdef unix
3442                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3443                         fprintf(stderr, "Error creating thread %d\n", err);
3444 #else
3445                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3446 #endif
3447         }
3448
3449         //      wait for termination
3450
3451 #ifdef unix
3452         for( idx = 0; idx < cnt; idx++ )
3453                 pthread_join (threads[idx], NULL);
3454 #else
3455         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3456
3457         for( idx = 0; idx < cnt; idx++ )
3458                 CloseHandle(threads[idx]);
3459
3460 #endif
3461         bt_poolaudit(mgr);
3462         bt_mgrclose (mgr);
3463
3464         elapsed = getCpuTime(0) - start;
3465         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3466         elapsed = getCpuTime(1);
3467         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3468         elapsed = getCpuTime(2);
3469         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3470 }
3471
3472 #endif  //STANDALONE