]> pd.if.org Git - btree/blob - threadskv8.c
Fix a couple of bugs in atomicfree
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and ACID batched key-value updates
9
10 // 26 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 /*
88 There are six lock types for each node in four independent sets: 
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
95 */
96
97 typedef enum{
98         BtLockAccess = 1,
99         BtLockDelete = 2,
100         BtLockRead   = 4,
101         BtLockWrite  = 8,
102         BtLockParent = 16,
103         BtLockAtomic = 32
104 } BtLock;
105
106 //      definition for phase-fair reader/writer lock implementation
107
108 typedef struct {
109         ushort rin[1];
110         ushort rout[1];
111         ushort ticket[1];
112         ushort serving[1];
113 } RWLock;
114
115 //      write only queue lock
116
117 typedef struct {
118         ushort ticket[1];
119         ushort serving[1];
120 } WOLock;
121
122 #define PHID 0x1
123 #define PRES 0x2
124 #define MASK 0x3
125 #define RINC 0x4
126
127 //      definition for spin latch implementation
128
129 // exclusive is set for write access
130 // share is count of read accessors
131 // grant write lock when share == 0
132
133 volatile typedef struct {
134         ushort exclusive:1;
135         ushort pending:1;
136         ushort share:14;
137 } BtSpinLatch;
138
139 #define XCL 1
140 #define PEND 2
141 #define BOTH 3
142 #define SHARE 4
143
144 //  hash table entries
145
146 typedef struct {
147         volatile uint slot;             // Latch table entry at head of chain
148         BtSpinLatch latch[1];
149 } BtHashEntry;
150
151 //      latch manager table structure
152
153 typedef struct {
154         uid page_no;                    // latch set page number
155         RWLock readwr[1];               // read/write page lock
156         RWLock access[1];               // Access Intent/Page delete
157         WOLock parent[1];               // Posting of fence key in parent
158         WOLock atomic[1];               // Atomic update in progress
159         uint split;                             // right split page atomic insert
160         uint entry;                             // entry slot in latch table
161         uint next;                              // next entry in hash table chain
162         uint prev;                              // prev entry in hash table chain
163         volatile ushort pin;    // number of outstanding threads
164         ushort dirty:1;                 // page in cache is dirty
165 } BtLatchSet;
166
167 //      Define the length of the page record numbers
168
169 #define BtId 6
170
171 //      Page key slot definition.
172
173 //      Keys are marked dead, but remain on the page until
174 //      it cleanup is called. The fence key (highest key) for
175 //      a leaf page is always present, even after cleanup.
176
177 //      Slot types
178
179 //      In addition to the Unique keys that occupy slots
180 //      there are Librarian and Duplicate key
181 //      slots occupying the key slot array.
182
183 //      The Librarian slots are dead keys that
184 //      serve as filler, available to add new Unique
185 //      or Dup slots that are inserted into the B-tree.
186
187 //      The Duplicate slots have had their key bytes extended
188 //      by 6 bytes to contain a binary duplicate key uniqueifier.
189
190 typedef enum {
191         Unique,
192         Librarian,
193         Duplicate,
194         Delete,
195         Update
196 } BtSlotType;
197
198 typedef struct {
199         uint off:BT_maxbits;    // page offset for key start
200         uint type:3;                    // type of slot
201         uint dead:1;                    // set for deleted slot
202 } BtSlot;
203
204 //      The key structure occupies space at the upper end of
205 //      each page.  It's a length byte followed by the key
206 //      bytes.
207
208 typedef struct {
209         unsigned char len;              // this can be changed to a ushort or uint
210         unsigned char key[0];
211 } BtKey;
212
213 //      the value structure also occupies space at the upper
214 //      end of the page. Each key is immediately followed by a value.
215
216 typedef struct {
217         unsigned char len;              // this can be changed to a ushort or uint
218         unsigned char value[0];
219 } BtVal;
220
221 #define BT_maxkey       255             // maximum number of bytes in a key
222 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
223
224 //      The first part of an index page.
225 //      It is immediately followed
226 //      by the BtSlot array of keys.
227
228 //      note that this structure size
229 //      must be a multiple of 8 bytes
230 //      in order to place dups correctly.
231
232 typedef struct BtPage_ {
233         uint cnt;                                       // count of keys in page
234         uint act;                                       // count of active keys
235         uint min;                                       // next key offset
236         uint garbage;                           // page garbage in bytes
237         unsigned char bits:7;           // page size in bits
238         unsigned char free:1;           // page is on free chain
239         unsigned char lvl:7;            // level of page
240         unsigned char kill:1;           // page is being deleted
241         unsigned char right[BtId];      // page number to right
242         unsigned char left[BtId];       // page number to left
243         unsigned char filler[2];        // padding to multiple of 8
244 } *BtPage;
245
246 //  The loadpage interface object
247
248 typedef struct {
249         BtPage page;            // current page pointer
250         BtLatchSet *latch;      // current page latch set
251 } BtPageSet;
252
253 //      structure for latch manager on ALLOC_page
254
255 typedef struct {
256         struct BtPage_ alloc[1];        // next page_no in right ptr
257         unsigned long long dups[1];     // global duplicate key uniqueifier
258         unsigned char chain[BtId];      // head of free page_nos chain
259 } BtPageZero;
260
261 //      The object structure for Btree access
262
263 typedef struct {
264         uint page_size;                         // page size    
265         uint page_bits;                         // page size in bits    
266 #ifdef unix
267         int idx;
268 #else
269         HANDLE idx;
270 #endif
271         BtPageZero *pagezero;           // mapped allocation page
272         BtSpinLatch lock[1];            // allocation area lite latch
273         uint latchdeployed;                     // highest number of latch entries deployed
274         uint nlatchpage;                        // number of latch pages at BT_latch
275         uint latchtotal;                        // number of page latch entries
276         uint latchhash;                         // number of latch hash table slots
277         uint latchvictim;                       // next latch entry to examine
278         BtHashEntry *hashtable;         // the buffer pool hash table entries
279         BtLatchSet *latchsets;          // mapped latch set from buffer pool
280         unsigned char *pagepool;        // mapped to the buffer pool pages
281 #ifndef unix
282         HANDLE halloc;                          // allocation handle
283         HANDLE hpool;                           // buffer pool handle
284 #endif
285 } BtMgr;
286
287 typedef struct {
288         BtMgr *mgr;                             // buffer manager for thread
289         BtPage cursor;                  // cached frame for start/next (never mapped)
290         BtPage frame;                   // spare frame for the page split (never mapped)
291         uid cursor_page;                                // current cursor page number   
292         unsigned char *mem;                             // frame, cursor, page memory buffer
293         unsigned char key[BT_keyarray]; // last found complete key
294         int found;                              // last delete or insert was found
295         int err;                                // last error
296         int reads, writes;              // number of reads and writes from the btree
297 } BtDb;
298
299 typedef enum {
300         BTERR_ok = 0,
301         BTERR_struct,
302         BTERR_ovflw,
303         BTERR_lock,
304         BTERR_map,
305         BTERR_read,
306         BTERR_wrt,
307         BTERR_atomic
308 } BTERR;
309
310 #define CLOCK_bit 0x8000
311
312 // B-Tree functions
313 extern void bt_close (BtDb *bt);
314 extern BtDb *bt_open (BtMgr *mgr);
315 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
316 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
317 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
318 extern BtKey *bt_foundkey (BtDb *bt);
319 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
320 extern uint bt_nextkey   (BtDb *bt, uint slot);
321
322 //      manager functions
323 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
324 void bt_mgrclose (BtMgr *mgr);
325
326 //  Helper functions to return slot values
327 //      from the cursor page.
328
329 extern BtKey *bt_key (BtDb *bt, uint slot);
330 extern BtVal *bt_val (BtDb *bt, uint slot);
331
332 //  The page is allocated from low and hi ends.
333 //  The key slots are allocated from the bottom,
334 //      while the text and value of the key
335 //  are allocated from the top.  When the two
336 //  areas meet, the page is split into two.
337
338 //  A key consists of a length byte, two bytes of
339 //  index number (0 - 65535), and up to 253 bytes
340 //  of key value.
341
342 //  Associated with each key is a value byte string
343 //      containing any value desired.
344
345 //  The b-tree root is always located at page 1.
346 //      The first leaf page of level zero is always
347 //      located on page 2.
348
349 //      The b-tree pages are linked with next
350 //      pointers to facilitate enumerators,
351 //      and provide for concurrency.
352
353 //      When to root page fills, it is split in two and
354 //      the tree height is raised by a new root at page
355 //      one with two keys.
356
357 //      Deleted keys are marked with a dead bit until
358 //      page cleanup. The fence key for a leaf node is
359 //      always present
360
361 //  To achieve maximum concurrency one page is locked at a time
362 //  as the tree is traversed to find leaf key in question. The right
363 //  page numbers are used in cases where the page is being split,
364 //      or consolidated.
365
366 //  Page 0 is dedicated to lock for new page extensions,
367 //      and chains empty pages together for reuse. It also
368 //      contains the latch manager hash table.
369
370 //      The ParentModification lock on a node is obtained to serialize posting
371 //      or changing the fence key for a node.
372
373 //      Empty pages are chained together through the ALLOC page and reused.
374
375 //      Access macros to address slot and key values from the page
376 //      Page slots use 1 based indexing.
377
378 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
379 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
380 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
381
382 void bt_putid(unsigned char *dest, uid id)
383 {
384 int i = BtId;
385
386         while( i-- )
387                 dest[i] = (unsigned char)id, id >>= 8;
388 }
389
390 uid bt_getid(unsigned char *src)
391 {
392 uid id = 0;
393 int i;
394
395         for( i = 0; i < BtId; i++ )
396                 id <<= 8, id |= *src++; 
397
398         return id;
399 }
400
401 uid bt_newdup (BtDb *bt)
402 {
403 #ifdef unix
404         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
405 #else
406         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
407 #endif
408 }
409
410 //      Write-Only Queue Lock
411
412 void WriteOLock (WOLock *lock)
413 {
414 ushort tix;
415 #ifdef unix
416         tix = __sync_fetch_and_add (lock->ticket, 1);
417 #else
418         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
419 #endif
420         // wait for our ticket to come up
421
422         while( tix != lock->serving[0] )
423 #ifdef unix
424                 sched_yield();
425 #else
426                 SwitchToThread ();
427 #endif
428 }
429
430 void WriteORelease (WOLock *lock)
431 {
432         lock->serving[0]++;
433 }
434
435 //      Phase-Fair reader/writer lock implementation
436
437 void WriteLock (RWLock *lock)
438 {
439 ushort w, r, tix;
440
441 #ifdef unix
442         tix = __sync_fetch_and_add (lock->ticket, 1);
443 #else
444         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
445 #endif
446         // wait for our ticket to come up
447
448         while( tix != lock->serving[0] )
449 #ifdef unix
450                 sched_yield();
451 #else
452                 SwitchToThread ();
453 #endif
454
455         w = PRES | (tix & PHID);
456 #ifdef  unix
457         r = __sync_fetch_and_add (lock->rin, w);
458 #else
459         r = _InterlockedExchangeAdd16 (lock->rin, w);
460 #endif
461         while( r != *lock->rout )
462 #ifdef unix
463                 sched_yield();
464 #else
465                 SwitchToThread();
466 #endif
467 }
468
469 void WriteRelease (RWLock *lock)
470 {
471 #ifdef unix
472         __sync_fetch_and_and (lock->rin, ~MASK);
473 #else
474         _InterlockedAnd16 (lock->rin, ~MASK);
475 #endif
476         lock->serving[0]++;
477 }
478
479 void ReadLock (RWLock *lock)
480 {
481 ushort w;
482 #ifdef unix
483         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
484 #else
485         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
486 #endif
487         if( w )
488           while( w == (*lock->rin & MASK) )
489 #ifdef unix
490                 sched_yield ();
491 #else
492                 SwitchToThread ();
493 #endif
494 }
495
496 void ReadRelease (RWLock *lock)
497 {
498 #ifdef unix
499         __sync_fetch_and_add (lock->rout, RINC);
500 #else
501         _InterlockedExchangeAdd16 (lock->rout, RINC);
502 #endif
503 }
504
505 //      Spin Latch Manager
506
507 //      wait until write lock mode is clear
508 //      and add 1 to the share count
509
510 void bt_spinreadlock(BtSpinLatch *latch)
511 {
512 ushort prev;
513
514   do {
515 #ifdef unix
516         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
517 #else
518         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
519 #endif
520         //  see if exclusive request is granted or pending
521
522         if( !(prev & BOTH) )
523                 return;
524 #ifdef unix
525         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
526 #else
527         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
528 #endif
529 #ifdef  unix
530   } while( sched_yield(), 1 );
531 #else
532   } while( SwitchToThread(), 1 );
533 #endif
534 }
535
536 //      wait for other read and write latches to relinquish
537
538 void bt_spinwritelock(BtSpinLatch *latch)
539 {
540 ushort prev;
541
542   do {
543 #ifdef  unix
544         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
545 #else
546         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
547 #endif
548         if( !(prev & XCL) )
549           if( !(prev & ~BOTH) )
550                 return;
551           else
552 #ifdef unix
553                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
554 #else
555                 _InterlockedAnd16((ushort *)latch, ~XCL);
556 #endif
557 #ifdef  unix
558   } while( sched_yield(), 1 );
559 #else
560   } while( SwitchToThread(), 1 );
561 #endif
562 }
563
564 //      try to obtain write lock
565
566 //      return 1 if obtained,
567 //              0 otherwise
568
569 int bt_spinwritetry(BtSpinLatch *latch)
570 {
571 ushort prev;
572
573 #ifdef  unix
574         prev = __sync_fetch_and_or((ushort *)latch, XCL);
575 #else
576         prev = _InterlockedOr16((ushort *)latch, XCL);
577 #endif
578         //      take write access if all bits are clear
579
580         if( !(prev & XCL) )
581           if( !(prev & ~BOTH) )
582                 return 1;
583           else
584 #ifdef unix
585                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
586 #else
587                 _InterlockedAnd16((ushort *)latch, ~XCL);
588 #endif
589         return 0;
590 }
591
592 //      clear write mode
593
594 void bt_spinreleasewrite(BtSpinLatch *latch)
595 {
596 #ifdef unix
597         __sync_fetch_and_and((ushort *)latch, ~BOTH);
598 #else
599         _InterlockedAnd16((ushort *)latch, ~BOTH);
600 #endif
601 }
602
603 //      decrement reader count
604
605 void bt_spinreleaseread(BtSpinLatch *latch)
606 {
607 #ifdef unix
608         __sync_fetch_and_add((ushort *)latch, -SHARE);
609 #else
610         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
611 #endif
612 }
613
614 //      read page from permanent location in Btree file
615
616 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
617 {
618 off64_t off = page_no << mgr->page_bits;
619
620 #ifdef unix
621         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
622                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
623                 return BTERR_read;
624         }
625 #else
626 OVERLAPPED ovl[1];
627 uint amt[1];
628
629         memset (ovl, 0, sizeof(OVERLAPPED));
630         ovl->Offset = off;
631         ovl->OffsetHigh = off >> 32;
632
633         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
634                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
635                 return BTERR_read;
636         }
637         if( *amt <  mgr->page_size ) {
638                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
639                 return BTERR_read;
640         }
641 #endif
642         return 0;
643 }
644
645 //      write page to permanent location in Btree file
646 //      clear the dirty bit
647
648 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
649 {
650 off64_t off = page_no << mgr->page_bits;
651
652 #ifdef unix
653         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
654                 return BTERR_wrt;
655 #else
656 OVERLAPPED ovl[1];
657 uint amt[1];
658
659         memset (ovl, 0, sizeof(OVERLAPPED));
660         ovl->Offset = off;
661         ovl->OffsetHigh = off >> 32;
662
663         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
664                 return BTERR_wrt;
665
666         if( *amt <  mgr->page_size )
667                 return BTERR_wrt;
668 #endif
669         return 0;
670 }
671
672 //      link latch table entry into head of latch hash table
673
674 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
675 {
676 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
677 BtLatchSet *latch = bt->mgr->latchsets + slot;
678
679         if( latch->next = bt->mgr->hashtable[hashidx].slot )
680                 bt->mgr->latchsets[latch->next].prev = slot;
681
682         bt->mgr->hashtable[hashidx].slot = slot;
683         latch->page_no = page_no;
684         latch->entry = slot;
685         latch->split = 0;
686         latch->prev = 0;
687         latch->pin = 1;
688
689         if( loadit )
690           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
691                 return bt->err;
692           else
693                 bt->reads++;
694
695         return bt->err = 0;
696 }
697
698 //      set CLOCK bit in latch
699 //      decrement pin count
700
701 void bt_unpinlatch (BtLatchSet *latch)
702 {
703 #ifdef unix
704         if( ~latch->pin & CLOCK_bit )
705                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
706         __sync_fetch_and_add(&latch->pin, -1);
707 #else
708         if( ~latch->pin & CLOCK_bit )
709                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
710         _InterlockedDecrement16 (&latch->pin);
711 #endif
712 }
713
714 //  return the btree cached page address
715
716 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
717 {
718 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
719
720         return page;
721 }
722
723 //      find existing latchset or inspire new one
724 //      return with latchset pinned
725
726 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
727 {
728 uint hashidx = page_no % bt->mgr->latchhash;
729 BtLatchSet *latch;
730 uint slot, idx;
731 uint lvl, cnt;
732 off64_t off;
733 uint amt[1];
734 BtPage page;
735
736   //  try to find our entry
737
738   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
739
740   if( slot = bt->mgr->hashtable[hashidx].slot ) do
741   {
742         latch = bt->mgr->latchsets + slot;
743         if( page_no == latch->page_no )
744                 break;
745   } while( slot = latch->next );
746
747   //  found our entry
748   //  increment clock
749
750   if( slot ) {
751         latch = bt->mgr->latchsets + slot;
752 #ifdef unix
753         __sync_fetch_and_add(&latch->pin, 1);
754 #else
755         _InterlockedIncrement16 (&latch->pin);
756 #endif
757         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
758         return latch;
759   }
760
761         //  see if there are any unused pool entries
762 #ifdef unix
763         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
764 #else
765         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
766 #endif
767
768         if( slot < bt->mgr->latchtotal ) {
769                 latch = bt->mgr->latchsets + slot;
770                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
771                         return NULL;
772                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
773                 return latch;
774         }
775
776 #ifdef unix
777         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
778 #else
779         _InterlockedDecrement (&bt->mgr->latchdeployed);
780 #endif
781   //  find and reuse previous entry on victim
782
783   while( 1 ) {
784 #ifdef unix
785         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
786 #else
787         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
788 #endif
789         // try to get write lock on hash chain
790         //      skip entry if not obtained
791         //      or has outstanding pins
792
793         slot %= bt->mgr->latchtotal;
794
795         if( !slot )
796                 continue;
797
798         latch = bt->mgr->latchsets + slot;
799         idx = latch->page_no % bt->mgr->latchhash;
800
801         //      see we are on same chain as hashidx
802
803         if( idx == hashidx )
804                 continue;
805
806         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
807                 continue;
808
809         //  skip this slot if it is pinned
810         //      or the CLOCK bit is set
811
812         if( latch->pin ) {
813           if( latch->pin & CLOCK_bit ) {
814 #ifdef unix
815                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
816 #else
817                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
818 #endif
819           }
820           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
821           continue;
822         }
823
824         //  update permanent page area in btree from buffer pool
825
826         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
827
828         if( latch->dirty )
829           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
830                 return NULL;
831           else
832                 latch->dirty = 0, bt->writes++;
833
834         //  unlink our available slot from its hash chain
835
836         if( latch->prev )
837                 bt->mgr->latchsets[latch->prev].next = latch->next;
838         else
839                 bt->mgr->hashtable[idx].slot = latch->next;
840
841         if( latch->next )
842                 bt->mgr->latchsets[latch->next].prev = latch->prev;
843
844         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
845
846         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
847                 return NULL;
848
849         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
850         return latch;
851   }
852 }
853
854 void bt_mgrclose (BtMgr *mgr)
855 {
856 BtLatchSet *latch;
857 uint num = 0;
858 BtPage page;
859 uint slot;
860
861         //      flush dirty pool pages to the btree
862
863         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
864                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
865                 latch = mgr->latchsets + slot;
866
867                 if( latch->dirty ) {
868                         bt_writepage(mgr, page, latch->page_no);
869                         latch->dirty = 0, num++;
870                 }
871 //              madvise (page, mgr->page_size, MADV_DONTNEED);
872         }
873
874         fprintf(stderr, "%d buffer pool pages flushed\n", num);
875
876 #ifdef unix
877         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
878         munmap (mgr->pagezero, mgr->page_size);
879 #else
880         FlushViewOfFile(mgr->pagezero, 0);
881         UnmapViewOfFile(mgr->pagezero);
882         UnmapViewOfFile(mgr->hashtable);
883         CloseHandle(mgr->halloc);
884         CloseHandle(mgr->hpool);
885 #endif
886 #ifdef unix
887         close (mgr->idx);
888         free (mgr);
889 #else
890         FlushFileBuffers(mgr->idx);
891         CloseHandle(mgr->idx);
892         GlobalFree (mgr);
893 #endif
894 }
895
896 //      close and release memory
897
898 void bt_close (BtDb *bt)
899 {
900 #ifdef unix
901         if( bt->mem )
902                 free (bt->mem);
903 #else
904         if( bt->mem)
905                 VirtualFree (bt->mem, 0, MEM_RELEASE);
906 #endif
907         free (bt);
908 }
909
910 //  open/create new btree buffer manager
911
912 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
913 //              size of page pool (e.g. 262144)
914
915 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
916 {
917 uint lvl, attr, last, slot, idx;
918 uint nlatchpage, latchhash;
919 unsigned char value[BtId];
920 int flag, initit = 0;
921 BtPageZero *pagezero;
922 off64_t size;
923 uint amt[1];
924 BtMgr* mgr;
925 BtKey* key;
926 BtVal *val;
927
928         // determine sanity of page size and buffer pool
929
930         if( bits > BT_maxbits )
931                 bits = BT_maxbits;
932         else if( bits < BT_minbits )
933                 bits = BT_minbits;
934
935         if( nodemax < 16 ) {
936                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
937                 return NULL;
938         }
939
940 #ifdef unix
941         mgr = calloc (1, sizeof(BtMgr));
942
943         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
944
945         if( mgr->idx == -1 ) {
946                 fprintf (stderr, "Unable to open btree file\n");
947                 return free(mgr), NULL;
948         }
949 #else
950         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
951         attr = FILE_ATTRIBUTE_NORMAL;
952         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
953
954         if( mgr->idx == INVALID_HANDLE_VALUE )
955                 return GlobalFree(mgr), NULL;
956 #endif
957
958 #ifdef unix
959         pagezero = valloc (BT_maxpage);
960         *amt = 0;
961
962         // read minimum page size to get root info
963         //      to support raw disk partition files
964         //      check if bits == 0 on the disk.
965
966         if( size = lseek (mgr->idx, 0L, 2) )
967                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
968                         if( pagezero->alloc->bits )
969                                 bits = pagezero->alloc->bits;
970                         else
971                                 initit = 1;
972                 else
973                         return free(mgr), free(pagezero), NULL;
974         else
975                 initit = 1;
976 #else
977         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
978         size = GetFileSize(mgr->idx, amt);
979
980         if( size || *amt ) {
981                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
982                         return bt_mgrclose (mgr), NULL;
983                 bits = pagezero->alloc->bits;
984         } else
985                 initit = 1;
986 #endif
987
988         mgr->page_size = 1 << bits;
989         mgr->page_bits = bits;
990
991         //  calculate number of latch hash table entries
992
993         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
994         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
995
996         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
997         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
998         mgr->latchtotal = nodemax;
999
1000         if( !initit )
1001                 goto mgrlatch;
1002
1003         // initialize an empty b-tree with latch page, root page, page of leaves
1004         // and page(s) of latches and page pool cache
1005
1006         memset (pagezero, 0, 1 << bits);
1007         pagezero->alloc->bits = mgr->page_bits;
1008         bt_putid(pagezero->alloc->right, MIN_lvl+1);
1009
1010         //  initialize left-most LEAF page in
1011         //      alloc->left.
1012
1013         bt_putid (pagezero->alloc->left, LEAF_page);
1014
1015         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1016                 fprintf (stderr, "Unable to create btree page zero\n");
1017                 return bt_mgrclose (mgr), NULL;
1018         }
1019
1020         memset (pagezero, 0, 1 << bits);
1021         pagezero->alloc->bits = mgr->page_bits;
1022
1023         for( lvl=MIN_lvl; lvl--; ) {
1024                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1025                 key = keyptr(pagezero->alloc, 1);
1026                 key->len = 2;           // create stopper key
1027                 key->key[0] = 0xff;
1028                 key->key[1] = 0xff;
1029
1030                 bt_putid(value, MIN_lvl - lvl + 1);
1031                 val = valptr(pagezero->alloc, 1);
1032                 val->len = lvl ? BtId : 0;
1033                 memcpy (val->value, value, val->len);
1034
1035                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1036                 pagezero->alloc->lvl = lvl;
1037                 pagezero->alloc->cnt = 1;
1038                 pagezero->alloc->act = 1;
1039
1040                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1041                         fprintf (stderr, "Unable to create btree page zero\n");
1042                         return bt_mgrclose (mgr), NULL;
1043                 }
1044         }
1045
1046 mgrlatch:
1047 #ifdef unix
1048         free (pagezero);
1049 #else
1050         VirtualFree (pagezero, 0, MEM_RELEASE);
1051 #endif
1052 #ifdef unix
1053         // mlock the pagezero page
1054
1055         flag = PROT_READ | PROT_WRITE;
1056         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1057         if( mgr->pagezero == MAP_FAILED ) {
1058                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1059                 return bt_mgrclose (mgr), NULL;
1060         }
1061         mlock (mgr->pagezero, mgr->page_size);
1062
1063         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1064         if( mgr->hashtable == MAP_FAILED ) {
1065                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1066                 return bt_mgrclose (mgr), NULL;
1067         }
1068 #else
1069         flag = PAGE_READWRITE;
1070         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1071         if( !mgr->halloc ) {
1072                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1073                 return bt_mgrclose (mgr), NULL;
1074         }
1075
1076         flag = FILE_MAP_WRITE;
1077         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1078         if( !mgr->pagezero ) {
1079                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1080                 return bt_mgrclose (mgr), NULL;
1081         }
1082
1083         flag = PAGE_READWRITE;
1084         size = (uid)mgr->nlatchpage << mgr->page_bits;
1085         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1086         if( !mgr->hpool ) {
1087                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1088                 return bt_mgrclose (mgr), NULL;
1089         }
1090
1091         flag = FILE_MAP_WRITE;
1092         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1093         if( !mgr->hashtable ) {
1094                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1095                 return bt_mgrclose (mgr), NULL;
1096         }
1097 #endif
1098
1099         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1100         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1101
1102         return mgr;
1103 }
1104
1105 //      open BTree access method
1106 //      based on buffer manager
1107
1108 BtDb *bt_open (BtMgr *mgr)
1109 {
1110 BtDb *bt = malloc (sizeof(*bt));
1111
1112         memset (bt, 0, sizeof(*bt));
1113         bt->mgr = mgr;
1114 #ifdef unix
1115         bt->mem = valloc (2 *mgr->page_size);
1116 #else
1117         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1118 #endif
1119         bt->frame = (BtPage)bt->mem;
1120         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1121         return bt;
1122 }
1123
1124 //  compare two keys, return > 0, = 0, or < 0
1125 //  =0: keys are same
1126 //  -1: key2 > key1
1127 //  +1: key2 < key1
1128 //  as the comparison value
1129
1130 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1131 {
1132 uint len1 = key1->len;
1133 int ans;
1134
1135         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1136                 return ans;
1137
1138         if( len1 > len2 )
1139                 return 1;
1140         if( len1 < len2 )
1141                 return -1;
1142
1143         return 0;
1144 }
1145
1146 // place write, read, or parent lock on requested page_no.
1147
1148 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1149 {
1150         switch( mode ) {
1151         case BtLockRead:
1152                 ReadLock (latch->readwr);
1153                 break;
1154         case BtLockWrite:
1155                 WriteLock (latch->readwr);
1156                 break;
1157         case BtLockAccess:
1158                 ReadLock (latch->access);
1159                 break;
1160         case BtLockDelete:
1161                 WriteLock (latch->access);
1162                 break;
1163         case BtLockParent:
1164                 WriteOLock (latch->parent);
1165                 break;
1166         case BtLockAtomic:
1167                 WriteOLock (latch->atomic);
1168                 break;
1169         }
1170 }
1171
1172 // remove write, read, or parent lock on requested page
1173
1174 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1175 {
1176         switch( mode ) {
1177         case BtLockRead:
1178                 ReadRelease (latch->readwr);
1179                 break;
1180         case BtLockWrite:
1181                 WriteRelease (latch->readwr);
1182                 break;
1183         case BtLockAccess:
1184                 ReadRelease (latch->access);
1185                 break;
1186         case BtLockDelete:
1187                 WriteRelease (latch->access);
1188                 break;
1189         case BtLockParent:
1190                 WriteORelease (latch->parent);
1191                 break;
1192         case BtLockAtomic:
1193                 WriteORelease (latch->atomic);
1194                 break;
1195         }
1196 }
1197
1198 //      allocate a new page
1199 //      return with page latched, but unlocked.
1200
1201 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1202 {
1203 uid page_no;
1204 int blk;
1205
1206         //      lock allocation page
1207
1208         bt_spinwritelock(bt->mgr->lock);
1209
1210         // use empty chain first
1211         // else allocate empty page
1212
1213         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1214                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1215                         set->page = bt_mappage (bt, set->latch);
1216                 else
1217                         return bt->err = BTERR_struct, -1;
1218
1219                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1220                 bt_spinreleasewrite(bt->mgr->lock);
1221
1222                 memcpy (set->page, contents, bt->mgr->page_size);
1223                 set->latch->dirty = 1;
1224                 return 0;
1225         }
1226
1227         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1228         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1229
1230         // unlock allocation latch
1231
1232         bt_spinreleasewrite(bt->mgr->lock);
1233
1234         //      don't load cache from btree page
1235
1236         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1237                 set->page = bt_mappage (bt, set->latch);
1238         else
1239                 return bt->err = BTERR_struct;
1240
1241         memcpy (set->page, contents, bt->mgr->page_size);
1242         set->latch->dirty = 1;
1243         return 0;
1244 }
1245
1246 //  find slot in page for given key at a given level
1247
1248 int bt_findslot (BtPage page, unsigned char *key, uint len)
1249 {
1250 uint diff, higher = page->cnt, low = 1, slot;
1251 uint good = 0;
1252
1253         //        make stopper key an infinite fence value
1254
1255         if( bt_getid (page->right) )
1256                 higher++;
1257         else
1258                 good++;
1259
1260         //      low is the lowest candidate.
1261         //  loop ends when they meet
1262
1263         //  higher is already
1264         //      tested as .ge. the passed key.
1265
1266         while( diff = higher - low ) {
1267                 slot = low + ( diff >> 1 );
1268                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1269                         low = slot + 1;
1270                 else
1271                         higher = slot, good++;
1272         }
1273
1274         //      return zero if key is on right link page
1275
1276         return good ? higher : 0;
1277 }
1278
1279 //  find and load page at given level for given key
1280 //      leave page rd or wr locked as requested
1281
1282 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1283 {
1284 uid page_no = ROOT_page, prevpage = 0;
1285 uint drill = 0xff, slot;
1286 BtLatchSet *prevlatch;
1287 uint mode, prevmode;
1288
1289   //  start at root of btree and drill down
1290
1291   do {
1292         // determine lock mode of drill level
1293         mode = (drill == lvl) ? lock : BtLockRead; 
1294
1295         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1296           return 0;
1297
1298         // obtain access lock using lock chaining with Access mode
1299
1300         if( page_no > ROOT_page )
1301           bt_lockpage(BtLockAccess, set->latch);
1302
1303         set->page = bt_mappage (bt, set->latch);
1304
1305         //      release & unpin parent or left sibling page
1306
1307         if( prevpage ) {
1308           bt_unlockpage(prevmode, prevlatch);
1309           bt_unpinlatch (prevlatch);
1310           prevpage = 0;
1311         }
1312
1313         // obtain mode lock using lock chaining through AccessLock
1314
1315         bt_lockpage(mode, set->latch);
1316
1317         if( set->page->free )
1318                 return bt->err = BTERR_struct, 0;
1319
1320         if( page_no > ROOT_page )
1321           bt_unlockpage(BtLockAccess, set->latch);
1322
1323         // re-read and re-lock root after determining actual level of root
1324
1325         if( set->page->lvl != drill) {
1326                 if( set->latch->page_no != ROOT_page )
1327                         return bt->err = BTERR_struct, 0;
1328                         
1329                 drill = set->page->lvl;
1330
1331                 if( lock != BtLockRead && drill == lvl ) {
1332                   bt_unlockpage(mode, set->latch);
1333                   bt_unpinlatch (set->latch);
1334                   continue;
1335                 }
1336         }
1337
1338         prevpage = set->latch->page_no;
1339         prevlatch = set->latch;
1340         prevmode = mode;
1341
1342         //  find key on page at this level
1343         //  and descend to requested level
1344
1345         if( set->page->kill )
1346           goto slideright;
1347
1348         if( slot = bt_findslot (set->page, key, len) ) {
1349           if( drill == lvl )
1350                 return slot;
1351
1352           // find next non-dead slot -- the fence key if nothing else
1353
1354           while( slotptr(set->page, slot)->dead )
1355                 if( slot++ < set->page->cnt )
1356                   continue;
1357                 else
1358                   return bt->err = BTERR_struct, 0;
1359
1360           page_no = bt_getid(valptr(set->page, slot)->value);
1361           drill--;
1362           continue;
1363         }
1364
1365         //  or slide right into next page
1366
1367 slideright:
1368         page_no = bt_getid(set->page->right);
1369
1370   } while( page_no );
1371
1372   // return error on end of right chain
1373
1374   bt->err = BTERR_struct;
1375   return 0;     // return error
1376 }
1377
1378 //      return page to free list
1379 //      page must be delete & write locked
1380
1381 void bt_freepage (BtDb *bt, BtPageSet *set)
1382 {
1383         //      lock allocation page
1384
1385         bt_spinwritelock (bt->mgr->lock);
1386
1387         //      store chain
1388
1389         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1390         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1391         set->latch->dirty = 1;
1392         set->page->free = 1;
1393
1394         // unlock released page
1395
1396         bt_unlockpage (BtLockDelete, set->latch);
1397         bt_unlockpage (BtLockWrite, set->latch);
1398         bt_unpinlatch (set->latch);
1399
1400         // unlock allocation page
1401
1402         bt_spinreleasewrite (bt->mgr->lock);
1403 }
1404
1405 //      a fence key was deleted from a page
1406 //      push new fence value upwards
1407
1408 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1409 {
1410 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1411 unsigned char value[BtId];
1412 BtKey* ptr;
1413 uint idx;
1414
1415         //      remove the old fence value
1416
1417         ptr = keyptr(set->page, set->page->cnt);
1418         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1419         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1420         set->latch->dirty = 1;
1421
1422         //  cache new fence value
1423
1424         ptr = keyptr(set->page, set->page->cnt);
1425         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1426
1427         bt_lockpage (BtLockParent, set->latch);
1428         bt_unlockpage (BtLockWrite, set->latch);
1429
1430         //      insert new (now smaller) fence key
1431
1432         bt_putid (value, set->latch->page_no);
1433         ptr = (BtKey*)leftkey;
1434
1435         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1436           return bt->err;
1437
1438         //      now delete old fence key
1439
1440         ptr = (BtKey*)rightkey;
1441
1442         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1443                 return bt->err;
1444
1445         bt_unlockpage (BtLockParent, set->latch);
1446         bt_unpinlatch(set->latch);
1447         return 0;
1448 }
1449
1450 //      root has a single child
1451 //      collapse a level from the tree
1452
1453 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1454 {
1455 BtPageSet child[1];
1456 uid page_no;
1457 uint idx;
1458
1459   // find the child entry and promote as new root contents
1460
1461   do {
1462         for( idx = 0; idx++ < root->page->cnt; )
1463           if( !slotptr(root->page, idx)->dead )
1464                 break;
1465
1466         page_no = bt_getid (valptr(root->page, idx)->value);
1467
1468         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1469                 child->page = bt_mappage (bt, child->latch);
1470         else
1471                 return bt->err;
1472
1473         bt_lockpage (BtLockDelete, child->latch);
1474         bt_lockpage (BtLockWrite, child->latch);
1475
1476         memcpy (root->page, child->page, bt->mgr->page_size);
1477         root->latch->dirty = 1;
1478
1479         bt_freepage (bt, child);
1480
1481   } while( root->page->lvl > 1 && root->page->act == 1 );
1482
1483   bt_unlockpage (BtLockWrite, root->latch);
1484   bt_unpinlatch (root->latch);
1485   return 0;
1486 }
1487
1488 //  delete a page and manage keys
1489 //  call with page writelocked
1490 //      returns with page unpinned
1491
1492 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1493 {
1494 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1495 unsigned char value[BtId];
1496 uint lvl = set->page->lvl;
1497 BtPageSet right[1];
1498 uid page_no;
1499 BtKey *ptr;
1500
1501         //      cache copy of fence key
1502         //      to post in parent
1503
1504         ptr = keyptr(set->page, set->page->cnt);
1505         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1506
1507         //      obtain lock on right page
1508
1509         page_no = bt_getid(set->page->right);
1510
1511         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1512                 right->page = bt_mappage (bt, right->latch);
1513         else
1514                 return 0;
1515
1516         bt_lockpage (BtLockWrite, right->latch);
1517
1518         // cache copy of key to update
1519
1520         ptr = keyptr(right->page, right->page->cnt);
1521         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1522
1523         if( right->page->kill )
1524                 return bt->err = BTERR_struct;
1525
1526         // pull contents of right peer into our empty page
1527
1528         memcpy (set->page, right->page, bt->mgr->page_size);
1529         set->latch->dirty = 1;
1530
1531         // mark right page deleted and point it to left page
1532         //      until we can post parent updates that remove access
1533         //      to the deleted page.
1534
1535         bt_putid (right->page->right, set->latch->page_no);
1536         right->latch->dirty = 1;
1537         right->page->kill = 1;
1538
1539         bt_lockpage (BtLockParent, right->latch);
1540         bt_unlockpage (BtLockWrite, right->latch);
1541
1542         bt_lockpage (BtLockParent, set->latch);
1543         bt_unlockpage (BtLockWrite, set->latch);
1544
1545         // redirect higher key directly to our new node contents
1546
1547         bt_putid (value, set->latch->page_no);
1548         ptr = (BtKey*)higherfence;
1549
1550         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1551           return bt->err;
1552
1553         //      delete old lower key to our node
1554
1555         ptr = (BtKey*)lowerfence;
1556
1557         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1558           return bt->err;
1559
1560         //      obtain delete and write locks to right node
1561
1562         bt_unlockpage (BtLockParent, right->latch);
1563         bt_lockpage (BtLockDelete, right->latch);
1564         bt_lockpage (BtLockWrite, right->latch);
1565         bt_freepage (bt, right);
1566
1567         bt_unlockpage (BtLockParent, set->latch);
1568         bt_unpinlatch (set->latch);
1569         bt->found = 1;
1570         return 0;
1571 }
1572
1573 //  find and delete key on page by marking delete flag bit
1574 //  if page becomes empty, delete it from the btree
1575
1576 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1577 {
1578 uint slot, idx, found, fence;
1579 BtPageSet set[1];
1580 BtKey *ptr;
1581 BtVal *val;
1582
1583         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1584                 ptr = keyptr(set->page, slot);
1585         else
1586                 return bt->err;
1587
1588         // if librarian slot, advance to real slot
1589
1590         if( slotptr(set->page, slot)->type == Librarian )
1591                 ptr = keyptr(set->page, ++slot);
1592
1593         fence = slot == set->page->cnt;
1594
1595         // if key is found delete it, otherwise ignore request
1596
1597         if( found = !keycmp (ptr, key, len) )
1598           if( found = slotptr(set->page, slot)->dead == 0 ) {
1599                 val = valptr(set->page,slot);
1600                 slotptr(set->page, slot)->dead = 1;
1601                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1602                 set->page->act--;
1603
1604                 // collapse empty slots beneath the fence
1605
1606                 while( idx = set->page->cnt - 1 )
1607                   if( slotptr(set->page, idx)->dead ) {
1608                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1609                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1610                   } else
1611                         break;
1612           }
1613
1614         //      did we delete a fence key in an upper level?
1615
1616         if( found && lvl && set->page->act && fence )
1617           if( bt_fixfence (bt, set, lvl) )
1618                 return bt->err;
1619           else
1620                 return bt->found = found, 0;
1621
1622         //      do we need to collapse root?
1623
1624         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1625           if( bt_collapseroot (bt, set) )
1626                 return bt->err;
1627           else
1628                 return bt->found = found, 0;
1629
1630         //      delete empty page
1631
1632         if( !set->page->act )
1633                 return bt_deletepage (bt, set);
1634
1635         set->latch->dirty = 1;
1636         bt_unlockpage(BtLockWrite, set->latch);
1637         bt_unpinlatch (set->latch);
1638         return bt->found = found, 0;
1639 }
1640
1641 BtKey *bt_foundkey (BtDb *bt)
1642 {
1643         return (BtKey*)(bt->key);
1644 }
1645
1646 //      advance to next slot
1647
1648 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1649 {
1650 BtLatchSet *prevlatch;
1651 uid page_no;
1652
1653         if( slot < set->page->cnt )
1654                 return slot + 1;
1655
1656         prevlatch = set->latch;
1657
1658         if( page_no = bt_getid(set->page->right) )
1659           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1660                 set->page = bt_mappage (bt, set->latch);
1661           else
1662                 return 0;
1663         else
1664           return bt->err = BTERR_struct, 0;
1665
1666         // obtain access lock using lock chaining with Access mode
1667
1668         bt_lockpage(BtLockAccess, set->latch);
1669
1670         bt_unlockpage(BtLockRead, prevlatch);
1671         bt_unpinlatch (prevlatch);
1672
1673         bt_lockpage(BtLockRead, set->latch);
1674         bt_unlockpage(BtLockAccess, set->latch);
1675         return 1;
1676 }
1677
1678 //      find unique key or first duplicate key in
1679 //      leaf level and return number of value bytes
1680 //      or (-1) if not found.  Setup key for bt_foundkey
1681
1682 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1683 {
1684 BtPageSet set[1];
1685 uint len, slot;
1686 int ret = -1;
1687 BtKey *ptr;
1688 BtVal *val;
1689
1690   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1691    do {
1692         ptr = keyptr(set->page, slot);
1693
1694         //      skip librarian slot place holder
1695
1696         if( slotptr(set->page, slot)->type == Librarian )
1697                 ptr = keyptr(set->page, ++slot);
1698
1699         //      return actual key found
1700
1701         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1702         len = ptr->len;
1703
1704         if( slotptr(set->page, slot)->type == Duplicate )
1705                 len -= BtId;
1706
1707         //      not there if we reach the stopper key
1708
1709         if( slot == set->page->cnt )
1710           if( !bt_getid (set->page->right) )
1711                 break;
1712
1713         // if key exists, return >= 0 value bytes copied
1714         //      otherwise return (-1)
1715
1716         if( slotptr(set->page, slot)->dead )
1717                 continue;
1718
1719         if( keylen == len )
1720           if( !memcmp (ptr->key, key, len) ) {
1721                 val = valptr (set->page,slot);
1722                 if( valmax > val->len )
1723                         valmax = val->len;
1724                 memcpy (value, val->value, valmax);
1725                 ret = valmax;
1726           }
1727
1728         break;
1729
1730    } while( slot = bt_findnext (bt, set, slot) );
1731
1732   bt_unlockpage (BtLockRead, set->latch);
1733   bt_unpinlatch (set->latch);
1734   return ret;
1735 }
1736
1737 //      check page for space available,
1738 //      clean if necessary and return
1739 //      0 - page needs splitting
1740 //      >0  new slot value
1741
1742 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1743 {
1744 uint nxt = bt->mgr->page_size;
1745 BtPage page = set->page;
1746 uint cnt = 0, idx = 0;
1747 uint max = page->cnt;
1748 uint newslot = max;
1749 BtKey *key;
1750 BtVal *val;
1751
1752         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1753                 return slot;
1754
1755         //      skip cleanup and proceed to split
1756         //      if there's not enough garbage
1757         //      to bother with.
1758
1759         if( page->garbage < nxt / 5 )
1760                 return 0;
1761
1762         memcpy (bt->frame, page, bt->mgr->page_size);
1763
1764         // skip page info and set rest of page to zero
1765
1766         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1767         set->latch->dirty = 1;
1768         page->garbage = 0;
1769         page->act = 0;
1770
1771         // clean up page first by
1772         // removing deleted keys
1773
1774         while( cnt++ < max ) {
1775                 if( cnt == slot )
1776                         newslot = idx + 2;
1777                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1778                         continue;
1779
1780                 // copy the value across
1781
1782                 val = valptr(bt->frame, cnt);
1783                 nxt -= val->len + sizeof(BtVal);
1784                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1785
1786                 // copy the key across
1787
1788                 key = keyptr(bt->frame, cnt);
1789                 nxt -= key->len + sizeof(BtKey);
1790                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1791
1792                 // make a librarian slot
1793
1794                 if( idx ) {
1795                         slotptr(page, ++idx)->off = nxt;
1796                         slotptr(page, idx)->type = Librarian;
1797                         slotptr(page, idx)->dead = 1;
1798                 }
1799
1800                 // set up the slot
1801
1802                 slotptr(page, ++idx)->off = nxt;
1803                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1804
1805                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1806                         page->act++;
1807         }
1808
1809         page->min = nxt;
1810         page->cnt = idx;
1811
1812         //      see if page has enough space now, or does it need splitting?
1813
1814         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1815                 return newslot;
1816
1817         return 0;
1818 }
1819
1820 // split the root and raise the height of the btree
1821
1822 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1823 {  
1824 unsigned char leftkey[BT_keyarray];
1825 uint nxt = bt->mgr->page_size;
1826 unsigned char value[BtId];
1827 BtPageSet left[1];
1828 uid left_page_no;
1829 BtKey *ptr;
1830 BtVal *val;
1831
1832         //      save left page fence key for new root
1833
1834         ptr = keyptr(root->page, root->page->cnt);
1835         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1836
1837         //  Obtain an empty page to use, and copy the current
1838         //  root contents into it, e.g. lower keys
1839
1840         if( bt_newpage(bt, left, root->page) )
1841                 return bt->err;
1842
1843         left_page_no = left->latch->page_no;
1844         bt_unpinlatch (left->latch);
1845
1846         // preserve the page info at the bottom
1847         // of higher keys and set rest to zero
1848
1849         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1850
1851         // insert stopper key at top of newroot page
1852         // and increase the root height
1853
1854         nxt -= BtId + sizeof(BtVal);
1855         bt_putid (value, right->page_no);
1856         val = (BtVal *)((unsigned char *)root->page + nxt);
1857         memcpy (val->value, value, BtId);
1858         val->len = BtId;
1859
1860         nxt -= 2 + sizeof(BtKey);
1861         slotptr(root->page, 2)->off = nxt;
1862         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1863         ptr->len = 2;
1864         ptr->key[0] = 0xff;
1865         ptr->key[1] = 0xff;
1866
1867         // insert lower keys page fence key on newroot page as first key
1868
1869         nxt -= BtId + sizeof(BtVal);
1870         bt_putid (value, left_page_no);
1871         val = (BtVal *)((unsigned char *)root->page + nxt);
1872         memcpy (val->value, value, BtId);
1873         val->len = BtId;
1874
1875         ptr = (BtKey *)leftkey;
1876         nxt -= ptr->len + sizeof(BtKey);
1877         slotptr(root->page, 1)->off = nxt;
1878         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1879         
1880         bt_putid(root->page->right, 0);
1881         root->page->min = nxt;          // reset lowest used offset and key count
1882         root->page->cnt = 2;
1883         root->page->act = 2;
1884         root->page->lvl++;
1885
1886         // release and unpin root pages
1887
1888         bt_unlockpage(BtLockWrite, root->latch);
1889         bt_unpinlatch (root->latch);
1890
1891         bt_unpinlatch (right);
1892         return 0;
1893 }
1894
1895 //  split already locked full node
1896 //      leave it locked.
1897 //      return pool entry for new right
1898 //      page, unlocked
1899
1900 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1901 {
1902 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1903 uint lvl = set->page->lvl;
1904 BtPageSet right[1];
1905 BtKey *key, *ptr;
1906 BtVal *val, *src;
1907 uid right2;
1908 uint prev;
1909
1910         //  split higher half of keys to bt->frame
1911
1912         memset (bt->frame, 0, bt->mgr->page_size);
1913         max = set->page->cnt;
1914         cnt = max / 2;
1915         idx = 0;
1916
1917         while( cnt++ < max ) {
1918                 if( slotptr(set->page, cnt)->dead && cnt < max )
1919                         continue;
1920                 src = valptr(set->page, cnt);
1921                 nxt -= src->len + sizeof(BtVal);
1922                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1923
1924                 key = keyptr(set->page, cnt);
1925                 nxt -= key->len + sizeof(BtKey);
1926                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1927                 memcpy (ptr, key, key->len + sizeof(BtKey));
1928
1929                 //      add librarian slot
1930
1931                 if( idx ) {
1932                         slotptr(bt->frame, ++idx)->off = nxt;
1933                         slotptr(bt->frame, idx)->type = Librarian;
1934                         slotptr(bt->frame, idx)->dead = 1;
1935                 }
1936
1937                 //  add actual slot
1938
1939                 slotptr(bt->frame, ++idx)->off = nxt;
1940                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1941
1942                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1943                         bt->frame->act++;
1944         }
1945
1946         bt->frame->bits = bt->mgr->page_bits;
1947         bt->frame->min = nxt;
1948         bt->frame->cnt = idx;
1949         bt->frame->lvl = lvl;
1950
1951         // link right node
1952
1953         if( set->latch->page_no > ROOT_page )
1954                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1955
1956         //      get new free page and write higher keys to it.
1957
1958         if( bt_newpage(bt, right, bt->frame) )
1959                 return 0;
1960
1961         memcpy (bt->frame, set->page, bt->mgr->page_size);
1962         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1963         set->latch->dirty = 1;
1964
1965         nxt = bt->mgr->page_size;
1966         set->page->garbage = 0;
1967         set->page->act = 0;
1968         max /= 2;
1969         cnt = 0;
1970         idx = 0;
1971
1972         if( slotptr(bt->frame, max)->type == Librarian )
1973                 max--;
1974
1975         //  assemble page of smaller keys
1976
1977         while( cnt++ < max ) {
1978                 if( slotptr(bt->frame, cnt)->dead )
1979                         continue;
1980                 val = valptr(bt->frame, cnt);
1981                 nxt -= val->len + sizeof(BtVal);
1982                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1983
1984                 key = keyptr(bt->frame, cnt);
1985                 nxt -= key->len + sizeof(BtKey);
1986                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1987
1988                 //      add librarian slot
1989
1990                 if( idx ) {
1991                         slotptr(set->page, ++idx)->off = nxt;
1992                         slotptr(set->page, idx)->type = Librarian;
1993                         slotptr(set->page, idx)->dead = 1;
1994                 }
1995
1996                 //      add actual slot
1997
1998                 slotptr(set->page, ++idx)->off = nxt;
1999                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2000                 set->page->act++;
2001         }
2002
2003         bt_putid(set->page->right, right->latch->page_no);
2004         set->page->min = nxt;
2005         set->page->cnt = idx;
2006
2007         return right->latch->entry;
2008 }
2009
2010 //      fix keys for newly split page
2011 //      call with page locked, return
2012 //      unlocked
2013
2014 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2015 {
2016 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2017 unsigned char value[BtId];
2018 uint lvl = set->page->lvl;
2019 BtPage page;
2020 BtKey *ptr;
2021
2022         // if current page is the root page, split it
2023
2024         if( set->latch->page_no == ROOT_page )
2025                 return bt_splitroot (bt, set, right);
2026
2027         ptr = keyptr(set->page, set->page->cnt);
2028         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2029
2030         page = bt_mappage (bt, right);
2031
2032         ptr = keyptr(page, page->cnt);
2033         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2034
2035         // insert new fences in their parent pages
2036
2037         bt_lockpage (BtLockParent, right);
2038
2039         bt_lockpage (BtLockParent, set->latch);
2040         bt_unlockpage (BtLockWrite, set->latch);
2041
2042         // insert new fence for reformulated left block of smaller keys
2043
2044         bt_putid (value, set->latch->page_no);
2045         ptr = (BtKey *)leftkey;
2046
2047         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2048                 return bt->err;
2049
2050         // switch fence for right block of larger keys to new right page
2051
2052         bt_putid (value, right->page_no);
2053         ptr = (BtKey *)rightkey;
2054
2055         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2056                 return bt->err;
2057
2058         bt_unlockpage (BtLockParent, set->latch);
2059         bt_unpinlatch (set->latch);
2060
2061         bt_unlockpage (BtLockParent, right);
2062         bt_unpinlatch (right);
2063         return 0;
2064 }
2065
2066 //      install new key and value onto page
2067 //      page must already be checked for
2068 //      adequate space
2069
2070 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2071 {
2072 uint idx, librarian;
2073 BtSlot *node;
2074 BtKey *ptr;
2075 BtVal *val;
2076
2077         //      if found slot > desired slot and previous slot
2078         //      is a librarian slot, use it
2079
2080         if( slot > 1 )
2081           if( slotptr(set->page, slot-1)->type == Librarian )
2082                 slot--;
2083
2084         // copy value onto page
2085
2086         set->page->min -= vallen + sizeof(BtVal);
2087         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2088         memcpy (val->value, value, vallen);
2089         val->len = vallen;
2090
2091         // copy key onto page
2092
2093         set->page->min -= keylen + sizeof(BtKey);
2094         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2095         memcpy (ptr->key, key, keylen);
2096         ptr->len = keylen;
2097         
2098         //      find first empty slot
2099
2100         for( idx = slot; idx < set->page->cnt; idx++ )
2101           if( slotptr(set->page, idx)->dead )
2102                 break;
2103
2104         // now insert key into array before slot
2105
2106         if( idx == set->page->cnt )
2107                 idx += 2, set->page->cnt += 2, librarian = 2;
2108         else
2109                 librarian = 1;
2110
2111         set->latch->dirty = 1;
2112         set->page->act++;
2113
2114         while( idx > slot + librarian - 1 )
2115                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2116
2117         //      add librarian slot
2118
2119         if( librarian > 1 ) {
2120                 node = slotptr(set->page, slot++);
2121                 node->off = set->page->min;
2122                 node->type = Librarian;
2123                 node->dead = 1;
2124         }
2125
2126         //      fill in new slot
2127
2128         node = slotptr(set->page, slot);
2129         node->off = set->page->min;
2130         node->type = type;
2131         node->dead = 0;
2132
2133         if( release ) {
2134                 bt_unlockpage (BtLockWrite, set->latch);
2135                 bt_unpinlatch (set->latch);
2136         }
2137
2138         return 0;
2139 }
2140
2141 //  Insert new key into the btree at given level.
2142 //      either add a new key or update/add an existing one
2143
2144 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2145 {
2146 unsigned char newkey[BT_keyarray];
2147 uint slot, idx, len, entry;
2148 BtPageSet set[1];
2149 BtKey *ptr, *ins;
2150 uid sequence;
2151 BtVal *val;
2152 uint type;
2153
2154   // set up the key we're working on
2155
2156   ins = (BtKey*)newkey;
2157   memcpy (ins->key, key, keylen);
2158   ins->len = keylen;
2159
2160   // is this a non-unique index value?
2161
2162   if( unique )
2163         type = Unique;
2164   else {
2165         type = Duplicate;
2166         sequence = bt_newdup (bt);
2167         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2168         ins->len += BtId;
2169   }
2170
2171   while( 1 ) { // find the page and slot for the current key
2172         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2173                 ptr = keyptr(set->page, slot);
2174         else {
2175                 if( !bt->err )
2176                         bt->err = BTERR_ovflw;
2177                 return bt->err;
2178         }
2179
2180         // if librarian slot == found slot, advance to real slot
2181
2182         if( slotptr(set->page, slot)->type == Librarian )
2183           if( !keycmp (ptr, key, keylen) )
2184                 ptr = keyptr(set->page, ++slot);
2185
2186         len = ptr->len;
2187
2188         if( slotptr(set->page, slot)->type == Duplicate )
2189                 len -= BtId;
2190
2191         //  if inserting a duplicate key or unique key
2192         //      check for adequate space on the page
2193         //      and insert the new key before slot.
2194
2195         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2196           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2197                 if( !(entry = bt_splitpage (bt, set)) )
2198                   return bt->err;
2199                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2200                   return bt->err;
2201                 else
2202                   continue;
2203
2204           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2205         }
2206
2207         // if key already exists, update value and return
2208
2209         val = valptr(set->page, slot);
2210
2211         if( val->len >= vallen ) {
2212                 if( slotptr(set->page, slot)->dead )
2213                         set->page->act++;
2214                 set->page->garbage += val->len - vallen;
2215                 set->latch->dirty = 1;
2216                 slotptr(set->page, slot)->dead = 0;
2217                 val->len = vallen;
2218                 memcpy (val->value, value, vallen);
2219                 bt_unlockpage(BtLockWrite, set->latch);
2220                 bt_unpinlatch (set->latch);
2221                 return 0;
2222         }
2223
2224         //      new update value doesn't fit in existing value area
2225
2226         if( !slotptr(set->page, slot)->dead )
2227                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2228         else {
2229                 slotptr(set->page, slot)->dead = 0;
2230                 set->page->act++;
2231         }
2232
2233         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2234           if( !(entry = bt_splitpage (bt, set)) )
2235                 return bt->err;
2236           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2237                 return bt->err;
2238           else
2239                 continue;
2240
2241         set->page->min -= vallen + sizeof(BtVal);
2242         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2243         memcpy (val->value, value, vallen);
2244         val->len = vallen;
2245
2246         set->latch->dirty = 1;
2247         set->page->min -= keylen + sizeof(BtKey);
2248         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2249         memcpy (ptr->key, key, keylen);
2250         ptr->len = keylen;
2251         
2252         slotptr(set->page, slot)->off = set->page->min;
2253         bt_unlockpage(BtLockWrite, set->latch);
2254         bt_unpinlatch (set->latch);
2255         return 0;
2256   }
2257   return 0;
2258 }
2259
2260 typedef struct {
2261         uint entry;                     // latch table entry number
2262         uint slot:31;           // page slot number
2263         uint reuse:1;           // reused previous page
2264 } AtomicMod;
2265
2266 typedef struct {
2267         uid page_no;            // page number for split leaf
2268         void *next;                     // next key to insert
2269         uint entry:29;          // latch table entry number
2270         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2271         uint nounlock:1;        // don't unlock ParentModification
2272         unsigned char leafkey[BT_keyarray];
2273 } AtomicKey;
2274
2275 //  find and load leaf page for given key
2276 //      leave page Atomic locked and Read locked.
2277
2278 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2279 {
2280 BtLatchSet *prevlatch;
2281 uid page_no;
2282 uint slot;
2283
2284   //  find level zero page
2285
2286   if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2287         return 0;
2288
2289   // find next non-dead entry on this page
2290   //    it will be the fence key if nothing else
2291
2292   while( slotptr(set->page, slot)->dead )
2293         if( slot++ < set->page->cnt )
2294           continue;
2295         else
2296           return bt->err = BTERR_struct, 0;
2297
2298   page_no = bt_getid(valptr(set->page, slot)->value);
2299   prevlatch = set->latch;
2300
2301   while( page_no ) {
2302         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2303           set->page = bt_mappage (bt, set->latch);
2304         else
2305           return 0;
2306
2307         // obtain read lock using lock chaining with Access mode
2308         //      release & unpin parent/left sibling page
2309
2310         bt_lockpage(BtLockAccess, set->latch);
2311
2312         bt_unlockpage(BtLockRead, prevlatch);
2313         bt_unpinlatch (prevlatch);
2314
2315         bt_lockpage(BtLockRead, set->latch);
2316         bt_unlockpage(BtLockAccess, set->latch);
2317
2318         //  find key on page at this level
2319         //  and descend to requested level
2320
2321         if( !set->page->kill )
2322          if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2323           bt_unlockpage(BtLockRead, set->latch);
2324           bt_lockpage(BtLockAccess, set->latch);
2325           bt_lockpage(BtLockAtomic, set->latch);
2326           bt_lockpage(BtLockRead, set->latch);
2327           bt_unlockpage(BtLockAccess, set->latch);
2328
2329           if( !set->page->kill )
2330            if( slot = bt_findslot (set->page, key, len) )
2331                 return slot;
2332
2333           bt_unlockpage(BtLockAtomic, set->latch);
2334           }
2335
2336         //  slide right into next page
2337
2338         page_no = bt_getid(set->page->right);
2339         prevlatch = set->latch;
2340   }
2341
2342   // return error on end of right chain
2343
2344   bt->err = BTERR_struct;
2345   return 0;     // return error
2346 }
2347
2348 //      determine actual page where key is located
2349 //  return slot number
2350
2351 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2352 {
2353 BtKey *key = keyptr(source,src);
2354 uint slot = locks[src].slot;
2355 uint entry;
2356
2357         if( src > 1 && locks[src].reuse )
2358           entry = locks[src-1].entry, slot = 0;
2359         else
2360           entry = locks[src].entry;
2361
2362         if( slot ) {
2363                 set->latch = bt->mgr->latchsets + entry;
2364                 set->page = bt_mappage (bt, set->latch);
2365                 return slot;
2366         }
2367
2368         //      is locks->reuse set?
2369         //      if so, find where our key
2370         //      is located on previous page or split pages
2371
2372         do {
2373                 set->latch = bt->mgr->latchsets + entry;
2374                 set->page = bt_mappage (bt, set->latch);
2375
2376                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2377                   if( locks[src].reuse )
2378                         locks[src].entry = entry;
2379                   return slot;
2380                 }
2381         } while( entry = set->latch->split );
2382
2383         bt->err = BTERR_atomic;
2384         return 0;
2385 }
2386
2387 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2388 {
2389 BtKey *key = keyptr(source, src);
2390 BtVal *val = valptr(source, src);
2391 BtLatchSet *latch;
2392 BtPageSet set[1];
2393 uint entry;
2394
2395   while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2396         if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2397           return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2398
2399         if( entry = bt_splitpage (bt, set) )
2400           latch = bt->mgr->latchsets + entry;
2401         else
2402           return bt->err;
2403
2404         //      splice right page into split chain
2405         //      and WriteLock it.
2406
2407         latch->split = set->latch->split;
2408         set->latch->split = entry;
2409         bt_lockpage(BtLockWrite, latch);
2410   }
2411
2412   return bt->err = BTERR_atomic;
2413 }
2414
2415 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2416 {
2417 BtKey *key = keyptr(source, src);
2418 uint idx, entry, slot;
2419 BtPageSet set[1];
2420 BtKey *ptr;
2421 BtVal *val;
2422
2423         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2424                 slotptr(set->page, slot)->dead = 1;
2425         else
2426                 return bt->err = BTERR_struct;
2427
2428         ptr = keyptr(set->page, slot);
2429         val = valptr(set->page, slot);
2430
2431         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2432         set->latch->dirty = 1;
2433         set->page->act--;
2434         return 0;
2435 }
2436
2437 //      delete an empty master page for a transaction
2438
2439 //      note that the far right page never empties because
2440 //      it always contains (at least) the infinite stopper key
2441 //      and that all pages that don't contain any keys are
2442 //      deleted, or are being held under Atomic lock.
2443
2444 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2445 {
2446 BtPageSet right[1], temp[1];
2447 unsigned char value[BtId];
2448 uid right_page_no;
2449 BtKey *ptr;
2450
2451         bt_lockpage(BtLockWrite, prev->latch);
2452
2453         //      grab the right sibling
2454
2455         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2456                 right->page = bt_mappage (bt, right->latch);
2457         else
2458                 return bt->err;
2459
2460         bt_lockpage(BtLockAtomic, right->latch);
2461         bt_lockpage(BtLockWrite, right->latch);
2462
2463         //      and pull contents over empty page
2464         //      while preserving master's left link
2465
2466         memcpy (right->page->left, prev->page->left, BtId);
2467         memcpy (prev->page, right->page, bt->mgr->page_size);
2468
2469         //      forward seekers to old right sibling
2470         //      to new page location in set
2471
2472         bt_putid (right->page->right, prev->latch->page_no);
2473         right->latch->dirty = 1;
2474         right->page->kill = 1;
2475
2476         //      remove pointer to right page for searchers by
2477         //      changing right fence key to point to the master page
2478
2479         ptr = keyptr(right->page,right->page->cnt);
2480         bt_putid (value, prev->latch->page_no);
2481
2482         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2483                 return bt->err;
2484
2485         //  now that master page is in good shape we can
2486         //      remove its locks.
2487
2488         bt_unlockpage (BtLockAtomic, prev->latch);
2489         bt_unlockpage (BtLockWrite, prev->latch);
2490
2491         //  fix master's right sibling's left pointer
2492         //      to remove scanner's poiner to the right page
2493
2494         if( right_page_no = bt_getid (prev->page->right) ) {
2495           if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2496                 temp->page = bt_mappage (bt, temp->latch);
2497
2498           bt_lockpage (BtLockWrite, temp->latch);
2499           bt_putid (temp->page->left, prev->latch->page_no);
2500           temp->latch->dirty = 1;
2501
2502           bt_unlockpage (BtLockWrite, temp->latch);
2503           bt_unpinlatch (temp->latch);
2504         } else {        // master is now the far right page
2505           bt_spinwritelock (bt->mgr->lock);
2506           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2507           bt_spinreleasewrite(bt->mgr->lock);
2508         }
2509
2510         //      now that there are no pointers to the right page
2511         //      we can delete it after the last read access occurs
2512
2513         bt_unlockpage (BtLockWrite, right->latch);
2514         bt_unlockpage (BtLockAtomic, right->latch);
2515         bt_lockpage (BtLockDelete, right->latch);
2516         bt_lockpage (BtLockWrite, right->latch);
2517         bt_freepage (bt, right);
2518         return 0;
2519 }
2520
2521 //      atomic modification of a batch of keys.
2522
2523 //      return -1 if BTERR is set
2524 //      otherwise return slot number
2525 //      causing the key constraint violation
2526 //      or zero on successful completion.
2527
2528 int bt_atomictxn (BtDb *bt, BtPage source)
2529 {
2530 uint src, idx, slot, samepage, entry;
2531 AtomicKey *head, *tail, *leaf;
2532 BtPageSet set[1], prev[1];
2533 unsigned char value[BtId];
2534 BtKey *key, *ptr, *key2;
2535 BtLatchSet *latch;
2536 AtomicMod *locks;
2537 int result = 0;
2538 BtSlot temp[1];
2539 BtPage page;
2540 BtVal *val;
2541 uid right;
2542 int type;
2543
2544   locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2545   head = NULL;
2546   tail = NULL;
2547
2548   // stable sort the list of keys into order to
2549   //    prevent deadlocks between threads.
2550
2551   for( src = 1; src++ < source->cnt; ) {
2552         *temp = *slotptr(source,src);
2553         key = keyptr (source,src);
2554
2555         for( idx = src; --idx; ) {
2556           key2 = keyptr (source,idx);
2557           if( keycmp (key, key2->key, key2->len) < 0 ) {
2558                 *slotptr(source,idx+1) = *slotptr(source,idx);
2559                 *slotptr(source,idx) = *temp;
2560           } else
2561                 break;
2562         }
2563   }
2564
2565   // Load the leaf page for each key
2566   // group same page references with reuse bit
2567   // and determine any constraint violations
2568
2569   for( src = 0; src++ < source->cnt; ) {
2570         key = keyptr(source, src);
2571         slot = 0;
2572
2573         // first determine if this modification falls
2574         // on the same page as the previous modification
2575         //      note that the far right leaf page is a special case
2576
2577         if( samepage = src > 1 )
2578           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2579                 slot = bt_findslot(set->page, key->key, key->len);
2580           else // release read on previous page
2581                 bt_unlockpage(BtLockRead, set->latch); 
2582
2583         if( !slot )
2584           if( slot = bt_atomicload(bt, set, key->key, key->len) )
2585                 set->latch->split = 0;
2586           else
2587                 return -1;
2588
2589         if( slotptr(set->page, slot)->type == Librarian )
2590           ptr = keyptr(set->page, ++slot);
2591         else
2592           ptr = keyptr(set->page, slot);
2593
2594         if( !samepage ) {
2595           locks[src].entry = set->latch->entry;
2596           locks[src].slot = slot;
2597           locks[src].reuse = 0;
2598         } else {
2599           locks[src].entry = 0;
2600           locks[src].slot = 0;
2601           locks[src].reuse = 1;
2602         }
2603
2604         switch( slotptr(source, src)->type ) {
2605         case Duplicate:
2606         case Unique:
2607           if( !slotptr(set->page, slot)->dead )
2608            if( slot < set->page->cnt || bt_getid (set->page->right) )
2609             if( !keycmp (ptr, key->key, key->len) ) {
2610
2611                   // return constraint violation if key already exists
2612
2613                   bt_unlockpage(BtLockRead, set->latch);
2614                   result = src;
2615
2616                   while( src ) {
2617                         if( locks[src].entry ) {
2618                           set->latch = bt->mgr->latchsets + locks[src].entry;
2619                           bt_unlockpage(BtLockAtomic, set->latch);
2620                           bt_unpinlatch (set->latch);
2621                         }
2622                         src--;
2623                   }
2624                   free (locks);
2625                   return result;
2626                 }
2627           break;
2628         }
2629   }
2630
2631   //  unlock last loadpage lock
2632
2633   if( source->cnt > 1 )
2634         bt_unlockpage(BtLockRead, set->latch);
2635
2636   //  obtain write lock for each master page
2637
2638   for( src = 0; src++ < source->cnt; )
2639         if( locks[src].reuse )
2640           continue;
2641         else
2642           bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2643
2644   // insert or delete each key
2645   // process any splits or merges
2646   // release Write & Atomic latches
2647   // set ParentModifications and build
2648   // queue of keys to insert for split pages
2649   // or delete for deleted pages.
2650
2651   // run through txn list backwards
2652
2653   samepage = source->cnt + 1;
2654
2655   for( src = source->cnt; src; src-- ) {
2656         if( locks[src].reuse )
2657           continue;
2658
2659         //  perform the txn operations
2660         //      from smaller to larger on
2661         //  the same page
2662
2663         for( idx = src; idx < samepage; idx++ )
2664          switch( slotptr(source,idx)->type ) {
2665          case Delete:
2666           if( bt_atomicdelete (bt, source, locks, idx) )
2667                 return -1;
2668           break;
2669
2670         case Duplicate:
2671         case Unique:
2672           if( bt_atomicinsert (bt, source, locks, idx) )
2673                 return -1;
2674           break;
2675         }
2676
2677         //      after the same page operations have finished,
2678         //  process master page for splits or deletion.
2679
2680         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2681         prev->page = bt_mappage (bt, prev->latch);
2682         samepage = src;
2683
2684         //  pick-up all splits from master page
2685         //      each one is already WriteLocked.
2686
2687         entry = prev->latch->split;
2688
2689         while( entry ) {
2690           set->latch = bt->mgr->latchsets + entry;
2691           set->page = bt_mappage (bt, set->latch);
2692           entry = set->latch->split;
2693
2694           // delete empty master page by undoing its split
2695           //  (this is potentially another empty page)
2696           //  note that there are no new left pointers yet
2697
2698           if( !prev->page->act ) {
2699                 memcpy (set->page->left, prev->page->left, BtId);
2700                 memcpy (prev->page, set->page, bt->mgr->page_size);
2701                 bt_lockpage (BtLockDelete, set->latch);
2702                 bt_freepage (bt, set);
2703
2704                 prev->latch->dirty = 1;
2705                 continue;
2706           }
2707
2708           // remove empty page from the split chain
2709
2710           if( !set->page->act ) {
2711                 memcpy (prev->page->right, set->page->right, BtId);
2712                 prev->latch->split = set->latch->split;
2713                 bt_lockpage (BtLockDelete, set->latch);
2714                 bt_freepage (bt, set);
2715                 continue;
2716           }
2717
2718           //  schedule prev fence key update
2719
2720           ptr = keyptr(prev->page,prev->page->cnt);
2721           leaf = calloc (sizeof(AtomicKey), 1);
2722
2723           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2724           leaf->page_no = prev->latch->page_no;
2725           leaf->entry = prev->latch->entry;
2726           leaf->type = 0;
2727
2728           if( tail )
2729                 tail->next = leaf;
2730           else
2731                 head = leaf;
2732
2733           tail = leaf;
2734
2735           // splice in the left link into the split page
2736
2737           bt_putid (set->page->left, prev->latch->page_no);
2738           bt_lockpage(BtLockParent, prev->latch);
2739           bt_unlockpage(BtLockWrite, prev->latch);
2740           *prev = *set;
2741         }
2742
2743         //  update left pointer in next right page from last split page
2744         //      (if all splits were reversed, latch->split == 0)
2745
2746         if( latch->split ) {
2747           //  fix left pointer in master's original (now split)
2748           //  far right sibling or set rightmost page in page zero
2749
2750           if( right = bt_getid (prev->page->right) ) {
2751                 if( set->latch = bt_pinlatch (bt, right, 1) )
2752                   set->page = bt_mappage (bt, set->latch);
2753                 else
2754                   return -1;
2755
2756             bt_lockpage (BtLockWrite, set->latch);
2757             bt_putid (set->page->left, prev->latch->page_no);
2758                 set->latch->dirty = 1;
2759             bt_unlockpage (BtLockWrite, set->latch);
2760                 bt_unpinlatch (set->latch);
2761           } else {      // prev is rightmost page
2762             bt_spinwritelock (bt->mgr->lock);
2763                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2764             bt_spinreleasewrite(bt->mgr->lock);
2765           }
2766
2767           //  Process last page split in chain
2768
2769           ptr = keyptr(prev->page,prev->page->cnt);
2770           leaf = calloc (sizeof(AtomicKey), 1);
2771
2772           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2773           leaf->page_no = prev->latch->page_no;
2774           leaf->entry = prev->latch->entry;
2775           leaf->type = 0;
2776   
2777           if( tail )
2778                 tail->next = leaf;
2779           else
2780                 head = leaf;
2781
2782           tail = leaf;
2783
2784           bt_lockpage(BtLockParent, prev->latch);
2785           bt_unlockpage(BtLockWrite, prev->latch);
2786
2787           //  remove atomic lock on master page
2788
2789           bt_unlockpage(BtLockAtomic, latch);
2790           continue;
2791         }
2792
2793         //  finished if prev page occupied (either master or final split)
2794
2795         if( prev->page->act ) {
2796           bt_unlockpage(BtLockWrite, latch);
2797           bt_unlockpage(BtLockAtomic, latch);
2798           bt_unpinlatch(latch);
2799           continue;
2800         }
2801
2802         // any and all splits were reversed, and the
2803         // master page located in prev is empty, delete it
2804         // by pulling over master's right sibling.
2805
2806         // Remove empty master's fence key
2807
2808         ptr = keyptr(prev->page,prev->page->cnt);
2809
2810         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2811                 return -1;
2812
2813         //      perform the remainder of the delete
2814         //      from the FIFO queue
2815
2816         leaf = calloc (sizeof(AtomicKey), 1);
2817
2818         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2819         leaf->page_no = prev->latch->page_no;
2820         leaf->entry = prev->latch->entry;
2821         leaf->nounlock = 1;
2822         leaf->type = 2;
2823   
2824         if( tail )
2825           tail->next = leaf;
2826         else
2827           head = leaf;
2828
2829         tail = leaf;
2830
2831         //      leave atomic lock in place until
2832         //      deletion completes in next phase.
2833
2834         bt_unlockpage(BtLockWrite, prev->latch);
2835   }
2836   
2837   //  add & delete keys for any pages split or merged during transaction
2838
2839   if( leaf = head )
2840     do {
2841           set->latch = bt->mgr->latchsets + leaf->entry;
2842           set->page = bt_mappage (bt, set->latch);
2843
2844           bt_putid (value, leaf->page_no);
2845           ptr = (BtKey *)leaf->leafkey;
2846
2847           switch( leaf->type ) {
2848           case 0:       // insert key
2849             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2850                   return -1;
2851
2852                 break;
2853
2854           case 1:       // delete key
2855                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2856                   return -1;
2857
2858                 break;
2859
2860           case 2:       // free page
2861                 if( bt_atomicfree (bt, set) )
2862                   return -1;
2863
2864                 break;
2865           }
2866
2867           if( !leaf->nounlock )
2868             bt_unlockpage (BtLockParent, set->latch);
2869
2870           bt_unpinlatch (set->latch);
2871           tail = leaf->next;
2872           free (leaf);
2873         } while( leaf = tail );
2874
2875   // return success
2876
2877   free (locks);
2878   return 0;
2879 }
2880
2881 //      set cursor to highest slot on highest page
2882
2883 uint bt_lastkey (BtDb *bt)
2884 {
2885 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2886 BtPageSet set[1];
2887
2888         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2889                 set->page = bt_mappage (bt, set->latch);
2890         else
2891                 return 0;
2892
2893     bt_lockpage(BtLockRead, set->latch);
2894         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2895     bt_unlockpage(BtLockRead, set->latch);
2896         bt_unpinlatch (set->latch);
2897
2898         bt->cursor_page = page_no;
2899         return bt->cursor->cnt;
2900 }
2901
2902 //      return previous slot on cursor page
2903
2904 uint bt_prevkey (BtDb *bt, uint slot)
2905 {
2906 uid ourright, next, us = bt->cursor_page;
2907 BtPageSet set[1];
2908
2909         if( --slot )
2910                 return slot;
2911
2912         ourright = bt_getid(bt->cursor->right);
2913
2914 goleft:
2915         if( !(next = bt_getid(bt->cursor->left)) )
2916                 return 0;
2917
2918 findourself:
2919         bt->cursor_page = next;
2920
2921         if( set->latch = bt_pinlatch (bt, next, 1) )
2922                 set->page = bt_mappage (bt, set->latch);
2923         else
2924                 return 0;
2925
2926     bt_lockpage(BtLockRead, set->latch);
2927         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2928         bt_unlockpage(BtLockRead, set->latch);
2929         bt_unpinlatch (set->latch);
2930         
2931         next = bt_getid (bt->cursor->right);
2932
2933         if( bt->cursor->kill )
2934                 goto findourself;
2935
2936         if( next != us )
2937           if( next == ourright )
2938                 goto goleft;
2939           else
2940                 goto findourself;
2941
2942         return bt->cursor->cnt;
2943 }
2944
2945 //  return next slot on cursor page
2946 //  or slide cursor right into next page
2947
2948 uint bt_nextkey (BtDb *bt, uint slot)
2949 {
2950 BtPageSet set[1];
2951 uid right;
2952
2953   do {
2954         right = bt_getid(bt->cursor->right);
2955
2956         while( slot++ < bt->cursor->cnt )
2957           if( slotptr(bt->cursor,slot)->dead )
2958                 continue;
2959           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2960                 return slot;
2961           else
2962                 break;
2963
2964         if( !right )
2965                 break;
2966
2967         bt->cursor_page = right;
2968
2969         if( set->latch = bt_pinlatch (bt, right, 1) )
2970                 set->page = bt_mappage (bt, set->latch);
2971         else
2972                 return 0;
2973
2974     bt_lockpage(BtLockRead, set->latch);
2975
2976         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2977
2978         bt_unlockpage(BtLockRead, set->latch);
2979         bt_unpinlatch (set->latch);
2980         slot = 0;
2981
2982   } while( 1 );
2983
2984   return bt->err = 0;
2985 }
2986
2987 //  cache page of keys into cursor and return starting slot for given key
2988
2989 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2990 {
2991 BtPageSet set[1];
2992 uint slot;
2993
2994         // cache page for retrieval
2995
2996         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2997           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2998         else
2999           return 0;
3000
3001         bt->cursor_page = set->latch->page_no;
3002
3003         bt_unlockpage(BtLockRead, set->latch);
3004         bt_unpinlatch (set->latch);
3005         return slot;
3006 }
3007
3008 BtKey *bt_key(BtDb *bt, uint slot)
3009 {
3010         return keyptr(bt->cursor, slot);
3011 }
3012
3013 BtVal *bt_val(BtDb *bt, uint slot)
3014 {
3015         return valptr(bt->cursor,slot);
3016 }
3017
3018 #ifdef STANDALONE
3019
3020 #ifndef unix
3021 double getCpuTime(int type)
3022 {
3023 FILETIME crtime[1];
3024 FILETIME xittime[1];
3025 FILETIME systime[1];
3026 FILETIME usrtime[1];
3027 SYSTEMTIME timeconv[1];
3028 double ans = 0;
3029
3030         memset (timeconv, 0, sizeof(SYSTEMTIME));
3031
3032         switch( type ) {
3033         case 0:
3034                 GetSystemTimeAsFileTime (xittime);
3035                 FileTimeToSystemTime (xittime, timeconv);
3036                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3037                 break;
3038         case 1:
3039                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3040                 FileTimeToSystemTime (usrtime, timeconv);
3041                 break;
3042         case 2:
3043                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3044                 FileTimeToSystemTime (systime, timeconv);
3045                 break;
3046         }
3047
3048         ans += (double)timeconv->wHour * 3600;
3049         ans += (double)timeconv->wMinute * 60;
3050         ans += (double)timeconv->wSecond;
3051         ans += (double)timeconv->wMilliseconds / 1000;
3052         return ans;
3053 }
3054 #else
3055 #include <time.h>
3056 #include <sys/resource.h>
3057
3058 double getCpuTime(int type)
3059 {
3060 struct rusage used[1];
3061 struct timeval tv[1];
3062
3063         switch( type ) {
3064         case 0:
3065                 gettimeofday(tv, NULL);
3066                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3067
3068         case 1:
3069                 getrusage(RUSAGE_SELF, used);
3070                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3071
3072         case 2:
3073                 getrusage(RUSAGE_SELF, used);
3074                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3075         }
3076
3077         return 0;
3078 }
3079 #endif
3080
3081 void bt_poolaudit (BtMgr *mgr)
3082 {
3083 BtLatchSet *latch;
3084 uint slot = 0;
3085
3086         while( slot++ < mgr->latchdeployed ) {
3087                 latch = mgr->latchsets + slot;
3088
3089                 if( *latch->readwr->rin & MASK )
3090                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3091                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3092
3093                 if( *latch->access->rin & MASK )
3094                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3095                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3096
3097                 if( *latch->parent->ticket != *latch->parent->serving )
3098                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3099                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3100
3101                 if( latch->pin & ~CLOCK_bit ) {
3102                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3103                         latch->pin = 0;
3104                 }
3105         }
3106 }
3107
3108 uint bt_latchaudit (BtDb *bt)
3109 {
3110 ushort idx, hashidx;
3111 uid next, page_no;
3112 BtLatchSet *latch;
3113 uint cnt = 0;
3114 BtKey *ptr;
3115
3116         if( *(ushort *)(bt->mgr->lock) )
3117                 fprintf(stderr, "Alloc page locked\n");
3118         *(ushort *)(bt->mgr->lock) = 0;
3119
3120         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3121                 latch = bt->mgr->latchsets + idx;
3122                 if( *latch->readwr->rin & MASK )
3123                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3124                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3125
3126                 if( *latch->access->rin & MASK )
3127                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3128                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3129
3130                 if( *latch->parent->ticket != *latch->parent->serving )
3131                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3132                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3133
3134                 if( latch->pin ) {
3135                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3136                         latch->pin = 0;
3137                 }
3138         }
3139
3140         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3141           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3142                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3143
3144           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3145
3146           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3147                 latch = bt->mgr->latchsets + idx;
3148                 if( latch->pin )
3149                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3150           } while( idx = latch->next );
3151         }
3152
3153         page_no = LEAF_page;
3154
3155         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3156         uid off = page_no << bt->mgr->page_bits;
3157 #ifdef unix
3158           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3159 #else
3160         DWORD amt[1];
3161
3162           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3163
3164           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3165                 return bt->err = BTERR_map;
3166
3167           if( *amt <  bt->mgr->page_size )
3168                 return bt->err = BTERR_map;
3169 #endif
3170                 if( !bt->frame->free && !bt->frame->lvl )
3171                         cnt += bt->frame->act;
3172                 page_no++;
3173         }
3174                 
3175         cnt--;  // remove stopper key
3176         fprintf(stderr, " Total keys read %d\n", cnt);
3177
3178         bt_close (bt);
3179         return 0;
3180 }
3181
3182 typedef struct {
3183         char idx;
3184         char *type;
3185         char *infile;
3186         BtMgr *mgr;
3187         int num;
3188 } ThreadArg;
3189
3190 //  standalone program to index file of keys
3191 //  then list them onto std-out
3192
3193 #ifdef unix
3194 void *index_file (void *arg)
3195 #else
3196 uint __stdcall index_file (void *arg)
3197 #endif
3198 {
3199 int line = 0, found = 0, cnt = 0, idx;
3200 uid next, page_no = LEAF_page;  // start on first page of leaves
3201 int ch, len = 0, slot, type = 0;
3202 unsigned char key[BT_maxkey];
3203 unsigned char txn[65536];
3204 ThreadArg *args = arg;
3205 BtPageSet set[1];
3206 uint nxt = 65536;
3207 BtPage page;
3208 BtKey *ptr;
3209 BtVal *val;
3210 BtDb *bt;
3211 FILE *in;
3212
3213         bt = bt_open (args->mgr);
3214         page = (BtPage)txn;
3215
3216         if( args->idx < strlen (args->type) )
3217                 ch = args->type[args->idx];
3218         else
3219                 ch = args->type[strlen(args->type) - 1];
3220
3221         switch(ch | 0x20)
3222         {
3223         case 'a':
3224                 fprintf(stderr, "started latch mgr audit\n");
3225                 cnt = bt_latchaudit (bt);
3226                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3227                 break;
3228
3229         case 'd':
3230                 type = Delete;
3231
3232         case 'p':
3233                 if( !type )
3234                         type = Unique;
3235
3236                 if( args->num )
3237                  if( type == Delete )
3238                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3239                  else
3240                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3241                 else
3242                  if( type == Delete )
3243                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3244                  else
3245                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3246
3247                 if( in = fopen (args->infile, "rb") )
3248                   while( ch = getc(in), ch != EOF )
3249                         if( ch == '\n' )
3250                         {
3251                           line++;
3252
3253                           if( !args->num ) {
3254                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3255                                   fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3256                             len = 0;
3257                                 continue;
3258                           }
3259
3260                           nxt -= len - 10;
3261                           memcpy (txn + nxt, key + 10, len - 10);
3262                           nxt -= 1;
3263                           txn[nxt] = len - 10;
3264                           nxt -= 10;
3265                           memcpy (txn + nxt, key, 10);
3266                           nxt -= 1;
3267                           txn[nxt] = 10;
3268                           slotptr(page,++cnt)->off  = nxt;
3269                           slotptr(page,cnt)->type = type;
3270                           len = 0;
3271
3272                           if( cnt < args->num )
3273                                 continue;
3274
3275                           page->cnt = cnt;
3276                           page->act = cnt;
3277                           page->min = nxt;
3278
3279                           if( bt_atomictxn (bt, page) )
3280                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3281                           nxt = sizeof(txn);
3282                           cnt = 0;
3283
3284                         }
3285                         else if( len < BT_maxkey )
3286                                 key[len++] = ch;
3287                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3288                 break;
3289
3290         case 'w':
3291                 fprintf(stderr, "started indexing for %s\n", args->infile);
3292                 if( in = fopen (args->infile, "r") )
3293                   while( ch = getc(in), ch != EOF )
3294                         if( ch == '\n' )
3295                         {
3296                           line++;
3297
3298                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3299                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3300                           len = 0;
3301                         }
3302                         else if( len < BT_maxkey )
3303                                 key[len++] = ch;
3304                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3305                 break;
3306
3307         case 'f':
3308                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3309                 if( in = fopen (args->infile, "rb") )
3310                   while( ch = getc(in), ch != EOF )
3311                         if( ch == '\n' )
3312                         {
3313                           line++;
3314                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3315                                 found++;
3316                           else if( bt->err )
3317                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3318                           len = 0;
3319                         }
3320                         else if( len < BT_maxkey )
3321                                 key[len++] = ch;
3322                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3323                 break;
3324
3325         case 's':
3326                 fprintf(stderr, "started scanning\n");
3327                 do {
3328                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3329                                 set->page = bt_mappage (bt, set->latch);
3330                         else
3331                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3332                         bt_lockpage (BtLockRead, set->latch);
3333                         next = bt_getid (set->page->right);
3334
3335                         for( slot = 0; slot++ < set->page->cnt; )
3336                          if( next || slot < set->page->cnt )
3337                           if( !slotptr(set->page, slot)->dead ) {
3338                                 ptr = keyptr(set->page, slot);
3339                                 len = ptr->len;
3340
3341                             if( slotptr(set->page, slot)->type == Duplicate )
3342                                         len -= BtId;
3343
3344                                 fwrite (ptr->key, len, 1, stdout);
3345                                 val = valptr(set->page, slot);
3346                                 fwrite (val->value, val->len, 1, stdout);
3347                                 fputc ('\n', stdout);
3348                                 cnt++;
3349                            }
3350
3351                         bt_unlockpage (BtLockRead, set->latch);
3352                         bt_unpinlatch (set->latch);
3353                 } while( page_no = next );
3354
3355                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3356                 break;
3357
3358         case 'r':
3359                 fprintf(stderr, "started reverse scan\n");
3360                 if( slot = bt_lastkey (bt) )
3361                    while( slot = bt_prevkey (bt, slot) ) {
3362                         if( slotptr(bt->cursor, slot)->dead )
3363                           continue;
3364
3365                         ptr = keyptr(bt->cursor, slot);
3366                         len = ptr->len;
3367
3368                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3369                                 len -= BtId;
3370
3371                         fwrite (ptr->key, len, 1, stdout);
3372                         val = valptr(bt->cursor, slot);
3373                         fwrite (val->value, val->len, 1, stdout);
3374                         fputc ('\n', stdout);
3375                         cnt++;
3376                   }
3377
3378                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3379                 break;
3380
3381         case 'c':
3382 #ifdef unix
3383                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3384 #endif
3385                 fprintf(stderr, "started counting\n");
3386                 page_no = LEAF_page;
3387
3388                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3389                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3390                                 break;
3391
3392                         if( !bt->frame->free && !bt->frame->lvl )
3393                                 cnt += bt->frame->act;
3394
3395                         bt->reads++;
3396                         page_no++;
3397                 }
3398                 
3399                 cnt--;  // remove stopper key
3400                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3401                 break;
3402         }
3403
3404         bt_close (bt);
3405 #ifdef unix
3406         return NULL;
3407 #else
3408         return 0;
3409 #endif
3410 }
3411
3412 typedef struct timeval timer;
3413
3414 int main (int argc, char **argv)
3415 {
3416 int idx, cnt, len, slot, err;
3417 int segsize, bits = 16;
3418 double start, stop;
3419 #ifdef unix
3420 pthread_t *threads;
3421 #else
3422 HANDLE *threads;
3423 #endif
3424 ThreadArg *args;
3425 uint poolsize = 0;
3426 float elapsed;
3427 int num = 0;
3428 char key[1];
3429 BtMgr *mgr;
3430 BtKey *ptr;
3431 BtDb *bt;
3432
3433         if( argc < 3 ) {
3434                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3435                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3436                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3437                 fprintf (stderr, "  page_bits is the page size in bits\n");
3438                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3439                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3440                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3441                 exit(0);
3442         }
3443
3444         start = getCpuTime(0);
3445
3446         if( argc > 3 )
3447                 bits = atoi(argv[3]);
3448
3449         if( argc > 4 )
3450                 poolsize = atoi(argv[4]);
3451
3452         if( !poolsize )
3453                 fprintf (stderr, "Warning: no mapped_pool\n");
3454
3455         if( argc > 5 )
3456                 num = atoi(argv[5]);
3457
3458         cnt = argc - 6;
3459 #ifdef unix
3460         threads = malloc (cnt * sizeof(pthread_t));
3461 #else
3462         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3463 #endif
3464         args = malloc (cnt * sizeof(ThreadArg));
3465
3466         mgr = bt_mgr ((argv[1]), bits, poolsize);
3467
3468         if( !mgr ) {
3469                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3470                 exit (1);
3471         }
3472
3473         //      fire off threads
3474
3475         for( idx = 0; idx < cnt; idx++ ) {
3476                 args[idx].infile = argv[idx + 6];
3477                 args[idx].type = argv[2];
3478                 args[idx].mgr = mgr;
3479                 args[idx].num = num;
3480                 args[idx].idx = idx;
3481 #ifdef unix
3482                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3483                         fprintf(stderr, "Error creating thread %d\n", err);
3484 #else
3485                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3486 #endif
3487         }
3488
3489         //      wait for termination
3490
3491 #ifdef unix
3492         for( idx = 0; idx < cnt; idx++ )
3493                 pthread_join (threads[idx], NULL);
3494 #else
3495         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3496
3497         for( idx = 0; idx < cnt; idx++ )
3498                 CloseHandle(threads[idx]);
3499
3500 #endif
3501         bt_poolaudit(mgr);
3502         bt_mgrclose (mgr);
3503
3504         elapsed = getCpuTime(0) - start;
3505         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3506         elapsed = getCpuTime(1);
3507         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3508         elapsed = getCpuTime(2);
3509         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3510 }
3511
3512 #endif  //STANDALONE