]> pd.if.org Git - btree/blob - threadskv8.c
clean up atomic delete deadlock problem
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and ACID batched key-value updates
9
10 // 26 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 /*
88 There are six lock types for each node in four independent sets: 
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
95 */
96
97 typedef enum{
98         BtLockAccess = 1,
99         BtLockDelete = 2,
100         BtLockRead   = 4,
101         BtLockWrite  = 8,
102         BtLockParent = 16,
103         BtLockAtomic = 32
104 } BtLock;
105
106 //      definition for phase-fair reader/writer lock implementation
107
108 typedef struct {
109         ushort rin[1];
110         ushort rout[1];
111         ushort ticket[1];
112         ushort serving[1];
113 } RWLock;
114
115 #define PHID 0x1
116 #define PRES 0x2
117 #define MASK 0x3
118 #define RINC 0x4
119
120 //      definition for spin latch implementation
121
122 // exclusive is set for write access
123 // share is count of read accessors
124 // grant write lock when share == 0
125
126 volatile typedef struct {
127         ushort exclusive:1;
128         ushort pending:1;
129         ushort share:14;
130 } BtSpinLatch;
131
132 #define XCL 1
133 #define PEND 2
134 #define BOTH 3
135 #define SHARE 4
136
137 //  hash table entries
138
139 typedef struct {
140         volatile uint slot;             // Latch table entry at head of chain
141         BtSpinLatch latch[1];
142 } BtHashEntry;
143
144 //      latch manager table structure
145
146 typedef struct {
147         uid page_no;                    // latch set page number
148         RWLock readwr[1];               // read/write page lock
149         RWLock access[1];               // Access Intent/Page delete
150         RWLock parent[1];               // Posting of fence key in parent
151         RWLock atomic[1];               // Atomic update in progress
152         uint split;                             // right split page atomic insert
153         uint entry;                             // entry slot in latch table
154         uint next;                              // next entry in hash table chain
155         uint prev;                              // prev entry in hash table chain
156         volatile ushort pin;    // number of outstanding threads
157         ushort dirty:1;                 // page in cache is dirty
158 } BtLatchSet;
159
160 //      Define the length of the page record numbers
161
162 #define BtId 6
163
164 //      Page key slot definition.
165
166 //      Keys are marked dead, but remain on the page until
167 //      it cleanup is called. The fence key (highest key) for
168 //      a leaf page is always present, even after cleanup.
169
170 //      Slot types
171
172 //      In addition to the Unique keys that occupy slots
173 //      there are Librarian and Duplicate key
174 //      slots occupying the key slot array.
175
176 //      The Librarian slots are dead keys that
177 //      serve as filler, available to add new Unique
178 //      or Dup slots that are inserted into the B-tree.
179
180 //      The Duplicate slots have had their key bytes extended
181 //      by 6 bytes to contain a binary duplicate key uniqueifier.
182
183 typedef enum {
184         Unique,
185         Librarian,
186         Duplicate,
187         Delete,
188         Update
189 } BtSlotType;
190
191 typedef struct {
192         uint off:BT_maxbits;    // page offset for key start
193         uint type:3;                    // type of slot
194         uint dead:1;                    // set for deleted slot
195 } BtSlot;
196
197 //      The key structure occupies space at the upper end of
198 //      each page.  It's a length byte followed by the key
199 //      bytes.
200
201 typedef struct {
202         unsigned char len;              // this can be changed to a ushort or uint
203         unsigned char key[0];
204 } BtKey;
205
206 //      the value structure also occupies space at the upper
207 //      end of the page. Each key is immediately followed by a value.
208
209 typedef struct {
210         unsigned char len;              // this can be changed to a ushort or uint
211         unsigned char value[0];
212 } BtVal;
213
214 #define BT_maxkey       255             // maximum number of bytes in a key
215 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
216
217 //      The first part of an index page.
218 //      It is immediately followed
219 //      by the BtSlot array of keys.
220
221 //      note that this structure size
222 //      must be a multiple of 8 bytes
223 //      in order to place dups correctly.
224
225 typedef struct BtPage_ {
226         uint cnt;                                       // count of keys in page
227         uint act;                                       // count of active keys
228         uint min;                                       // next key offset
229         uint garbage;                           // page garbage in bytes
230         unsigned char bits:7;           // page size in bits
231         unsigned char free:1;           // page is on free chain
232         unsigned char lvl:7;            // level of page
233         unsigned char kill:1;           // page is being deleted
234         unsigned char right[BtId];      // page number to right
235         unsigned char left[BtId];       // page number to left
236         unsigned char filler[2];        // padding to multiple of 8
237 } *BtPage;
238
239 //  The loadpage interface object
240
241 typedef struct {
242         BtPage page;            // current page pointer
243         BtLatchSet *latch;      // current page latch set
244 } BtPageSet;
245
246 //      structure for latch manager on ALLOC_page
247
248 typedef struct {
249         struct BtPage_ alloc[1];        // next page_no in right ptr
250         unsigned long long dups[1];     // global duplicate key uniqueifier
251         unsigned char chain[BtId];      // head of free page_nos chain
252 } BtPageZero;
253
254 //      The object structure for Btree access
255
256 typedef struct {
257         uint page_size;                         // page size    
258         uint page_bits;                         // page size in bits    
259 #ifdef unix
260         int idx;
261 #else
262         HANDLE idx;
263 #endif
264         BtPageZero *pagezero;           // mapped allocation page
265         BtSpinLatch lock[1];            // allocation area lite latch
266         uint latchdeployed;                     // highest number of latch entries deployed
267         uint nlatchpage;                        // number of latch pages at BT_latch
268         uint latchtotal;                        // number of page latch entries
269         uint latchhash;                         // number of latch hash table slots
270         uint latchvictim;                       // next latch entry to examine
271         BtHashEntry *hashtable;         // the buffer pool hash table entries
272         BtLatchSet *latchsets;          // mapped latch set from buffer pool
273         unsigned char *pagepool;        // mapped to the buffer pool pages
274 #ifndef unix
275         HANDLE halloc;                          // allocation handle
276         HANDLE hpool;                           // buffer pool handle
277 #endif
278 } BtMgr;
279
280 typedef struct {
281         BtMgr *mgr;                             // buffer manager for thread
282         BtPage cursor;                  // cached frame for start/next (never mapped)
283         BtPage frame;                   // spare frame for the page split (never mapped)
284         uid cursor_page;                                // current cursor page number   
285         unsigned char *mem;                             // frame, cursor, page memory buffer
286         unsigned char key[BT_keyarray]; // last found complete key
287         int found;                              // last delete or insert was found
288         int err;                                // last error
289         int reads, writes;              // number of reads and writes from the btree
290 } BtDb;
291
292 typedef enum {
293         BTERR_ok = 0,
294         BTERR_struct,
295         BTERR_ovflw,
296         BTERR_lock,
297         BTERR_map,
298         BTERR_read,
299         BTERR_wrt,
300         BTERR_atomic
301 } BTERR;
302
303 #define CLOCK_bit 0x8000
304
305 // B-Tree functions
306 extern void bt_close (BtDb *bt);
307 extern BtDb *bt_open (BtMgr *mgr);
308 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
309 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
310 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
311 extern BtKey *bt_foundkey (BtDb *bt);
312 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
313 extern uint bt_nextkey   (BtDb *bt, uint slot);
314
315 //      manager functions
316 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
317 void bt_mgrclose (BtMgr *mgr);
318
319 //  Helper functions to return slot values
320 //      from the cursor page.
321
322 extern BtKey *bt_key (BtDb *bt, uint slot);
323 extern BtVal *bt_val (BtDb *bt, uint slot);
324
325 //  The page is allocated from low and hi ends.
326 //  The key slots are allocated from the bottom,
327 //      while the text and value of the key
328 //  are allocated from the top.  When the two
329 //  areas meet, the page is split into two.
330
331 //  A key consists of a length byte, two bytes of
332 //  index number (0 - 65535), and up to 253 bytes
333 //  of key value.
334
335 //  Associated with each key is a value byte string
336 //      containing any value desired.
337
338 //  The b-tree root is always located at page 1.
339 //      The first leaf page of level zero is always
340 //      located on page 2.
341
342 //      The b-tree pages are linked with next
343 //      pointers to facilitate enumerators,
344 //      and provide for concurrency.
345
346 //      When to root page fills, it is split in two and
347 //      the tree height is raised by a new root at page
348 //      one with two keys.
349
350 //      Deleted keys are marked with a dead bit until
351 //      page cleanup. The fence key for a leaf node is
352 //      always present
353
354 //  To achieve maximum concurrency one page is locked at a time
355 //  as the tree is traversed to find leaf key in question. The right
356 //  page numbers are used in cases where the page is being split,
357 //      or consolidated.
358
359 //  Page 0 is dedicated to lock for new page extensions,
360 //      and chains empty pages together for reuse. It also
361 //      contains the latch manager hash table.
362
363 //      The ParentModification lock on a node is obtained to serialize posting
364 //      or changing the fence key for a node.
365
366 //      Empty pages are chained together through the ALLOC page and reused.
367
368 //      Access macros to address slot and key values from the page
369 //      Page slots use 1 based indexing.
370
371 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
372 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
373 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
374
375 void bt_putid(unsigned char *dest, uid id)
376 {
377 int i = BtId;
378
379         while( i-- )
380                 dest[i] = (unsigned char)id, id >>= 8;
381 }
382
383 uid bt_getid(unsigned char *src)
384 {
385 uid id = 0;
386 int i;
387
388         for( i = 0; i < BtId; i++ )
389                 id <<= 8, id |= *src++; 
390
391         return id;
392 }
393
394 uid bt_newdup (BtDb *bt)
395 {
396 #ifdef unix
397         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
398 #else
399         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
400 #endif
401 }
402
403 //      Phase-Fair reader/writer lock implementation
404
405 void WriteLock (RWLock *lock)
406 {
407 ushort w, r, tix;
408
409 #ifdef unix
410         tix = __sync_fetch_and_add (lock->ticket, 1);
411 #else
412         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
413 #endif
414         // wait for our ticket to come up
415
416         while( tix != lock->serving[0] )
417 #ifdef unix
418                 sched_yield();
419 #else
420                 SwitchToThread ();
421 #endif
422
423         w = PRES | (tix & PHID);
424 #ifdef  unix
425         r = __sync_fetch_and_add (lock->rin, w);
426 #else
427         r = _InterlockedExchangeAdd16 (lock->rin, w);
428 #endif
429         while( r != *lock->rout )
430 #ifdef unix
431                 sched_yield();
432 #else
433                 SwitchToThread();
434 #endif
435 }
436
437 void WriteRelease (RWLock *lock)
438 {
439 #ifdef unix
440         __sync_fetch_and_and (lock->rin, ~MASK);
441 #else
442         _InterlockedAnd16 (lock->rin, ~MASK);
443 #endif
444         lock->serving[0]++;
445 }
446
447 void ReadLock (RWLock *lock)
448 {
449 ushort w;
450 #ifdef unix
451         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
452 #else
453         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
454 #endif
455         if( w )
456           while( w == (*lock->rin & MASK) )
457 #ifdef unix
458                 sched_yield ();
459 #else
460                 SwitchToThread ();
461 #endif
462 }
463
464 void ReadRelease (RWLock *lock)
465 {
466 #ifdef unix
467         __sync_fetch_and_add (lock->rout, RINC);
468 #else
469         _InterlockedExchangeAdd16 (lock->rout, RINC);
470 #endif
471 }
472
473 //      Spin Latch Manager
474
475 //      wait until write lock mode is clear
476 //      and add 1 to the share count
477
478 void bt_spinreadlock(BtSpinLatch *latch)
479 {
480 ushort prev;
481
482   do {
483 #ifdef unix
484         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
485 #else
486         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
487 #endif
488         //  see if exclusive request is granted or pending
489
490         if( !(prev & BOTH) )
491                 return;
492 #ifdef unix
493         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
494 #else
495         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
496 #endif
497 #ifdef  unix
498   } while( sched_yield(), 1 );
499 #else
500   } while( SwitchToThread(), 1 );
501 #endif
502 }
503
504 //      wait for other read and write latches to relinquish
505
506 void bt_spinwritelock(BtSpinLatch *latch)
507 {
508 ushort prev;
509
510   do {
511 #ifdef  unix
512         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
513 #else
514         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
515 #endif
516         if( !(prev & XCL) )
517           if( !(prev & ~BOTH) )
518                 return;
519           else
520 #ifdef unix
521                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
522 #else
523                 _InterlockedAnd16((ushort *)latch, ~XCL);
524 #endif
525 #ifdef  unix
526   } while( sched_yield(), 1 );
527 #else
528   } while( SwitchToThread(), 1 );
529 #endif
530 }
531
532 //      try to obtain write lock
533
534 //      return 1 if obtained,
535 //              0 otherwise
536
537 int bt_spinwritetry(BtSpinLatch *latch)
538 {
539 ushort prev;
540
541 #ifdef  unix
542         prev = __sync_fetch_and_or((ushort *)latch, XCL);
543 #else
544         prev = _InterlockedOr16((ushort *)latch, XCL);
545 #endif
546         //      take write access if all bits are clear
547
548         if( !(prev & XCL) )
549           if( !(prev & ~BOTH) )
550                 return 1;
551           else
552 #ifdef unix
553                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
554 #else
555                 _InterlockedAnd16((ushort *)latch, ~XCL);
556 #endif
557         return 0;
558 }
559
560 //      clear write mode
561
562 void bt_spinreleasewrite(BtSpinLatch *latch)
563 {
564 #ifdef unix
565         __sync_fetch_and_and((ushort *)latch, ~BOTH);
566 #else
567         _InterlockedAnd16((ushort *)latch, ~BOTH);
568 #endif
569 }
570
571 //      decrement reader count
572
573 void bt_spinreleaseread(BtSpinLatch *latch)
574 {
575 #ifdef unix
576         __sync_fetch_and_add((ushort *)latch, -SHARE);
577 #else
578         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
579 #endif
580 }
581
582 //      read page from permanent location in Btree file
583
584 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
585 {
586 off64_t off = page_no << mgr->page_bits;
587
588 #ifdef unix
589         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
590                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
591                 return BTERR_read;
592         }
593 #else
594 OVERLAPPED ovl[1];
595 uint amt[1];
596
597         memset (ovl, 0, sizeof(OVERLAPPED));
598         ovl->Offset = off;
599         ovl->OffsetHigh = off >> 32;
600
601         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
602                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
603                 return BTERR_read;
604         }
605         if( *amt <  mgr->page_size ) {
606                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
607                 return BTERR_read;
608         }
609 #endif
610         return 0;
611 }
612
613 //      write page to permanent location in Btree file
614 //      clear the dirty bit
615
616 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
617 {
618 off64_t off = page_no << mgr->page_bits;
619
620 #ifdef unix
621         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
622                 return BTERR_wrt;
623 #else
624 OVERLAPPED ovl[1];
625 uint amt[1];
626
627         memset (ovl, 0, sizeof(OVERLAPPED));
628         ovl->Offset = off;
629         ovl->OffsetHigh = off >> 32;
630
631         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
632                 return BTERR_wrt;
633
634         if( *amt <  mgr->page_size )
635                 return BTERR_wrt;
636 #endif
637         return 0;
638 }
639
640 //      link latch table entry into head of latch hash table
641
642 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
643 {
644 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
645 BtLatchSet *latch = bt->mgr->latchsets + slot;
646
647         if( latch->next = bt->mgr->hashtable[hashidx].slot )
648                 bt->mgr->latchsets[latch->next].prev = slot;
649
650         bt->mgr->hashtable[hashidx].slot = slot;
651         latch->page_no = page_no;
652         latch->entry = slot;
653         latch->split = 0;
654         latch->prev = 0;
655         latch->pin = 1;
656
657         if( loadit )
658           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
659                 return bt->err;
660           else
661                 bt->reads++;
662
663         return bt->err = 0;
664 }
665
666 //      set CLOCK bit in latch
667 //      decrement pin count
668
669 void bt_unpinlatch (BtLatchSet *latch)
670 {
671 #ifdef unix
672         if( ~latch->pin & CLOCK_bit )
673                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
674         __sync_fetch_and_add(&latch->pin, -1);
675 #else
676         if( ~latch->pin & CLOCK_bit )
677                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
678         _InterlockedDecrement16 (&latch->pin);
679 #endif
680 }
681
682 //  return the btree cached page address
683
684 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
685 {
686 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
687
688         return page;
689 }
690
691 //      find existing latchset or inspire new one
692 //      return with latchset pinned
693
694 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
695 {
696 uint hashidx = page_no % bt->mgr->latchhash;
697 BtLatchSet *latch;
698 uint slot, idx;
699 uint lvl, cnt;
700 off64_t off;
701 uint amt[1];
702 BtPage page;
703
704   //  try to find our entry
705
706   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
707
708   if( slot = bt->mgr->hashtable[hashidx].slot ) do
709   {
710         latch = bt->mgr->latchsets + slot;
711         if( page_no == latch->page_no )
712                 break;
713   } while( slot = latch->next );
714
715   //  found our entry
716   //  increment clock
717
718   if( slot ) {
719         latch = bt->mgr->latchsets + slot;
720 #ifdef unix
721         __sync_fetch_and_add(&latch->pin, 1);
722 #else
723         _InterlockedIncrement16 (&latch->pin);
724 #endif
725         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
726         return latch;
727   }
728
729         //  see if there are any unused pool entries
730 #ifdef unix
731         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
732 #else
733         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
734 #endif
735
736         if( slot < bt->mgr->latchtotal ) {
737                 latch = bt->mgr->latchsets + slot;
738                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
739                         return NULL;
740                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
741                 return latch;
742         }
743
744 #ifdef unix
745         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
746 #else
747         _InterlockedDecrement (&bt->mgr->latchdeployed);
748 #endif
749   //  find and reuse previous entry on victim
750
751   while( 1 ) {
752 #ifdef unix
753         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
754 #else
755         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
756 #endif
757         // try to get write lock on hash chain
758         //      skip entry if not obtained
759         //      or has outstanding pins
760
761         slot %= bt->mgr->latchtotal;
762
763         if( !slot )
764                 continue;
765
766         latch = bt->mgr->latchsets + slot;
767         idx = latch->page_no % bt->mgr->latchhash;
768
769         //      see we are on same chain as hashidx
770
771         if( idx == hashidx )
772                 continue;
773
774         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
775                 continue;
776
777         //  skip this slot if it is pinned
778         //      or the CLOCK bit is set
779
780         if( latch->pin ) {
781           if( latch->pin & CLOCK_bit ) {
782 #ifdef unix
783                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
784 #else
785                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
786 #endif
787           }
788           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
789           continue;
790         }
791
792         //  update permanent page area in btree from buffer pool
793
794         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
795
796         if( latch->dirty )
797           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
798                 return NULL;
799           else
800                 latch->dirty = 0, bt->writes++;
801
802         //  unlink our available slot from its hash chain
803
804         if( latch->prev )
805                 bt->mgr->latchsets[latch->prev].next = latch->next;
806         else
807                 bt->mgr->hashtable[idx].slot = latch->next;
808
809         if( latch->next )
810                 bt->mgr->latchsets[latch->next].prev = latch->prev;
811
812         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
813
814         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
815                 return NULL;
816
817         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
818         return latch;
819   }
820 }
821
822 void bt_mgrclose (BtMgr *mgr)
823 {
824 BtLatchSet *latch;
825 uint num = 0;
826 BtPage page;
827 uint slot;
828
829         //      flush dirty pool pages to the btree
830
831         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
832                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
833                 latch = mgr->latchsets + slot;
834
835                 if( latch->dirty ) {
836                         bt_writepage(mgr, page, latch->page_no);
837                         latch->dirty = 0, num++;
838                 }
839 //              madvise (page, mgr->page_size, MADV_DONTNEED);
840         }
841
842         fprintf(stderr, "%d buffer pool pages flushed\n", num);
843
844 #ifdef unix
845         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
846         munmap (mgr->pagezero, mgr->page_size);
847 #else
848         FlushViewOfFile(mgr->pagezero, 0);
849         UnmapViewOfFile(mgr->pagezero);
850         UnmapViewOfFile(mgr->hashtable);
851         CloseHandle(mgr->halloc);
852         CloseHandle(mgr->hpool);
853 #endif
854 #ifdef unix
855         close (mgr->idx);
856         free (mgr);
857 #else
858         FlushFileBuffers(mgr->idx);
859         CloseHandle(mgr->idx);
860         GlobalFree (mgr);
861 #endif
862 }
863
864 //      close and release memory
865
866 void bt_close (BtDb *bt)
867 {
868 #ifdef unix
869         if( bt->mem )
870                 free (bt->mem);
871 #else
872         if( bt->mem)
873                 VirtualFree (bt->mem, 0, MEM_RELEASE);
874 #endif
875         free (bt);
876 }
877
878 //  open/create new btree buffer manager
879
880 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
881 //              size of page pool (e.g. 262144)
882
883 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
884 {
885 uint lvl, attr, last, slot, idx;
886 uint nlatchpage, latchhash;
887 unsigned char value[BtId];
888 int flag, initit = 0;
889 BtPageZero *pagezero;
890 off64_t size;
891 uint amt[1];
892 BtMgr* mgr;
893 BtKey* key;
894 BtVal *val;
895
896         // determine sanity of page size and buffer pool
897
898         if( bits > BT_maxbits )
899                 bits = BT_maxbits;
900         else if( bits < BT_minbits )
901                 bits = BT_minbits;
902
903         if( nodemax < 16 ) {
904                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
905                 return NULL;
906         }
907
908 #ifdef unix
909         mgr = calloc (1, sizeof(BtMgr));
910
911         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
912
913         if( mgr->idx == -1 ) {
914                 fprintf (stderr, "Unable to open btree file\n");
915                 return free(mgr), NULL;
916         }
917 #else
918         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
919         attr = FILE_ATTRIBUTE_NORMAL;
920         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
921
922         if( mgr->idx == INVALID_HANDLE_VALUE )
923                 return GlobalFree(mgr), NULL;
924 #endif
925
926 #ifdef unix
927         pagezero = valloc (BT_maxpage);
928         *amt = 0;
929
930         // read minimum page size to get root info
931         //      to support raw disk partition files
932         //      check if bits == 0 on the disk.
933
934         if( size = lseek (mgr->idx, 0L, 2) )
935                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
936                         if( pagezero->alloc->bits )
937                                 bits = pagezero->alloc->bits;
938                         else
939                                 initit = 1;
940                 else
941                         return free(mgr), free(pagezero), NULL;
942         else
943                 initit = 1;
944 #else
945         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
946         size = GetFileSize(mgr->idx, amt);
947
948         if( size || *amt ) {
949                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
950                         return bt_mgrclose (mgr), NULL;
951                 bits = pagezero->alloc->bits;
952         } else
953                 initit = 1;
954 #endif
955
956         mgr->page_size = 1 << bits;
957         mgr->page_bits = bits;
958
959         //  calculate number of latch hash table entries
960
961         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
962         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
963
964         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
965         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
966         mgr->latchtotal = nodemax;
967
968         if( !initit )
969                 goto mgrlatch;
970
971         // initialize an empty b-tree with latch page, root page, page of leaves
972         // and page(s) of latches and page pool cache
973
974         memset (pagezero, 0, 1 << bits);
975         pagezero->alloc->bits = mgr->page_bits;
976         bt_putid(pagezero->alloc->right, MIN_lvl+1);
977
978         //  initialize left-most LEAF page in
979         //      alloc->left.
980
981         bt_putid (pagezero->alloc->left, LEAF_page);
982
983         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
984                 fprintf (stderr, "Unable to create btree page zero\n");
985                 return bt_mgrclose (mgr), NULL;
986         }
987
988         memset (pagezero, 0, 1 << bits);
989         pagezero->alloc->bits = mgr->page_bits;
990
991         for( lvl=MIN_lvl; lvl--; ) {
992                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
993                 key = keyptr(pagezero->alloc, 1);
994                 key->len = 2;           // create stopper key
995                 key->key[0] = 0xff;
996                 key->key[1] = 0xff;
997
998                 bt_putid(value, MIN_lvl - lvl + 1);
999                 val = valptr(pagezero->alloc, 1);
1000                 val->len = lvl ? BtId : 0;
1001                 memcpy (val->value, value, val->len);
1002
1003                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1004                 pagezero->alloc->lvl = lvl;
1005                 pagezero->alloc->cnt = 1;
1006                 pagezero->alloc->act = 1;
1007
1008                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1009                         fprintf (stderr, "Unable to create btree page zero\n");
1010                         return bt_mgrclose (mgr), NULL;
1011                 }
1012         }
1013
1014 mgrlatch:
1015 #ifdef unix
1016         free (pagezero);
1017 #else
1018         VirtualFree (pagezero, 0, MEM_RELEASE);
1019 #endif
1020 #ifdef unix
1021         // mlock the pagezero page
1022
1023         flag = PROT_READ | PROT_WRITE;
1024         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1025         if( mgr->pagezero == MAP_FAILED ) {
1026                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1027                 return bt_mgrclose (mgr), NULL;
1028         }
1029         mlock (mgr->pagezero, mgr->page_size);
1030
1031         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1032         if( mgr->hashtable == MAP_FAILED ) {
1033                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1034                 return bt_mgrclose (mgr), NULL;
1035         }
1036 #else
1037         flag = PAGE_READWRITE;
1038         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1039         if( !mgr->halloc ) {
1040                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1041                 return bt_mgrclose (mgr), NULL;
1042         }
1043
1044         flag = FILE_MAP_WRITE;
1045         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1046         if( !mgr->pagezero ) {
1047                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1048                 return bt_mgrclose (mgr), NULL;
1049         }
1050
1051         flag = PAGE_READWRITE;
1052         size = (uid)mgr->nlatchpage << mgr->page_bits;
1053         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1054         if( !mgr->hpool ) {
1055                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1056                 return bt_mgrclose (mgr), NULL;
1057         }
1058
1059         flag = FILE_MAP_WRITE;
1060         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1061         if( !mgr->hashtable ) {
1062                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1063                 return bt_mgrclose (mgr), NULL;
1064         }
1065 #endif
1066
1067         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1068         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1069
1070         return mgr;
1071 }
1072
1073 //      open BTree access method
1074 //      based on buffer manager
1075
1076 BtDb *bt_open (BtMgr *mgr)
1077 {
1078 BtDb *bt = malloc (sizeof(*bt));
1079
1080         memset (bt, 0, sizeof(*bt));
1081         bt->mgr = mgr;
1082 #ifdef unix
1083         bt->mem = valloc (2 *mgr->page_size);
1084 #else
1085         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1086 #endif
1087         bt->frame = (BtPage)bt->mem;
1088         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1089         return bt;
1090 }
1091
1092 //  compare two keys, return > 0, = 0, or < 0
1093 //  =0: keys are same
1094 //  -1: key2 > key1
1095 //  +1: key2 < key1
1096 //  as the comparison value
1097
1098 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1099 {
1100 uint len1 = key1->len;
1101 int ans;
1102
1103         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1104                 return ans;
1105
1106         if( len1 > len2 )
1107                 return 1;
1108         if( len1 < len2 )
1109                 return -1;
1110
1111         return 0;
1112 }
1113
1114 // place write, read, or parent lock on requested page_no.
1115
1116 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1117 {
1118         switch( mode ) {
1119         case BtLockRead:
1120                 ReadLock (latch->readwr);
1121                 break;
1122         case BtLockWrite:
1123                 WriteLock (latch->readwr);
1124                 break;
1125         case BtLockAccess:
1126                 ReadLock (latch->access);
1127                 break;
1128         case BtLockDelete:
1129                 WriteLock (latch->access);
1130                 break;
1131         case BtLockParent:
1132                 WriteLock (latch->parent);
1133                 break;
1134         case BtLockAtomic:
1135                 WriteLock (latch->atomic);
1136                 break;
1137         }
1138 }
1139
1140 // remove write, read, or parent lock on requested page
1141
1142 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1143 {
1144         switch( mode ) {
1145         case BtLockRead:
1146                 ReadRelease (latch->readwr);
1147                 break;
1148         case BtLockWrite:
1149                 WriteRelease (latch->readwr);
1150                 break;
1151         case BtLockAccess:
1152                 ReadRelease (latch->access);
1153                 break;
1154         case BtLockDelete:
1155                 WriteRelease (latch->access);
1156                 break;
1157         case BtLockParent:
1158                 WriteRelease (latch->parent);
1159                 break;
1160         case BtLockAtomic:
1161                 WriteRelease (latch->atomic);
1162                 break;
1163         }
1164 }
1165
1166 //      allocate a new page
1167 //      return with page latched, but unlocked.
1168
1169 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1170 {
1171 uid page_no;
1172 int blk;
1173
1174         //      lock allocation page
1175
1176         bt_spinwritelock(bt->mgr->lock);
1177
1178         // use empty chain first
1179         // else allocate empty page
1180
1181         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1182                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1183                         set->page = bt_mappage (bt, set->latch);
1184                 else
1185                         return bt->err = BTERR_struct, -1;
1186
1187                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1188                 bt_spinreleasewrite(bt->mgr->lock);
1189
1190                 memcpy (set->page, contents, bt->mgr->page_size);
1191                 set->latch->dirty = 1;
1192                 return 0;
1193         }
1194
1195         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1196         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1197
1198         // unlock allocation latch
1199
1200         bt_spinreleasewrite(bt->mgr->lock);
1201
1202         //      don't load cache from btree page
1203
1204         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1205                 set->page = bt_mappage (bt, set->latch);
1206         else
1207                 return bt->err = BTERR_struct;
1208
1209         memcpy (set->page, contents, bt->mgr->page_size);
1210         set->latch->dirty = 1;
1211         return 0;
1212 }
1213
1214 //  find slot in page for given key at a given level
1215
1216 int bt_findslot (BtPage page, unsigned char *key, uint len)
1217 {
1218 uint diff, higher = page->cnt, low = 1, slot;
1219 uint good = 0;
1220
1221         //        make stopper key an infinite fence value
1222
1223         if( bt_getid (page->right) )
1224                 higher++;
1225         else
1226                 good++;
1227
1228         //      low is the lowest candidate.
1229         //  loop ends when they meet
1230
1231         //  higher is already
1232         //      tested as .ge. the passed key.
1233
1234         while( diff = higher - low ) {
1235                 slot = low + ( diff >> 1 );
1236                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1237                         low = slot + 1;
1238                 else
1239                         higher = slot, good++;
1240         }
1241
1242         //      return zero if key is on right link page
1243
1244         return good ? higher : 0;
1245 }
1246
1247 //  find and load page at given level for given key
1248 //      leave page rd or wr locked as requested
1249
1250 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1251 {
1252 uid page_no = ROOT_page, prevpage = 0;
1253 uint drill = 0xff, slot;
1254 BtLatchSet *prevlatch;
1255 uint mode, prevmode;
1256
1257   //  start at root of btree and drill down
1258
1259   do {
1260         // determine lock mode of drill level
1261         mode = (drill == lvl) ? lock : BtLockRead; 
1262
1263         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1264           return 0;
1265
1266         // obtain access lock using lock chaining with Access mode
1267
1268         if( page_no > ROOT_page )
1269           bt_lockpage(BtLockAccess, set->latch);
1270
1271         set->page = bt_mappage (bt, set->latch);
1272
1273         //      release & unpin parent or left sibling page
1274
1275         if( prevpage ) {
1276           bt_unlockpage(prevmode, prevlatch);
1277           bt_unpinlatch (prevlatch);
1278           prevpage = 0;
1279         }
1280
1281         // obtain mode lock using lock chaining through AccessLock
1282
1283         bt_lockpage(mode, set->latch);
1284
1285         if( set->page->free )
1286                 return bt->err = BTERR_struct, 0;
1287
1288         if( page_no > ROOT_page )
1289           bt_unlockpage(BtLockAccess, set->latch);
1290
1291         // re-read and re-lock root after determining actual level of root
1292
1293         if( set->page->lvl != drill) {
1294                 if( set->latch->page_no != ROOT_page )
1295                         return bt->err = BTERR_struct, 0;
1296                         
1297                 drill = set->page->lvl;
1298
1299                 if( lock != BtLockRead && drill == lvl ) {
1300                   bt_unlockpage(mode, set->latch);
1301                   bt_unpinlatch (set->latch);
1302                   continue;
1303                 }
1304         }
1305
1306         prevpage = set->latch->page_no;
1307         prevlatch = set->latch;
1308         prevmode = mode;
1309
1310         //  find key on page at this level
1311         //  and descend to requested level
1312
1313         if( set->page->kill )
1314           goto slideright;
1315
1316         if( slot = bt_findslot (set->page, key, len) ) {
1317           if( drill == lvl )
1318                 return slot;
1319
1320           // find next non-dead slot -- the fence key if nothing else
1321
1322           while( slotptr(set->page, slot)->dead )
1323                 if( slot++ < set->page->cnt )
1324                   continue;
1325                 else
1326                   return bt->err = BTERR_struct, 0;
1327
1328           page_no = bt_getid(valptr(set->page, slot)->value);
1329           drill--;
1330           continue;
1331         }
1332
1333         //  or slide right into next page
1334
1335 slideright:
1336         page_no = bt_getid(set->page->right);
1337
1338   } while( page_no );
1339
1340   // return error on end of right chain
1341
1342   bt->err = BTERR_struct;
1343   return 0;     // return error
1344 }
1345
1346 //      return page to free list
1347 //      page must be delete & write locked
1348
1349 void bt_freepage (BtDb *bt, BtPageSet *set)
1350 {
1351         //      lock allocation page
1352
1353         bt_spinwritelock (bt->mgr->lock);
1354
1355         //      store chain
1356
1357         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1358         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1359         set->latch->dirty = 1;
1360         set->page->free = 1;
1361
1362         // unlock released page
1363
1364         bt_unlockpage (BtLockDelete, set->latch);
1365         bt_unlockpage (BtLockWrite, set->latch);
1366
1367         // unlock allocation page
1368
1369         bt_spinreleasewrite (bt->mgr->lock);
1370 }
1371
1372 //      a fence key was deleted from a page
1373 //      push new fence value upwards
1374
1375 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1376 {
1377 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1378 unsigned char value[BtId];
1379 BtKey* ptr;
1380 uint idx;
1381
1382         //      remove the old fence value
1383
1384         ptr = keyptr(set->page, set->page->cnt);
1385         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1386         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1387         set->latch->dirty = 1;
1388
1389         //  cache new fence value
1390
1391         ptr = keyptr(set->page, set->page->cnt);
1392         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1393
1394         bt_lockpage (BtLockParent, set->latch);
1395         bt_unlockpage (BtLockWrite, set->latch);
1396
1397         //      insert new (now smaller) fence key
1398
1399         bt_putid (value, set->latch->page_no);
1400         ptr = (BtKey*)leftkey;
1401
1402         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1403           return bt->err;
1404
1405         //      now delete old fence key
1406
1407         ptr = (BtKey*)rightkey;
1408
1409         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1410                 return bt->err;
1411
1412         bt_unlockpage (BtLockParent, set->latch);
1413         bt_unpinlatch(set->latch);
1414         return 0;
1415 }
1416
1417 //      root has a single child
1418 //      collapse a level from the tree
1419
1420 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1421 {
1422 BtPageSet child[1];
1423 uid page_no;
1424 uint idx;
1425
1426   // find the child entry and promote as new root contents
1427
1428   do {
1429         for( idx = 0; idx++ < root->page->cnt; )
1430           if( !slotptr(root->page, idx)->dead )
1431                 break;
1432
1433         page_no = bt_getid (valptr(root->page, idx)->value);
1434
1435         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1436                 child->page = bt_mappage (bt, child->latch);
1437         else
1438                 return bt->err;
1439
1440         bt_lockpage (BtLockDelete, child->latch);
1441         bt_lockpage (BtLockWrite, child->latch);
1442
1443         memcpy (root->page, child->page, bt->mgr->page_size);
1444         root->latch->dirty = 1;
1445
1446         bt_freepage (bt, child);
1447         bt_unpinlatch (child->latch);
1448
1449   } while( root->page->lvl > 1 && root->page->act == 1 );
1450
1451   bt_unlockpage (BtLockWrite, root->latch);
1452   bt_unpinlatch (root->latch);
1453   return 0;
1454 }
1455
1456 //  delete a page and manage keys
1457 //  call with page writelocked
1458 //      returns with page unpinned
1459
1460 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1461 {
1462 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1463 unsigned char value[BtId];
1464 uint lvl = set->page->lvl;
1465 BtPageSet right[1];
1466 uid page_no;
1467 BtKey *ptr;
1468
1469         //      cache copy of fence key
1470         //      to post in parent
1471
1472         ptr = keyptr(set->page, set->page->cnt);
1473         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1474
1475         //      obtain lock on right page
1476
1477         page_no = bt_getid(set->page->right);
1478
1479         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1480                 right->page = bt_mappage (bt, right->latch);
1481         else
1482                 return 0;
1483
1484         bt_lockpage (BtLockWrite, right->latch);
1485
1486         // cache copy of key to update
1487
1488         ptr = keyptr(right->page, right->page->cnt);
1489         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1490
1491         if( right->page->kill )
1492                 return bt->err = BTERR_struct;
1493
1494         // pull contents of right peer into our empty page
1495
1496         memcpy (set->page, right->page, bt->mgr->page_size);
1497         set->latch->dirty = 1;
1498
1499         // mark right page deleted and point it to left page
1500         //      until we can post parent updates that remove access
1501         //      to the deleted page.
1502
1503         bt_putid (right->page->right, set->latch->page_no);
1504         right->latch->dirty = 1;
1505         right->page->kill = 1;
1506
1507         bt_lockpage (BtLockParent, right->latch);
1508         bt_unlockpage (BtLockWrite, right->latch);
1509
1510         bt_lockpage (BtLockParent, set->latch);
1511         bt_unlockpage (BtLockWrite, set->latch);
1512
1513         // redirect higher key directly to our new node contents
1514
1515         bt_putid (value, set->latch->page_no);
1516         ptr = (BtKey*)higherfence;
1517
1518         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1519           return bt->err;
1520
1521         //      delete old lower key to our node
1522
1523         ptr = (BtKey*)lowerfence;
1524
1525         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1526           return bt->err;
1527
1528         //      obtain delete and write locks to right node
1529
1530         bt_unlockpage (BtLockParent, right->latch);
1531         bt_lockpage (BtLockDelete, right->latch);
1532         bt_lockpage (BtLockWrite, right->latch);
1533         bt_freepage (bt, right);
1534         bt_unpinlatch (right->latch);
1535
1536         bt_unlockpage (BtLockParent, set->latch);
1537         bt_unpinlatch (set->latch);
1538         bt->found = 1;
1539         return 0;
1540 }
1541
1542 //  find and delete key on page by marking delete flag bit
1543 //  if page becomes empty, delete it from the btree
1544
1545 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1546 {
1547 uint slot, idx, found, fence;
1548 BtPageSet set[1];
1549 BtKey *ptr;
1550 BtVal *val;
1551
1552         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1553                 ptr = keyptr(set->page, slot);
1554         else
1555                 return bt->err;
1556
1557         // if librarian slot, advance to real slot
1558
1559         if( slotptr(set->page, slot)->type == Librarian )
1560                 ptr = keyptr(set->page, ++slot);
1561
1562         fence = slot == set->page->cnt;
1563
1564         // if key is found delete it, otherwise ignore request
1565
1566         if( found = !keycmp (ptr, key, len) )
1567           if( found = slotptr(set->page, slot)->dead == 0 ) {
1568                 val = valptr(set->page,slot);
1569                 slotptr(set->page, slot)->dead = 1;
1570                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1571                 set->page->act--;
1572
1573                 // collapse empty slots beneath the fence
1574
1575                 while( idx = set->page->cnt - 1 )
1576                   if( slotptr(set->page, idx)->dead ) {
1577                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1578                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1579                   } else
1580                         break;
1581           }
1582
1583         //      did we delete a fence key in an upper level?
1584
1585         if( found && lvl && set->page->act && fence )
1586           if( bt_fixfence (bt, set, lvl) )
1587                 return bt->err;
1588           else
1589                 return bt->found = found, 0;
1590
1591         //      do we need to collapse root?
1592
1593         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1594           if( bt_collapseroot (bt, set) )
1595                 return bt->err;
1596           else
1597                 return bt->found = found, 0;
1598
1599         //      delete empty page
1600
1601         if( !set->page->act )
1602                 return bt_deletepage (bt, set);
1603
1604         set->latch->dirty = 1;
1605         bt_unlockpage(BtLockWrite, set->latch);
1606         bt_unpinlatch (set->latch);
1607         return bt->found = found, 0;
1608 }
1609
1610 BtKey *bt_foundkey (BtDb *bt)
1611 {
1612         return (BtKey*)(bt->key);
1613 }
1614
1615 //      advance to next slot
1616
1617 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1618 {
1619 BtLatchSet *prevlatch;
1620 uid page_no;
1621
1622         if( slot < set->page->cnt )
1623                 return slot + 1;
1624
1625         prevlatch = set->latch;
1626
1627         if( page_no = bt_getid(set->page->right) )
1628           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1629                 set->page = bt_mappage (bt, set->latch);
1630           else
1631                 return 0;
1632         else
1633           return bt->err = BTERR_struct, 0;
1634
1635         // obtain access lock using lock chaining with Access mode
1636
1637         bt_lockpage(BtLockAccess, set->latch);
1638
1639         bt_unlockpage(BtLockRead, prevlatch);
1640         bt_unpinlatch (prevlatch);
1641
1642         bt_lockpage(BtLockRead, set->latch);
1643         bt_unlockpage(BtLockAccess, set->latch);
1644         return 1;
1645 }
1646
1647 //      find unique key or first duplicate key in
1648 //      leaf level and return number of value bytes
1649 //      or (-1) if not found.  Setup key for bt_foundkey
1650
1651 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1652 {
1653 BtPageSet set[1];
1654 uint len, slot;
1655 int ret = -1;
1656 BtKey *ptr;
1657 BtVal *val;
1658
1659   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1660    do {
1661         ptr = keyptr(set->page, slot);
1662
1663         //      skip librarian slot place holder
1664
1665         if( slotptr(set->page, slot)->type == Librarian )
1666                 ptr = keyptr(set->page, ++slot);
1667
1668         //      return actual key found
1669
1670         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1671         len = ptr->len;
1672
1673         if( slotptr(set->page, slot)->type == Duplicate )
1674                 len -= BtId;
1675
1676         //      not there if we reach the stopper key
1677
1678         if( slot == set->page->cnt )
1679           if( !bt_getid (set->page->right) )
1680                 break;
1681
1682         // if key exists, return >= 0 value bytes copied
1683         //      otherwise return (-1)
1684
1685         if( slotptr(set->page, slot)->dead )
1686                 continue;
1687
1688         if( keylen == len )
1689           if( !memcmp (ptr->key, key, len) ) {
1690                 val = valptr (set->page,slot);
1691                 if( valmax > val->len )
1692                         valmax = val->len;
1693                 memcpy (value, val->value, valmax);
1694                 ret = valmax;
1695           }
1696
1697         break;
1698
1699    } while( slot = bt_findnext (bt, set, slot) );
1700
1701   bt_unlockpage (BtLockRead, set->latch);
1702   bt_unpinlatch (set->latch);
1703   return ret;
1704 }
1705
1706 //      check page for space available,
1707 //      clean if necessary and return
1708 //      0 - page needs splitting
1709 //      >0  new slot value
1710
1711 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1712 {
1713 uint nxt = bt->mgr->page_size;
1714 BtPage page = set->page;
1715 uint cnt = 0, idx = 0;
1716 uint max = page->cnt;
1717 uint newslot = max;
1718 BtKey *key;
1719 BtVal *val;
1720
1721         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1722                 return slot;
1723
1724         //      skip cleanup and proceed to split
1725         //      if there's not enough garbage
1726         //      to bother with.
1727
1728         if( page->garbage < nxt / 5 )
1729                 return 0;
1730
1731         memcpy (bt->frame, page, bt->mgr->page_size);
1732
1733         // skip page info and set rest of page to zero
1734
1735         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1736         set->latch->dirty = 1;
1737         page->garbage = 0;
1738         page->act = 0;
1739
1740         // clean up page first by
1741         // removing deleted keys
1742
1743         while( cnt++ < max ) {
1744                 if( cnt == slot )
1745                         newslot = idx + 2;
1746                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1747                         continue;
1748
1749                 // copy the value across
1750
1751                 val = valptr(bt->frame, cnt);
1752                 nxt -= val->len + sizeof(BtVal);
1753                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1754
1755                 // copy the key across
1756
1757                 key = keyptr(bt->frame, cnt);
1758                 nxt -= key->len + sizeof(BtKey);
1759                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1760
1761                 // make a librarian slot
1762
1763                 if( idx ) {
1764                         slotptr(page, ++idx)->off = nxt;
1765                         slotptr(page, idx)->type = Librarian;
1766                         slotptr(page, idx)->dead = 1;
1767                 }
1768
1769                 // set up the slot
1770
1771                 slotptr(page, ++idx)->off = nxt;
1772                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1773
1774                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1775                         page->act++;
1776         }
1777
1778         page->min = nxt;
1779         page->cnt = idx;
1780
1781         //      see if page has enough space now, or does it need splitting?
1782
1783         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1784                 return newslot;
1785
1786         return 0;
1787 }
1788
1789 // split the root and raise the height of the btree
1790
1791 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1792 {  
1793 unsigned char leftkey[BT_keyarray];
1794 uint nxt = bt->mgr->page_size;
1795 unsigned char value[BtId];
1796 BtPageSet left[1];
1797 uid left_page_no;
1798 BtKey *ptr;
1799 BtVal *val;
1800
1801         //      save left page fence key for new root
1802
1803         ptr = keyptr(root->page, root->page->cnt);
1804         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1805
1806         //  Obtain an empty page to use, and copy the current
1807         //  root contents into it, e.g. lower keys
1808
1809         if( bt_newpage(bt, left, root->page) )
1810                 return bt->err;
1811
1812         left_page_no = left->latch->page_no;
1813         bt_unpinlatch (left->latch);
1814
1815         // preserve the page info at the bottom
1816         // of higher keys and set rest to zero
1817
1818         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1819
1820         // insert stopper key at top of newroot page
1821         // and increase the root height
1822
1823         nxt -= BtId + sizeof(BtVal);
1824         bt_putid (value, right->page_no);
1825         val = (BtVal *)((unsigned char *)root->page + nxt);
1826         memcpy (val->value, value, BtId);
1827         val->len = BtId;
1828
1829         nxt -= 2 + sizeof(BtKey);
1830         slotptr(root->page, 2)->off = nxt;
1831         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1832         ptr->len = 2;
1833         ptr->key[0] = 0xff;
1834         ptr->key[1] = 0xff;
1835
1836         // insert lower keys page fence key on newroot page as first key
1837
1838         nxt -= BtId + sizeof(BtVal);
1839         bt_putid (value, left_page_no);
1840         val = (BtVal *)((unsigned char *)root->page + nxt);
1841         memcpy (val->value, value, BtId);
1842         val->len = BtId;
1843
1844         ptr = (BtKey *)leftkey;
1845         nxt -= ptr->len + sizeof(BtKey);
1846         slotptr(root->page, 1)->off = nxt;
1847         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1848         
1849         bt_putid(root->page->right, 0);
1850         root->page->min = nxt;          // reset lowest used offset and key count
1851         root->page->cnt = 2;
1852         root->page->act = 2;
1853         root->page->lvl++;
1854
1855         // release and unpin root pages
1856
1857         bt_unlockpage(BtLockWrite, root->latch);
1858         bt_unpinlatch (root->latch);
1859
1860         bt_unpinlatch (right);
1861         return 0;
1862 }
1863
1864 //  split already locked full node
1865 //      leave it locked.
1866 //      return pool entry for new right
1867 //      page, unlocked
1868
1869 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1870 {
1871 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1872 uint lvl = set->page->lvl;
1873 BtPageSet right[1];
1874 BtKey *key, *ptr;
1875 BtVal *val, *src;
1876 uid right2;
1877 uint prev;
1878
1879         //  split higher half of keys to bt->frame
1880
1881         memset (bt->frame, 0, bt->mgr->page_size);
1882         max = set->page->cnt;
1883         cnt = max / 2;
1884         idx = 0;
1885
1886         while( cnt++ < max ) {
1887                 if( slotptr(set->page, cnt)->dead && cnt < max )
1888                         continue;
1889                 src = valptr(set->page, cnt);
1890                 nxt -= src->len + sizeof(BtVal);
1891                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1892
1893                 key = keyptr(set->page, cnt);
1894                 nxt -= key->len + sizeof(BtKey);
1895                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1896                 memcpy (ptr, key, key->len + sizeof(BtKey));
1897
1898                 //      add librarian slot
1899
1900                 if( idx ) {
1901                         slotptr(bt->frame, ++idx)->off = nxt;
1902                         slotptr(bt->frame, idx)->type = Librarian;
1903                         slotptr(bt->frame, idx)->dead = 1;
1904                 }
1905
1906                 //  add actual slot
1907
1908                 slotptr(bt->frame, ++idx)->off = nxt;
1909                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1910
1911                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1912                         bt->frame->act++;
1913         }
1914
1915         bt->frame->bits = bt->mgr->page_bits;
1916         bt->frame->min = nxt;
1917         bt->frame->cnt = idx;
1918         bt->frame->lvl = lvl;
1919
1920         // link right node
1921
1922         if( set->latch->page_no > ROOT_page )
1923                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1924
1925         //      get new free page and write higher keys to it.
1926
1927         if( bt_newpage(bt, right, bt->frame) )
1928                 return 0;
1929
1930         memcpy (bt->frame, set->page, bt->mgr->page_size);
1931         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1932         set->latch->dirty = 1;
1933
1934         nxt = bt->mgr->page_size;
1935         set->page->garbage = 0;
1936         set->page->act = 0;
1937         max /= 2;
1938         cnt = 0;
1939         idx = 0;
1940
1941         if( slotptr(bt->frame, max)->type == Librarian )
1942                 max--;
1943
1944         //  assemble page of smaller keys
1945
1946         while( cnt++ < max ) {
1947                 if( slotptr(bt->frame, cnt)->dead )
1948                         continue;
1949                 val = valptr(bt->frame, cnt);
1950                 nxt -= val->len + sizeof(BtVal);
1951                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1952
1953                 key = keyptr(bt->frame, cnt);
1954                 nxt -= key->len + sizeof(BtKey);
1955                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1956
1957                 //      add librarian slot
1958
1959                 if( idx ) {
1960                         slotptr(set->page, ++idx)->off = nxt;
1961                         slotptr(set->page, idx)->type = Librarian;
1962                         slotptr(set->page, idx)->dead = 1;
1963                 }
1964
1965                 //      add actual slot
1966
1967                 slotptr(set->page, ++idx)->off = nxt;
1968                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1969                 set->page->act++;
1970         }
1971
1972         bt_putid(set->page->right, right->latch->page_no);
1973         set->page->min = nxt;
1974         set->page->cnt = idx;
1975
1976         return right->latch->entry;
1977 }
1978
1979 //      fix keys for newly split page
1980 //      call with page locked, return
1981 //      unlocked
1982
1983 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
1984 {
1985 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1986 unsigned char value[BtId];
1987 uint lvl = set->page->lvl;
1988 BtPage page;
1989 BtKey *ptr;
1990
1991         // if current page is the root page, split it
1992
1993         if( set->latch->page_no == ROOT_page )
1994                 return bt_splitroot (bt, set, right);
1995
1996         ptr = keyptr(set->page, set->page->cnt);
1997         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1998
1999         page = bt_mappage (bt, right);
2000
2001         ptr = keyptr(page, page->cnt);
2002         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2003
2004         // insert new fences in their parent pages
2005
2006         bt_lockpage (BtLockParent, right);
2007
2008         bt_lockpage (BtLockParent, set->latch);
2009         bt_unlockpage (BtLockWrite, set->latch);
2010
2011         // insert new fence for reformulated left block of smaller keys
2012
2013         bt_putid (value, set->latch->page_no);
2014         ptr = (BtKey *)leftkey;
2015
2016         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2017                 return bt->err;
2018
2019         // switch fence for right block of larger keys to new right page
2020
2021         bt_putid (value, right->page_no);
2022         ptr = (BtKey *)rightkey;
2023
2024         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2025                 return bt->err;
2026
2027         bt_unlockpage (BtLockParent, set->latch);
2028         bt_unpinlatch (set->latch);
2029
2030         bt_unlockpage (BtLockParent, right);
2031         bt_unpinlatch (right);
2032         return 0;
2033 }
2034
2035 //      install new key and value onto page
2036 //      page must already be checked for
2037 //      adequate space
2038
2039 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2040 {
2041 uint idx, librarian;
2042 BtSlot *node;
2043 BtKey *ptr;
2044 BtVal *val;
2045
2046         //      if found slot > desired slot and previous slot
2047         //      is a librarian slot, use it
2048
2049         if( slot > 1 )
2050           if( slotptr(set->page, slot-1)->type == Librarian )
2051                 slot--;
2052
2053         // copy value onto page
2054
2055         set->page->min -= vallen + sizeof(BtVal);
2056         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2057         memcpy (val->value, value, vallen);
2058         val->len = vallen;
2059
2060         // copy key onto page
2061
2062         set->page->min -= keylen + sizeof(BtKey);
2063         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2064         memcpy (ptr->key, key, keylen);
2065         ptr->len = keylen;
2066         
2067         //      find first empty slot
2068
2069         for( idx = slot; idx < set->page->cnt; idx++ )
2070           if( slotptr(set->page, idx)->dead )
2071                 break;
2072
2073         // now insert key into array before slot
2074
2075         if( idx == set->page->cnt )
2076                 idx += 2, set->page->cnt += 2, librarian = 2;
2077         else
2078                 librarian = 1;
2079
2080         set->latch->dirty = 1;
2081         set->page->act++;
2082
2083         while( idx > slot + librarian - 1 )
2084                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2085
2086         //      add librarian slot
2087
2088         if( librarian > 1 ) {
2089                 node = slotptr(set->page, slot++);
2090                 node->off = set->page->min;
2091                 node->type = Librarian;
2092                 node->dead = 1;
2093         }
2094
2095         //      fill in new slot
2096
2097         node = slotptr(set->page, slot);
2098         node->off = set->page->min;
2099         node->type = type;
2100         node->dead = 0;
2101
2102         if( release ) {
2103                 bt_unlockpage (BtLockWrite, set->latch);
2104                 bt_unpinlatch (set->latch);
2105         }
2106
2107         return 0;
2108 }
2109
2110 //  Insert new key into the btree at given level.
2111 //      either add a new key or update/add an existing one
2112
2113 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2114 {
2115 unsigned char newkey[BT_keyarray];
2116 uint slot, idx, len, entry;
2117 BtPageSet set[1];
2118 BtKey *ptr, *ins;
2119 uid sequence;
2120 BtVal *val;
2121 uint type;
2122
2123   // set up the key we're working on
2124
2125   ins = (BtKey*)newkey;
2126   memcpy (ins->key, key, keylen);
2127   ins->len = keylen;
2128
2129   // is this a non-unique index value?
2130
2131   if( unique )
2132         type = Unique;
2133   else {
2134         type = Duplicate;
2135         sequence = bt_newdup (bt);
2136         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2137         ins->len += BtId;
2138   }
2139
2140   while( 1 ) { // find the page and slot for the current key
2141         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2142                 ptr = keyptr(set->page, slot);
2143         else {
2144                 if( !bt->err )
2145                         bt->err = BTERR_ovflw;
2146                 return bt->err;
2147         }
2148
2149         // if librarian slot == found slot, advance to real slot
2150
2151         if( slotptr(set->page, slot)->type == Librarian )
2152           if( !keycmp (ptr, key, keylen) )
2153                 ptr = keyptr(set->page, ++slot);
2154
2155         len = ptr->len;
2156
2157         if( slotptr(set->page, slot)->type == Duplicate )
2158                 len -= BtId;
2159
2160         //  if inserting a duplicate key or unique key
2161         //      check for adequate space on the page
2162         //      and insert the new key before slot.
2163
2164         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2165           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2166                 if( !(entry = bt_splitpage (bt, set)) )
2167                   return bt->err;
2168                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2169                   return bt->err;
2170                 else
2171                   continue;
2172
2173           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2174         }
2175
2176         // if key already exists, update value and return
2177
2178         val = valptr(set->page, slot);
2179
2180         if( val->len >= vallen ) {
2181                 if( slotptr(set->page, slot)->dead )
2182                         set->page->act++;
2183                 set->page->garbage += val->len - vallen;
2184                 set->latch->dirty = 1;
2185                 slotptr(set->page, slot)->dead = 0;
2186                 val->len = vallen;
2187                 memcpy (val->value, value, vallen);
2188                 bt_unlockpage(BtLockWrite, set->latch);
2189                 bt_unpinlatch (set->latch);
2190                 return 0;
2191         }
2192
2193         //      new update value doesn't fit in existing value area
2194
2195         if( !slotptr(set->page, slot)->dead )
2196                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2197         else {
2198                 slotptr(set->page, slot)->dead = 0;
2199                 set->page->act++;
2200         }
2201
2202         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2203           if( !(entry = bt_splitpage (bt, set)) )
2204                 return bt->err;
2205           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2206                 return bt->err;
2207           else
2208                 continue;
2209
2210         set->page->min -= vallen + sizeof(BtVal);
2211         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2212         memcpy (val->value, value, vallen);
2213         val->len = vallen;
2214
2215         set->latch->dirty = 1;
2216         set->page->min -= keylen + sizeof(BtKey);
2217         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2218         memcpy (ptr->key, key, keylen);
2219         ptr->len = keylen;
2220         
2221         slotptr(set->page, slot)->off = set->page->min;
2222         bt_unlockpage(BtLockWrite, set->latch);
2223         bt_unpinlatch (set->latch);
2224         return 0;
2225   }
2226   return 0;
2227 }
2228
2229 typedef struct {
2230         uint entry;                     // latch table entry number
2231         uint slot:31;           // page slot number
2232         uint reuse:1;           // reused previous page
2233 } AtomicMod;
2234
2235 typedef struct {
2236         uid page_no;            // page number for split leaf
2237         void *next;                     // next key to insert
2238         uint entry:29;          // latch table entry number
2239         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2240         uint nounlock:1;        // don't unlock ParentModification
2241         unsigned char leafkey[BT_keyarray];
2242 } AtomicKey;
2243
2244 //  find and load leaf page for given key
2245 //      leave page Atomic locked and Read locked.
2246
2247 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2248 {
2249 BtLatchSet *prevlatch;
2250 uid page_no;
2251 uint slot;
2252
2253   //  find level zero page
2254
2255   if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2256         return 0;
2257
2258   // find next non-dead entry on this page
2259   //    it will be the fence key if nothing else
2260
2261   while( slotptr(set->page, slot)->dead )
2262         if( slot++ < set->page->cnt )
2263           continue;
2264         else
2265           return bt->err = BTERR_struct, 0;
2266
2267   page_no = bt_getid(valptr(set->page, slot)->value);
2268   prevlatch = set->latch;
2269
2270   while( page_no ) {
2271         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2272           set->page = bt_mappage (bt, set->latch);
2273         else
2274           return 0;
2275
2276         if( set->page->free || set->page->lvl )
2277                 return bt->err = BTERR_struct, 0;
2278
2279         // obtain read lock using lock chaining with Access mode
2280         //      release & unpin parent/left sibling page
2281
2282         bt_lockpage(BtLockAccess, set->latch);
2283
2284         bt_unlockpage(BtLockRead, prevlatch);
2285         bt_unpinlatch (prevlatch);
2286
2287         bt_lockpage(BtLockRead, set->latch);
2288         bt_unlockpage(BtLockAccess, set->latch);
2289
2290         //  find key on page at this level
2291         //  and descend to requested level
2292
2293         if( !set->page->kill )
2294          if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2295           bt_unlockpage(BtLockRead, set->latch);
2296           bt_lockpage(BtLockAtomic, set->latch);
2297           bt_lockpage(BtLockAccess, set->latch);
2298           bt_lockpage(BtLockRead, set->latch);
2299           bt_unlockpage(BtLockAccess, set->latch);
2300
2301           if( slot = bt_findslot (set->page, key, len) )
2302                 return slot;
2303
2304           bt_unlockpage(BtLockAtomic, set->latch);
2305           }
2306
2307         //  slide right into next page
2308
2309         page_no = bt_getid(set->page->right);
2310         prevlatch = set->latch;
2311   }
2312
2313   // return error on end of right chain
2314
2315   bt->err = BTERR_struct;
2316   return 0;     // return error
2317 }
2318
2319 //      determine actual page where key is located
2320 //  return slot number
2321
2322 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2323 {
2324 BtKey *key = keyptr(source,src);
2325 uint slot = locks[src].slot;
2326 uint entry;
2327
2328         if( src > 1 && locks[src].reuse )
2329           entry = locks[src-1].entry, slot = 0;
2330         else
2331           entry = locks[src].entry;
2332
2333         if( slot ) {
2334                 set->latch = bt->mgr->latchsets + entry;
2335                 set->page = bt_mappage (bt, set->latch);
2336                 return slot;
2337         }
2338
2339         //      is locks->reuse set?
2340         //      if so, find where our key
2341         //      is located on previous page or split pages
2342
2343         do {
2344                 set->latch = bt->mgr->latchsets + entry;
2345                 set->page = bt_mappage (bt, set->latch);
2346
2347                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2348                   if( locks[src].reuse )
2349                         locks[src].entry = entry;
2350                   return slot;
2351                 }
2352         } while( entry = set->latch->split );
2353
2354         bt->err = BTERR_atomic;
2355         return 0;
2356 }
2357
2358 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2359 {
2360 BtKey *key = keyptr(source, src);
2361 BtVal *val = valptr(source, src);
2362 BtLatchSet *latch;
2363 BtPageSet set[1];
2364 uint entry;
2365
2366   while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2367         if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2368           return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2369
2370         if( entry = bt_splitpage (bt, set) )
2371           latch = bt->mgr->latchsets + entry;
2372         else
2373           return bt->err;
2374
2375         //      splice right page into split chain
2376         //      and WriteLock it.
2377
2378         latch->split = set->latch->split;
2379         set->latch->split = entry;
2380         bt_lockpage(BtLockWrite, latch);
2381   }
2382
2383   return bt->err = BTERR_atomic;
2384 }
2385
2386 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2387 {
2388 BtKey *key = keyptr(source, src);
2389 uint idx, entry, slot;
2390 BtPageSet set[1];
2391 BtKey *ptr;
2392 BtVal *val;
2393
2394         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2395                 slotptr(set->page, slot)->dead = 1;
2396         else
2397                 return bt->err = BTERR_struct;
2398
2399         ptr = keyptr(set->page, slot);
2400         val = valptr(set->page, slot);
2401
2402         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2403         set->latch->dirty = 1;
2404         set->page->act--;
2405         return 0;
2406 }
2407
2408 uint bt_parentmatch (AtomicKey *head, uint entry)
2409 {
2410 uint cnt = 0;
2411
2412         if( head )
2413           do if( head->entry == entry )
2414                 head->nounlock = 1, cnt++;
2415           while( head = head->next );
2416
2417         return cnt;
2418 }
2419
2420 //      atomic modification of a batch of keys.
2421
2422 //      return -1 if BTERR is set
2423 //      otherwise return slot number
2424 //      causing the key constraint violation
2425 //      or zero on successful completion.
2426
2427 int bt_atomictxn (BtDb *bt, BtPage source)
2428 {
2429 uint src, idx, slot, samepage, entry;
2430 AtomicKey *head, *tail, *leaf;
2431 BtPageSet set[1], prev[1];
2432 unsigned char value[BtId];
2433 BtKey *key, *ptr, *key2;
2434 BtLatchSet *latch;
2435 AtomicMod *locks;
2436 int result = 0;
2437 BtSlot temp[1];
2438 BtPage page;
2439 BtVal *val;
2440 uid right;
2441 int type;
2442
2443   locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2444   head = NULL;
2445   tail = NULL;
2446
2447   // stable sort the list of keys into order to
2448   //    prevent deadlocks between threads.
2449
2450   for( src = 1; src++ < source->cnt; ) {
2451         *temp = *slotptr(source,src);
2452         key = keyptr (source,src);
2453
2454         for( idx = src; --idx; ) {
2455           key2 = keyptr (source,idx);
2456           if( keycmp (key, key2->key, key2->len) < 0 ) {
2457                 *slotptr(source,idx+1) = *slotptr(source,idx);
2458                 *slotptr(source,idx) = *temp;
2459           } else
2460                 break;
2461         }
2462   }
2463
2464   // Load the leaf page for each key
2465   // group same page references with reuse bit
2466   // and determine any constraint violations
2467
2468   for( src = 0; src++ < source->cnt; ) {
2469         key = keyptr(source, src);
2470         slot = 0;
2471
2472         // first determine if this modification falls
2473         // on the same page as the previous modification
2474         //      note that the far right leaf page is a special case
2475
2476         if( samepage = src > 1 )
2477           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2478                 slot = bt_findslot(set->page, key->key, key->len);
2479           else // release read on previous page
2480                 bt_unlockpage(BtLockRead, set->latch); 
2481
2482         if( !slot )
2483           if( slot = bt_atomicload(bt, set, key->key, key->len) )
2484                 set->latch->split = 0;
2485           else
2486                 return -1;
2487
2488         if( slotptr(set->page, slot)->type == Librarian )
2489           ptr = keyptr(set->page, ++slot);
2490         else
2491           ptr = keyptr(set->page, slot);
2492
2493         if( !samepage ) {
2494           locks[src].entry = set->latch->entry;
2495           locks[src].slot = slot;
2496           locks[src].reuse = 0;
2497         } else {
2498           locks[src].entry = 0;
2499           locks[src].slot = 0;
2500           locks[src].reuse = 1;
2501         }
2502
2503         switch( slotptr(source, src)->type ) {
2504         case Duplicate:
2505         case Unique:
2506           if( !slotptr(set->page, slot)->dead )
2507            if( slot < set->page->cnt || bt_getid (set->page->right) )
2508             if( !keycmp (ptr, key->key, key->len) ) {
2509
2510                   // return constraint violation if key already exists
2511
2512                   bt_unlockpage(BtLockRead, set->latch);
2513                   result = src;
2514
2515                   while( src ) {
2516                         if( locks[src].entry ) {
2517                           set->latch = bt->mgr->latchsets + locks[src].entry;
2518                           bt_unlockpage(BtLockAtomic, set->latch);
2519                           bt_unpinlatch (set->latch);
2520                         }
2521                         src--;
2522                   }
2523                   free (locks);
2524                   return result;
2525                 }
2526           break;
2527         }
2528   }
2529
2530   //  unlock last loadpage lock
2531
2532   if( source->cnt > 1 )
2533         bt_unlockpage(BtLockRead, set->latch);
2534
2535   //  obtain write lock for each master page
2536
2537   for( src = 0; src++ < source->cnt; )
2538         if( locks[src].reuse )
2539           continue;
2540         else
2541           bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2542
2543   // insert or delete each key
2544   // process any splits or merges
2545   // release Write & Atomic latches
2546   // set ParentModifications and build
2547   // queue of keys to insert for split pages
2548   // or delete for deleted pages.
2549
2550   // run through txn list backwards
2551
2552   samepage = source->cnt + 1;
2553
2554   for( src = source->cnt; src; src-- ) {
2555         if( locks[src].reuse )
2556           continue;
2557
2558         //  perform the txn operations
2559         //      from smaller to larger on
2560         //  the same page
2561
2562         for( idx = src; idx < samepage; idx++ )
2563          switch( slotptr(source,idx)->type ) {
2564          case Delete:
2565           if( bt_atomicdelete (bt, source, locks, idx) )
2566                 return -1;
2567           break;
2568
2569         case Duplicate:
2570         case Unique:
2571           if( bt_atomicinsert (bt, source, locks, idx) )
2572                 return -1;
2573           break;
2574         }
2575
2576         //      after the same page operations have finished,
2577         //  process master page for splits or deletion.
2578
2579         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2580         prev->page = bt_mappage (bt, prev->latch);
2581         samepage = src;
2582
2583         //  pick-up all splits from master page
2584
2585         entry = prev->latch->split;
2586
2587         while( entry ) {
2588           set->latch = bt->mgr->latchsets + entry;
2589           set->page = bt_mappage (bt, set->latch);
2590           entry = set->latch->split;
2591
2592           // delete empty master page
2593
2594           if( !prev->page->act ) {
2595                 memcpy (set->page->left, prev->page->left, BtId);
2596                 memcpy (prev->page, set->page, bt->mgr->page_size);
2597                 bt_lockpage (BtLockDelete, set->latch);
2598                 bt_freepage (bt, set);
2599                 bt_unpinlatch (set->latch);
2600
2601                 prev->latch->dirty = 1;
2602                 continue;
2603           }
2604
2605           // remove empty page from the split chain
2606
2607           if( !set->page->act ) {
2608                 memcpy (prev->page->right, set->page->right, BtId);
2609                 prev->latch->split = set->latch->split;
2610                 bt_lockpage (BtLockDelete, set->latch);
2611                 bt_freepage (bt, set);
2612                 bt_unpinlatch (set->latch);
2613                 continue;
2614           }
2615
2616           //  schedule prev fence key update
2617
2618           ptr = keyptr(prev->page,prev->page->cnt);
2619           leaf = calloc (sizeof(AtomicKey), 1);
2620
2621           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2622           leaf->page_no = prev->latch->page_no;
2623           leaf->entry = prev->latch->entry;
2624           leaf->type = 0;
2625
2626           if( tail )
2627                 tail->next = leaf;
2628           else
2629                 head = leaf;
2630
2631           tail = leaf;
2632
2633           bt_putid (set->page->left, prev->latch->page_no);
2634           bt_lockpage(BtLockParent, prev->latch);
2635           bt_unlockpage(BtLockWrite, prev->latch);
2636           *prev = *set;
2637         }
2638
2639         //  update left pointer in next right page
2640         //      if we did any now non-empty splits
2641
2642         if( latch->split ) {
2643           //  fix left pointer in master's original right sibling
2644           //    or set rightmost page in page zero
2645
2646           if( right = bt_getid (prev->page->right) ) {
2647                 if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
2648                   set->page = bt_mappage (bt, set->latch);
2649                 else
2650                   return -1;
2651
2652             bt_lockpage (BtLockWrite, set->latch);
2653             bt_putid (set->page->left, prev->latch->page_no);
2654                 set->latch->dirty = 1;
2655             bt_unlockpage (BtLockWrite, set->latch);
2656                 bt_unpinlatch (set->latch);
2657           } else
2658                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2659
2660           //  Process last page split in chain
2661
2662           ptr = keyptr(prev->page,prev->page->cnt);
2663           leaf = calloc (sizeof(AtomicKey), 1);
2664
2665           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2666           leaf->page_no = prev->latch->page_no;
2667           leaf->entry = prev->latch->entry;
2668           leaf->type = 0;
2669   
2670           if( tail )
2671                 tail->next = leaf;
2672           else
2673                 head = leaf;
2674
2675           tail = leaf;
2676
2677           bt_lockpage(BtLockParent, prev->latch);
2678           bt_unlockpage(BtLockWrite, prev->latch);
2679           bt_unlockpage(BtLockAtomic, latch);
2680           continue;
2681         }
2682
2683         //  finished if prev page occupied (either master or final split)
2684
2685         if( prev->page->act ) {
2686           bt_unlockpage(BtLockWrite, latch);
2687           bt_unlockpage(BtLockAtomic, latch);
2688           bt_unpinlatch(latch);
2689           continue;
2690         }
2691
2692         // any splits were reversed, and the
2693         // master page located in prev is empty, delete it
2694         // by pulling over master's right sibling.
2695         // Delete empty master's fence
2696
2697         ptr = keyptr(prev->page,prev->page->cnt);
2698         leaf = calloc (sizeof(AtomicKey), 1);
2699
2700         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2701         leaf->page_no = prev->latch->page_no;
2702         leaf->entry = prev->latch->entry;
2703         leaf->type = 1;
2704   
2705         if( tail )
2706           tail->next = leaf;
2707         else
2708           head = leaf;
2709
2710         tail = leaf;
2711
2712         bt_lockpage(BtLockParent, prev->latch);
2713         bt_unlockpage(BtLockWrite, prev->latch);
2714
2715         // grab master's right sibling
2716
2717         if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2718                 set->page = bt_mappage (bt, set->latch);
2719         else
2720                 return -1;
2721
2722         bt_lockpage(BtLockWrite, set->latch);
2723
2724         //      pull contents over empty page
2725
2726         memcpy (set->page->left, prev->page->left, BtId);
2727         memcpy (prev->page, set->page, bt->mgr->page_size);
2728
2729         bt_putid (set->page->right, prev->latch->page_no);
2730         set->latch->dirty = 1;
2731         set->page->kill = 1;
2732
2733         //      add new fence key for new master page contents, delete
2734         //      right sibling after parent posts the new master fence.
2735
2736         ptr = keyptr(set->page,set->page->cnt);
2737         leaf = calloc (sizeof(AtomicKey), 1);
2738
2739         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2740         leaf->page_no = prev->latch->page_no;
2741         leaf->entry = set->latch->entry;
2742         leaf->type = 2;
2743   
2744         //  see if right sibling is not yet in the FIFO
2745
2746         if( !bt_parentmatch (head, leaf->entry) )
2747           bt_lockpage(BtLockParent, set->latch);
2748
2749         bt_unlockpage(BtLockWrite, set->latch);
2750
2751         if( tail )
2752           tail->next = leaf;
2753         else
2754           head = leaf;
2755
2756         tail = leaf;
2757
2758         //  fix master's far right sibling's left pointer
2759
2760         if( right = bt_getid (set->page->right) ) {
2761           if( set->latch = bt_pinlatch (bt, right, 1) )
2762                 set->page = bt_mappage (bt, set->latch);
2763
2764           bt_lockpage (BtLockWrite, set->latch);
2765           bt_putid (set->page->left, prev->latch->page_no);
2766           set->latch->dirty = 1;
2767
2768           bt_unlockpage (BtLockWrite, set->latch);
2769           bt_unpinlatch (set->latch);
2770         }
2771
2772         bt_unlockpage(BtLockAtomic, latch);
2773   }
2774   
2775   //  add & delete keys for any pages split or merged during transaction
2776
2777   if( leaf = head )
2778     do {
2779           set->latch = bt->mgr->latchsets + leaf->entry;
2780           set->page = bt_mappage (bt, set->latch);
2781
2782           bt_putid (value, leaf->page_no);
2783           ptr = (BtKey *)leaf->leafkey;
2784
2785           switch( leaf->type ) {
2786           case 0:       // insert key
2787             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2788                   return -1;
2789
2790                 break;
2791
2792           case 1:       // delete key
2793                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2794                   return -1;
2795
2796                 break;
2797
2798           case 2:       // insert key & free
2799             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2800                   return -1;
2801
2802                 bt_lockpage (BtLockDelete, set->latch);
2803                 bt_lockpage (BtLockWrite, set->latch);
2804                 bt_freepage (bt, set);
2805                 break;
2806           }
2807
2808           if( !leaf->nounlock )
2809             bt_unlockpage (BtLockParent, set->latch);
2810
2811           bt_unpinlatch (set->latch);
2812           tail = leaf->next;
2813           free (leaf);
2814         } while( leaf = tail );
2815
2816   // return success
2817
2818   free (locks);
2819   return 0;
2820 }
2821
2822 //      set cursor to highest slot on highest page
2823
2824 uint bt_lastkey (BtDb *bt)
2825 {
2826 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2827 BtPageSet set[1];
2828 uint slot;
2829
2830         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2831                 set->page = bt_mappage (bt, set->latch);
2832         else
2833                 return 0;
2834
2835     bt_lockpage(BtLockRead, set->latch);
2836
2837         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2838         slot = set->page->cnt;
2839
2840     bt_unlockpage(BtLockRead, set->latch);
2841         bt_unpinlatch (set->latch);
2842         return slot;
2843 }
2844
2845 //      return previous slot on cursor page
2846
2847 uint bt_prevkey (BtDb *bt, uint slot)
2848 {
2849 uid ourright, next, us = bt->cursor_page;
2850 BtPageSet set[1];
2851
2852         if( --slot )
2853                 return slot;
2854
2855         ourright = bt_getid(bt->cursor->right);
2856
2857 goleft:
2858         if( !(next = bt_getid(bt->cursor->left)) )
2859                 return 0;
2860
2861 findourself:
2862         bt->cursor_page = next;
2863
2864         if( set->latch = bt_pinlatch (bt, next, 1) )
2865                 set->page = bt_mappage (bt, set->latch);
2866         else
2867                 return 0;
2868
2869     bt_lockpage(BtLockRead, set->latch);
2870         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2871         bt_unlockpage(BtLockRead, set->latch);
2872         bt_unpinlatch (set->latch);
2873         
2874         next = bt_getid (bt->cursor->right);
2875
2876         if( next != us )
2877           if( next == ourright )
2878                 goto goleft;
2879           else
2880                 goto findourself;
2881
2882         return bt->cursor->cnt;
2883 }
2884
2885 //  return next slot on cursor page
2886 //  or slide cursor right into next page
2887
2888 uint bt_nextkey (BtDb *bt, uint slot)
2889 {
2890 BtPageSet set[1];
2891 uid right;
2892
2893   do {
2894         right = bt_getid(bt->cursor->right);
2895
2896         while( slot++ < bt->cursor->cnt )
2897           if( slotptr(bt->cursor,slot)->dead )
2898                 continue;
2899           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2900                 return slot;
2901           else
2902                 break;
2903
2904         if( !right )
2905                 break;
2906
2907         bt->cursor_page = right;
2908
2909         if( set->latch = bt_pinlatch (bt, right, 1) )
2910                 set->page = bt_mappage (bt, set->latch);
2911         else
2912                 return 0;
2913
2914     bt_lockpage(BtLockRead, set->latch);
2915
2916         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2917
2918         bt_unlockpage(BtLockRead, set->latch);
2919         bt_unpinlatch (set->latch);
2920         slot = 0;
2921
2922   } while( 1 );
2923
2924   return bt->err = 0;
2925 }
2926
2927 //  cache page of keys into cursor and return starting slot for given key
2928
2929 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2930 {
2931 BtPageSet set[1];
2932 uint slot;
2933
2934         // cache page for retrieval
2935
2936         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2937           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2938         else
2939           return 0;
2940
2941         bt->cursor_page = set->latch->page_no;
2942
2943         bt_unlockpage(BtLockRead, set->latch);
2944         bt_unpinlatch (set->latch);
2945         return slot;
2946 }
2947
2948 BtKey *bt_key(BtDb *bt, uint slot)
2949 {
2950         return keyptr(bt->cursor, slot);
2951 }
2952
2953 BtVal *bt_val(BtDb *bt, uint slot)
2954 {
2955         return valptr(bt->cursor,slot);
2956 }
2957
2958 #ifdef STANDALONE
2959
2960 #ifndef unix
2961 double getCpuTime(int type)
2962 {
2963 FILETIME crtime[1];
2964 FILETIME xittime[1];
2965 FILETIME systime[1];
2966 FILETIME usrtime[1];
2967 SYSTEMTIME timeconv[1];
2968 double ans = 0;
2969
2970         memset (timeconv, 0, sizeof(SYSTEMTIME));
2971
2972         switch( type ) {
2973         case 0:
2974                 GetSystemTimeAsFileTime (xittime);
2975                 FileTimeToSystemTime (xittime, timeconv);
2976                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2977                 break;
2978         case 1:
2979                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2980                 FileTimeToSystemTime (usrtime, timeconv);
2981                 break;
2982         case 2:
2983                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2984                 FileTimeToSystemTime (systime, timeconv);
2985                 break;
2986         }
2987
2988         ans += (double)timeconv->wHour * 3600;
2989         ans += (double)timeconv->wMinute * 60;
2990         ans += (double)timeconv->wSecond;
2991         ans += (double)timeconv->wMilliseconds / 1000;
2992         return ans;
2993 }
2994 #else
2995 #include <time.h>
2996 #include <sys/resource.h>
2997
2998 double getCpuTime(int type)
2999 {
3000 struct rusage used[1];
3001 struct timeval tv[1];
3002
3003         switch( type ) {
3004         case 0:
3005                 gettimeofday(tv, NULL);
3006                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3007
3008         case 1:
3009                 getrusage(RUSAGE_SELF, used);
3010                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3011
3012         case 2:
3013                 getrusage(RUSAGE_SELF, used);
3014                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3015         }
3016
3017         return 0;
3018 }
3019 #endif
3020
3021 void bt_poolaudit (BtMgr *mgr)
3022 {
3023 BtLatchSet *latch;
3024 uint slot = 0;
3025
3026         while( slot++ < mgr->latchdeployed ) {
3027                 latch = mgr->latchsets + slot;
3028
3029                 if( *latch->readwr->rin & MASK )
3030                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3031                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3032
3033                 if( *latch->access->rin & MASK )
3034                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3035                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3036
3037                 if( *latch->parent->rin & MASK )
3038                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3039                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3040
3041                 if( latch->pin & ~CLOCK_bit ) {
3042                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3043                         latch->pin = 0;
3044                 }
3045         }
3046 }
3047
3048 uint bt_latchaudit (BtDb *bt)
3049 {
3050 ushort idx, hashidx;
3051 uid next, page_no;
3052 BtLatchSet *latch;
3053 uint cnt = 0;
3054 BtKey *ptr;
3055
3056         if( *(ushort *)(bt->mgr->lock) )
3057                 fprintf(stderr, "Alloc page locked\n");
3058         *(ushort *)(bt->mgr->lock) = 0;
3059
3060         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3061                 latch = bt->mgr->latchsets + idx;
3062                 if( *latch->readwr->rin & MASK )
3063                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3064                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3065
3066                 if( *latch->access->rin & MASK )
3067                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3068                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3069
3070                 if( *latch->parent->rin & MASK )
3071                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3072                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3073
3074                 if( latch->pin ) {
3075                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3076                         latch->pin = 0;
3077                 }
3078         }
3079
3080         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3081           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3082                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3083
3084           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3085
3086           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3087                 latch = bt->mgr->latchsets + idx;
3088                 if( latch->pin )
3089                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3090           } while( idx = latch->next );
3091         }
3092
3093         page_no = LEAF_page;
3094
3095         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3096         uid off = page_no << bt->mgr->page_bits;
3097 #ifdef unix
3098           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3099 #else
3100         DWORD amt[1];
3101
3102           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3103
3104           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3105                 return bt->err = BTERR_map;
3106
3107           if( *amt <  bt->mgr->page_size )
3108                 return bt->err = BTERR_map;
3109 #endif
3110                 if( !bt->frame->free && !bt->frame->lvl )
3111                         cnt += bt->frame->act;
3112                 page_no++;
3113         }
3114                 
3115         cnt--;  // remove stopper key
3116         fprintf(stderr, " Total keys read %d\n", cnt);
3117
3118         bt_close (bt);
3119         return 0;
3120 }
3121
3122 typedef struct {
3123         char idx;
3124         char *type;
3125         char *infile;
3126         BtMgr *mgr;
3127         int num;
3128 } ThreadArg;
3129
3130 //  standalone program to index file of keys
3131 //  then list them onto std-out
3132
3133 #ifdef unix
3134 void *index_file (void *arg)
3135 #else
3136 uint __stdcall index_file (void *arg)
3137 #endif
3138 {
3139 int line = 0, found = 0, cnt = 0, unique;
3140 uid next, page_no = LEAF_page;  // start on first page of leaves
3141 unsigned char key[BT_maxkey];
3142 unsigned char txn[65536];
3143 ThreadArg *args = arg;
3144 int ch, len = 0, slot;
3145 BtPageSet set[1];
3146 uint nxt = 65536;
3147 BtPage page;
3148 BtKey *ptr;
3149 BtVal *val;
3150 BtDb *bt;
3151 FILE *in;
3152
3153         bt = bt_open (args->mgr);
3154         page = (BtPage)txn;
3155
3156         unique = (args->type[1] | 0x20) != 'd';
3157
3158         switch(args->type[0] | 0x20)
3159         {
3160         case 'a':
3161                 fprintf(stderr, "started latch mgr audit\n");
3162                 cnt = bt_latchaudit (bt);
3163                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3164                 break;
3165
3166         case 't':
3167                 fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
3168                 if( in = fopen (args->infile, "rb") )
3169                   while( ch = getc(in), ch != EOF )
3170                         if( ch == '\n' )
3171                         {
3172                           line++;
3173                           nxt -= len - 10;
3174                           memcpy (txn + nxt, key + 10, len - 10);
3175                           nxt -= 1;
3176                           txn[nxt] = len - 10;
3177                           nxt -= 10;
3178                           memcpy (txn + nxt, key, 10);
3179                           nxt -= 1;
3180                           txn[nxt] = 10;
3181                           slotptr(page,++cnt)->off  = nxt;
3182                           slotptr(page,cnt)->type = Delete;
3183                           len = 0;
3184
3185                           if( cnt < args->num )
3186                                 continue;
3187
3188                           page->cnt = cnt;
3189                           page->act = cnt;
3190                           page->min = nxt;
3191
3192                           if( bt_atomictxn (bt, page) )
3193                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3194                           nxt = 65536;
3195                           cnt = 0;
3196
3197                         }
3198                         else if( len < BT_maxkey )
3199                                 key[len++] = ch;
3200                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3201                 break;
3202
3203         case 'p':
3204                 fprintf(stderr, "started pennysort for %s\n", args->infile);
3205                 if( in = fopen (args->infile, "rb") )
3206                   while( ch = getc(in), ch != EOF )
3207                         if( ch == '\n' )
3208                         {
3209                           line++;
3210
3211                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
3212                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3213                           len = 0;
3214                         }
3215                         else if( len < BT_maxkey )
3216                                 key[len++] = ch;
3217                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3218                 break;
3219
3220         case 'w':
3221                 fprintf(stderr, "started indexing for %s\n", args->infile);
3222                 if( in = fopen (args->infile, "r") )
3223                   while( ch = getc(in), ch != EOF )
3224                         if( ch == '\n' )
3225                         {
3226                           line++;
3227
3228                           if( args->num == 1 )
3229                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3230
3231                           else if( args->num )
3232                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3233
3234                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
3235                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3236                           len = 0;
3237                         }
3238                         else if( len < BT_maxkey )
3239                                 key[len++] = ch;
3240                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3241                 break;
3242
3243         case 'd':
3244                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
3245                 if( in = fopen (args->infile, "rb") )
3246                   while( ch = getc(in), ch != EOF )
3247                         if( ch == '\n' )
3248                         {
3249                           line++;
3250                           if( args->num == 1 )
3251                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3252
3253                           else if( args->num )
3254                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3255
3256                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
3257                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
3258                           ptr = (BtKey*)(bt->key);
3259                           found++;
3260
3261                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
3262                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3263                           len = 0;
3264                         }
3265                         else if( len < BT_maxkey )
3266                                 key[len++] = ch;
3267                 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3268                 break;
3269
3270         case 'f':
3271                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3272                 if( in = fopen (args->infile, "rb") )
3273                   while( ch = getc(in), ch != EOF )
3274                         if( ch == '\n' )
3275                         {
3276                           line++;
3277                           if( args->num == 1 )
3278                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3279
3280                           else if( args->num )
3281                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3282
3283                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3284                                 found++;
3285                           else if( bt->err )
3286                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3287                           len = 0;
3288                         }
3289                         else if( len < BT_maxkey )
3290                                 key[len++] = ch;
3291                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3292                 break;
3293
3294         case 's':
3295                 fprintf(stderr, "started scanning\n");
3296                 do {
3297                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3298                                 set->page = bt_mappage (bt, set->latch);
3299                         else
3300                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3301                         bt_lockpage (BtLockRead, set->latch);
3302                         next = bt_getid (set->page->right);
3303
3304                         for( slot = 0; slot++ < set->page->cnt; )
3305                          if( next || slot < set->page->cnt )
3306                           if( !slotptr(set->page, slot)->dead ) {
3307                                 ptr = keyptr(set->page, slot);
3308                                 len = ptr->len;
3309
3310                             if( slotptr(set->page, slot)->type == Duplicate )
3311                                         len -= BtId;
3312
3313                                 fwrite (ptr->key, len, 1, stdout);
3314                                 val = valptr(set->page, slot);
3315                                 fwrite (val->value, val->len, 1, stdout);
3316                                 fputc ('\n', stdout);
3317                                 cnt++;
3318                            }
3319
3320                         bt_unlockpage (BtLockRead, set->latch);
3321                         bt_unpinlatch (set->latch);
3322                 } while( page_no = next );
3323
3324                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3325                 break;
3326
3327         case 'c':
3328 #ifdef unix
3329                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3330 #endif
3331                 fprintf(stderr, "started counting\n");
3332                 page_no = LEAF_page;
3333
3334                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3335                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3336                                 break;
3337
3338                         if( !bt->frame->free && !bt->frame->lvl )
3339                                 cnt += bt->frame->act;
3340
3341                         bt->reads++;
3342                         page_no++;
3343                 }
3344                 
3345                 cnt--;  // remove stopper key
3346                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3347                 break;
3348         }
3349
3350         bt_close (bt);
3351 #ifdef unix
3352         return NULL;
3353 #else
3354         return 0;
3355 #endif
3356 }
3357
3358 typedef struct timeval timer;
3359
3360 int main (int argc, char **argv)
3361 {
3362 int idx, cnt, len, slot, err;
3363 int segsize, bits = 16;
3364 double start, stop;
3365 #ifdef unix
3366 pthread_t *threads;
3367 #else
3368 HANDLE *threads;
3369 #endif
3370 ThreadArg *args;
3371 uint poolsize = 0;
3372 float elapsed;
3373 int num = 0;
3374 char key[1];
3375 BtMgr *mgr;
3376 BtKey *ptr;
3377 BtDb *bt;
3378
3379         if( argc < 3 ) {
3380                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3381                 fprintf (stderr, "  where page_bits is the page size in bits\n");
3382                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3383                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
3384                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3385                 exit(0);
3386         }
3387
3388         start = getCpuTime(0);
3389
3390         if( argc > 3 )
3391                 bits = atoi(argv[3]);
3392
3393         if( argc > 4 )
3394                 poolsize = atoi(argv[4]);
3395
3396         if( !poolsize )
3397                 fprintf (stderr, "Warning: no mapped_pool\n");
3398
3399         if( argc > 5 )
3400                 num = atoi(argv[5]);
3401
3402         cnt = argc - 6;
3403 #ifdef unix
3404         threads = malloc (cnt * sizeof(pthread_t));
3405 #else
3406         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3407 #endif
3408         args = malloc (cnt * sizeof(ThreadArg));
3409
3410         mgr = bt_mgr ((argv[1]), bits, poolsize);
3411
3412         if( !mgr ) {
3413                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3414                 exit (1);
3415         }
3416
3417         //      fire off threads
3418
3419         for( idx = 0; idx < cnt; idx++ ) {
3420                 args[idx].infile = argv[idx + 6];
3421                 args[idx].type = argv[2];
3422                 args[idx].mgr = mgr;
3423                 args[idx].num = num;
3424                 args[idx].idx = idx;
3425 #ifdef unix
3426                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3427                         fprintf(stderr, "Error creating thread %d\n", err);
3428 #else
3429                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3430 #endif
3431         }
3432
3433         //      wait for termination
3434
3435 #ifdef unix
3436         for( idx = 0; idx < cnt; idx++ )
3437                 pthread_join (threads[idx], NULL);
3438 #else
3439         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3440
3441         for( idx = 0; idx < cnt; idx++ )
3442                 CloseHandle(threads[idx]);
3443
3444 #endif
3445         bt_poolaudit(mgr);
3446         bt_mgrclose (mgr);
3447
3448         elapsed = getCpuTime(0) - start;
3449         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3450         elapsed = getCpuTime(1);
3451         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3452         elapsed = getCpuTime(2);
3453         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3454 }
3455
3456 #endif  //STANDALONE