]> pd.if.org Git - btree/blob - threadskv8.c
add txn level zero page loader
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and ACID batched key-value updates
9
10 // 26 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 /*
88 There are six lock types for each node in four independent sets: 
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
95 */
96
97 typedef enum{
98         BtLockAccess = 1,
99         BtLockDelete = 2,
100         BtLockRead   = 4,
101         BtLockWrite  = 8,
102         BtLockParent = 16,
103         BtLockAtomic = 32
104 } BtLock;
105
106 //      definition for phase-fair reader/writer lock implementation
107
108 typedef struct {
109         ushort rin[1];
110         ushort rout[1];
111         ushort ticket[1];
112         ushort serving[1];
113 } RWLock;
114
115 #define PHID 0x1
116 #define PRES 0x2
117 #define MASK 0x3
118 #define RINC 0x4
119
120 //      definition for spin latch implementation
121
122 // exclusive is set for write access
123 // share is count of read accessors
124 // grant write lock when share == 0
125
126 volatile typedef struct {
127         ushort exclusive:1;
128         ushort pending:1;
129         ushort share:14;
130 } BtSpinLatch;
131
132 #define XCL 1
133 #define PEND 2
134 #define BOTH 3
135 #define SHARE 4
136
137 //  hash table entries
138
139 typedef struct {
140         volatile uint slot;             // Latch table entry at head of chain
141         BtSpinLatch latch[1];
142 } BtHashEntry;
143
144 //      latch manager table structure
145
146 typedef struct {
147         uid page_no;                    // latch set page number
148         RWLock readwr[1];               // read/write page lock
149         RWLock access[1];               // Access Intent/Page delete
150         RWLock parent[1];               // Posting of fence key in parent
151         RWLock atomic[1];               // Atomic update in progress
152         uint split;                             // right split page atomic insert
153         uint entry;                             // entry slot in latch table
154         uint next;                              // next entry in hash table chain
155         uint prev;                              // prev entry in hash table chain
156         volatile ushort pin;    // number of outstanding threads
157         ushort dirty:1;                 // page in cache is dirty
158 } BtLatchSet;
159
160 //      Define the length of the page record numbers
161
162 #define BtId 6
163
164 //      Page key slot definition.
165
166 //      Keys are marked dead, but remain on the page until
167 //      it cleanup is called. The fence key (highest key) for
168 //      a leaf page is always present, even after cleanup.
169
170 //      Slot types
171
172 //      In addition to the Unique keys that occupy slots
173 //      there are Librarian and Duplicate key
174 //      slots occupying the key slot array.
175
176 //      The Librarian slots are dead keys that
177 //      serve as filler, available to add new Unique
178 //      or Dup slots that are inserted into the B-tree.
179
180 //      The Duplicate slots have had their key bytes extended
181 //      by 6 bytes to contain a binary duplicate key uniqueifier.
182
183 typedef enum {
184         Unique,
185         Librarian,
186         Duplicate,
187         Delete
188 } BtSlotType;
189
190 typedef struct {
191         uint off:BT_maxbits;    // page offset for key start
192         uint type:3;                    // type of slot
193         uint dead:1;                    // set for deleted slot
194 } BtSlot;
195
196 //      The key structure occupies space at the upper end of
197 //      each page.  It's a length byte followed by the key
198 //      bytes.
199
200 typedef struct {
201         unsigned char len;              // this can be changed to a ushort or uint
202         unsigned char key[0];
203 } BtKey;
204
205 //      the value structure also occupies space at the upper
206 //      end of the page. Each key is immediately followed by a value.
207
208 typedef struct {
209         unsigned char len;              // this can be changed to a ushort or uint
210         unsigned char value[0];
211 } BtVal;
212
213 #define BT_maxkey       255             // maximum number of bytes in a key
214 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
215
216 //      The first part of an index page.
217 //      It is immediately followed
218 //      by the BtSlot array of keys.
219
220 //      note that this structure size
221 //      must be a multiple of 8 bytes
222 //      in order to place dups correctly.
223
224 typedef struct BtPage_ {
225         uint cnt;                                       // count of keys in page
226         uint act;                                       // count of active keys
227         uint min;                                       // next key offset
228         uint garbage;                           // page garbage in bytes
229         unsigned char bits:7;           // page size in bits
230         unsigned char free:1;           // page is on free chain
231         unsigned char lvl:7;            // level of page
232         unsigned char kill:1;           // page is being deleted
233         unsigned char right[BtId];      // page number to right
234         unsigned char left[BtId];       // page number to left
235         unsigned char filler[2];        // padding to multiple of 8
236 } *BtPage;
237
238 //  The loadpage interface object
239
240 typedef struct {
241         BtPage page;            // current page pointer
242         BtLatchSet *latch;      // current page latch set
243 } BtPageSet;
244
245 //      structure for latch manager on ALLOC_page
246
247 typedef struct {
248         struct BtPage_ alloc[1];        // next page_no in right ptr
249         unsigned long long dups[1];     // global duplicate key uniqueifier
250         unsigned char chain[BtId];      // head of free page_nos chain
251 } BtPageZero;
252
253 //      The object structure for Btree access
254
255 typedef struct {
256         uint page_size;                         // page size    
257         uint page_bits;                         // page size in bits    
258 #ifdef unix
259         int idx;
260 #else
261         HANDLE idx;
262 #endif
263         BtPageZero *pagezero;           // mapped allocation page
264         BtSpinLatch lock[1];            // allocation area lite latch
265         uint latchdeployed;                     // highest number of latch entries deployed
266         uint nlatchpage;                        // number of latch pages at BT_latch
267         uint latchtotal;                        // number of page latch entries
268         uint latchhash;                         // number of latch hash table slots
269         uint latchvictim;                       // next latch entry to examine
270         BtHashEntry *hashtable;         // the buffer pool hash table entries
271         BtLatchSet *latchsets;          // mapped latch set from buffer pool
272         unsigned char *pagepool;        // mapped to the buffer pool pages
273 #ifndef unix
274         HANDLE halloc;                          // allocation handle
275         HANDLE hpool;                           // buffer pool handle
276 #endif
277 } BtMgr;
278
279 typedef struct {
280         BtMgr *mgr;                             // buffer manager for thread
281         BtPage cursor;                  // cached frame for start/next (never mapped)
282         BtPage frame;                   // spare frame for the page split (never mapped)
283         uid cursor_page;                                // current cursor page number   
284         unsigned char *mem;                             // frame, cursor, page memory buffer
285         unsigned char key[BT_keyarray]; // last found complete key
286         int found;                              // last delete or insert was found
287         int err;                                // last error
288         int reads, writes;              // number of reads and writes from the btree
289 } BtDb;
290
291 typedef enum {
292         BTERR_ok = 0,
293         BTERR_struct,
294         BTERR_ovflw,
295         BTERR_lock,
296         BTERR_map,
297         BTERR_read,
298         BTERR_wrt,
299         BTERR_atomic
300 } BTERR;
301
302 #define CLOCK_bit 0x8000
303
304 // B-Tree functions
305 extern void bt_close (BtDb *bt);
306 extern BtDb *bt_open (BtMgr *mgr);
307 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
308 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
309 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
310 extern BtKey *bt_foundkey (BtDb *bt);
311 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
312 extern uint bt_nextkey   (BtDb *bt, uint slot);
313
314 //      manager functions
315 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
316 void bt_mgrclose (BtMgr *mgr);
317
318 //  Helper functions to return slot values
319 //      from the cursor page.
320
321 extern BtKey *bt_key (BtDb *bt, uint slot);
322 extern BtVal *bt_val (BtDb *bt, uint slot);
323
324 //  The page is allocated from low and hi ends.
325 //  The key slots are allocated from the bottom,
326 //      while the text and value of the key
327 //  are allocated from the top.  When the two
328 //  areas meet, the page is split into two.
329
330 //  A key consists of a length byte, two bytes of
331 //  index number (0 - 65535), and up to 253 bytes
332 //  of key value.
333
334 //  Associated with each key is a value byte string
335 //      containing any value desired.
336
337 //  The b-tree root is always located at page 1.
338 //      The first leaf page of level zero is always
339 //      located on page 2.
340
341 //      The b-tree pages are linked with next
342 //      pointers to facilitate enumerators,
343 //      and provide for concurrency.
344
345 //      When to root page fills, it is split in two and
346 //      the tree height is raised by a new root at page
347 //      one with two keys.
348
349 //      Deleted keys are marked with a dead bit until
350 //      page cleanup. The fence key for a leaf node is
351 //      always present
352
353 //  To achieve maximum concurrency one page is locked at a time
354 //  as the tree is traversed to find leaf key in question. The right
355 //  page numbers are used in cases where the page is being split,
356 //      or consolidated.
357
358 //  Page 0 is dedicated to lock for new page extensions,
359 //      and chains empty pages together for reuse. It also
360 //      contains the latch manager hash table.
361
362 //      The ParentModification lock on a node is obtained to serialize posting
363 //      or changing the fence key for a node.
364
365 //      Empty pages are chained together through the ALLOC page and reused.
366
367 //      Access macros to address slot and key values from the page
368 //      Page slots use 1 based indexing.
369
370 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
371 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
372 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
373
374 void bt_putid(unsigned char *dest, uid id)
375 {
376 int i = BtId;
377
378         while( i-- )
379                 dest[i] = (unsigned char)id, id >>= 8;
380 }
381
382 uid bt_getid(unsigned char *src)
383 {
384 uid id = 0;
385 int i;
386
387         for( i = 0; i < BtId; i++ )
388                 id <<= 8, id |= *src++; 
389
390         return id;
391 }
392
393 uid bt_newdup (BtDb *bt)
394 {
395 #ifdef unix
396         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
397 #else
398         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
399 #endif
400 }
401
402 //      Phase-Fair reader/writer lock implementation
403
404 void WriteLock (RWLock *lock)
405 {
406 ushort w, r, tix;
407
408 #ifdef unix
409         tix = __sync_fetch_and_add (lock->ticket, 1);
410 #else
411         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
412 #endif
413         // wait for our ticket to come up
414
415         while( tix != lock->serving[0] )
416 #ifdef unix
417                 sched_yield();
418 #else
419                 SwitchToThread ();
420 #endif
421
422         w = PRES | (tix & PHID);
423 #ifdef  unix
424         r = __sync_fetch_and_add (lock->rin, w);
425 #else
426         r = _InterlockedExchangeAdd16 (lock->rin, w);
427 #endif
428         while( r != *lock->rout )
429 #ifdef unix
430                 sched_yield();
431 #else
432                 SwitchToThread();
433 #endif
434 }
435
436 void WriteRelease (RWLock *lock)
437 {
438 #ifdef unix
439         __sync_fetch_and_and (lock->rin, ~MASK);
440 #else
441         _InterlockedAnd16 (lock->rin, ~MASK);
442 #endif
443         lock->serving[0]++;
444 }
445
446 void ReadLock (RWLock *lock)
447 {
448 ushort w;
449 #ifdef unix
450         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
451 #else
452         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
453 #endif
454         if( w )
455           while( w == (*lock->rin & MASK) )
456 #ifdef unix
457                 sched_yield ();
458 #else
459                 SwitchToThread ();
460 #endif
461 }
462
463 void ReadRelease (RWLock *lock)
464 {
465 #ifdef unix
466         __sync_fetch_and_add (lock->rout, RINC);
467 #else
468         _InterlockedExchangeAdd16 (lock->rout, RINC);
469 #endif
470 }
471
472 //      Spin Latch Manager
473
474 //      wait until write lock mode is clear
475 //      and add 1 to the share count
476
477 void bt_spinreadlock(BtSpinLatch *latch)
478 {
479 ushort prev;
480
481   do {
482 #ifdef unix
483         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
484 #else
485         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
486 #endif
487         //  see if exclusive request is granted or pending
488
489         if( !(prev & BOTH) )
490                 return;
491 #ifdef unix
492         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
493 #else
494         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
495 #endif
496 #ifdef  unix
497   } while( sched_yield(), 1 );
498 #else
499   } while( SwitchToThread(), 1 );
500 #endif
501 }
502
503 //      wait for other read and write latches to relinquish
504
505 void bt_spinwritelock(BtSpinLatch *latch)
506 {
507 ushort prev;
508
509   do {
510 #ifdef  unix
511         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
512 #else
513         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
514 #endif
515         if( !(prev & XCL) )
516           if( !(prev & ~BOTH) )
517                 return;
518           else
519 #ifdef unix
520                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
521 #else
522                 _InterlockedAnd16((ushort *)latch, ~XCL);
523 #endif
524 #ifdef  unix
525   } while( sched_yield(), 1 );
526 #else
527   } while( SwitchToThread(), 1 );
528 #endif
529 }
530
531 //      try to obtain write lock
532
533 //      return 1 if obtained,
534 //              0 otherwise
535
536 int bt_spinwritetry(BtSpinLatch *latch)
537 {
538 ushort prev;
539
540 #ifdef  unix
541         prev = __sync_fetch_and_or((ushort *)latch, XCL);
542 #else
543         prev = _InterlockedOr16((ushort *)latch, XCL);
544 #endif
545         //      take write access if all bits are clear
546
547         if( !(prev & XCL) )
548           if( !(prev & ~BOTH) )
549                 return 1;
550           else
551 #ifdef unix
552                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
553 #else
554                 _InterlockedAnd16((ushort *)latch, ~XCL);
555 #endif
556         return 0;
557 }
558
559 //      clear write mode
560
561 void bt_spinreleasewrite(BtSpinLatch *latch)
562 {
563 #ifdef unix
564         __sync_fetch_and_and((ushort *)latch, ~BOTH);
565 #else
566         _InterlockedAnd16((ushort *)latch, ~BOTH);
567 #endif
568 }
569
570 //      decrement reader count
571
572 void bt_spinreleaseread(BtSpinLatch *latch)
573 {
574 #ifdef unix
575         __sync_fetch_and_add((ushort *)latch, -SHARE);
576 #else
577         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
578 #endif
579 }
580
581 //      read page from permanent location in Btree file
582
583 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
584 {
585 off64_t off = page_no << mgr->page_bits;
586
587 #ifdef unix
588         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
589                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
590                 return BTERR_read;
591         }
592 #else
593 OVERLAPPED ovl[1];
594 uint amt[1];
595
596         memset (ovl, 0, sizeof(OVERLAPPED));
597         ovl->Offset = off;
598         ovl->OffsetHigh = off >> 32;
599
600         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
601                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
602                 return BTERR_read;
603         }
604         if( *amt <  mgr->page_size ) {
605                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
606                 return BTERR_read;
607         }
608 #endif
609         return 0;
610 }
611
612 //      write page to permanent location in Btree file
613 //      clear the dirty bit
614
615 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
616 {
617 off64_t off = page_no << mgr->page_bits;
618
619 #ifdef unix
620         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
621                 return BTERR_wrt;
622 #else
623 OVERLAPPED ovl[1];
624 uint amt[1];
625
626         memset (ovl, 0, sizeof(OVERLAPPED));
627         ovl->Offset = off;
628         ovl->OffsetHigh = off >> 32;
629
630         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
631                 return BTERR_wrt;
632
633         if( *amt <  mgr->page_size )
634                 return BTERR_wrt;
635 #endif
636         return 0;
637 }
638
639 //      link latch table entry into head of latch hash table
640
641 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
642 {
643 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
644 BtLatchSet *latch = bt->mgr->latchsets + slot;
645
646         if( latch->next = bt->mgr->hashtable[hashidx].slot )
647                 bt->mgr->latchsets[latch->next].prev = slot;
648
649         bt->mgr->hashtable[hashidx].slot = slot;
650         latch->page_no = page_no;
651         latch->entry = slot;
652         latch->split = 0;
653         latch->prev = 0;
654         latch->pin = 1;
655
656         if( loadit )
657           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
658                 return bt->err;
659           else
660                 bt->reads++;
661
662         return bt->err = 0;
663 }
664
665 //      set CLOCK bit in latch
666 //      decrement pin count
667
668 void bt_unpinlatch (BtLatchSet *latch)
669 {
670 #ifdef unix
671         if( ~latch->pin & CLOCK_bit )
672                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
673         __sync_fetch_and_add(&latch->pin, -1);
674 #else
675         if( ~latch->pin & CLOCK_bit )
676                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
677         _InterlockedDecrement16 (&latch->pin);
678 #endif
679 }
680
681 //  return the btree cached page address
682
683 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
684 {
685 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
686
687         return page;
688 }
689
690 //      find existing latchset or inspire new one
691 //      return with latchset pinned
692
693 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
694 {
695 uint hashidx = page_no % bt->mgr->latchhash;
696 BtLatchSet *latch;
697 uint slot, idx;
698 uint lvl, cnt;
699 off64_t off;
700 uint amt[1];
701 BtPage page;
702
703   //  try to find our entry
704
705   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
706
707   if( slot = bt->mgr->hashtable[hashidx].slot ) do
708   {
709         latch = bt->mgr->latchsets + slot;
710         if( page_no == latch->page_no )
711                 break;
712   } while( slot = latch->next );
713
714   //  found our entry
715   //  increment clock
716
717   if( slot ) {
718         latch = bt->mgr->latchsets + slot;
719 #ifdef unix
720         __sync_fetch_and_add(&latch->pin, 1);
721 #else
722         _InterlockedIncrement16 (&latch->pin);
723 #endif
724         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
725         return latch;
726   }
727
728         //  see if there are any unused pool entries
729 #ifdef unix
730         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
731 #else
732         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
733 #endif
734
735         if( slot < bt->mgr->latchtotal ) {
736                 latch = bt->mgr->latchsets + slot;
737                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
738                         return NULL;
739                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
740                 return latch;
741         }
742
743 #ifdef unix
744         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
745 #else
746         _InterlockedDecrement (&bt->mgr->latchdeployed);
747 #endif
748   //  find and reuse previous entry on victim
749
750   while( 1 ) {
751 #ifdef unix
752         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
753 #else
754         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
755 #endif
756         // try to get write lock on hash chain
757         //      skip entry if not obtained
758         //      or has outstanding pins
759
760         slot %= bt->mgr->latchtotal;
761
762         if( !slot )
763                 continue;
764
765         latch = bt->mgr->latchsets + slot;
766         idx = latch->page_no % bt->mgr->latchhash;
767
768         //      see we are on same chain as hashidx
769
770         if( idx == hashidx )
771                 continue;
772
773         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
774                 continue;
775
776         //  skip this slot if it is pinned
777         //      or the CLOCK bit is set
778
779         if( latch->pin ) {
780           if( latch->pin & CLOCK_bit ) {
781 #ifdef unix
782                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
783 #else
784                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
785 #endif
786           }
787           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
788           continue;
789         }
790
791         //  update permanent page area in btree from buffer pool
792
793         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
794
795         if( latch->dirty )
796           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
797                 return NULL;
798           else
799                 latch->dirty = 0, bt->writes++;
800
801         //  unlink our available slot from its hash chain
802
803         if( latch->prev )
804                 bt->mgr->latchsets[latch->prev].next = latch->next;
805         else
806                 bt->mgr->hashtable[idx].slot = latch->next;
807
808         if( latch->next )
809                 bt->mgr->latchsets[latch->next].prev = latch->prev;
810
811         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
812
813         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
814                 return NULL;
815
816         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
817         return latch;
818   }
819 }
820
821 void bt_mgrclose (BtMgr *mgr)
822 {
823 BtLatchSet *latch;
824 uint num = 0;
825 BtPage page;
826 uint slot;
827
828         //      flush dirty pool pages to the btree
829
830         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
831                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
832                 latch = mgr->latchsets + slot;
833
834                 if( latch->dirty ) {
835                         bt_writepage(mgr, page, latch->page_no);
836                         latch->dirty = 0, num++;
837                 }
838 //              madvise (page, mgr->page_size, MADV_DONTNEED);
839         }
840
841         fprintf(stderr, "%d buffer pool pages flushed\n", num);
842
843 #ifdef unix
844         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
845         munmap (mgr->pagezero, mgr->page_size);
846 #else
847         FlushViewOfFile(mgr->pagezero, 0);
848         UnmapViewOfFile(mgr->pagezero);
849         UnmapViewOfFile(mgr->hashtable);
850         CloseHandle(mgr->halloc);
851         CloseHandle(mgr->hpool);
852 #endif
853 #ifdef unix
854         close (mgr->idx);
855         free (mgr);
856 #else
857         FlushFileBuffers(mgr->idx);
858         CloseHandle(mgr->idx);
859         GlobalFree (mgr);
860 #endif
861 }
862
863 //      close and release memory
864
865 void bt_close (BtDb *bt)
866 {
867 #ifdef unix
868         if( bt->mem )
869                 free (bt->mem);
870 #else
871         if( bt->mem)
872                 VirtualFree (bt->mem, 0, MEM_RELEASE);
873 #endif
874         free (bt);
875 }
876
877 //  open/create new btree buffer manager
878
879 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
880 //              size of page pool (e.g. 262144)
881
882 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
883 {
884 uint lvl, attr, last, slot, idx;
885 uint nlatchpage, latchhash;
886 unsigned char value[BtId];
887 int flag, initit = 0;
888 BtPageZero *pagezero;
889 off64_t size;
890 uint amt[1];
891 BtMgr* mgr;
892 BtKey* key;
893 BtVal *val;
894
895         // determine sanity of page size and buffer pool
896
897         if( bits > BT_maxbits )
898                 bits = BT_maxbits;
899         else if( bits < BT_minbits )
900                 bits = BT_minbits;
901
902         if( nodemax < 16 ) {
903                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
904                 return NULL;
905         }
906
907 #ifdef unix
908         mgr = calloc (1, sizeof(BtMgr));
909
910         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
911
912         if( mgr->idx == -1 ) {
913                 fprintf (stderr, "Unable to open btree file\n");
914                 return free(mgr), NULL;
915         }
916 #else
917         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
918         attr = FILE_ATTRIBUTE_NORMAL;
919         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
920
921         if( mgr->idx == INVALID_HANDLE_VALUE )
922                 return GlobalFree(mgr), NULL;
923 #endif
924
925 #ifdef unix
926         pagezero = valloc (BT_maxpage);
927         *amt = 0;
928
929         // read minimum page size to get root info
930         //      to support raw disk partition files
931         //      check if bits == 0 on the disk.
932
933         if( size = lseek (mgr->idx, 0L, 2) )
934                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
935                         if( pagezero->alloc->bits )
936                                 bits = pagezero->alloc->bits;
937                         else
938                                 initit = 1;
939                 else
940                         return free(mgr), free(pagezero), NULL;
941         else
942                 initit = 1;
943 #else
944         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
945         size = GetFileSize(mgr->idx, amt);
946
947         if( size || *amt ) {
948                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
949                         return bt_mgrclose (mgr), NULL;
950                 bits = pagezero->alloc->bits;
951         } else
952                 initit = 1;
953 #endif
954
955         mgr->page_size = 1 << bits;
956         mgr->page_bits = bits;
957
958         //  calculate number of latch hash table entries
959
960         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
961         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
962
963         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
964         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
965         mgr->latchtotal = nodemax;
966
967         if( !initit )
968                 goto mgrlatch;
969
970         // initialize an empty b-tree with latch page, root page, page of leaves
971         // and page(s) of latches and page pool cache
972
973         memset (pagezero, 0, 1 << bits);
974         pagezero->alloc->bits = mgr->page_bits;
975         bt_putid(pagezero->alloc->right, MIN_lvl+1);
976
977         //  initialize left-most LEAF page in
978         //      alloc->left.
979
980         bt_putid (pagezero->alloc->left, LEAF_page);
981
982         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
983                 fprintf (stderr, "Unable to create btree page zero\n");
984                 return bt_mgrclose (mgr), NULL;
985         }
986
987         memset (pagezero, 0, 1 << bits);
988         pagezero->alloc->bits = mgr->page_bits;
989
990         for( lvl=MIN_lvl; lvl--; ) {
991                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
992                 key = keyptr(pagezero->alloc, 1);
993                 key->len = 2;           // create stopper key
994                 key->key[0] = 0xff;
995                 key->key[1] = 0xff;
996
997                 bt_putid(value, MIN_lvl - lvl + 1);
998                 val = valptr(pagezero->alloc, 1);
999                 val->len = lvl ? BtId : 0;
1000                 memcpy (val->value, value, val->len);
1001
1002                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1003                 pagezero->alloc->lvl = lvl;
1004                 pagezero->alloc->cnt = 1;
1005                 pagezero->alloc->act = 1;
1006
1007                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1008                         fprintf (stderr, "Unable to create btree page zero\n");
1009                         return bt_mgrclose (mgr), NULL;
1010                 }
1011         }
1012
1013 mgrlatch:
1014 #ifdef unix
1015         free (pagezero);
1016 #else
1017         VirtualFree (pagezero, 0, MEM_RELEASE);
1018 #endif
1019 #ifdef unix
1020         // mlock the pagezero page
1021
1022         flag = PROT_READ | PROT_WRITE;
1023         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1024         if( mgr->pagezero == MAP_FAILED ) {
1025                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1026                 return bt_mgrclose (mgr), NULL;
1027         }
1028         mlock (mgr->pagezero, mgr->page_size);
1029
1030         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1031         if( mgr->hashtable == MAP_FAILED ) {
1032                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1033                 return bt_mgrclose (mgr), NULL;
1034         }
1035 #else
1036         flag = PAGE_READWRITE;
1037         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1038         if( !mgr->halloc ) {
1039                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1040                 return bt_mgrclose (mgr), NULL;
1041         }
1042
1043         flag = FILE_MAP_WRITE;
1044         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1045         if( !mgr->pagezero ) {
1046                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1047                 return bt_mgrclose (mgr), NULL;
1048         }
1049
1050         flag = PAGE_READWRITE;
1051         size = (uid)mgr->nlatchpage << mgr->page_bits;
1052         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1053         if( !mgr->hpool ) {
1054                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1055                 return bt_mgrclose (mgr), NULL;
1056         }
1057
1058         flag = FILE_MAP_WRITE;
1059         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1060         if( !mgr->hashtable ) {
1061                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1062                 return bt_mgrclose (mgr), NULL;
1063         }
1064 #endif
1065
1066         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1067         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1068
1069         return mgr;
1070 }
1071
1072 //      open BTree access method
1073 //      based on buffer manager
1074
1075 BtDb *bt_open (BtMgr *mgr)
1076 {
1077 BtDb *bt = malloc (sizeof(*bt));
1078
1079         memset (bt, 0, sizeof(*bt));
1080         bt->mgr = mgr;
1081 #ifdef unix
1082         bt->mem = valloc (2 *mgr->page_size);
1083 #else
1084         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1085 #endif
1086         bt->frame = (BtPage)bt->mem;
1087         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1088         return bt;
1089 }
1090
1091 //  compare two keys, return > 0, = 0, or < 0
1092 //  =0: keys are same
1093 //  -1: key2 > key1
1094 //  +1: key2 < key1
1095 //  as the comparison value
1096
1097 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1098 {
1099 uint len1 = key1->len;
1100 int ans;
1101
1102         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1103                 return ans;
1104
1105         if( len1 > len2 )
1106                 return 1;
1107         if( len1 < len2 )
1108                 return -1;
1109
1110         return 0;
1111 }
1112
1113 // place write, read, or parent lock on requested page_no.
1114
1115 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1116 {
1117         switch( mode ) {
1118         case BtLockRead:
1119                 ReadLock (latch->readwr);
1120                 break;
1121         case BtLockWrite:
1122                 WriteLock (latch->readwr);
1123                 break;
1124         case BtLockAccess:
1125                 ReadLock (latch->access);
1126                 break;
1127         case BtLockDelete:
1128                 WriteLock (latch->access);
1129                 break;
1130         case BtLockParent:
1131                 WriteLock (latch->parent);
1132                 break;
1133         case BtLockAtomic:
1134                 WriteLock (latch->atomic);
1135                 break;
1136         }
1137 }
1138
1139 // remove write, read, or parent lock on requested page
1140
1141 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1142 {
1143         switch( mode ) {
1144         case BtLockRead:
1145                 ReadRelease (latch->readwr);
1146                 break;
1147         case BtLockWrite:
1148                 WriteRelease (latch->readwr);
1149                 break;
1150         case BtLockAccess:
1151                 ReadRelease (latch->access);
1152                 break;
1153         case BtLockDelete:
1154                 WriteRelease (latch->access);
1155                 break;
1156         case BtLockParent:
1157                 WriteRelease (latch->parent);
1158                 break;
1159         case BtLockAtomic:
1160                 WriteRelease (latch->atomic);
1161                 break;
1162         }
1163 }
1164
1165 //      allocate a new page
1166 //      return with page latched, but unlocked.
1167
1168 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1169 {
1170 uid page_no;
1171 int blk;
1172
1173         //      lock allocation page
1174
1175         bt_spinwritelock(bt->mgr->lock);
1176
1177         // use empty chain first
1178         // else allocate empty page
1179
1180         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1181                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1182                         set->page = bt_mappage (bt, set->latch);
1183                 else
1184                         return bt->err = BTERR_struct, -1;
1185
1186                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1187                 bt_spinreleasewrite(bt->mgr->lock);
1188
1189                 memcpy (set->page, contents, bt->mgr->page_size);
1190                 set->latch->dirty = 1;
1191                 return 0;
1192         }
1193
1194         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1195         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1196
1197         // unlock allocation latch
1198
1199         bt_spinreleasewrite(bt->mgr->lock);
1200
1201         //      don't load cache from btree page
1202
1203         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1204                 set->page = bt_mappage (bt, set->latch);
1205         else
1206                 return bt->err = BTERR_struct;
1207
1208         memcpy (set->page, contents, bt->mgr->page_size);
1209         set->latch->dirty = 1;
1210         return 0;
1211 }
1212
1213 //  find slot in page for given key at a given level
1214
1215 int bt_findslot (BtPage page, unsigned char *key, uint len)
1216 {
1217 uint diff, higher = page->cnt, low = 1, slot;
1218 uint good = 0;
1219
1220         //        make stopper key an infinite fence value
1221
1222         if( bt_getid (page->right) )
1223                 higher++;
1224         else
1225                 good++;
1226
1227         //      low is the lowest candidate.
1228         //  loop ends when they meet
1229
1230         //  higher is already
1231         //      tested as .ge. the passed key.
1232
1233         while( diff = higher - low ) {
1234                 slot = low + ( diff >> 1 );
1235                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1236                         low = slot + 1;
1237                 else
1238                         higher = slot, good++;
1239         }
1240
1241         //      return zero if key is on right link page
1242
1243         return good ? higher : 0;
1244 }
1245
1246 //  find and load page at given level for given key
1247 //      leave page rd or wr locked as requested
1248
1249 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1250 {
1251 uid page_no = ROOT_page, prevpage = 0;
1252 uint drill = 0xff, slot;
1253 BtLatchSet *prevlatch;
1254 uint mode, prevmode;
1255
1256   //  start at root of btree and drill down
1257
1258   do {
1259         // determine lock mode of drill level
1260         mode = (drill == lvl) ? lock : BtLockRead; 
1261
1262         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1263           return 0;
1264
1265         // obtain access lock using lock chaining with Access mode
1266
1267         if( page_no > ROOT_page )
1268           bt_lockpage(BtLockAccess, set->latch);
1269
1270         set->page = bt_mappage (bt, set->latch);
1271
1272         //      release & unpin parent or left sibling page
1273
1274         if( prevpage ) {
1275           bt_unlockpage(prevmode, prevlatch);
1276           bt_unpinlatch (prevlatch);
1277           prevpage = 0;
1278         }
1279
1280         // obtain mode lock using lock chaining through AccessLock
1281
1282         bt_lockpage(mode, set->latch);
1283
1284         if( set->page->free )
1285                 return bt->err = BTERR_struct, 0;
1286
1287         if( page_no > ROOT_page )
1288           bt_unlockpage(BtLockAccess, set->latch);
1289
1290         // re-read and re-lock root after determining actual level of root
1291
1292         if( set->page->lvl != drill) {
1293                 if( set->latch->page_no != ROOT_page )
1294                         return bt->err = BTERR_struct, 0;
1295                         
1296                 drill = set->page->lvl;
1297
1298                 if( lock != BtLockRead && drill == lvl ) {
1299                   bt_unlockpage(mode, set->latch);
1300                   bt_unpinlatch (set->latch);
1301                   continue;
1302                 }
1303         }
1304
1305         prevpage = set->latch->page_no;
1306         prevlatch = set->latch;
1307         prevmode = mode;
1308
1309         //  find key on page at this level
1310         //  and descend to requested level
1311
1312         if( set->page->kill )
1313           goto slideright;
1314
1315         if( slot = bt_findslot (set->page, key, len) ) {
1316           if( drill == lvl )
1317                 return slot;
1318
1319           // find next non-dead slot -- the fence key if nothing else
1320
1321           while( slotptr(set->page, slot)->dead )
1322                 if( slot++ < set->page->cnt )
1323                   continue;
1324                 else
1325                   return bt->err = BTERR_struct, 0;
1326
1327           page_no = bt_getid(valptr(set->page, slot)->value);
1328           drill--;
1329           continue;
1330         }
1331
1332         //  or slide right into next page
1333
1334 slideright:
1335         page_no = bt_getid(set->page->right);
1336
1337   } while( page_no );
1338
1339   // return error on end of right chain
1340
1341   bt->err = BTERR_struct;
1342   return 0;     // return error
1343 }
1344
1345 //      return page to free list
1346 //      page must be delete & write locked
1347
1348 void bt_freepage (BtDb *bt, BtPageSet *set)
1349 {
1350         //      lock allocation page
1351
1352         bt_spinwritelock (bt->mgr->lock);
1353
1354         //      store chain
1355
1356         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1357         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1358         set->latch->dirty = 1;
1359         set->page->free = 1;
1360
1361         // unlock released page
1362
1363         bt_unlockpage (BtLockDelete, set->latch);
1364         bt_unlockpage (BtLockWrite, set->latch);
1365
1366         // unlock allocation page
1367
1368         bt_spinreleasewrite (bt->mgr->lock);
1369 }
1370
1371 //      a fence key was deleted from a page
1372 //      push new fence value upwards
1373
1374 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1375 {
1376 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1377 unsigned char value[BtId];
1378 BtKey* ptr;
1379 uint idx;
1380
1381         //      remove the old fence value
1382
1383         ptr = keyptr(set->page, set->page->cnt);
1384         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1385         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1386         set->latch->dirty = 1;
1387
1388         //  cache new fence value
1389
1390         ptr = keyptr(set->page, set->page->cnt);
1391         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1392
1393         bt_lockpage (BtLockParent, set->latch);
1394         bt_unlockpage (BtLockWrite, set->latch);
1395
1396         //      insert new (now smaller) fence key
1397
1398         bt_putid (value, set->latch->page_no);
1399         ptr = (BtKey*)leftkey;
1400
1401         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1402           return bt->err;
1403
1404         //      now delete old fence key
1405
1406         ptr = (BtKey*)rightkey;
1407
1408         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1409                 return bt->err;
1410
1411         bt_unlockpage (BtLockParent, set->latch);
1412         bt_unpinlatch(set->latch);
1413         return 0;
1414 }
1415
1416 //      root has a single child
1417 //      collapse a level from the tree
1418
1419 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1420 {
1421 BtPageSet child[1];
1422 uid page_no;
1423 uint idx;
1424
1425   // find the child entry and promote as new root contents
1426
1427   do {
1428         for( idx = 0; idx++ < root->page->cnt; )
1429           if( !slotptr(root->page, idx)->dead )
1430                 break;
1431
1432         page_no = bt_getid (valptr(root->page, idx)->value);
1433
1434         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1435                 child->page = bt_mappage (bt, child->latch);
1436         else
1437                 return bt->err;
1438
1439         bt_lockpage (BtLockDelete, child->latch);
1440         bt_lockpage (BtLockWrite, child->latch);
1441
1442         memcpy (root->page, child->page, bt->mgr->page_size);
1443         root->latch->dirty = 1;
1444
1445         bt_freepage (bt, child);
1446         bt_unpinlatch (child->latch);
1447
1448   } while( root->page->lvl > 1 && root->page->act == 1 );
1449
1450   bt_unlockpage (BtLockWrite, root->latch);
1451   bt_unpinlatch (root->latch);
1452   return 0;
1453 }
1454
1455 //  delete a page and manage keys
1456 //  call with page writelocked
1457 //      returns with page unpinned
1458
1459 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1460 {
1461 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1462 unsigned char value[BtId];
1463 uint lvl = set->page->lvl;
1464 BtPageSet right[1];
1465 uid page_no;
1466 BtKey *ptr;
1467
1468         //      cache copy of fence key
1469         //      to post in parent
1470
1471         ptr = keyptr(set->page, set->page->cnt);
1472         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1473
1474         //      obtain lock on right page
1475
1476         page_no = bt_getid(set->page->right);
1477
1478         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1479                 right->page = bt_mappage (bt, right->latch);
1480         else
1481                 return 0;
1482
1483         bt_lockpage (BtLockWrite, right->latch);
1484
1485         // cache copy of key to update
1486
1487         ptr = keyptr(right->page, right->page->cnt);
1488         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1489
1490         if( right->page->kill )
1491                 return bt->err = BTERR_struct;
1492
1493         // pull contents of right peer into our empty page
1494
1495         memcpy (set->page, right->page, bt->mgr->page_size);
1496         set->latch->dirty = 1;
1497
1498         // mark right page deleted and point it to left page
1499         //      until we can post parent updates that remove access
1500         //      to the deleted page.
1501
1502         bt_putid (right->page->right, set->latch->page_no);
1503         right->latch->dirty = 1;
1504         right->page->kill = 1;
1505
1506         bt_lockpage (BtLockParent, right->latch);
1507         bt_unlockpage (BtLockWrite, right->latch);
1508
1509         bt_lockpage (BtLockParent, set->latch);
1510         bt_unlockpage (BtLockWrite, set->latch);
1511
1512         // redirect higher key directly to our new node contents
1513
1514         bt_putid (value, set->latch->page_no);
1515         ptr = (BtKey*)higherfence;
1516
1517         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1518           return bt->err;
1519
1520         //      delete old lower key to our node
1521
1522         ptr = (BtKey*)lowerfence;
1523
1524         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1525           return bt->err;
1526
1527         //      obtain delete and write locks to right node
1528
1529         bt_unlockpage (BtLockParent, right->latch);
1530         bt_lockpage (BtLockDelete, right->latch);
1531         bt_lockpage (BtLockWrite, right->latch);
1532         bt_freepage (bt, right);
1533         bt_unpinlatch (right->latch);
1534
1535         bt_unlockpage (BtLockParent, set->latch);
1536         bt_unpinlatch (set->latch);
1537         bt->found = 1;
1538         return 0;
1539 }
1540
1541 //  find and delete key on page by marking delete flag bit
1542 //  if page becomes empty, delete it from the btree
1543
1544 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1545 {
1546 uint slot, idx, found, fence;
1547 BtPageSet set[1];
1548 BtKey *ptr;
1549 BtVal *val;
1550
1551         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1552                 ptr = keyptr(set->page, slot);
1553         else
1554                 return bt->err;
1555
1556         // if librarian slot, advance to real slot
1557
1558         if( slotptr(set->page, slot)->type == Librarian )
1559                 ptr = keyptr(set->page, ++slot);
1560
1561         fence = slot == set->page->cnt;
1562
1563         // if key is found delete it, otherwise ignore request
1564
1565         if( found = !keycmp (ptr, key, len) )
1566           if( found = slotptr(set->page, slot)->dead == 0 ) {
1567                 val = valptr(set->page,slot);
1568                 slotptr(set->page, slot)->dead = 1;
1569                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1570                 set->page->act--;
1571
1572                 // collapse empty slots beneath the fence
1573
1574                 while( idx = set->page->cnt - 1 )
1575                   if( slotptr(set->page, idx)->dead ) {
1576                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1577                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1578                   } else
1579                         break;
1580           }
1581
1582         //      did we delete a fence key in an upper level?
1583
1584         if( found && lvl && set->page->act && fence )
1585           if( bt_fixfence (bt, set, lvl) )
1586                 return bt->err;
1587           else
1588                 return bt->found = found, 0;
1589
1590         //      do we need to collapse root?
1591
1592         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1593           if( bt_collapseroot (bt, set) )
1594                 return bt->err;
1595           else
1596                 return bt->found = found, 0;
1597
1598         //      delete empty page
1599
1600         if( !set->page->act )
1601                 return bt_deletepage (bt, set);
1602
1603         set->latch->dirty = 1;
1604         bt_unlockpage(BtLockWrite, set->latch);
1605         bt_unpinlatch (set->latch);
1606         return bt->found = found, 0;
1607 }
1608
1609 BtKey *bt_foundkey (BtDb *bt)
1610 {
1611         return (BtKey*)(bt->key);
1612 }
1613
1614 //      advance to next slot
1615
1616 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1617 {
1618 BtLatchSet *prevlatch;
1619 uid page_no;
1620
1621         if( slot < set->page->cnt )
1622                 return slot + 1;
1623
1624         prevlatch = set->latch;
1625
1626         if( page_no = bt_getid(set->page->right) )
1627           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1628                 set->page = bt_mappage (bt, set->latch);
1629           else
1630                 return 0;
1631         else
1632           return bt->err = BTERR_struct, 0;
1633
1634         // obtain access lock using lock chaining with Access mode
1635
1636         bt_lockpage(BtLockAccess, set->latch);
1637
1638         bt_unlockpage(BtLockRead, prevlatch);
1639         bt_unpinlatch (prevlatch);
1640
1641         bt_lockpage(BtLockRead, set->latch);
1642         bt_unlockpage(BtLockAccess, set->latch);
1643         return 1;
1644 }
1645
1646 //      find unique key or first duplicate key in
1647 //      leaf level and return number of value bytes
1648 //      or (-1) if not found.  Setup key for bt_foundkey
1649
1650 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1651 {
1652 BtPageSet set[1];
1653 uint len, slot;
1654 int ret = -1;
1655 BtKey *ptr;
1656 BtVal *val;
1657
1658   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1659    do {
1660         ptr = keyptr(set->page, slot);
1661
1662         //      skip librarian slot place holder
1663
1664         if( slotptr(set->page, slot)->type == Librarian )
1665                 ptr = keyptr(set->page, ++slot);
1666
1667         //      return actual key found
1668
1669         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1670         len = ptr->len;
1671
1672         if( slotptr(set->page, slot)->type == Duplicate )
1673                 len -= BtId;
1674
1675         //      not there if we reach the stopper key
1676
1677         if( slot == set->page->cnt )
1678           if( !bt_getid (set->page->right) )
1679                 break;
1680
1681         // if key exists, return >= 0 value bytes copied
1682         //      otherwise return (-1)
1683
1684         if( slotptr(set->page, slot)->dead )
1685                 continue;
1686
1687         if( keylen == len )
1688           if( !memcmp (ptr->key, key, len) ) {
1689                 val = valptr (set->page,slot);
1690                 if( valmax > val->len )
1691                         valmax = val->len;
1692                 memcpy (value, val->value, valmax);
1693                 ret = valmax;
1694           }
1695
1696         break;
1697
1698    } while( slot = bt_findnext (bt, set, slot) );
1699
1700   bt_unlockpage (BtLockRead, set->latch);
1701   bt_unpinlatch (set->latch);
1702   return ret;
1703 }
1704
1705 //      check page for space available,
1706 //      clean if necessary and return
1707 //      0 - page needs splitting
1708 //      >0  new slot value
1709
1710 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1711 {
1712 uint nxt = bt->mgr->page_size;
1713 BtPage page = set->page;
1714 uint cnt = 0, idx = 0;
1715 uint max = page->cnt;
1716 uint newslot = max;
1717 BtKey *key;
1718 BtVal *val;
1719
1720         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1721                 return slot;
1722
1723         //      skip cleanup and proceed to split
1724         //      if there's not enough garbage
1725         //      to bother with.
1726
1727         if( page->garbage < nxt / 5 )
1728                 return 0;
1729
1730         memcpy (bt->frame, page, bt->mgr->page_size);
1731
1732         // skip page info and set rest of page to zero
1733
1734         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1735         set->latch->dirty = 1;
1736         page->garbage = 0;
1737         page->act = 0;
1738
1739         // clean up page first by
1740         // removing deleted keys
1741
1742         while( cnt++ < max ) {
1743                 if( cnt == slot )
1744                         newslot = idx + 2;
1745                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1746                         continue;
1747
1748                 // copy the value across
1749
1750                 val = valptr(bt->frame, cnt);
1751                 nxt -= val->len + sizeof(BtVal);
1752                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1753
1754                 // copy the key across
1755
1756                 key = keyptr(bt->frame, cnt);
1757                 nxt -= key->len + sizeof(BtKey);
1758                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1759
1760                 // make a librarian slot
1761
1762                 if( idx ) {
1763                         slotptr(page, ++idx)->off = nxt;
1764                         slotptr(page, idx)->type = Librarian;
1765                         slotptr(page, idx)->dead = 1;
1766                 }
1767
1768                 // set up the slot
1769
1770                 slotptr(page, ++idx)->off = nxt;
1771                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1772
1773                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1774                         page->act++;
1775         }
1776
1777         page->min = nxt;
1778         page->cnt = idx;
1779
1780         //      see if page has enough space now, or does it need splitting?
1781
1782         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1783                 return newslot;
1784
1785         return 0;
1786 }
1787
1788 // split the root and raise the height of the btree
1789
1790 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1791 {  
1792 unsigned char leftkey[BT_keyarray];
1793 uint nxt = bt->mgr->page_size;
1794 unsigned char value[BtId];
1795 BtPageSet left[1];
1796 uid left_page_no;
1797 BtKey *ptr;
1798 BtVal *val;
1799
1800         //      save left page fence key for new root
1801
1802         ptr = keyptr(root->page, root->page->cnt);
1803         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1804
1805         //  Obtain an empty page to use, and copy the current
1806         //  root contents into it, e.g. lower keys
1807
1808         if( bt_newpage(bt, left, root->page) )
1809                 return bt->err;
1810
1811         left_page_no = left->latch->page_no;
1812         bt_unpinlatch (left->latch);
1813
1814         // preserve the page info at the bottom
1815         // of higher keys and set rest to zero
1816
1817         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1818
1819         // insert stopper key at top of newroot page
1820         // and increase the root height
1821
1822         nxt -= BtId + sizeof(BtVal);
1823         bt_putid (value, right->page_no);
1824         val = (BtVal *)((unsigned char *)root->page + nxt);
1825         memcpy (val->value, value, BtId);
1826         val->len = BtId;
1827
1828         nxt -= 2 + sizeof(BtKey);
1829         slotptr(root->page, 2)->off = nxt;
1830         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1831         ptr->len = 2;
1832         ptr->key[0] = 0xff;
1833         ptr->key[1] = 0xff;
1834
1835         // insert lower keys page fence key on newroot page as first key
1836
1837         nxt -= BtId + sizeof(BtVal);
1838         bt_putid (value, left_page_no);
1839         val = (BtVal *)((unsigned char *)root->page + nxt);
1840         memcpy (val->value, value, BtId);
1841         val->len = BtId;
1842
1843         ptr = (BtKey *)leftkey;
1844         nxt -= ptr->len + sizeof(BtKey);
1845         slotptr(root->page, 1)->off = nxt;
1846         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1847         
1848         bt_putid(root->page->right, 0);
1849         root->page->min = nxt;          // reset lowest used offset and key count
1850         root->page->cnt = 2;
1851         root->page->act = 2;
1852         root->page->lvl++;
1853
1854         // release and unpin root pages
1855
1856         bt_unlockpage(BtLockWrite, root->latch);
1857         bt_unpinlatch (root->latch);
1858
1859         bt_unpinlatch (right);
1860         return 0;
1861 }
1862
1863 //  split already locked full node
1864 //      leave it locked.
1865 //      return pool entry for new right
1866 //      page, unlocked
1867
1868 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1869 {
1870 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1871 uint lvl = set->page->lvl;
1872 BtPageSet right[1];
1873 BtKey *key, *ptr;
1874 BtVal *val, *src;
1875 uid right2;
1876 uint prev;
1877
1878         //  split higher half of keys to bt->frame
1879
1880         memset (bt->frame, 0, bt->mgr->page_size);
1881         max = set->page->cnt;
1882         cnt = max / 2;
1883         idx = 0;
1884
1885         while( cnt++ < max ) {
1886                 if( slotptr(set->page, cnt)->dead && cnt < max )
1887                         continue;
1888                 src = valptr(set->page, cnt);
1889                 nxt -= src->len + sizeof(BtVal);
1890                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1891
1892                 key = keyptr(set->page, cnt);
1893                 nxt -= key->len + sizeof(BtKey);
1894                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1895                 memcpy (ptr, key, key->len + sizeof(BtKey));
1896
1897                 //      add librarian slot
1898
1899                 if( idx ) {
1900                         slotptr(bt->frame, ++idx)->off = nxt;
1901                         slotptr(bt->frame, idx)->type = Librarian;
1902                         slotptr(bt->frame, idx)->dead = 1;
1903                 }
1904
1905                 //  add actual slot
1906
1907                 slotptr(bt->frame, ++idx)->off = nxt;
1908                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1909
1910                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1911                         bt->frame->act++;
1912         }
1913
1914         bt->frame->bits = bt->mgr->page_bits;
1915         bt->frame->min = nxt;
1916         bt->frame->cnt = idx;
1917         bt->frame->lvl = lvl;
1918
1919         // link right node
1920
1921         if( set->latch->page_no > ROOT_page )
1922                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1923
1924         //      get new free page and write higher keys to it.
1925
1926         if( bt_newpage(bt, right, bt->frame) )
1927                 return 0;
1928
1929         memcpy (bt->frame, set->page, bt->mgr->page_size);
1930         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1931         set->latch->dirty = 1;
1932
1933         nxt = bt->mgr->page_size;
1934         set->page->garbage = 0;
1935         set->page->act = 0;
1936         max /= 2;
1937         cnt = 0;
1938         idx = 0;
1939
1940         if( slotptr(bt->frame, max)->type == Librarian )
1941                 max--;
1942
1943         //  assemble page of smaller keys
1944
1945         while( cnt++ < max ) {
1946                 if( slotptr(bt->frame, cnt)->dead )
1947                         continue;
1948                 val = valptr(bt->frame, cnt);
1949                 nxt -= val->len + sizeof(BtVal);
1950                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1951
1952                 key = keyptr(bt->frame, cnt);
1953                 nxt -= key->len + sizeof(BtKey);
1954                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1955
1956                 //      add librarian slot
1957
1958                 if( idx ) {
1959                         slotptr(set->page, ++idx)->off = nxt;
1960                         slotptr(set->page, idx)->type = Librarian;
1961                         slotptr(set->page, idx)->dead = 1;
1962                 }
1963
1964                 //      add actual slot
1965
1966                 slotptr(set->page, ++idx)->off = nxt;
1967                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1968                 set->page->act++;
1969         }
1970
1971         bt_putid(set->page->right, right->latch->page_no);
1972         set->page->min = nxt;
1973         set->page->cnt = idx;
1974
1975         return right->latch->entry;
1976 }
1977
1978 //      fix keys for newly split page
1979 //      call with page locked, return
1980 //      unlocked
1981
1982 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
1983 {
1984 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1985 unsigned char value[BtId];
1986 uint lvl = set->page->lvl;
1987 BtPage page;
1988 BtKey *ptr;
1989
1990         // if current page is the root page, split it
1991
1992         if( set->latch->page_no == ROOT_page )
1993                 return bt_splitroot (bt, set, right);
1994
1995         ptr = keyptr(set->page, set->page->cnt);
1996         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1997
1998         page = bt_mappage (bt, right);
1999
2000         ptr = keyptr(page, page->cnt);
2001         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2002
2003         // insert new fences in their parent pages
2004
2005         bt_lockpage (BtLockParent, right);
2006
2007         bt_lockpage (BtLockParent, set->latch);
2008         bt_unlockpage (BtLockWrite, set->latch);
2009
2010         // insert new fence for reformulated left block of smaller keys
2011
2012         bt_putid (value, set->latch->page_no);
2013         ptr = (BtKey *)leftkey;
2014
2015         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2016                 return bt->err;
2017
2018         // switch fence for right block of larger keys to new right page
2019
2020         bt_putid (value, right->page_no);
2021         ptr = (BtKey *)rightkey;
2022
2023         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2024                 return bt->err;
2025
2026         bt_unlockpage (BtLockParent, set->latch);
2027         bt_unpinlatch (set->latch);
2028
2029         bt_unlockpage (BtLockParent, right);
2030         bt_unpinlatch (right);
2031         return 0;
2032 }
2033
2034 //      install new key and value onto page
2035 //      page must already be checked for
2036 //      adequate space
2037
2038 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2039 {
2040 uint idx, librarian;
2041 BtSlot *node;
2042 BtKey *ptr;
2043 BtVal *val;
2044
2045         //      if found slot > desired slot and previous slot
2046         //      is a librarian slot, use it
2047
2048         if( slot > 1 )
2049           if( slotptr(set->page, slot-1)->type == Librarian )
2050                 slot--;
2051
2052         // copy value onto page
2053
2054         set->page->min -= vallen + sizeof(BtVal);
2055         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2056         memcpy (val->value, value, vallen);
2057         val->len = vallen;
2058
2059         // copy key onto page
2060
2061         set->page->min -= keylen + sizeof(BtKey);
2062         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2063         memcpy (ptr->key, key, keylen);
2064         ptr->len = keylen;
2065         
2066         //      find first empty slot
2067
2068         for( idx = slot; idx < set->page->cnt; idx++ )
2069           if( slotptr(set->page, idx)->dead )
2070                 break;
2071
2072         // now insert key into array before slot
2073
2074         if( idx == set->page->cnt )
2075                 idx += 2, set->page->cnt += 2, librarian = 2;
2076         else
2077                 librarian = 1;
2078
2079         set->latch->dirty = 1;
2080         set->page->act++;
2081
2082         while( idx > slot + librarian - 1 )
2083                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2084
2085         //      add librarian slot
2086
2087         if( librarian > 1 ) {
2088                 node = slotptr(set->page, slot++);
2089                 node->off = set->page->min;
2090                 node->type = Librarian;
2091                 node->dead = 1;
2092         }
2093
2094         //      fill in new slot
2095
2096         node = slotptr(set->page, slot);
2097         node->off = set->page->min;
2098         node->type = type;
2099         node->dead = 0;
2100
2101         if( release ) {
2102                 bt_unlockpage (BtLockWrite, set->latch);
2103                 bt_unpinlatch (set->latch);
2104         }
2105
2106         return 0;
2107 }
2108
2109 //  Insert new key into the btree at given level.
2110 //      either add a new key or update/add an existing one
2111
2112 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2113 {
2114 unsigned char newkey[BT_keyarray];
2115 uint slot, idx, len, entry;
2116 BtPageSet set[1];
2117 BtKey *ptr, *ins;
2118 uid sequence;
2119 BtVal *val;
2120 uint type;
2121
2122   // set up the key we're working on
2123
2124   ins = (BtKey*)newkey;
2125   memcpy (ins->key, key, keylen);
2126   ins->len = keylen;
2127
2128   // is this a non-unique index value?
2129
2130   if( unique )
2131         type = Unique;
2132   else {
2133         type = Duplicate;
2134         sequence = bt_newdup (bt);
2135         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2136         ins->len += BtId;
2137   }
2138
2139   while( 1 ) { // find the page and slot for the current key
2140         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2141                 ptr = keyptr(set->page, slot);
2142         else {
2143                 if( !bt->err )
2144                         bt->err = BTERR_ovflw;
2145                 return bt->err;
2146         }
2147
2148         // if librarian slot == found slot, advance to real slot
2149
2150         if( slotptr(set->page, slot)->type == Librarian )
2151           if( !keycmp (ptr, key, keylen) )
2152                 ptr = keyptr(set->page, ++slot);
2153
2154         len = ptr->len;
2155
2156         if( slotptr(set->page, slot)->type == Duplicate )
2157                 len -= BtId;
2158
2159         //  if inserting a duplicate key or unique key
2160         //      check for adequate space on the page
2161         //      and insert the new key before slot.
2162
2163         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2164           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2165                 if( !(entry = bt_splitpage (bt, set)) )
2166                   return bt->err;
2167                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2168                   return bt->err;
2169                 else
2170                   continue;
2171
2172           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2173         }
2174
2175         // if key already exists, update value and return
2176
2177         val = valptr(set->page, slot);
2178
2179         if( val->len >= vallen ) {
2180                 if( slotptr(set->page, slot)->dead )
2181                         set->page->act++;
2182                 set->page->garbage += val->len - vallen;
2183                 set->latch->dirty = 1;
2184                 slotptr(set->page, slot)->dead = 0;
2185                 val->len = vallen;
2186                 memcpy (val->value, value, vallen);
2187                 bt_unlockpage(BtLockWrite, set->latch);
2188                 bt_unpinlatch (set->latch);
2189                 return 0;
2190         }
2191
2192         //      new update value doesn't fit in existing value area
2193
2194         if( !slotptr(set->page, slot)->dead )
2195                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2196         else {
2197                 slotptr(set->page, slot)->dead = 0;
2198                 set->page->act++;
2199         }
2200
2201         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2202           if( !(entry = bt_splitpage (bt, set)) )
2203                 return bt->err;
2204           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2205                 return bt->err;
2206           else
2207                 continue;
2208
2209         set->page->min -= vallen + sizeof(BtVal);
2210         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2211         memcpy (val->value, value, vallen);
2212         val->len = vallen;
2213
2214         set->latch->dirty = 1;
2215         set->page->min -= keylen + sizeof(BtKey);
2216         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2217         memcpy (ptr->key, key, keylen);
2218         ptr->len = keylen;
2219         
2220         slotptr(set->page, slot)->off = set->page->min;
2221         bt_unlockpage(BtLockWrite, set->latch);
2222         bt_unpinlatch (set->latch);
2223         return 0;
2224   }
2225   return 0;
2226 }
2227
2228 typedef struct {
2229         uint entry;                     // latch table entry number
2230         uint slot:31;           // page slot number
2231         uint reuse:1;           // reused previous page
2232 } AtomicMod;
2233
2234 typedef struct {
2235         uid page_no;            // page number for split leaf
2236         void *next;                     // next key to insert
2237         uint entry:30;          // latch table entry number
2238         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2239         unsigned char leafkey[BT_keyarray];
2240 } AtomicKey;
2241
2242 //  find and load leaf page for given key
2243 //      leave page Atomic locked and Read locked.
2244
2245 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2246 {
2247 BtLatchSet *prevlatch;
2248 uid page_no;
2249 uint slot;
2250
2251   //  find level zero page
2252
2253   if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2254         return 0;
2255
2256   // find next non-dead entry on this page
2257   //    it will be the fence key if nothing else
2258
2259   while( slotptr(set->page, slot)->dead )
2260         if( slot++ < set->page->cnt )
2261           continue;
2262         else
2263           return bt->err = BTERR_struct, 0;
2264
2265   page_no = bt_getid(valptr(set->page, slot)->value);
2266   prevlatch = set->latch;
2267
2268   while( page_no ) {
2269         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2270           set->page = bt_mappage (bt, set->latch);
2271         else
2272           return 0;
2273
2274         if( set->page->free || set->page->lvl )
2275                 return bt->err = BTERR_struct, 0;
2276
2277         // obtain read lock using lock chaining with Access mode
2278         //      release & unpin parent/left sibling page
2279
2280         bt_lockpage(BtLockAccess, set->latch);
2281
2282         bt_unlockpage(BtLockRead, prevlatch);
2283         bt_unpinlatch (prevlatch);
2284
2285         bt_lockpage(BtLockRead, set->latch);
2286         bt_unlockpage(BtLockAccess, set->latch);
2287
2288         //  find key on page at this level
2289         //  and descend to requested level
2290
2291         if( !set->page->kill )
2292          if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2293           bt_unlockpage(BtLockRead, set->latch);
2294           bt_lockpage(BtLockAtomic, set->latch);
2295           bt_lockpage(BtLockAccess, set->latch);
2296           bt_lockpage(BtLockRead, set->latch);
2297           bt_unlockpage(BtLockAccess, set->latch);
2298
2299           if( slot = bt_findslot (set->page, key, len) )
2300                 return slot;
2301
2302           bt_unlockpage(BtLockAtomic, set->latch);
2303           }
2304
2305         //  slide right into next page
2306
2307         page_no = bt_getid(set->page->right);
2308         prevlatch = set->latch;
2309   }
2310
2311   // return error on end of right chain
2312
2313   bt->err = BTERR_struct;
2314   return 0;     // return error
2315 }
2316
2317 //      determine actual page where key is located
2318 //  return slot number
2319
2320 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2321 {
2322 BtKey *key = keyptr(source,src);
2323 uint slot = locks[src].slot;
2324 uint entry;
2325
2326         if( src > 1 && locks[src].reuse )
2327           entry = locks[src-1].entry, slot = 0;
2328         else
2329           entry = locks[src].entry;
2330
2331         if( slot ) {
2332                 set->latch = bt->mgr->latchsets + entry;
2333                 set->page = bt_mappage (bt, set->latch);
2334                 return slot;
2335         }
2336
2337         //      is locks->reuse set?
2338         //      if so, find where our key
2339         //      is located on previous page or split pages
2340
2341         do {
2342                 set->latch = bt->mgr->latchsets + entry;
2343                 set->page = bt_mappage (bt, set->latch);
2344
2345                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2346                   if( locks[src].reuse )
2347                         locks[src].entry = entry;
2348                   return slot;
2349                 }
2350         } while( entry = set->latch->split );
2351
2352         bt->err = BTERR_atomic;
2353         return 0;
2354 }
2355
2356 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2357 {
2358 BtKey *key = keyptr(source, src);
2359 BtVal *val = valptr(source, src);
2360 BtLatchSet *latch;
2361 BtPageSet set[1];
2362 uint entry;
2363
2364   while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2365         if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2366           return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2367
2368         if( entry = bt_splitpage (bt, set) )
2369           latch = bt->mgr->latchsets + entry;
2370         else
2371           return bt->err;
2372
2373         //      splice right page into split chain
2374         //      and WriteLock it.
2375
2376         latch->split = set->latch->split;
2377         set->latch->split = entry;
2378         bt_lockpage(BtLockWrite, latch);
2379   }
2380
2381   return bt->err = BTERR_atomic;
2382 }
2383
2384 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2385 {
2386 BtKey *key = keyptr(source, src);
2387 uint idx, entry, slot;
2388 BtPageSet set[1];
2389 BtKey *ptr;
2390 BtVal *val;
2391
2392         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2393                 slotptr(set->page, slot)->dead = 1;
2394         else
2395                 return bt->err = BTERR_struct;
2396
2397         ptr = keyptr(set->page, slot);
2398         val = valptr(set->page, slot);
2399
2400         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2401         set->latch->dirty = 1;
2402         set->page->act--;
2403         return 0;
2404 }
2405
2406 //      atomic modification of a batch of keys.
2407
2408 //      return -1 if BTERR is set
2409 //      otherwise return slot number
2410 //      causing the key constraint violation
2411 //      or zero on successful completion.
2412
2413 int bt_atomictxn (BtDb *bt, BtPage source)
2414 {
2415 uint src, idx, slot, samepage, entry;
2416 AtomicKey *head, *tail, *leaf;
2417 BtPageSet set[1], prev[1];
2418 unsigned char value[BtId];
2419 BtKey *key, *ptr, *key2;
2420 BtLatchSet *latch;
2421 AtomicMod *locks;
2422 int result = 0;
2423 BtSlot temp[1];
2424 BtPage page;
2425 BtVal *val;
2426 uid right;
2427 int type;
2428
2429   locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2430   head = NULL;
2431   tail = NULL;
2432
2433   // stable sort the list of keys into order to
2434   //    prevent deadlocks between threads.
2435
2436   for( src = 1; src++ < source->cnt; ) {
2437         *temp = *slotptr(source,src);
2438         key = keyptr (source,src);
2439
2440         for( idx = src; --idx; ) {
2441           key2 = keyptr (source,idx);
2442           if( keycmp (key, key2->key, key2->len) < 0 ) {
2443                 *slotptr(source,idx+1) = *slotptr(source,idx);
2444                 *slotptr(source,idx) = *temp;
2445           } else
2446                 break;
2447         }
2448   }
2449
2450   // Load the leaf page for each key
2451   // group same page references with reuse bit
2452   // and determine any constraint violations
2453
2454   for( src = 0; src++ < source->cnt; ) {
2455         key = keyptr(source, src);
2456         slot = 0;
2457
2458         // first determine if this modification falls
2459         // on the same page as the previous modification
2460         //      note that the far right leaf page is a special case
2461
2462         if( samepage = src > 1 )
2463           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2464                 slot = bt_findslot(set->page, key->key, key->len);
2465           else // release read on previous page
2466                 bt_unlockpage(BtLockRead, set->latch); 
2467
2468         if( !slot )
2469           if( slot = bt_atomicload(bt, set, key->key, key->len) )
2470                 set->latch->split = 0;
2471           else
2472                 return -1;
2473
2474         if( slotptr(set->page, slot)->type == Librarian )
2475           ptr = keyptr(set->page, ++slot);
2476         else
2477           ptr = keyptr(set->page, slot);
2478
2479         if( !samepage ) {
2480           locks[src].entry = set->latch->entry;
2481           locks[src].slot = slot;
2482           locks[src].reuse = 0;
2483         } else {
2484           locks[src].entry = 0;
2485           locks[src].slot = 0;
2486           locks[src].reuse = 1;
2487         }
2488
2489         switch( slotptr(source, src)->type ) {
2490         case Duplicate:
2491         case Unique:
2492           if( !slotptr(set->page, slot)->dead )
2493            if( slot < set->page->cnt || bt_getid (set->page->right) )
2494             if( !keycmp (ptr, key->key, key->len) ) {
2495
2496                   // return constraint violation if key already exists
2497
2498                   bt_unlockpage(BtLockRead, set->latch);
2499                   result = src;
2500
2501                   while( src ) {
2502                         if( locks[src].entry ) {
2503                           set->latch = bt->mgr->latchsets + locks[src].entry;
2504                           bt_unlockpage(BtLockAtomic, set->latch);
2505                           bt_unpinlatch (set->latch);
2506                         }
2507                         src--;
2508                   }
2509                   free (locks);
2510                   return result;
2511                 }
2512           break;
2513         }
2514   }
2515
2516   //  unlock last loadpage lock
2517
2518   if( source->cnt > 1 )
2519         bt_unlockpage(BtLockRead, set->latch);
2520
2521   //  obtain write lock for each master page
2522
2523   for( src = 0; src++ < source->cnt; )
2524         if( locks[src].reuse )
2525           continue;
2526         else
2527           bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2528
2529   // insert or delete each key
2530   // process any splits or merges
2531   // release Write & Atomic latches
2532   // set ParentModifications and build
2533   // queue of keys to insert for split pages
2534   // or delete for deleted pages.
2535
2536   // run through txn list backwards
2537
2538   samepage = source->cnt + 1;
2539
2540   for( src = source->cnt; src; src-- ) {
2541         if( locks[src].reuse )
2542           continue;
2543
2544         //  perform the txn operations
2545         //      from smaller to larger on
2546         //  the same page
2547
2548         for( idx = src; idx < samepage; idx++ )
2549          switch( slotptr(source,idx)->type ) {
2550          case Delete:
2551           if( bt_atomicdelete (bt, source, locks, idx) )
2552                 return -1;
2553           break;
2554
2555         case Duplicate:
2556         case Unique:
2557           if( bt_atomicinsert (bt, source, locks, idx) )
2558                 return -1;
2559           break;
2560         }
2561
2562         //      after the same page operations have finished,
2563         //  process master page for splits or deletion.
2564
2565         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2566         prev->page = bt_mappage (bt, prev->latch);
2567         samepage = src;
2568
2569         //  pick-up all splits from master page
2570
2571         entry = prev->latch->split;
2572
2573         while( entry ) {
2574           set->latch = bt->mgr->latchsets + entry;
2575           set->page = bt_mappage (bt, set->latch);
2576           entry = set->latch->split;
2577
2578           // delete empty master page
2579
2580           if( !prev->page->act ) {
2581                 memcpy (set->page->left, prev->page->left, BtId);
2582                 memcpy (prev->page, set->page, bt->mgr->page_size);
2583                 bt_lockpage (BtLockDelete, set->latch);
2584                 bt_freepage (bt, set);
2585                 bt_unpinlatch (set->latch);
2586
2587                 prev->latch->dirty = 1;
2588                 continue;
2589           }
2590
2591           // remove empty page from the split chain
2592
2593           if( !set->page->act ) {
2594                 memcpy (prev->page->right, set->page->right, BtId);
2595                 prev->latch->split = set->latch->split;
2596                 bt_lockpage (BtLockDelete, set->latch);
2597                 bt_freepage (bt, set);
2598                 bt_unpinlatch (set->latch);
2599                 continue;
2600           }
2601
2602           //  schedule prev fence key update
2603
2604           ptr = keyptr(prev->page,prev->page->cnt);
2605           leaf = malloc (sizeof(AtomicKey));
2606
2607           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2608           leaf->page_no = prev->latch->page_no;
2609           leaf->entry = prev->latch->entry;
2610           leaf->next = NULL;
2611           leaf->type = 0;
2612
2613           if( tail )
2614                 tail->next = leaf;
2615           else
2616                 head = leaf;
2617
2618           tail = leaf;
2619
2620           bt_putid (set->page->left, prev->latch->page_no);
2621           bt_lockpage(BtLockParent, prev->latch);
2622           bt_unlockpage(BtLockWrite, prev->latch);
2623           *prev = *set;
2624         }
2625
2626         //  update left pointer in next right page
2627         //      if we did any now non-empty splits
2628
2629         if( latch->split ) {
2630           //  fix left pointer in master's original right sibling
2631           //    or set rightmost page in page zero
2632
2633           if( right = bt_getid (prev->page->right) ) {
2634                 if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
2635                   set->page = bt_mappage (bt, set->latch);
2636                 else
2637                   return -1;
2638
2639             bt_lockpage (BtLockWrite, set->latch);
2640             bt_putid (set->page->left, prev->latch->page_no);
2641                 set->latch->dirty = 1;
2642             bt_unlockpage (BtLockWrite, set->latch);
2643                 bt_unpinlatch (set->latch);
2644           } else
2645                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2646
2647           //  Process last page split in chain
2648
2649           ptr = keyptr(prev->page,prev->page->cnt);
2650           leaf = malloc (sizeof(AtomicKey));
2651
2652           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2653           leaf->page_no = prev->latch->page_no;
2654           leaf->entry = prev->latch->entry;
2655           leaf->next = NULL;
2656           leaf->type = 0;
2657   
2658           if( tail )
2659                 tail->next = leaf;
2660           else
2661                 head = leaf;
2662
2663           tail = leaf;
2664
2665           bt_lockpage(BtLockParent, prev->latch);
2666           bt_unlockpage(BtLockWrite, prev->latch);
2667           bt_unlockpage(BtLockAtomic, latch);
2668           continue;
2669         }
2670
2671         //  finished if prev page occupied (either master or final split)
2672
2673         if( prev->page->act ) {
2674           bt_unlockpage(BtLockWrite, latch);
2675           bt_unlockpage(BtLockAtomic, latch);
2676           bt_unpinlatch(latch);
2677           continue;
2678         }
2679
2680         // any splits were reversed, and the
2681         // master page located in prev is empty, delete it
2682         // by pulling over master's right sibling.
2683         // Delete empty master's fence
2684
2685         ptr = keyptr(prev->page,prev->page->cnt);
2686         leaf = malloc (sizeof(AtomicKey));
2687
2688         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2689         leaf->page_no = prev->latch->page_no;
2690         leaf->entry = prev->latch->entry;
2691         leaf->next = NULL;
2692         leaf->type = 1;
2693   
2694         if( tail )
2695           tail->next = leaf;
2696         else
2697           head = leaf;
2698
2699         tail = leaf;
2700
2701         bt_lockpage(BtLockParent, prev->latch);
2702         bt_unlockpage(BtLockWrite, prev->latch);
2703
2704         if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2705                 set->page = bt_mappage (bt, set->latch);
2706         else
2707                 return -1;
2708
2709         //      add page to our transaction
2710
2711         bt_lockpage(BtLockAtomic, set->latch);
2712         bt_lockpage(BtLockWrite, set->latch);
2713
2714         //      pull contents over empty page
2715
2716         memcpy (set->page->left, prev->page->left, BtId);
2717         memcpy (prev->page, set->page, bt->mgr->page_size);
2718
2719         bt_putid (set->page->right, prev->latch->page_no);
2720         set->latch->dirty = 1;
2721         set->page->kill = 1;
2722
2723         //      add new parent key for new master page contents
2724         //      delete it after parent posts the new master fence.
2725
2726         ptr = keyptr(set->page,set->page->cnt);
2727         leaf = malloc (sizeof(AtomicKey));
2728
2729         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2730         leaf->page_no = prev->latch->page_no;
2731         leaf->entry = set->latch->entry;
2732         leaf->next = NULL;
2733         leaf->type = 2;
2734   
2735         if( tail )
2736           tail->next = leaf;
2737         else
2738           head = leaf;
2739
2740         tail = leaf;
2741
2742 //      bt_lockpage(BtLockParent, set->latch);
2743         bt_unlockpage(BtLockWrite, set->latch);
2744
2745         //  fix master's far right sibling's left pointer
2746
2747         if( right = bt_getid (set->page->right) ) {
2748           if( set->latch = bt_pinlatch (bt, right, 1) )
2749                 set->page = bt_mappage (bt, set->latch);
2750
2751           bt_lockpage (BtLockWrite, set->latch);
2752           bt_putid (set->page->left, prev->latch->page_no);
2753           set->latch->dirty = 1;
2754
2755           bt_unlockpage (BtLockWrite, set->latch);
2756           bt_unpinlatch (set->latch);
2757         }
2758
2759         bt_unlockpage(BtLockAtomic, latch);
2760   }
2761   
2762   //  add & delete keys for any pages split or merged during transaction
2763
2764   if( leaf = head )
2765     do {
2766           set->latch = bt->mgr->latchsets + leaf->entry;
2767           set->page = bt_mappage (bt, set->latch);
2768
2769           bt_putid (value, leaf->page_no);
2770           ptr = (BtKey *)leaf->leafkey;
2771
2772           switch( leaf->type ) {
2773           case 0:       // insert key
2774             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2775                   return -1;
2776
2777             bt_unlockpage (BtLockParent, set->latch);
2778                 break;
2779
2780           case 1:       // delete key
2781                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2782                   return -1;
2783
2784             bt_unlockpage (BtLockParent, set->latch);
2785                 break;
2786
2787           case 2:       // insert key & free
2788             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2789                   return -1;
2790
2791                 bt_unlockpage (BtLockAtomic, set->latch);
2792                 bt_lockpage (BtLockDelete, set->latch);
2793                 bt_lockpage (BtLockWrite, set->latch);
2794                 bt_freepage (bt, set);
2795                 break;
2796           }
2797
2798           bt_unpinlatch (set->latch);
2799           tail = leaf->next;
2800           free (leaf);
2801         } while( leaf = tail );
2802
2803   // return success
2804
2805   free (locks);
2806   return 0;
2807 }
2808
2809 //      set cursor to highest slot on highest page
2810
2811 uint bt_lastkey (BtDb *bt)
2812 {
2813 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2814 BtPageSet set[1];
2815 uint slot;
2816
2817         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2818                 set->page = bt_mappage (bt, set->latch);
2819         else
2820                 return 0;
2821
2822     bt_lockpage(BtLockRead, set->latch);
2823
2824         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2825         slot = set->page->cnt;
2826
2827     bt_unlockpage(BtLockRead, set->latch);
2828         bt_unpinlatch (set->latch);
2829         return slot;
2830 }
2831
2832 //      return previous slot on cursor page
2833
2834 uint bt_prevkey (BtDb *bt, uint slot)
2835 {
2836 uid ourright, next, us = bt->cursor_page;
2837 BtPageSet set[1];
2838
2839         if( --slot )
2840                 return slot;
2841
2842         ourright = bt_getid(bt->cursor->right);
2843
2844 goleft:
2845         if( !(next = bt_getid(bt->cursor->left)) )
2846                 return 0;
2847
2848 findourself:
2849         bt->cursor_page = next;
2850
2851         if( set->latch = bt_pinlatch (bt, next, 1) )
2852                 set->page = bt_mappage (bt, set->latch);
2853         else
2854                 return 0;
2855
2856     bt_lockpage(BtLockRead, set->latch);
2857         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2858         bt_unlockpage(BtLockRead, set->latch);
2859         bt_unpinlatch (set->latch);
2860         
2861         next = bt_getid (bt->cursor->right);
2862
2863         if( next != us )
2864           if( next == ourright )
2865                 goto goleft;
2866           else
2867                 goto findourself;
2868
2869         return bt->cursor->cnt;
2870 }
2871
2872 //  return next slot on cursor page
2873 //  or slide cursor right into next page
2874
2875 uint bt_nextkey (BtDb *bt, uint slot)
2876 {
2877 BtPageSet set[1];
2878 uid right;
2879
2880   do {
2881         right = bt_getid(bt->cursor->right);
2882
2883         while( slot++ < bt->cursor->cnt )
2884           if( slotptr(bt->cursor,slot)->dead )
2885                 continue;
2886           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2887                 return slot;
2888           else
2889                 break;
2890
2891         if( !right )
2892                 break;
2893
2894         bt->cursor_page = right;
2895
2896         if( set->latch = bt_pinlatch (bt, right, 1) )
2897                 set->page = bt_mappage (bt, set->latch);
2898         else
2899                 return 0;
2900
2901     bt_lockpage(BtLockRead, set->latch);
2902
2903         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2904
2905         bt_unlockpage(BtLockRead, set->latch);
2906         bt_unpinlatch (set->latch);
2907         slot = 0;
2908
2909   } while( 1 );
2910
2911   return bt->err = 0;
2912 }
2913
2914 //  cache page of keys into cursor and return starting slot for given key
2915
2916 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2917 {
2918 BtPageSet set[1];
2919 uint slot;
2920
2921         // cache page for retrieval
2922
2923         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2924           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2925         else
2926           return 0;
2927
2928         bt->cursor_page = set->latch->page_no;
2929
2930         bt_unlockpage(BtLockRead, set->latch);
2931         bt_unpinlatch (set->latch);
2932         return slot;
2933 }
2934
2935 BtKey *bt_key(BtDb *bt, uint slot)
2936 {
2937         return keyptr(bt->cursor, slot);
2938 }
2939
2940 BtVal *bt_val(BtDb *bt, uint slot)
2941 {
2942         return valptr(bt->cursor,slot);
2943 }
2944
2945 #ifdef STANDALONE
2946
2947 #ifndef unix
2948 double getCpuTime(int type)
2949 {
2950 FILETIME crtime[1];
2951 FILETIME xittime[1];
2952 FILETIME systime[1];
2953 FILETIME usrtime[1];
2954 SYSTEMTIME timeconv[1];
2955 double ans = 0;
2956
2957         memset (timeconv, 0, sizeof(SYSTEMTIME));
2958
2959         switch( type ) {
2960         case 0:
2961                 GetSystemTimeAsFileTime (xittime);
2962                 FileTimeToSystemTime (xittime, timeconv);
2963                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2964                 break;
2965         case 1:
2966                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2967                 FileTimeToSystemTime (usrtime, timeconv);
2968                 break;
2969         case 2:
2970                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2971                 FileTimeToSystemTime (systime, timeconv);
2972                 break;
2973         }
2974
2975         ans += (double)timeconv->wHour * 3600;
2976         ans += (double)timeconv->wMinute * 60;
2977         ans += (double)timeconv->wSecond;
2978         ans += (double)timeconv->wMilliseconds / 1000;
2979         return ans;
2980 }
2981 #else
2982 #include <time.h>
2983 #include <sys/resource.h>
2984
2985 double getCpuTime(int type)
2986 {
2987 struct rusage used[1];
2988 struct timeval tv[1];
2989
2990         switch( type ) {
2991         case 0:
2992                 gettimeofday(tv, NULL);
2993                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2994
2995         case 1:
2996                 getrusage(RUSAGE_SELF, used);
2997                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2998
2999         case 2:
3000                 getrusage(RUSAGE_SELF, used);
3001                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3002         }
3003
3004         return 0;
3005 }
3006 #endif
3007
3008 void bt_poolaudit (BtMgr *mgr)
3009 {
3010 BtLatchSet *latch;
3011 uint slot = 0;
3012
3013         while( slot++ < mgr->latchdeployed ) {
3014                 latch = mgr->latchsets + slot;
3015
3016                 if( *latch->readwr->rin & MASK )
3017                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3018                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3019
3020                 if( *latch->access->rin & MASK )
3021                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3022                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3023
3024                 if( *latch->parent->rin & MASK )
3025                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3026                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3027
3028                 if( latch->pin & ~CLOCK_bit ) {
3029                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3030                         latch->pin = 0;
3031                 }
3032         }
3033 }
3034
3035 uint bt_latchaudit (BtDb *bt)
3036 {
3037 ushort idx, hashidx;
3038 uid next, page_no;
3039 BtLatchSet *latch;
3040 uint cnt = 0;
3041 BtKey *ptr;
3042
3043         if( *(ushort *)(bt->mgr->lock) )
3044                 fprintf(stderr, "Alloc page locked\n");
3045         *(ushort *)(bt->mgr->lock) = 0;
3046
3047         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3048                 latch = bt->mgr->latchsets + idx;
3049                 if( *latch->readwr->rin & MASK )
3050                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3051                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3052
3053                 if( *latch->access->rin & MASK )
3054                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3055                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3056
3057                 if( *latch->parent->rin & MASK )
3058                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3059                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3060
3061                 if( latch->pin ) {
3062                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3063                         latch->pin = 0;
3064                 }
3065         }
3066
3067         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3068           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3069                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3070
3071           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3072
3073           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3074                 latch = bt->mgr->latchsets + idx;
3075                 if( latch->pin )
3076                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3077           } while( idx = latch->next );
3078         }
3079
3080         page_no = LEAF_page;
3081
3082         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3083         uid off = page_no << bt->mgr->page_bits;
3084 #ifdef unix
3085           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3086 #else
3087         DWORD amt[1];
3088
3089           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3090
3091           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3092                 return bt->err = BTERR_map;
3093
3094           if( *amt <  bt->mgr->page_size )
3095                 return bt->err = BTERR_map;
3096 #endif
3097                 if( !bt->frame->free && !bt->frame->lvl )
3098                         cnt += bt->frame->act;
3099                 page_no++;
3100         }
3101                 
3102         cnt--;  // remove stopper key
3103         fprintf(stderr, " Total keys read %d\n", cnt);
3104
3105         bt_close (bt);
3106         return 0;
3107 }
3108
3109 typedef struct {
3110         char idx;
3111         char *type;
3112         char *infile;
3113         BtMgr *mgr;
3114         int num;
3115 } ThreadArg;
3116
3117 //  standalone program to index file of keys
3118 //  then list them onto std-out
3119
3120 #ifdef unix
3121 void *index_file (void *arg)
3122 #else
3123 uint __stdcall index_file (void *arg)
3124 #endif
3125 {
3126 int line = 0, found = 0, cnt = 0, unique;
3127 uid next, page_no = LEAF_page;  // start on first page of leaves
3128 unsigned char key[BT_maxkey];
3129 unsigned char txn[65536];
3130 ThreadArg *args = arg;
3131 int ch, len = 0, slot;
3132 BtPageSet set[1];
3133 uint nxt = 65536;
3134 BtPage page;
3135 BtKey *ptr;
3136 BtVal *val;
3137 BtDb *bt;
3138 FILE *in;
3139
3140         bt = bt_open (args->mgr);
3141         page = (BtPage)txn;
3142
3143         unique = (args->type[1] | 0x20) != 'd';
3144
3145         switch(args->type[0] | 0x20)
3146         {
3147         case 'a':
3148                 fprintf(stderr, "started latch mgr audit\n");
3149                 cnt = bt_latchaudit (bt);
3150                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3151                 break;
3152
3153         case 't':
3154                 fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
3155                 if( in = fopen (args->infile, "rb") )
3156                   while( ch = getc(in), ch != EOF )
3157                         if( ch == '\n' )
3158                         {
3159                           line++;
3160                           nxt -= len - 10;
3161                           memcpy (txn + nxt, key + 10, len - 10);
3162                           nxt -= 1;
3163                           txn[nxt] = len - 10;
3164                           nxt -= 10;
3165                           memcpy (txn + nxt, key, 10);
3166                           nxt -= 1;
3167                           txn[nxt] = 10;
3168                           slotptr(page,++cnt)->off  = nxt;
3169                           slotptr(page,cnt)->type = Unique;
3170                           len = 0;
3171
3172                           if( cnt < args->num )
3173                                 continue;
3174
3175                           page->cnt = cnt;
3176                           page->act = cnt;
3177                           page->min = nxt;
3178
3179                           if( bt_atomictxn (bt, page) )
3180                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3181                           nxt = 65536;
3182                           cnt = 0;
3183
3184                         }
3185                         else if( len < BT_maxkey )
3186                                 key[len++] = ch;
3187                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3188                 break;
3189
3190         case 'p':
3191                 fprintf(stderr, "started pennysort for %s\n", args->infile);
3192                 if( in = fopen (args->infile, "rb") )
3193                   while( ch = getc(in), ch != EOF )
3194                         if( ch == '\n' )
3195                         {
3196                           line++;
3197
3198                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
3199                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3200                           len = 0;
3201                         }
3202                         else if( len < BT_maxkey )
3203                                 key[len++] = ch;
3204                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3205                 break;
3206
3207         case 'w':
3208                 fprintf(stderr, "started indexing for %s\n", args->infile);
3209                 if( in = fopen (args->infile, "r") )
3210                   while( ch = getc(in), ch != EOF )
3211                         if( ch == '\n' )
3212                         {
3213                           line++;
3214
3215                           if( args->num == 1 )
3216                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3217
3218                           else if( args->num )
3219                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3220
3221                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
3222                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3223                           len = 0;
3224                         }
3225                         else if( len < BT_maxkey )
3226                                 key[len++] = ch;
3227                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3228                 break;
3229
3230         case 'd':
3231                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
3232                 if( in = fopen (args->infile, "rb") )
3233                   while( ch = getc(in), ch != EOF )
3234                         if( ch == '\n' )
3235                         {
3236                           line++;
3237                           if( args->num == 1 )
3238                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3239
3240                           else if( args->num )
3241                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3242
3243                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
3244                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
3245                           ptr = (BtKey*)(bt->key);
3246                           found++;
3247
3248                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
3249                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3250                           len = 0;
3251                         }
3252                         else if( len < BT_maxkey )
3253                                 key[len++] = ch;
3254                 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3255                 break;
3256
3257         case 'f':
3258                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3259                 if( in = fopen (args->infile, "rb") )
3260                   while( ch = getc(in), ch != EOF )
3261                         if( ch == '\n' )
3262                         {
3263                           line++;
3264                           if( args->num == 1 )
3265                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3266
3267                           else if( args->num )
3268                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3269
3270                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3271                                 found++;
3272                           else if( bt->err )
3273                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3274                           len = 0;
3275                         }
3276                         else if( len < BT_maxkey )
3277                                 key[len++] = ch;
3278                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3279                 break;
3280
3281         case 's':
3282                 fprintf(stderr, "started scanning\n");
3283                 do {
3284                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3285                                 set->page = bt_mappage (bt, set->latch);
3286                         else
3287                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3288                         bt_lockpage (BtLockRead, set->latch);
3289                         next = bt_getid (set->page->right);
3290
3291                         for( slot = 0; slot++ < set->page->cnt; )
3292                          if( next || slot < set->page->cnt )
3293                           if( !slotptr(set->page, slot)->dead ) {
3294                                 ptr = keyptr(set->page, slot);
3295                                 len = ptr->len;
3296
3297                             if( slotptr(set->page, slot)->type == Duplicate )
3298                                         len -= BtId;
3299
3300                                 fwrite (ptr->key, len, 1, stdout);
3301                                 val = valptr(set->page, slot);
3302                                 fwrite (val->value, val->len, 1, stdout);
3303                                 fputc ('\n', stdout);
3304                                 cnt++;
3305                            }
3306
3307                         bt_unlockpage (BtLockRead, set->latch);
3308                         bt_unpinlatch (set->latch);
3309                 } while( page_no = next );
3310
3311                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3312                 break;
3313
3314         case 'c':
3315 #ifdef unix
3316                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3317 #endif
3318                 fprintf(stderr, "started counting\n");
3319                 page_no = LEAF_page;
3320
3321                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3322                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3323                                 break;
3324
3325                         if( !bt->frame->free && !bt->frame->lvl )
3326                                 cnt += bt->frame->act;
3327
3328                         bt->reads++;
3329                         page_no++;
3330                 }
3331                 
3332                 cnt--;  // remove stopper key
3333                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3334                 break;
3335         }
3336
3337         bt_close (bt);
3338 #ifdef unix
3339         return NULL;
3340 #else
3341         return 0;
3342 #endif
3343 }
3344
3345 typedef struct timeval timer;
3346
3347 int main (int argc, char **argv)
3348 {
3349 int idx, cnt, len, slot, err;
3350 int segsize, bits = 16;
3351 double start, stop;
3352 #ifdef unix
3353 pthread_t *threads;
3354 #else
3355 HANDLE *threads;
3356 #endif
3357 ThreadArg *args;
3358 uint poolsize = 0;
3359 float elapsed;
3360 int num = 0;
3361 char key[1];
3362 BtMgr *mgr;
3363 BtKey *ptr;
3364 BtDb *bt;
3365
3366         if( argc < 3 ) {
3367                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3368                 fprintf (stderr, "  where page_bits is the page size in bits\n");
3369                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3370                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
3371                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3372                 exit(0);
3373         }
3374
3375         start = getCpuTime(0);
3376
3377         if( argc > 3 )
3378                 bits = atoi(argv[3]);
3379
3380         if( argc > 4 )
3381                 poolsize = atoi(argv[4]);
3382
3383         if( !poolsize )
3384                 fprintf (stderr, "Warning: no mapped_pool\n");
3385
3386         if( argc > 5 )
3387                 num = atoi(argv[5]);
3388
3389         cnt = argc - 6;
3390 #ifdef unix
3391         threads = malloc (cnt * sizeof(pthread_t));
3392 #else
3393         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3394 #endif
3395         args = malloc (cnt * sizeof(ThreadArg));
3396
3397         mgr = bt_mgr ((argv[1]), bits, poolsize);
3398
3399         if( !mgr ) {
3400                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3401                 exit (1);
3402         }
3403
3404         //      fire off threads
3405
3406         for( idx = 0; idx < cnt; idx++ ) {
3407                 args[idx].infile = argv[idx + 6];
3408                 args[idx].type = argv[2];
3409                 args[idx].mgr = mgr;
3410                 args[idx].num = num;
3411                 args[idx].idx = idx;
3412 #ifdef unix
3413                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3414                         fprintf(stderr, "Error creating thread %d\n", err);
3415 #else
3416                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3417 #endif
3418         }
3419
3420         //      wait for termination
3421
3422 #ifdef unix
3423         for( idx = 0; idx < cnt; idx++ )
3424                 pthread_join (threads[idx], NULL);
3425 #else
3426         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3427
3428         for( idx = 0; idx < cnt; idx++ )
3429                 CloseHandle(threads[idx]);
3430
3431 #endif
3432         bt_poolaudit(mgr);
3433         bt_mgrclose (mgr);
3434
3435         elapsed = getCpuTime(0) - start;
3436         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3437         elapsed = getCpuTime(1);
3438         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3439         elapsed = getCpuTime(2);
3440         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3441 }
3442
3443 #endif  //STANDALONE