]> pd.if.org Git - btree/blob - threadskv8.c
Fix bug in reverse scan. New command line interface with a command per input file
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and ACID batched key-value updates
9
10 // 26 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 /*
88 There are six lock types for each node in four independent sets: 
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
95 */
96
97 typedef enum{
98         BtLockAccess = 1,
99         BtLockDelete = 2,
100         BtLockRead   = 4,
101         BtLockWrite  = 8,
102         BtLockParent = 16,
103         BtLockAtomic = 32
104 } BtLock;
105
106 //      definition for phase-fair reader/writer lock implementation
107
108 typedef struct {
109         ushort rin[1];
110         ushort rout[1];
111         ushort ticket[1];
112         ushort serving[1];
113 } RWLock;
114
115 //      write only queue lock
116
117 typedef struct {
118         ushort ticket[1];
119         ushort serving[1];
120 } WOLock;
121
122 #define PHID 0x1
123 #define PRES 0x2
124 #define MASK 0x3
125 #define RINC 0x4
126
127 //      definition for spin latch implementation
128
129 // exclusive is set for write access
130 // share is count of read accessors
131 // grant write lock when share == 0
132
133 volatile typedef struct {
134         ushort exclusive:1;
135         ushort pending:1;
136         ushort share:14;
137 } BtSpinLatch;
138
139 #define XCL 1
140 #define PEND 2
141 #define BOTH 3
142 #define SHARE 4
143
144 //  hash table entries
145
146 typedef struct {
147         volatile uint slot;             // Latch table entry at head of chain
148         BtSpinLatch latch[1];
149 } BtHashEntry;
150
151 //      latch manager table structure
152
153 typedef struct {
154         uid page_no;                    // latch set page number
155         RWLock readwr[1];               // read/write page lock
156         RWLock access[1];               // Access Intent/Page delete
157         WOLock parent[1];               // Posting of fence key in parent
158         WOLock atomic[1];               // Atomic update in progress
159         uint split;                             // right split page atomic insert
160         uint entry;                             // entry slot in latch table
161         uint next;                              // next entry in hash table chain
162         uint prev;                              // prev entry in hash table chain
163         volatile ushort pin;    // number of outstanding threads
164         ushort dirty:1;                 // page in cache is dirty
165 } BtLatchSet;
166
167 //      Define the length of the page record numbers
168
169 #define BtId 6
170
171 //      Page key slot definition.
172
173 //      Keys are marked dead, but remain on the page until
174 //      it cleanup is called. The fence key (highest key) for
175 //      a leaf page is always present, even after cleanup.
176
177 //      Slot types
178
179 //      In addition to the Unique keys that occupy slots
180 //      there are Librarian and Duplicate key
181 //      slots occupying the key slot array.
182
183 //      The Librarian slots are dead keys that
184 //      serve as filler, available to add new Unique
185 //      or Dup slots that are inserted into the B-tree.
186
187 //      The Duplicate slots have had their key bytes extended
188 //      by 6 bytes to contain a binary duplicate key uniqueifier.
189
190 typedef enum {
191         Unique,
192         Librarian,
193         Duplicate,
194         Delete,
195         Update
196 } BtSlotType;
197
198 typedef struct {
199         uint off:BT_maxbits;    // page offset for key start
200         uint type:3;                    // type of slot
201         uint dead:1;                    // set for deleted slot
202 } BtSlot;
203
204 //      The key structure occupies space at the upper end of
205 //      each page.  It's a length byte followed by the key
206 //      bytes.
207
208 typedef struct {
209         unsigned char len;              // this can be changed to a ushort or uint
210         unsigned char key[0];
211 } BtKey;
212
213 //      the value structure also occupies space at the upper
214 //      end of the page. Each key is immediately followed by a value.
215
216 typedef struct {
217         unsigned char len;              // this can be changed to a ushort or uint
218         unsigned char value[0];
219 } BtVal;
220
221 #define BT_maxkey       255             // maximum number of bytes in a key
222 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
223
224 //      The first part of an index page.
225 //      It is immediately followed
226 //      by the BtSlot array of keys.
227
228 //      note that this structure size
229 //      must be a multiple of 8 bytes
230 //      in order to place dups correctly.
231
232 typedef struct BtPage_ {
233         uint cnt;                                       // count of keys in page
234         uint act;                                       // count of active keys
235         uint min;                                       // next key offset
236         uint garbage;                           // page garbage in bytes
237         unsigned char bits:7;           // page size in bits
238         unsigned char free:1;           // page is on free chain
239         unsigned char lvl:7;            // level of page
240         unsigned char kill:1;           // page is being deleted
241         unsigned char right[BtId];      // page number to right
242         unsigned char left[BtId];       // page number to left
243         unsigned char filler[2];        // padding to multiple of 8
244 } *BtPage;
245
246 //  The loadpage interface object
247
248 typedef struct {
249         BtPage page;            // current page pointer
250         BtLatchSet *latch;      // current page latch set
251 } BtPageSet;
252
253 //      structure for latch manager on ALLOC_page
254
255 typedef struct {
256         struct BtPage_ alloc[1];        // next page_no in right ptr
257         unsigned long long dups[1];     // global duplicate key uniqueifier
258         unsigned char chain[BtId];      // head of free page_nos chain
259 } BtPageZero;
260
261 //      The object structure for Btree access
262
263 typedef struct {
264         uint page_size;                         // page size    
265         uint page_bits;                         // page size in bits    
266 #ifdef unix
267         int idx;
268 #else
269         HANDLE idx;
270 #endif
271         BtPageZero *pagezero;           // mapped allocation page
272         BtSpinLatch lock[1];            // allocation area lite latch
273         uint latchdeployed;                     // highest number of latch entries deployed
274         uint nlatchpage;                        // number of latch pages at BT_latch
275         uint latchtotal;                        // number of page latch entries
276         uint latchhash;                         // number of latch hash table slots
277         uint latchvictim;                       // next latch entry to examine
278         BtHashEntry *hashtable;         // the buffer pool hash table entries
279         BtLatchSet *latchsets;          // mapped latch set from buffer pool
280         unsigned char *pagepool;        // mapped to the buffer pool pages
281 #ifndef unix
282         HANDLE halloc;                          // allocation handle
283         HANDLE hpool;                           // buffer pool handle
284 #endif
285 } BtMgr;
286
287 typedef struct {
288         BtMgr *mgr;                             // buffer manager for thread
289         BtPage cursor;                  // cached frame for start/next (never mapped)
290         BtPage frame;                   // spare frame for the page split (never mapped)
291         uid cursor_page;                                // current cursor page number   
292         unsigned char *mem;                             // frame, cursor, page memory buffer
293         unsigned char key[BT_keyarray]; // last found complete key
294         int found;                              // last delete or insert was found
295         int err;                                // last error
296         int reads, writes;              // number of reads and writes from the btree
297 } BtDb;
298
299 typedef enum {
300         BTERR_ok = 0,
301         BTERR_struct,
302         BTERR_ovflw,
303         BTERR_lock,
304         BTERR_map,
305         BTERR_read,
306         BTERR_wrt,
307         BTERR_atomic
308 } BTERR;
309
310 #define CLOCK_bit 0x8000
311
312 // B-Tree functions
313 extern void bt_close (BtDb *bt);
314 extern BtDb *bt_open (BtMgr *mgr);
315 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
316 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
317 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
318 extern BtKey *bt_foundkey (BtDb *bt);
319 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
320 extern uint bt_nextkey   (BtDb *bt, uint slot);
321
322 //      manager functions
323 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
324 void bt_mgrclose (BtMgr *mgr);
325
326 //  Helper functions to return slot values
327 //      from the cursor page.
328
329 extern BtKey *bt_key (BtDb *bt, uint slot);
330 extern BtVal *bt_val (BtDb *bt, uint slot);
331
332 //  The page is allocated from low and hi ends.
333 //  The key slots are allocated from the bottom,
334 //      while the text and value of the key
335 //  are allocated from the top.  When the two
336 //  areas meet, the page is split into two.
337
338 //  A key consists of a length byte, two bytes of
339 //  index number (0 - 65535), and up to 253 bytes
340 //  of key value.
341
342 //  Associated with each key is a value byte string
343 //      containing any value desired.
344
345 //  The b-tree root is always located at page 1.
346 //      The first leaf page of level zero is always
347 //      located on page 2.
348
349 //      The b-tree pages are linked with next
350 //      pointers to facilitate enumerators,
351 //      and provide for concurrency.
352
353 //      When to root page fills, it is split in two and
354 //      the tree height is raised by a new root at page
355 //      one with two keys.
356
357 //      Deleted keys are marked with a dead bit until
358 //      page cleanup. The fence key for a leaf node is
359 //      always present
360
361 //  To achieve maximum concurrency one page is locked at a time
362 //  as the tree is traversed to find leaf key in question. The right
363 //  page numbers are used in cases where the page is being split,
364 //      or consolidated.
365
366 //  Page 0 is dedicated to lock for new page extensions,
367 //      and chains empty pages together for reuse. It also
368 //      contains the latch manager hash table.
369
370 //      The ParentModification lock on a node is obtained to serialize posting
371 //      or changing the fence key for a node.
372
373 //      Empty pages are chained together through the ALLOC page and reused.
374
375 //      Access macros to address slot and key values from the page
376 //      Page slots use 1 based indexing.
377
378 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
379 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
380 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
381
382 void bt_putid(unsigned char *dest, uid id)
383 {
384 int i = BtId;
385
386         while( i-- )
387                 dest[i] = (unsigned char)id, id >>= 8;
388 }
389
390 uid bt_getid(unsigned char *src)
391 {
392 uid id = 0;
393 int i;
394
395         for( i = 0; i < BtId; i++ )
396                 id <<= 8, id |= *src++; 
397
398         return id;
399 }
400
401 uid bt_newdup (BtDb *bt)
402 {
403 #ifdef unix
404         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
405 #else
406         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
407 #endif
408 }
409
410 //      Write-Only Queue Lock
411
412 void WriteOLock (WOLock *lock)
413 {
414 ushort tix;
415 #ifdef unix
416         tix = __sync_fetch_and_add (lock->ticket, 1);
417 #else
418         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
419 #endif
420         // wait for our ticket to come up
421
422         while( tix != lock->serving[0] )
423 #ifdef unix
424                 sched_yield();
425 #else
426                 SwitchToThread ();
427 #endif
428 }
429
430 void WriteORelease (WOLock *lock)
431 {
432         lock->serving[0]++;
433 }
434
435 //      Phase-Fair reader/writer lock implementation
436
437 void WriteLock (RWLock *lock)
438 {
439 ushort w, r, tix;
440
441 #ifdef unix
442         tix = __sync_fetch_and_add (lock->ticket, 1);
443 #else
444         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
445 #endif
446         // wait for our ticket to come up
447
448         while( tix != lock->serving[0] )
449 #ifdef unix
450                 sched_yield();
451 #else
452                 SwitchToThread ();
453 #endif
454
455         w = PRES | (tix & PHID);
456 #ifdef  unix
457         r = __sync_fetch_and_add (lock->rin, w);
458 #else
459         r = _InterlockedExchangeAdd16 (lock->rin, w);
460 #endif
461         while( r != *lock->rout )
462 #ifdef unix
463                 sched_yield();
464 #else
465                 SwitchToThread();
466 #endif
467 }
468
469 void WriteRelease (RWLock *lock)
470 {
471 #ifdef unix
472         __sync_fetch_and_and (lock->rin, ~MASK);
473 #else
474         _InterlockedAnd16 (lock->rin, ~MASK);
475 #endif
476         lock->serving[0]++;
477 }
478
479 void ReadLock (RWLock *lock)
480 {
481 ushort w;
482 #ifdef unix
483         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
484 #else
485         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
486 #endif
487         if( w )
488           while( w == (*lock->rin & MASK) )
489 #ifdef unix
490                 sched_yield ();
491 #else
492                 SwitchToThread ();
493 #endif
494 }
495
496 void ReadRelease (RWLock *lock)
497 {
498 #ifdef unix
499         __sync_fetch_and_add (lock->rout, RINC);
500 #else
501         _InterlockedExchangeAdd16 (lock->rout, RINC);
502 #endif
503 }
504
505 //      Spin Latch Manager
506
507 //      wait until write lock mode is clear
508 //      and add 1 to the share count
509
510 void bt_spinreadlock(BtSpinLatch *latch)
511 {
512 ushort prev;
513
514   do {
515 #ifdef unix
516         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
517 #else
518         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
519 #endif
520         //  see if exclusive request is granted or pending
521
522         if( !(prev & BOTH) )
523                 return;
524 #ifdef unix
525         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
526 #else
527         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
528 #endif
529 #ifdef  unix
530   } while( sched_yield(), 1 );
531 #else
532   } while( SwitchToThread(), 1 );
533 #endif
534 }
535
536 //      wait for other read and write latches to relinquish
537
538 void bt_spinwritelock(BtSpinLatch *latch)
539 {
540 ushort prev;
541
542   do {
543 #ifdef  unix
544         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
545 #else
546         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
547 #endif
548         if( !(prev & XCL) )
549           if( !(prev & ~BOTH) )
550                 return;
551           else
552 #ifdef unix
553                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
554 #else
555                 _InterlockedAnd16((ushort *)latch, ~XCL);
556 #endif
557 #ifdef  unix
558   } while( sched_yield(), 1 );
559 #else
560   } while( SwitchToThread(), 1 );
561 #endif
562 }
563
564 //      try to obtain write lock
565
566 //      return 1 if obtained,
567 //              0 otherwise
568
569 int bt_spinwritetry(BtSpinLatch *latch)
570 {
571 ushort prev;
572
573 #ifdef  unix
574         prev = __sync_fetch_and_or((ushort *)latch, XCL);
575 #else
576         prev = _InterlockedOr16((ushort *)latch, XCL);
577 #endif
578         //      take write access if all bits are clear
579
580         if( !(prev & XCL) )
581           if( !(prev & ~BOTH) )
582                 return 1;
583           else
584 #ifdef unix
585                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
586 #else
587                 _InterlockedAnd16((ushort *)latch, ~XCL);
588 #endif
589         return 0;
590 }
591
592 //      clear write mode
593
594 void bt_spinreleasewrite(BtSpinLatch *latch)
595 {
596 #ifdef unix
597         __sync_fetch_and_and((ushort *)latch, ~BOTH);
598 #else
599         _InterlockedAnd16((ushort *)latch, ~BOTH);
600 #endif
601 }
602
603 //      decrement reader count
604
605 void bt_spinreleaseread(BtSpinLatch *latch)
606 {
607 #ifdef unix
608         __sync_fetch_and_add((ushort *)latch, -SHARE);
609 #else
610         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
611 #endif
612 }
613
614 //      read page from permanent location in Btree file
615
616 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
617 {
618 off64_t off = page_no << mgr->page_bits;
619
620 #ifdef unix
621         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
622                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
623                 return BTERR_read;
624         }
625 #else
626 OVERLAPPED ovl[1];
627 uint amt[1];
628
629         memset (ovl, 0, sizeof(OVERLAPPED));
630         ovl->Offset = off;
631         ovl->OffsetHigh = off >> 32;
632
633         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
634                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
635                 return BTERR_read;
636         }
637         if( *amt <  mgr->page_size ) {
638                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
639                 return BTERR_read;
640         }
641 #endif
642         return 0;
643 }
644
645 //      write page to permanent location in Btree file
646 //      clear the dirty bit
647
648 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
649 {
650 off64_t off = page_no << mgr->page_bits;
651
652 #ifdef unix
653         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
654                 return BTERR_wrt;
655 #else
656 OVERLAPPED ovl[1];
657 uint amt[1];
658
659         memset (ovl, 0, sizeof(OVERLAPPED));
660         ovl->Offset = off;
661         ovl->OffsetHigh = off >> 32;
662
663         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
664                 return BTERR_wrt;
665
666         if( *amt <  mgr->page_size )
667                 return BTERR_wrt;
668 #endif
669         return 0;
670 }
671
672 //      link latch table entry into head of latch hash table
673
674 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
675 {
676 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
677 BtLatchSet *latch = bt->mgr->latchsets + slot;
678
679         if( latch->next = bt->mgr->hashtable[hashidx].slot )
680                 bt->mgr->latchsets[latch->next].prev = slot;
681
682         bt->mgr->hashtable[hashidx].slot = slot;
683         latch->page_no = page_no;
684         latch->entry = slot;
685         latch->split = 0;
686         latch->prev = 0;
687         latch->pin = 1;
688
689         if( loadit )
690           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
691                 return bt->err;
692           else
693                 bt->reads++;
694
695         return bt->err = 0;
696 }
697
698 //      set CLOCK bit in latch
699 //      decrement pin count
700
701 void bt_unpinlatch (BtLatchSet *latch)
702 {
703 #ifdef unix
704         if( ~latch->pin & CLOCK_bit )
705                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
706         __sync_fetch_and_add(&latch->pin, -1);
707 #else
708         if( ~latch->pin & CLOCK_bit )
709                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
710         _InterlockedDecrement16 (&latch->pin);
711 #endif
712 }
713
714 //  return the btree cached page address
715
716 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
717 {
718 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
719
720         return page;
721 }
722
723 //      find existing latchset or inspire new one
724 //      return with latchset pinned
725
726 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
727 {
728 uint hashidx = page_no % bt->mgr->latchhash;
729 BtLatchSet *latch;
730 uint slot, idx;
731 uint lvl, cnt;
732 off64_t off;
733 uint amt[1];
734 BtPage page;
735
736   //  try to find our entry
737
738   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
739
740   if( slot = bt->mgr->hashtable[hashidx].slot ) do
741   {
742         latch = bt->mgr->latchsets + slot;
743         if( page_no == latch->page_no )
744                 break;
745   } while( slot = latch->next );
746
747   //  found our entry
748   //  increment clock
749
750   if( slot ) {
751         latch = bt->mgr->latchsets + slot;
752 #ifdef unix
753         __sync_fetch_and_add(&latch->pin, 1);
754 #else
755         _InterlockedIncrement16 (&latch->pin);
756 #endif
757         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
758         return latch;
759   }
760
761         //  see if there are any unused pool entries
762 #ifdef unix
763         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
764 #else
765         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
766 #endif
767
768         if( slot < bt->mgr->latchtotal ) {
769                 latch = bt->mgr->latchsets + slot;
770                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
771                         return NULL;
772                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
773                 return latch;
774         }
775
776 #ifdef unix
777         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
778 #else
779         _InterlockedDecrement (&bt->mgr->latchdeployed);
780 #endif
781   //  find and reuse previous entry on victim
782
783   while( 1 ) {
784 #ifdef unix
785         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
786 #else
787         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
788 #endif
789         // try to get write lock on hash chain
790         //      skip entry if not obtained
791         //      or has outstanding pins
792
793         slot %= bt->mgr->latchtotal;
794
795         if( !slot )
796                 continue;
797
798         latch = bt->mgr->latchsets + slot;
799         idx = latch->page_no % bt->mgr->latchhash;
800
801         //      see we are on same chain as hashidx
802
803         if( idx == hashidx )
804                 continue;
805
806         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
807                 continue;
808
809         //  skip this slot if it is pinned
810         //      or the CLOCK bit is set
811
812         if( latch->pin ) {
813           if( latch->pin & CLOCK_bit ) {
814 #ifdef unix
815                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
816 #else
817                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
818 #endif
819           }
820           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
821           continue;
822         }
823
824         //  update permanent page area in btree from buffer pool
825
826         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
827
828         if( latch->dirty )
829           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
830                 return NULL;
831           else
832                 latch->dirty = 0, bt->writes++;
833
834         //  unlink our available slot from its hash chain
835
836         if( latch->prev )
837                 bt->mgr->latchsets[latch->prev].next = latch->next;
838         else
839                 bt->mgr->hashtable[idx].slot = latch->next;
840
841         if( latch->next )
842                 bt->mgr->latchsets[latch->next].prev = latch->prev;
843
844         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
845
846         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
847                 return NULL;
848
849         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
850         return latch;
851   }
852 }
853
854 void bt_mgrclose (BtMgr *mgr)
855 {
856 BtLatchSet *latch;
857 uint num = 0;
858 BtPage page;
859 uint slot;
860
861         //      flush dirty pool pages to the btree
862
863         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
864                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
865                 latch = mgr->latchsets + slot;
866
867                 if( latch->dirty ) {
868                         bt_writepage(mgr, page, latch->page_no);
869                         latch->dirty = 0, num++;
870                 }
871 //              madvise (page, mgr->page_size, MADV_DONTNEED);
872         }
873
874         fprintf(stderr, "%d buffer pool pages flushed\n", num);
875
876 #ifdef unix
877         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
878         munmap (mgr->pagezero, mgr->page_size);
879 #else
880         FlushViewOfFile(mgr->pagezero, 0);
881         UnmapViewOfFile(mgr->pagezero);
882         UnmapViewOfFile(mgr->hashtable);
883         CloseHandle(mgr->halloc);
884         CloseHandle(mgr->hpool);
885 #endif
886 #ifdef unix
887         close (mgr->idx);
888         free (mgr);
889 #else
890         FlushFileBuffers(mgr->idx);
891         CloseHandle(mgr->idx);
892         GlobalFree (mgr);
893 #endif
894 }
895
896 //      close and release memory
897
898 void bt_close (BtDb *bt)
899 {
900 #ifdef unix
901         if( bt->mem )
902                 free (bt->mem);
903 #else
904         if( bt->mem)
905                 VirtualFree (bt->mem, 0, MEM_RELEASE);
906 #endif
907         free (bt);
908 }
909
910 //  open/create new btree buffer manager
911
912 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
913 //              size of page pool (e.g. 262144)
914
915 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
916 {
917 uint lvl, attr, last, slot, idx;
918 uint nlatchpage, latchhash;
919 unsigned char value[BtId];
920 int flag, initit = 0;
921 BtPageZero *pagezero;
922 off64_t size;
923 uint amt[1];
924 BtMgr* mgr;
925 BtKey* key;
926 BtVal *val;
927
928         // determine sanity of page size and buffer pool
929
930         if( bits > BT_maxbits )
931                 bits = BT_maxbits;
932         else if( bits < BT_minbits )
933                 bits = BT_minbits;
934
935         if( nodemax < 16 ) {
936                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
937                 return NULL;
938         }
939
940 #ifdef unix
941         mgr = calloc (1, sizeof(BtMgr));
942
943         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
944
945         if( mgr->idx == -1 ) {
946                 fprintf (stderr, "Unable to open btree file\n");
947                 return free(mgr), NULL;
948         }
949 #else
950         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
951         attr = FILE_ATTRIBUTE_NORMAL;
952         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
953
954         if( mgr->idx == INVALID_HANDLE_VALUE )
955                 return GlobalFree(mgr), NULL;
956 #endif
957
958 #ifdef unix
959         pagezero = valloc (BT_maxpage);
960         *amt = 0;
961
962         // read minimum page size to get root info
963         //      to support raw disk partition files
964         //      check if bits == 0 on the disk.
965
966         if( size = lseek (mgr->idx, 0L, 2) )
967                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
968                         if( pagezero->alloc->bits )
969                                 bits = pagezero->alloc->bits;
970                         else
971                                 initit = 1;
972                 else
973                         return free(mgr), free(pagezero), NULL;
974         else
975                 initit = 1;
976 #else
977         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
978         size = GetFileSize(mgr->idx, amt);
979
980         if( size || *amt ) {
981                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
982                         return bt_mgrclose (mgr), NULL;
983                 bits = pagezero->alloc->bits;
984         } else
985                 initit = 1;
986 #endif
987
988         mgr->page_size = 1 << bits;
989         mgr->page_bits = bits;
990
991         //  calculate number of latch hash table entries
992
993         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
994         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
995
996         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
997         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
998         mgr->latchtotal = nodemax;
999
1000         if( !initit )
1001                 goto mgrlatch;
1002
1003         // initialize an empty b-tree with latch page, root page, page of leaves
1004         // and page(s) of latches and page pool cache
1005
1006         memset (pagezero, 0, 1 << bits);
1007         pagezero->alloc->bits = mgr->page_bits;
1008         bt_putid(pagezero->alloc->right, MIN_lvl+1);
1009
1010         //  initialize left-most LEAF page in
1011         //      alloc->left.
1012
1013         bt_putid (pagezero->alloc->left, LEAF_page);
1014
1015         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1016                 fprintf (stderr, "Unable to create btree page zero\n");
1017                 return bt_mgrclose (mgr), NULL;
1018         }
1019
1020         memset (pagezero, 0, 1 << bits);
1021         pagezero->alloc->bits = mgr->page_bits;
1022
1023         for( lvl=MIN_lvl; lvl--; ) {
1024                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1025                 key = keyptr(pagezero->alloc, 1);
1026                 key->len = 2;           // create stopper key
1027                 key->key[0] = 0xff;
1028                 key->key[1] = 0xff;
1029
1030                 bt_putid(value, MIN_lvl - lvl + 1);
1031                 val = valptr(pagezero->alloc, 1);
1032                 val->len = lvl ? BtId : 0;
1033                 memcpy (val->value, value, val->len);
1034
1035                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1036                 pagezero->alloc->lvl = lvl;
1037                 pagezero->alloc->cnt = 1;
1038                 pagezero->alloc->act = 1;
1039
1040                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1041                         fprintf (stderr, "Unable to create btree page zero\n");
1042                         return bt_mgrclose (mgr), NULL;
1043                 }
1044         }
1045
1046 mgrlatch:
1047 #ifdef unix
1048         free (pagezero);
1049 #else
1050         VirtualFree (pagezero, 0, MEM_RELEASE);
1051 #endif
1052 #ifdef unix
1053         // mlock the pagezero page
1054
1055         flag = PROT_READ | PROT_WRITE;
1056         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1057         if( mgr->pagezero == MAP_FAILED ) {
1058                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1059                 return bt_mgrclose (mgr), NULL;
1060         }
1061         mlock (mgr->pagezero, mgr->page_size);
1062
1063         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1064         if( mgr->hashtable == MAP_FAILED ) {
1065                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1066                 return bt_mgrclose (mgr), NULL;
1067         }
1068 #else
1069         flag = PAGE_READWRITE;
1070         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1071         if( !mgr->halloc ) {
1072                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1073                 return bt_mgrclose (mgr), NULL;
1074         }
1075
1076         flag = FILE_MAP_WRITE;
1077         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1078         if( !mgr->pagezero ) {
1079                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1080                 return bt_mgrclose (mgr), NULL;
1081         }
1082
1083         flag = PAGE_READWRITE;
1084         size = (uid)mgr->nlatchpage << mgr->page_bits;
1085         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1086         if( !mgr->hpool ) {
1087                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1088                 return bt_mgrclose (mgr), NULL;
1089         }
1090
1091         flag = FILE_MAP_WRITE;
1092         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1093         if( !mgr->hashtable ) {
1094                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1095                 return bt_mgrclose (mgr), NULL;
1096         }
1097 #endif
1098
1099         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1100         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1101
1102         return mgr;
1103 }
1104
1105 //      open BTree access method
1106 //      based on buffer manager
1107
1108 BtDb *bt_open (BtMgr *mgr)
1109 {
1110 BtDb *bt = malloc (sizeof(*bt));
1111
1112         memset (bt, 0, sizeof(*bt));
1113         bt->mgr = mgr;
1114 #ifdef unix
1115         bt->mem = valloc (2 *mgr->page_size);
1116 #else
1117         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1118 #endif
1119         bt->frame = (BtPage)bt->mem;
1120         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1121         return bt;
1122 }
1123
1124 //  compare two keys, return > 0, = 0, or < 0
1125 //  =0: keys are same
1126 //  -1: key2 > key1
1127 //  +1: key2 < key1
1128 //  as the comparison value
1129
1130 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1131 {
1132 uint len1 = key1->len;
1133 int ans;
1134
1135         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1136                 return ans;
1137
1138         if( len1 > len2 )
1139                 return 1;
1140         if( len1 < len2 )
1141                 return -1;
1142
1143         return 0;
1144 }
1145
1146 // place write, read, or parent lock on requested page_no.
1147
1148 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1149 {
1150         switch( mode ) {
1151         case BtLockRead:
1152                 ReadLock (latch->readwr);
1153                 break;
1154         case BtLockWrite:
1155                 WriteLock (latch->readwr);
1156                 break;
1157         case BtLockAccess:
1158                 ReadLock (latch->access);
1159                 break;
1160         case BtLockDelete:
1161                 WriteLock (latch->access);
1162                 break;
1163         case BtLockParent:
1164                 WriteOLock (latch->parent);
1165                 break;
1166         case BtLockAtomic:
1167                 WriteOLock (latch->atomic);
1168                 break;
1169         }
1170 }
1171
1172 // remove write, read, or parent lock on requested page
1173
1174 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1175 {
1176         switch( mode ) {
1177         case BtLockRead:
1178                 ReadRelease (latch->readwr);
1179                 break;
1180         case BtLockWrite:
1181                 WriteRelease (latch->readwr);
1182                 break;
1183         case BtLockAccess:
1184                 ReadRelease (latch->access);
1185                 break;
1186         case BtLockDelete:
1187                 WriteRelease (latch->access);
1188                 break;
1189         case BtLockParent:
1190                 WriteORelease (latch->parent);
1191                 break;
1192         case BtLockAtomic:
1193                 WriteORelease (latch->atomic);
1194                 break;
1195         }
1196 }
1197
1198 //      allocate a new page
1199 //      return with page latched, but unlocked.
1200
1201 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1202 {
1203 uid page_no;
1204 int blk;
1205
1206         //      lock allocation page
1207
1208         bt_spinwritelock(bt->mgr->lock);
1209
1210         // use empty chain first
1211         // else allocate empty page
1212
1213         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1214                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1215                         set->page = bt_mappage (bt, set->latch);
1216                 else
1217                         return bt->err = BTERR_struct, -1;
1218
1219                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1220                 bt_spinreleasewrite(bt->mgr->lock);
1221
1222                 memcpy (set->page, contents, bt->mgr->page_size);
1223                 set->latch->dirty = 1;
1224                 return 0;
1225         }
1226
1227         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1228         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1229
1230         // unlock allocation latch
1231
1232         bt_spinreleasewrite(bt->mgr->lock);
1233
1234         //      don't load cache from btree page
1235
1236         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1237                 set->page = bt_mappage (bt, set->latch);
1238         else
1239                 return bt->err = BTERR_struct;
1240
1241         memcpy (set->page, contents, bt->mgr->page_size);
1242         set->latch->dirty = 1;
1243         return 0;
1244 }
1245
1246 //  find slot in page for given key at a given level
1247
1248 int bt_findslot (BtPage page, unsigned char *key, uint len)
1249 {
1250 uint diff, higher = page->cnt, low = 1, slot;
1251 uint good = 0;
1252
1253         //        make stopper key an infinite fence value
1254
1255         if( bt_getid (page->right) )
1256                 higher++;
1257         else
1258                 good++;
1259
1260         //      low is the lowest candidate.
1261         //  loop ends when they meet
1262
1263         //  higher is already
1264         //      tested as .ge. the passed key.
1265
1266         while( diff = higher - low ) {
1267                 slot = low + ( diff >> 1 );
1268                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1269                         low = slot + 1;
1270                 else
1271                         higher = slot, good++;
1272         }
1273
1274         //      return zero if key is on right link page
1275
1276         return good ? higher : 0;
1277 }
1278
1279 //  find and load page at given level for given key
1280 //      leave page rd or wr locked as requested
1281
1282 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1283 {
1284 uid page_no = ROOT_page, prevpage = 0;
1285 uint drill = 0xff, slot;
1286 BtLatchSet *prevlatch;
1287 uint mode, prevmode;
1288
1289   //  start at root of btree and drill down
1290
1291   do {
1292         // determine lock mode of drill level
1293         mode = (drill == lvl) ? lock : BtLockRead; 
1294
1295         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1296           return 0;
1297
1298         // obtain access lock using lock chaining with Access mode
1299
1300         if( page_no > ROOT_page )
1301           bt_lockpage(BtLockAccess, set->latch);
1302
1303         set->page = bt_mappage (bt, set->latch);
1304
1305         //      release & unpin parent or left sibling page
1306
1307         if( prevpage ) {
1308           bt_unlockpage(prevmode, prevlatch);
1309           bt_unpinlatch (prevlatch);
1310           prevpage = 0;
1311         }
1312
1313         // obtain mode lock using lock chaining through AccessLock
1314
1315         bt_lockpage(mode, set->latch);
1316
1317         if( set->page->free )
1318                 return bt->err = BTERR_struct, 0;
1319
1320         if( page_no > ROOT_page )
1321           bt_unlockpage(BtLockAccess, set->latch);
1322
1323         // re-read and re-lock root after determining actual level of root
1324
1325         if( set->page->lvl != drill) {
1326                 if( set->latch->page_no != ROOT_page )
1327                         return bt->err = BTERR_struct, 0;
1328                         
1329                 drill = set->page->lvl;
1330
1331                 if( lock != BtLockRead && drill == lvl ) {
1332                   bt_unlockpage(mode, set->latch);
1333                   bt_unpinlatch (set->latch);
1334                   continue;
1335                 }
1336         }
1337
1338         prevpage = set->latch->page_no;
1339         prevlatch = set->latch;
1340         prevmode = mode;
1341
1342         //  find key on page at this level
1343         //  and descend to requested level
1344
1345         if( set->page->kill )
1346           goto slideright;
1347
1348         if( slot = bt_findslot (set->page, key, len) ) {
1349           if( drill == lvl )
1350                 return slot;
1351
1352           // find next non-dead slot -- the fence key if nothing else
1353
1354           while( slotptr(set->page, slot)->dead )
1355                 if( slot++ < set->page->cnt )
1356                   continue;
1357                 else
1358                   return bt->err = BTERR_struct, 0;
1359
1360           page_no = bt_getid(valptr(set->page, slot)->value);
1361           drill--;
1362           continue;
1363         }
1364
1365         //  or slide right into next page
1366
1367 slideright:
1368         page_no = bt_getid(set->page->right);
1369
1370   } while( page_no );
1371
1372   // return error on end of right chain
1373
1374   bt->err = BTERR_struct;
1375   return 0;     // return error
1376 }
1377
1378 //      return page to free list
1379 //      page must be delete & write locked
1380
1381 void bt_freepage (BtDb *bt, BtPageSet *set)
1382 {
1383         //      lock allocation page
1384
1385         bt_spinwritelock (bt->mgr->lock);
1386
1387         //      store chain
1388
1389         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1390         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1391         set->latch->dirty = 1;
1392         set->page->free = 1;
1393
1394         // unlock released page
1395
1396         bt_unlockpage (BtLockDelete, set->latch);
1397         bt_unlockpage (BtLockWrite, set->latch);
1398
1399         // unlock allocation page
1400
1401         bt_spinreleasewrite (bt->mgr->lock);
1402 }
1403
1404 //      a fence key was deleted from a page
1405 //      push new fence value upwards
1406
1407 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1408 {
1409 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1410 unsigned char value[BtId];
1411 BtKey* ptr;
1412 uint idx;
1413
1414         //      remove the old fence value
1415
1416         ptr = keyptr(set->page, set->page->cnt);
1417         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1418         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1419         set->latch->dirty = 1;
1420
1421         //  cache new fence value
1422
1423         ptr = keyptr(set->page, set->page->cnt);
1424         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1425
1426         bt_lockpage (BtLockParent, set->latch);
1427         bt_unlockpage (BtLockWrite, set->latch);
1428
1429         //      insert new (now smaller) fence key
1430
1431         bt_putid (value, set->latch->page_no);
1432         ptr = (BtKey*)leftkey;
1433
1434         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1435           return bt->err;
1436
1437         //      now delete old fence key
1438
1439         ptr = (BtKey*)rightkey;
1440
1441         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1442                 return bt->err;
1443
1444         bt_unlockpage (BtLockParent, set->latch);
1445         bt_unpinlatch(set->latch);
1446         return 0;
1447 }
1448
1449 //      root has a single child
1450 //      collapse a level from the tree
1451
1452 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1453 {
1454 BtPageSet child[1];
1455 uid page_no;
1456 uint idx;
1457
1458   // find the child entry and promote as new root contents
1459
1460   do {
1461         for( idx = 0; idx++ < root->page->cnt; )
1462           if( !slotptr(root->page, idx)->dead )
1463                 break;
1464
1465         page_no = bt_getid (valptr(root->page, idx)->value);
1466
1467         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1468                 child->page = bt_mappage (bt, child->latch);
1469         else
1470                 return bt->err;
1471
1472         bt_lockpage (BtLockDelete, child->latch);
1473         bt_lockpage (BtLockWrite, child->latch);
1474
1475         memcpy (root->page, child->page, bt->mgr->page_size);
1476         root->latch->dirty = 1;
1477
1478         bt_freepage (bt, child);
1479         bt_unpinlatch (child->latch);
1480
1481   } while( root->page->lvl > 1 && root->page->act == 1 );
1482
1483   bt_unlockpage (BtLockWrite, root->latch);
1484   bt_unpinlatch (root->latch);
1485   return 0;
1486 }
1487
1488 //  delete a page and manage keys
1489 //  call with page writelocked
1490 //      returns with page unpinned
1491
1492 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1493 {
1494 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1495 unsigned char value[BtId];
1496 uint lvl = set->page->lvl;
1497 BtPageSet right[1];
1498 uid page_no;
1499 BtKey *ptr;
1500
1501         //      cache copy of fence key
1502         //      to post in parent
1503
1504         ptr = keyptr(set->page, set->page->cnt);
1505         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1506
1507         //      obtain lock on right page
1508
1509         page_no = bt_getid(set->page->right);
1510
1511         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1512                 right->page = bt_mappage (bt, right->latch);
1513         else
1514                 return 0;
1515
1516         bt_lockpage (BtLockWrite, right->latch);
1517
1518         // cache copy of key to update
1519
1520         ptr = keyptr(right->page, right->page->cnt);
1521         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1522
1523         if( right->page->kill )
1524                 return bt->err = BTERR_struct;
1525
1526         // pull contents of right peer into our empty page
1527
1528         memcpy (set->page, right->page, bt->mgr->page_size);
1529         set->latch->dirty = 1;
1530
1531         // mark right page deleted and point it to left page
1532         //      until we can post parent updates that remove access
1533         //      to the deleted page.
1534
1535         bt_putid (right->page->right, set->latch->page_no);
1536         right->latch->dirty = 1;
1537         right->page->kill = 1;
1538
1539         bt_lockpage (BtLockParent, right->latch);
1540         bt_unlockpage (BtLockWrite, right->latch);
1541
1542         bt_lockpage (BtLockParent, set->latch);
1543         bt_unlockpage (BtLockWrite, set->latch);
1544
1545         // redirect higher key directly to our new node contents
1546
1547         bt_putid (value, set->latch->page_no);
1548         ptr = (BtKey*)higherfence;
1549
1550         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1551           return bt->err;
1552
1553         //      delete old lower key to our node
1554
1555         ptr = (BtKey*)lowerfence;
1556
1557         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1558           return bt->err;
1559
1560         //      obtain delete and write locks to right node
1561
1562         bt_unlockpage (BtLockParent, right->latch);
1563         bt_lockpage (BtLockDelete, right->latch);
1564         bt_lockpage (BtLockWrite, right->latch);
1565         bt_freepage (bt, right);
1566         bt_unpinlatch (right->latch);
1567
1568         bt_unlockpage (BtLockParent, set->latch);
1569         bt_unpinlatch (set->latch);
1570         bt->found = 1;
1571         return 0;
1572 }
1573
1574 //  find and delete key on page by marking delete flag bit
1575 //  if page becomes empty, delete it from the btree
1576
1577 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1578 {
1579 uint slot, idx, found, fence;
1580 BtPageSet set[1];
1581 BtKey *ptr;
1582 BtVal *val;
1583
1584         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1585                 ptr = keyptr(set->page, slot);
1586         else
1587                 return bt->err;
1588
1589         // if librarian slot, advance to real slot
1590
1591         if( slotptr(set->page, slot)->type == Librarian )
1592                 ptr = keyptr(set->page, ++slot);
1593
1594         fence = slot == set->page->cnt;
1595
1596         // if key is found delete it, otherwise ignore request
1597
1598         if( found = !keycmp (ptr, key, len) )
1599           if( found = slotptr(set->page, slot)->dead == 0 ) {
1600                 val = valptr(set->page,slot);
1601                 slotptr(set->page, slot)->dead = 1;
1602                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1603                 set->page->act--;
1604
1605                 // collapse empty slots beneath the fence
1606
1607                 while( idx = set->page->cnt - 1 )
1608                   if( slotptr(set->page, idx)->dead ) {
1609                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1610                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1611                   } else
1612                         break;
1613           }
1614
1615         //      did we delete a fence key in an upper level?
1616
1617         if( found && lvl && set->page->act && fence )
1618           if( bt_fixfence (bt, set, lvl) )
1619                 return bt->err;
1620           else
1621                 return bt->found = found, 0;
1622
1623         //      do we need to collapse root?
1624
1625         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1626           if( bt_collapseroot (bt, set) )
1627                 return bt->err;
1628           else
1629                 return bt->found = found, 0;
1630
1631         //      delete empty page
1632
1633         if( !set->page->act )
1634                 return bt_deletepage (bt, set);
1635
1636         set->latch->dirty = 1;
1637         bt_unlockpage(BtLockWrite, set->latch);
1638         bt_unpinlatch (set->latch);
1639         return bt->found = found, 0;
1640 }
1641
1642 BtKey *bt_foundkey (BtDb *bt)
1643 {
1644         return (BtKey*)(bt->key);
1645 }
1646
1647 //      advance to next slot
1648
1649 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1650 {
1651 BtLatchSet *prevlatch;
1652 uid page_no;
1653
1654         if( slot < set->page->cnt )
1655                 return slot + 1;
1656
1657         prevlatch = set->latch;
1658
1659         if( page_no = bt_getid(set->page->right) )
1660           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1661                 set->page = bt_mappage (bt, set->latch);
1662           else
1663                 return 0;
1664         else
1665           return bt->err = BTERR_struct, 0;
1666
1667         // obtain access lock using lock chaining with Access mode
1668
1669         bt_lockpage(BtLockAccess, set->latch);
1670
1671         bt_unlockpage(BtLockRead, prevlatch);
1672         bt_unpinlatch (prevlatch);
1673
1674         bt_lockpage(BtLockRead, set->latch);
1675         bt_unlockpage(BtLockAccess, set->latch);
1676         return 1;
1677 }
1678
1679 //      find unique key or first duplicate key in
1680 //      leaf level and return number of value bytes
1681 //      or (-1) if not found.  Setup key for bt_foundkey
1682
1683 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1684 {
1685 BtPageSet set[1];
1686 uint len, slot;
1687 int ret = -1;
1688 BtKey *ptr;
1689 BtVal *val;
1690
1691   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1692    do {
1693         ptr = keyptr(set->page, slot);
1694
1695         //      skip librarian slot place holder
1696
1697         if( slotptr(set->page, slot)->type == Librarian )
1698                 ptr = keyptr(set->page, ++slot);
1699
1700         //      return actual key found
1701
1702         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1703         len = ptr->len;
1704
1705         if( slotptr(set->page, slot)->type == Duplicate )
1706                 len -= BtId;
1707
1708         //      not there if we reach the stopper key
1709
1710         if( slot == set->page->cnt )
1711           if( !bt_getid (set->page->right) )
1712                 break;
1713
1714         // if key exists, return >= 0 value bytes copied
1715         //      otherwise return (-1)
1716
1717         if( slotptr(set->page, slot)->dead )
1718                 continue;
1719
1720         if( keylen == len )
1721           if( !memcmp (ptr->key, key, len) ) {
1722                 val = valptr (set->page,slot);
1723                 if( valmax > val->len )
1724                         valmax = val->len;
1725                 memcpy (value, val->value, valmax);
1726                 ret = valmax;
1727           }
1728
1729         break;
1730
1731    } while( slot = bt_findnext (bt, set, slot) );
1732
1733   bt_unlockpage (BtLockRead, set->latch);
1734   bt_unpinlatch (set->latch);
1735   return ret;
1736 }
1737
1738 //      check page for space available,
1739 //      clean if necessary and return
1740 //      0 - page needs splitting
1741 //      >0  new slot value
1742
1743 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1744 {
1745 uint nxt = bt->mgr->page_size;
1746 BtPage page = set->page;
1747 uint cnt = 0, idx = 0;
1748 uint max = page->cnt;
1749 uint newslot = max;
1750 BtKey *key;
1751 BtVal *val;
1752
1753         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1754                 return slot;
1755
1756         //      skip cleanup and proceed to split
1757         //      if there's not enough garbage
1758         //      to bother with.
1759
1760         if( page->garbage < nxt / 5 )
1761                 return 0;
1762
1763         memcpy (bt->frame, page, bt->mgr->page_size);
1764
1765         // skip page info and set rest of page to zero
1766
1767         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1768         set->latch->dirty = 1;
1769         page->garbage = 0;
1770         page->act = 0;
1771
1772         // clean up page first by
1773         // removing deleted keys
1774
1775         while( cnt++ < max ) {
1776                 if( cnt == slot )
1777                         newslot = idx + 2;
1778                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1779                         continue;
1780
1781                 // copy the value across
1782
1783                 val = valptr(bt->frame, cnt);
1784                 nxt -= val->len + sizeof(BtVal);
1785                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1786
1787                 // copy the key across
1788
1789                 key = keyptr(bt->frame, cnt);
1790                 nxt -= key->len + sizeof(BtKey);
1791                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1792
1793                 // make a librarian slot
1794
1795                 if( idx ) {
1796                         slotptr(page, ++idx)->off = nxt;
1797                         slotptr(page, idx)->type = Librarian;
1798                         slotptr(page, idx)->dead = 1;
1799                 }
1800
1801                 // set up the slot
1802
1803                 slotptr(page, ++idx)->off = nxt;
1804                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1805
1806                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1807                         page->act++;
1808         }
1809
1810         page->min = nxt;
1811         page->cnt = idx;
1812
1813         //      see if page has enough space now, or does it need splitting?
1814
1815         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1816                 return newslot;
1817
1818         return 0;
1819 }
1820
1821 // split the root and raise the height of the btree
1822
1823 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1824 {  
1825 unsigned char leftkey[BT_keyarray];
1826 uint nxt = bt->mgr->page_size;
1827 unsigned char value[BtId];
1828 BtPageSet left[1];
1829 uid left_page_no;
1830 BtKey *ptr;
1831 BtVal *val;
1832
1833         //      save left page fence key for new root
1834
1835         ptr = keyptr(root->page, root->page->cnt);
1836         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1837
1838         //  Obtain an empty page to use, and copy the current
1839         //  root contents into it, e.g. lower keys
1840
1841         if( bt_newpage(bt, left, root->page) )
1842                 return bt->err;
1843
1844         left_page_no = left->latch->page_no;
1845         bt_unpinlatch (left->latch);
1846
1847         // preserve the page info at the bottom
1848         // of higher keys and set rest to zero
1849
1850         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1851
1852         // insert stopper key at top of newroot page
1853         // and increase the root height
1854
1855         nxt -= BtId + sizeof(BtVal);
1856         bt_putid (value, right->page_no);
1857         val = (BtVal *)((unsigned char *)root->page + nxt);
1858         memcpy (val->value, value, BtId);
1859         val->len = BtId;
1860
1861         nxt -= 2 + sizeof(BtKey);
1862         slotptr(root->page, 2)->off = nxt;
1863         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1864         ptr->len = 2;
1865         ptr->key[0] = 0xff;
1866         ptr->key[1] = 0xff;
1867
1868         // insert lower keys page fence key on newroot page as first key
1869
1870         nxt -= BtId + sizeof(BtVal);
1871         bt_putid (value, left_page_no);
1872         val = (BtVal *)((unsigned char *)root->page + nxt);
1873         memcpy (val->value, value, BtId);
1874         val->len = BtId;
1875
1876         ptr = (BtKey *)leftkey;
1877         nxt -= ptr->len + sizeof(BtKey);
1878         slotptr(root->page, 1)->off = nxt;
1879         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1880         
1881         bt_putid(root->page->right, 0);
1882         root->page->min = nxt;          // reset lowest used offset and key count
1883         root->page->cnt = 2;
1884         root->page->act = 2;
1885         root->page->lvl++;
1886
1887         // release and unpin root pages
1888
1889         bt_unlockpage(BtLockWrite, root->latch);
1890         bt_unpinlatch (root->latch);
1891
1892         bt_unpinlatch (right);
1893         return 0;
1894 }
1895
1896 //  split already locked full node
1897 //      leave it locked.
1898 //      return pool entry for new right
1899 //      page, unlocked
1900
1901 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1902 {
1903 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1904 uint lvl = set->page->lvl;
1905 BtPageSet right[1];
1906 BtKey *key, *ptr;
1907 BtVal *val, *src;
1908 uid right2;
1909 uint prev;
1910
1911         //  split higher half of keys to bt->frame
1912
1913         memset (bt->frame, 0, bt->mgr->page_size);
1914         max = set->page->cnt;
1915         cnt = max / 2;
1916         idx = 0;
1917
1918         while( cnt++ < max ) {
1919                 if( slotptr(set->page, cnt)->dead && cnt < max )
1920                         continue;
1921                 src = valptr(set->page, cnt);
1922                 nxt -= src->len + sizeof(BtVal);
1923                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1924
1925                 key = keyptr(set->page, cnt);
1926                 nxt -= key->len + sizeof(BtKey);
1927                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1928                 memcpy (ptr, key, key->len + sizeof(BtKey));
1929
1930                 //      add librarian slot
1931
1932                 if( idx ) {
1933                         slotptr(bt->frame, ++idx)->off = nxt;
1934                         slotptr(bt->frame, idx)->type = Librarian;
1935                         slotptr(bt->frame, idx)->dead = 1;
1936                 }
1937
1938                 //  add actual slot
1939
1940                 slotptr(bt->frame, ++idx)->off = nxt;
1941                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1942
1943                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1944                         bt->frame->act++;
1945         }
1946
1947         bt->frame->bits = bt->mgr->page_bits;
1948         bt->frame->min = nxt;
1949         bt->frame->cnt = idx;
1950         bt->frame->lvl = lvl;
1951
1952         // link right node
1953
1954         if( set->latch->page_no > ROOT_page )
1955                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1956
1957         //      get new free page and write higher keys to it.
1958
1959         if( bt_newpage(bt, right, bt->frame) )
1960                 return 0;
1961
1962         memcpy (bt->frame, set->page, bt->mgr->page_size);
1963         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1964         set->latch->dirty = 1;
1965
1966         nxt = bt->mgr->page_size;
1967         set->page->garbage = 0;
1968         set->page->act = 0;
1969         max /= 2;
1970         cnt = 0;
1971         idx = 0;
1972
1973         if( slotptr(bt->frame, max)->type == Librarian )
1974                 max--;
1975
1976         //  assemble page of smaller keys
1977
1978         while( cnt++ < max ) {
1979                 if( slotptr(bt->frame, cnt)->dead )
1980                         continue;
1981                 val = valptr(bt->frame, cnt);
1982                 nxt -= val->len + sizeof(BtVal);
1983                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1984
1985                 key = keyptr(bt->frame, cnt);
1986                 nxt -= key->len + sizeof(BtKey);
1987                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1988
1989                 //      add librarian slot
1990
1991                 if( idx ) {
1992                         slotptr(set->page, ++idx)->off = nxt;
1993                         slotptr(set->page, idx)->type = Librarian;
1994                         slotptr(set->page, idx)->dead = 1;
1995                 }
1996
1997                 //      add actual slot
1998
1999                 slotptr(set->page, ++idx)->off = nxt;
2000                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2001                 set->page->act++;
2002         }
2003
2004         bt_putid(set->page->right, right->latch->page_no);
2005         set->page->min = nxt;
2006         set->page->cnt = idx;
2007
2008         return right->latch->entry;
2009 }
2010
2011 //      fix keys for newly split page
2012 //      call with page locked, return
2013 //      unlocked
2014
2015 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2016 {
2017 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2018 unsigned char value[BtId];
2019 uint lvl = set->page->lvl;
2020 BtPage page;
2021 BtKey *ptr;
2022
2023         // if current page is the root page, split it
2024
2025         if( set->latch->page_no == ROOT_page )
2026                 return bt_splitroot (bt, set, right);
2027
2028         ptr = keyptr(set->page, set->page->cnt);
2029         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2030
2031         page = bt_mappage (bt, right);
2032
2033         ptr = keyptr(page, page->cnt);
2034         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2035
2036         // insert new fences in their parent pages
2037
2038         bt_lockpage (BtLockParent, right);
2039
2040         bt_lockpage (BtLockParent, set->latch);
2041         bt_unlockpage (BtLockWrite, set->latch);
2042
2043         // insert new fence for reformulated left block of smaller keys
2044
2045         bt_putid (value, set->latch->page_no);
2046         ptr = (BtKey *)leftkey;
2047
2048         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2049                 return bt->err;
2050
2051         // switch fence for right block of larger keys to new right page
2052
2053         bt_putid (value, right->page_no);
2054         ptr = (BtKey *)rightkey;
2055
2056         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2057                 return bt->err;
2058
2059         bt_unlockpage (BtLockParent, set->latch);
2060         bt_unpinlatch (set->latch);
2061
2062         bt_unlockpage (BtLockParent, right);
2063         bt_unpinlatch (right);
2064         return 0;
2065 }
2066
2067 //      install new key and value onto page
2068 //      page must already be checked for
2069 //      adequate space
2070
2071 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2072 {
2073 uint idx, librarian;
2074 BtSlot *node;
2075 BtKey *ptr;
2076 BtVal *val;
2077
2078         //      if found slot > desired slot and previous slot
2079         //      is a librarian slot, use it
2080
2081         if( slot > 1 )
2082           if( slotptr(set->page, slot-1)->type == Librarian )
2083                 slot--;
2084
2085         // copy value onto page
2086
2087         set->page->min -= vallen + sizeof(BtVal);
2088         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2089         memcpy (val->value, value, vallen);
2090         val->len = vallen;
2091
2092         // copy key onto page
2093
2094         set->page->min -= keylen + sizeof(BtKey);
2095         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2096         memcpy (ptr->key, key, keylen);
2097         ptr->len = keylen;
2098         
2099         //      find first empty slot
2100
2101         for( idx = slot; idx < set->page->cnt; idx++ )
2102           if( slotptr(set->page, idx)->dead )
2103                 break;
2104
2105         // now insert key into array before slot
2106
2107         if( idx == set->page->cnt )
2108                 idx += 2, set->page->cnt += 2, librarian = 2;
2109         else
2110                 librarian = 1;
2111
2112         set->latch->dirty = 1;
2113         set->page->act++;
2114
2115         while( idx > slot + librarian - 1 )
2116                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2117
2118         //      add librarian slot
2119
2120         if( librarian > 1 ) {
2121                 node = slotptr(set->page, slot++);
2122                 node->off = set->page->min;
2123                 node->type = Librarian;
2124                 node->dead = 1;
2125         }
2126
2127         //      fill in new slot
2128
2129         node = slotptr(set->page, slot);
2130         node->off = set->page->min;
2131         node->type = type;
2132         node->dead = 0;
2133
2134         if( release ) {
2135                 bt_unlockpage (BtLockWrite, set->latch);
2136                 bt_unpinlatch (set->latch);
2137         }
2138
2139         return 0;
2140 }
2141
2142 //  Insert new key into the btree at given level.
2143 //      either add a new key or update/add an existing one
2144
2145 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2146 {
2147 unsigned char newkey[BT_keyarray];
2148 uint slot, idx, len, entry;
2149 BtPageSet set[1];
2150 BtKey *ptr, *ins;
2151 uid sequence;
2152 BtVal *val;
2153 uint type;
2154
2155   // set up the key we're working on
2156
2157   ins = (BtKey*)newkey;
2158   memcpy (ins->key, key, keylen);
2159   ins->len = keylen;
2160
2161   // is this a non-unique index value?
2162
2163   if( unique )
2164         type = Unique;
2165   else {
2166         type = Duplicate;
2167         sequence = bt_newdup (bt);
2168         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2169         ins->len += BtId;
2170   }
2171
2172   while( 1 ) { // find the page and slot for the current key
2173         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2174                 ptr = keyptr(set->page, slot);
2175         else {
2176                 if( !bt->err )
2177                         bt->err = BTERR_ovflw;
2178                 return bt->err;
2179         }
2180
2181         // if librarian slot == found slot, advance to real slot
2182
2183         if( slotptr(set->page, slot)->type == Librarian )
2184           if( !keycmp (ptr, key, keylen) )
2185                 ptr = keyptr(set->page, ++slot);
2186
2187         len = ptr->len;
2188
2189         if( slotptr(set->page, slot)->type == Duplicate )
2190                 len -= BtId;
2191
2192         //  if inserting a duplicate key or unique key
2193         //      check for adequate space on the page
2194         //      and insert the new key before slot.
2195
2196         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2197           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2198                 if( !(entry = bt_splitpage (bt, set)) )
2199                   return bt->err;
2200                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2201                   return bt->err;
2202                 else
2203                   continue;
2204
2205           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2206         }
2207
2208         // if key already exists, update value and return
2209
2210         val = valptr(set->page, slot);
2211
2212         if( val->len >= vallen ) {
2213                 if( slotptr(set->page, slot)->dead )
2214                         set->page->act++;
2215                 set->page->garbage += val->len - vallen;
2216                 set->latch->dirty = 1;
2217                 slotptr(set->page, slot)->dead = 0;
2218                 val->len = vallen;
2219                 memcpy (val->value, value, vallen);
2220                 bt_unlockpage(BtLockWrite, set->latch);
2221                 bt_unpinlatch (set->latch);
2222                 return 0;
2223         }
2224
2225         //      new update value doesn't fit in existing value area
2226
2227         if( !slotptr(set->page, slot)->dead )
2228                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2229         else {
2230                 slotptr(set->page, slot)->dead = 0;
2231                 set->page->act++;
2232         }
2233
2234         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2235           if( !(entry = bt_splitpage (bt, set)) )
2236                 return bt->err;
2237           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2238                 return bt->err;
2239           else
2240                 continue;
2241
2242         set->page->min -= vallen + sizeof(BtVal);
2243         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2244         memcpy (val->value, value, vallen);
2245         val->len = vallen;
2246
2247         set->latch->dirty = 1;
2248         set->page->min -= keylen + sizeof(BtKey);
2249         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2250         memcpy (ptr->key, key, keylen);
2251         ptr->len = keylen;
2252         
2253         slotptr(set->page, slot)->off = set->page->min;
2254         bt_unlockpage(BtLockWrite, set->latch);
2255         bt_unpinlatch (set->latch);
2256         return 0;
2257   }
2258   return 0;
2259 }
2260
2261 typedef struct {
2262         uint entry;                     // latch table entry number
2263         uint slot:31;           // page slot number
2264         uint reuse:1;           // reused previous page
2265 } AtomicMod;
2266
2267 typedef struct {
2268         uid page_no;            // page number for split leaf
2269         void *next;                     // next key to insert
2270         uint entry:29;          // latch table entry number
2271         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2272         uint nounlock:1;        // don't unlock ParentModification
2273         unsigned char leafkey[BT_keyarray];
2274 } AtomicKey;
2275
2276 //  find and load leaf page for given key
2277 //      leave page Atomic locked and Read locked.
2278
2279 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2280 {
2281 BtLatchSet *prevlatch;
2282 uid page_no;
2283 uint slot;
2284
2285   //  find level zero page
2286
2287   if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2288         return 0;
2289
2290   // find next non-dead entry on this page
2291   //    it will be the fence key if nothing else
2292
2293   while( slotptr(set->page, slot)->dead )
2294         if( slot++ < set->page->cnt )
2295           continue;
2296         else
2297           return bt->err = BTERR_struct, 0;
2298
2299   page_no = bt_getid(valptr(set->page, slot)->value);
2300   prevlatch = set->latch;
2301
2302   while( page_no ) {
2303         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2304           set->page = bt_mappage (bt, set->latch);
2305         else
2306           return 0;
2307
2308         if( set->page->free || set->page->lvl )
2309                 return bt->err = BTERR_struct, 0;
2310
2311         // obtain read lock using lock chaining with Access mode
2312         //      release & unpin parent/left sibling page
2313
2314         bt_lockpage(BtLockAccess, set->latch);
2315
2316         bt_unlockpage(BtLockRead, prevlatch);
2317         bt_unpinlatch (prevlatch);
2318
2319         bt_lockpage(BtLockRead, set->latch);
2320         bt_unlockpage(BtLockAccess, set->latch);
2321
2322         //  find key on page at this level
2323         //  and descend to requested level
2324
2325         if( !set->page->kill )
2326          if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2327           bt_unlockpage(BtLockRead, set->latch);
2328           bt_lockpage(BtLockAtomic, set->latch);
2329           bt_lockpage(BtLockAccess, set->latch);
2330           bt_lockpage(BtLockRead, set->latch);
2331           bt_unlockpage(BtLockAccess, set->latch);
2332
2333           if( slot = bt_findslot (set->page, key, len) )
2334                 return slot;
2335
2336           bt_unlockpage(BtLockAtomic, set->latch);
2337           }
2338
2339         //  slide right into next page
2340
2341         page_no = bt_getid(set->page->right);
2342         prevlatch = set->latch;
2343   }
2344
2345   // return error on end of right chain
2346
2347   bt->err = BTERR_struct;
2348   return 0;     // return error
2349 }
2350
2351 //      determine actual page where key is located
2352 //  return slot number
2353
2354 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2355 {
2356 BtKey *key = keyptr(source,src);
2357 uint slot = locks[src].slot;
2358 uint entry;
2359
2360         if( src > 1 && locks[src].reuse )
2361           entry = locks[src-1].entry, slot = 0;
2362         else
2363           entry = locks[src].entry;
2364
2365         if( slot ) {
2366                 set->latch = bt->mgr->latchsets + entry;
2367                 set->page = bt_mappage (bt, set->latch);
2368                 return slot;
2369         }
2370
2371         //      is locks->reuse set?
2372         //      if so, find where our key
2373         //      is located on previous page or split pages
2374
2375         do {
2376                 set->latch = bt->mgr->latchsets + entry;
2377                 set->page = bt_mappage (bt, set->latch);
2378
2379                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2380                   if( locks[src].reuse )
2381                         locks[src].entry = entry;
2382                   return slot;
2383                 }
2384         } while( entry = set->latch->split );
2385
2386         bt->err = BTERR_atomic;
2387         return 0;
2388 }
2389
2390 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2391 {
2392 BtKey *key = keyptr(source, src);
2393 BtVal *val = valptr(source, src);
2394 BtLatchSet *latch;
2395 BtPageSet set[1];
2396 uint entry;
2397
2398   while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2399         if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2400           return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2401
2402         if( entry = bt_splitpage (bt, set) )
2403           latch = bt->mgr->latchsets + entry;
2404         else
2405           return bt->err;
2406
2407         //      splice right page into split chain
2408         //      and WriteLock it.
2409
2410         latch->split = set->latch->split;
2411         set->latch->split = entry;
2412         bt_lockpage(BtLockWrite, latch);
2413   }
2414
2415   return bt->err = BTERR_atomic;
2416 }
2417
2418 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2419 {
2420 BtKey *key = keyptr(source, src);
2421 uint idx, entry, slot;
2422 BtPageSet set[1];
2423 BtKey *ptr;
2424 BtVal *val;
2425
2426         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2427                 slotptr(set->page, slot)->dead = 1;
2428         else
2429                 return bt->err = BTERR_struct;
2430
2431         ptr = keyptr(set->page, slot);
2432         val = valptr(set->page, slot);
2433
2434         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2435         set->latch->dirty = 1;
2436         set->page->act--;
2437         return 0;
2438 }
2439
2440 uint bt_parentmatch (AtomicKey *head, uint entry)
2441 {
2442 uint cnt = 0;
2443
2444         if( head )
2445           do if( head->entry == entry )
2446                 head->nounlock = 1, cnt++;
2447           while( head = head->next );
2448
2449         return cnt;
2450 }
2451
2452 //      atomic modification of a batch of keys.
2453
2454 //      return -1 if BTERR is set
2455 //      otherwise return slot number
2456 //      causing the key constraint violation
2457 //      or zero on successful completion.
2458
2459 int bt_atomictxn (BtDb *bt, BtPage source)
2460 {
2461 uint src, idx, slot, samepage, entry;
2462 AtomicKey *head, *tail, *leaf;
2463 BtPageSet set[1], prev[1];
2464 unsigned char value[BtId];
2465 BtKey *key, *ptr, *key2;
2466 BtLatchSet *latch;
2467 AtomicMod *locks;
2468 int result = 0;
2469 BtSlot temp[1];
2470 BtPage page;
2471 BtVal *val;
2472 uid right;
2473 int type;
2474
2475   locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2476   head = NULL;
2477   tail = NULL;
2478
2479   // stable sort the list of keys into order to
2480   //    prevent deadlocks between threads.
2481
2482   for( src = 1; src++ < source->cnt; ) {
2483         *temp = *slotptr(source,src);
2484         key = keyptr (source,src);
2485
2486         for( idx = src; --idx; ) {
2487           key2 = keyptr (source,idx);
2488           if( keycmp (key, key2->key, key2->len) < 0 ) {
2489                 *slotptr(source,idx+1) = *slotptr(source,idx);
2490                 *slotptr(source,idx) = *temp;
2491           } else
2492                 break;
2493         }
2494   }
2495
2496   // Load the leaf page for each key
2497   // group same page references with reuse bit
2498   // and determine any constraint violations
2499
2500   for( src = 0; src++ < source->cnt; ) {
2501         key = keyptr(source, src);
2502         slot = 0;
2503
2504         // first determine if this modification falls
2505         // on the same page as the previous modification
2506         //      note that the far right leaf page is a special case
2507
2508         if( samepage = src > 1 )
2509           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2510                 slot = bt_findslot(set->page, key->key, key->len);
2511           else // release read on previous page
2512                 bt_unlockpage(BtLockRead, set->latch); 
2513
2514         if( !slot )
2515           if( slot = bt_atomicload(bt, set, key->key, key->len) )
2516                 set->latch->split = 0;
2517           else
2518                 return -1;
2519
2520         if( slotptr(set->page, slot)->type == Librarian )
2521           ptr = keyptr(set->page, ++slot);
2522         else
2523           ptr = keyptr(set->page, slot);
2524
2525         if( !samepage ) {
2526           locks[src].entry = set->latch->entry;
2527           locks[src].slot = slot;
2528           locks[src].reuse = 0;
2529         } else {
2530           locks[src].entry = 0;
2531           locks[src].slot = 0;
2532           locks[src].reuse = 1;
2533         }
2534
2535         switch( slotptr(source, src)->type ) {
2536         case Duplicate:
2537         case Unique:
2538           if( !slotptr(set->page, slot)->dead )
2539            if( slot < set->page->cnt || bt_getid (set->page->right) )
2540             if( !keycmp (ptr, key->key, key->len) ) {
2541
2542                   // return constraint violation if key already exists
2543
2544                   bt_unlockpage(BtLockRead, set->latch);
2545                   result = src;
2546
2547                   while( src ) {
2548                         if( locks[src].entry ) {
2549                           set->latch = bt->mgr->latchsets + locks[src].entry;
2550                           bt_unlockpage(BtLockAtomic, set->latch);
2551                           bt_unpinlatch (set->latch);
2552                         }
2553                         src--;
2554                   }
2555                   free (locks);
2556                   return result;
2557                 }
2558           break;
2559         }
2560   }
2561
2562   //  unlock last loadpage lock
2563
2564   if( source->cnt > 1 )
2565         bt_unlockpage(BtLockRead, set->latch);
2566
2567   //  obtain write lock for each master page
2568
2569   for( src = 0; src++ < source->cnt; )
2570         if( locks[src].reuse )
2571           continue;
2572         else
2573           bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2574
2575   // insert or delete each key
2576   // process any splits or merges
2577   // release Write & Atomic latches
2578   // set ParentModifications and build
2579   // queue of keys to insert for split pages
2580   // or delete for deleted pages.
2581
2582   // run through txn list backwards
2583
2584   samepage = source->cnt + 1;
2585
2586   for( src = source->cnt; src; src-- ) {
2587         if( locks[src].reuse )
2588           continue;
2589
2590         //  perform the txn operations
2591         //      from smaller to larger on
2592         //  the same page
2593
2594         for( idx = src; idx < samepage; idx++ )
2595          switch( slotptr(source,idx)->type ) {
2596          case Delete:
2597           if( bt_atomicdelete (bt, source, locks, idx) )
2598                 return -1;
2599           break;
2600
2601         case Duplicate:
2602         case Unique:
2603           if( bt_atomicinsert (bt, source, locks, idx) )
2604                 return -1;
2605           break;
2606         }
2607
2608         //      after the same page operations have finished,
2609         //  process master page for splits or deletion.
2610
2611         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2612         prev->page = bt_mappage (bt, prev->latch);
2613         samepage = src;
2614
2615         //  pick-up all splits from master page
2616
2617         entry = prev->latch->split;
2618
2619         while( entry ) {
2620           set->latch = bt->mgr->latchsets + entry;
2621           set->page = bt_mappage (bt, set->latch);
2622           entry = set->latch->split;
2623
2624           // delete empty master page
2625
2626           if( !prev->page->act ) {
2627                 memcpy (set->page->left, prev->page->left, BtId);
2628                 memcpy (prev->page, set->page, bt->mgr->page_size);
2629                 bt_lockpage (BtLockDelete, set->latch);
2630                 bt_freepage (bt, set);
2631                 bt_unpinlatch (set->latch);
2632
2633                 prev->latch->dirty = 1;
2634                 continue;
2635           }
2636
2637           // remove empty page from the split chain
2638
2639           if( !set->page->act ) {
2640                 memcpy (prev->page->right, set->page->right, BtId);
2641                 prev->latch->split = set->latch->split;
2642                 bt_lockpage (BtLockDelete, set->latch);
2643                 bt_freepage (bt, set);
2644                 bt_unpinlatch (set->latch);
2645                 continue;
2646           }
2647
2648           //  schedule prev fence key update
2649
2650           ptr = keyptr(prev->page,prev->page->cnt);
2651           leaf = calloc (sizeof(AtomicKey), 1);
2652
2653           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2654           leaf->page_no = prev->latch->page_no;
2655           leaf->entry = prev->latch->entry;
2656           leaf->type = 0;
2657
2658           if( tail )
2659                 tail->next = leaf;
2660           else
2661                 head = leaf;
2662
2663           tail = leaf;
2664
2665           bt_putid (set->page->left, prev->latch->page_no);
2666           bt_lockpage(BtLockParent, prev->latch);
2667           bt_unlockpage(BtLockWrite, prev->latch);
2668           *prev = *set;
2669         }
2670
2671         //  update left pointer in next right page from last split page
2672         //      (if all splits were reversed, latch->split == 0)
2673
2674         if( latch->split ) {
2675           //  fix left pointer in master's original (now split) right sibling
2676           //    or set rightmost page in page zero
2677
2678           if( right = bt_getid (prev->page->right) ) {
2679                 if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
2680                   set->page = bt_mappage (bt, set->latch);
2681                 else
2682                   return -1;
2683
2684             bt_lockpage (BtLockWrite, set->latch);
2685             bt_putid (set->page->left, prev->latch->page_no);
2686                 set->latch->dirty = 1;
2687             bt_unlockpage (BtLockWrite, set->latch);
2688                 bt_unpinlatch (set->latch);
2689           } else {      // prev is rightmost page
2690             bt_spinwritelock (bt->mgr->lock);
2691                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2692             bt_spinreleasewrite(bt->mgr->lock);
2693           }
2694
2695           //  Process last page split in chain
2696
2697           ptr = keyptr(prev->page,prev->page->cnt);
2698           leaf = calloc (sizeof(AtomicKey), 1);
2699
2700           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2701           leaf->page_no = prev->latch->page_no;
2702           leaf->entry = prev->latch->entry;
2703           leaf->type = 0;
2704   
2705           if( tail )
2706                 tail->next = leaf;
2707           else
2708                 head = leaf;
2709
2710           tail = leaf;
2711
2712           bt_lockpage(BtLockParent, prev->latch);
2713           bt_unlockpage(BtLockWrite, prev->latch);
2714
2715           //  remove atomic lock on master page
2716
2717           bt_unlockpage(BtLockAtomic, latch);
2718           continue;
2719         }
2720
2721         //  finished if prev page occupied (either master or final split)
2722
2723         if( prev->page->act ) {
2724           bt_unlockpage(BtLockWrite, latch);
2725           bt_unlockpage(BtLockAtomic, latch);
2726           bt_unpinlatch(latch);
2727           continue;
2728         }
2729
2730         // any and all splits were reversed, and the
2731         // master page located in prev is empty, delete it
2732         // by pulling over master's right sibling.
2733
2734         // Delete empty master's fence key
2735
2736         ptr = keyptr(prev->page,prev->page->cnt);
2737         leaf = calloc (sizeof(AtomicKey), 1);
2738
2739         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2740         leaf->page_no = prev->latch->page_no;
2741         leaf->entry = prev->latch->entry;
2742         leaf->type = 1;
2743   
2744         if( tail )
2745           tail->next = leaf;
2746         else
2747           head = leaf;
2748
2749         tail = leaf;
2750
2751         bt_lockpage(BtLockParent, prev->latch);
2752         bt_unlockpage(BtLockWrite, prev->latch);
2753
2754         // grab master's right sibling
2755         //      note that the far right page never empties because
2756         //      it always contains (at least) the infinite stopper key
2757         //      and that all pages that don't contain any keys have
2758         //      been deleted, or are being deleted under write lock.
2759
2760         if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2761                 set->page = bt_mappage (bt, set->latch);
2762         else
2763                 return -1;
2764
2765         bt_lockpage(BtLockWrite, set->latch);
2766
2767         //      and pull contents over empty page
2768         //      while preserving master's left link
2769
2770         memcpy (set->page->left, prev->page->left, BtId);
2771         memcpy (prev->page, set->page, bt->mgr->page_size);
2772
2773         //      forward seekers to old right sibling
2774         //      to new page location in prev
2775
2776         bt_putid (set->page->right, prev->latch->page_no);
2777         set->latch->dirty = 1;
2778         set->page->kill = 1;
2779
2780         //      add new fence key for new master page contents, delete
2781         //      right sibling after parent posts the new master fence.
2782
2783         ptr = keyptr(set->page,set->page->cnt);
2784         leaf = calloc (sizeof(AtomicKey), 1);
2785
2786         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2787         leaf->page_no = prev->latch->page_no;
2788         leaf->entry = set->latch->entry;
2789         leaf->type = 2;
2790   
2791         //  see if right sibling key update is in the FIFO,
2792         //      and ParentLock if not there.
2793
2794         if( !bt_parentmatch (head, leaf->entry) )
2795           bt_lockpage(BtLockParent, set->latch);
2796
2797         bt_unlockpage(BtLockWrite, set->latch);
2798
2799         if( tail )
2800           tail->next = leaf;
2801         else
2802           head = leaf;
2803
2804         tail = leaf;
2805
2806         //  fix new master's right sibling's left pointer
2807
2808         if( right = bt_getid (set->page->right) ) {
2809           if( set->latch = bt_pinlatch (bt, right, 1) )
2810                 set->page = bt_mappage (bt, set->latch);
2811
2812           bt_lockpage (BtLockWrite, set->latch);
2813           bt_putid (set->page->left, prev->latch->page_no);
2814           set->latch->dirty = 1;
2815
2816           bt_unlockpage (BtLockWrite, set->latch);
2817           bt_unpinlatch (set->latch);
2818         } else {        // master is now the far right page
2819           bt_spinwritelock (bt->mgr->lock);
2820           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2821           bt_spinreleasewrite(bt->mgr->lock);
2822         }
2823
2824         bt_unlockpage(BtLockAtomic, latch);
2825   }
2826   
2827   //  add & delete keys for any pages split or merged during transaction
2828
2829   if( leaf = head )
2830     do {
2831           set->latch = bt->mgr->latchsets + leaf->entry;
2832           set->page = bt_mappage (bt, set->latch);
2833
2834           bt_putid (value, leaf->page_no);
2835           ptr = (BtKey *)leaf->leafkey;
2836
2837           switch( leaf->type ) {
2838           case 0:       // insert key
2839             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2840                   return -1;
2841
2842                 break;
2843
2844           case 1:       // delete key
2845                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2846                   return -1;
2847
2848                 break;
2849
2850           case 2:       // insert key & free
2851             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2852                   return -1;
2853
2854                 bt_lockpage (BtLockDelete, set->latch);
2855                 bt_lockpage (BtLockWrite, set->latch);
2856                 bt_freepage (bt, set);
2857                 break;
2858           }
2859
2860           if( !leaf->nounlock )
2861             bt_unlockpage (BtLockParent, set->latch);
2862
2863           bt_unpinlatch (set->latch);
2864           tail = leaf->next;
2865           free (leaf);
2866         } while( leaf = tail );
2867
2868   // return success
2869
2870   free (locks);
2871   return 0;
2872 }
2873
2874 //      set cursor to highest slot on highest page
2875
2876 uint bt_lastkey (BtDb *bt)
2877 {
2878 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2879 BtPageSet set[1];
2880
2881         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2882                 set->page = bt_mappage (bt, set->latch);
2883         else
2884                 return 0;
2885
2886     bt_lockpage(BtLockRead, set->latch);
2887         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2888     bt_unlockpage(BtLockRead, set->latch);
2889         bt_unpinlatch (set->latch);
2890
2891         bt->cursor_page = page_no;
2892         return bt->cursor->cnt;
2893 }
2894
2895 //      return previous slot on cursor page
2896
2897 uint bt_prevkey (BtDb *bt, uint slot)
2898 {
2899 uid ourright, next, us = bt->cursor_page;
2900 BtPageSet set[1];
2901
2902         if( --slot )
2903                 return slot;
2904
2905         ourright = bt_getid(bt->cursor->right);
2906
2907 goleft:
2908         if( !(next = bt_getid(bt->cursor->left)) )
2909                 return 0;
2910
2911 findourself:
2912         bt->cursor_page = next;
2913
2914         if( set->latch = bt_pinlatch (bt, next, 1) )
2915                 set->page = bt_mappage (bt, set->latch);
2916         else
2917                 return 0;
2918
2919     bt_lockpage(BtLockRead, set->latch);
2920         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2921         bt_unlockpage(BtLockRead, set->latch);
2922         bt_unpinlatch (set->latch);
2923         
2924         next = bt_getid (bt->cursor->right);
2925
2926         if( bt->cursor->kill )
2927                 goto findourself;
2928
2929         if( next != us )
2930           if( next == ourright )
2931                 goto goleft;
2932           else
2933                 goto findourself;
2934
2935         return bt->cursor->cnt;
2936 }
2937
2938 //  return next slot on cursor page
2939 //  or slide cursor right into next page
2940
2941 uint bt_nextkey (BtDb *bt, uint slot)
2942 {
2943 BtPageSet set[1];
2944 uid right;
2945
2946   do {
2947         right = bt_getid(bt->cursor->right);
2948
2949         while( slot++ < bt->cursor->cnt )
2950           if( slotptr(bt->cursor,slot)->dead )
2951                 continue;
2952           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2953                 return slot;
2954           else
2955                 break;
2956
2957         if( !right )
2958                 break;
2959
2960         bt->cursor_page = right;
2961
2962         if( set->latch = bt_pinlatch (bt, right, 1) )
2963                 set->page = bt_mappage (bt, set->latch);
2964         else
2965                 return 0;
2966
2967     bt_lockpage(BtLockRead, set->latch);
2968
2969         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2970
2971         bt_unlockpage(BtLockRead, set->latch);
2972         bt_unpinlatch (set->latch);
2973         slot = 0;
2974
2975   } while( 1 );
2976
2977   return bt->err = 0;
2978 }
2979
2980 //  cache page of keys into cursor and return starting slot for given key
2981
2982 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2983 {
2984 BtPageSet set[1];
2985 uint slot;
2986
2987         // cache page for retrieval
2988
2989         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2990           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2991         else
2992           return 0;
2993
2994         bt->cursor_page = set->latch->page_no;
2995
2996         bt_unlockpage(BtLockRead, set->latch);
2997         bt_unpinlatch (set->latch);
2998         return slot;
2999 }
3000
3001 BtKey *bt_key(BtDb *bt, uint slot)
3002 {
3003         return keyptr(bt->cursor, slot);
3004 }
3005
3006 BtVal *bt_val(BtDb *bt, uint slot)
3007 {
3008         return valptr(bt->cursor,slot);
3009 }
3010
3011 #ifdef STANDALONE
3012
3013 #ifndef unix
3014 double getCpuTime(int type)
3015 {
3016 FILETIME crtime[1];
3017 FILETIME xittime[1];
3018 FILETIME systime[1];
3019 FILETIME usrtime[1];
3020 SYSTEMTIME timeconv[1];
3021 double ans = 0;
3022
3023         memset (timeconv, 0, sizeof(SYSTEMTIME));
3024
3025         switch( type ) {
3026         case 0:
3027                 GetSystemTimeAsFileTime (xittime);
3028                 FileTimeToSystemTime (xittime, timeconv);
3029                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3030                 break;
3031         case 1:
3032                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3033                 FileTimeToSystemTime (usrtime, timeconv);
3034                 break;
3035         case 2:
3036                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3037                 FileTimeToSystemTime (systime, timeconv);
3038                 break;
3039         }
3040
3041         ans += (double)timeconv->wHour * 3600;
3042         ans += (double)timeconv->wMinute * 60;
3043         ans += (double)timeconv->wSecond;
3044         ans += (double)timeconv->wMilliseconds / 1000;
3045         return ans;
3046 }
3047 #else
3048 #include <time.h>
3049 #include <sys/resource.h>
3050
3051 double getCpuTime(int type)
3052 {
3053 struct rusage used[1];
3054 struct timeval tv[1];
3055
3056         switch( type ) {
3057         case 0:
3058                 gettimeofday(tv, NULL);
3059                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3060
3061         case 1:
3062                 getrusage(RUSAGE_SELF, used);
3063                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3064
3065         case 2:
3066                 getrusage(RUSAGE_SELF, used);
3067                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3068         }
3069
3070         return 0;
3071 }
3072 #endif
3073
3074 void bt_poolaudit (BtMgr *mgr)
3075 {
3076 BtLatchSet *latch;
3077 uint slot = 0;
3078
3079         while( slot++ < mgr->latchdeployed ) {
3080                 latch = mgr->latchsets + slot;
3081
3082                 if( *latch->readwr->rin & MASK )
3083                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3084                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3085
3086                 if( *latch->access->rin & MASK )
3087                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3088                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3089
3090                 if( *latch->parent->ticket != *latch->parent->serving )
3091                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3092                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3093
3094                 if( latch->pin & ~CLOCK_bit ) {
3095                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3096                         latch->pin = 0;
3097                 }
3098         }
3099 }
3100
3101 uint bt_latchaudit (BtDb *bt)
3102 {
3103 ushort idx, hashidx;
3104 uid next, page_no;
3105 BtLatchSet *latch;
3106 uint cnt = 0;
3107 BtKey *ptr;
3108
3109         if( *(ushort *)(bt->mgr->lock) )
3110                 fprintf(stderr, "Alloc page locked\n");
3111         *(ushort *)(bt->mgr->lock) = 0;
3112
3113         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3114                 latch = bt->mgr->latchsets + idx;
3115                 if( *latch->readwr->rin & MASK )
3116                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3117                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3118
3119                 if( *latch->access->rin & MASK )
3120                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3121                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3122
3123                 if( *latch->parent->ticket != *latch->parent->serving )
3124                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3125                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3126
3127                 if( latch->pin ) {
3128                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3129                         latch->pin = 0;
3130                 }
3131         }
3132
3133         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3134           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3135                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3136
3137           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3138
3139           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3140                 latch = bt->mgr->latchsets + idx;
3141                 if( latch->pin )
3142                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3143           } while( idx = latch->next );
3144         }
3145
3146         page_no = LEAF_page;
3147
3148         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3149         uid off = page_no << bt->mgr->page_bits;
3150 #ifdef unix
3151           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3152 #else
3153         DWORD amt[1];
3154
3155           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3156
3157           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3158                 return bt->err = BTERR_map;
3159
3160           if( *amt <  bt->mgr->page_size )
3161                 return bt->err = BTERR_map;
3162 #endif
3163                 if( !bt->frame->free && !bt->frame->lvl )
3164                         cnt += bt->frame->act;
3165                 page_no++;
3166         }
3167                 
3168         cnt--;  // remove stopper key
3169         fprintf(stderr, " Total keys read %d\n", cnt);
3170
3171         bt_close (bt);
3172         return 0;
3173 }
3174
3175 typedef struct {
3176         char idx;
3177         char *type;
3178         char *infile;
3179         BtMgr *mgr;
3180         int num;
3181 } ThreadArg;
3182
3183 //  standalone program to index file of keys
3184 //  then list them onto std-out
3185
3186 #ifdef unix
3187 void *index_file (void *arg)
3188 #else
3189 uint __stdcall index_file (void *arg)
3190 #endif
3191 {
3192 int line = 0, found = 0, cnt = 0, idx;
3193 uid next, page_no = LEAF_page;  // start on first page of leaves
3194 int ch, len = 0, slot, type = 0;
3195 unsigned char key[BT_maxkey];
3196 unsigned char txn[65536];
3197 ThreadArg *args = arg;
3198 BtPageSet set[1];
3199 uint nxt = 65536;
3200 BtPage page;
3201 BtKey *ptr;
3202 BtVal *val;
3203 BtDb *bt;
3204 FILE *in;
3205
3206         bt = bt_open (args->mgr);
3207         page = (BtPage)txn;
3208
3209         if( args->idx < strlen (args->type) )
3210                 ch = args->type[args->idx];
3211         else
3212                 ch = args->type[strlen(args->type) - 1];
3213
3214         switch(ch | 0x20)
3215         {
3216         case 'a':
3217                 fprintf(stderr, "started latch mgr audit\n");
3218                 cnt = bt_latchaudit (bt);
3219                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3220                 break;
3221
3222         case 'd':
3223                 type = Delete;
3224
3225         case 'p':
3226                 if( !type )
3227                         type = Unique;
3228
3229                 if( args->num )
3230                  if( type == Delete )
3231                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3232                  else
3233                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3234                 else
3235                  if( type == Delete )
3236                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3237                  else
3238                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3239
3240                 if( in = fopen (args->infile, "rb") )
3241                   while( ch = getc(in), ch != EOF )
3242                         if( ch == '\n' )
3243                         {
3244                           line++;
3245
3246                           if( !args->num ) {
3247                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3248                                   fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3249                             len = 0;
3250                                 continue;
3251                           }
3252
3253                           nxt -= len - 10;
3254                           memcpy (txn + nxt, key + 10, len - 10);
3255                           nxt -= 1;
3256                           txn[nxt] = len - 10;
3257                           nxt -= 10;
3258                           memcpy (txn + nxt, key, 10);
3259                           nxt -= 1;
3260                           txn[nxt] = 10;
3261                           slotptr(page,++cnt)->off  = nxt;
3262                           slotptr(page,cnt)->type = type;
3263                           len = 0;
3264
3265                           if( cnt < args->num )
3266                                 continue;
3267
3268                           page->cnt = cnt;
3269                           page->act = cnt;
3270                           page->min = nxt;
3271
3272                           if( bt_atomictxn (bt, page) )
3273                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3274                           nxt = sizeof(txn);
3275                           cnt = 0;
3276
3277                         }
3278                         else if( len < BT_maxkey )
3279                                 key[len++] = ch;
3280                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3281                 break;
3282
3283         case 'w':
3284                 fprintf(stderr, "started indexing for %s\n", args->infile);
3285                 if( in = fopen (args->infile, "r") )
3286                   while( ch = getc(in), ch != EOF )
3287                         if( ch == '\n' )
3288                         {
3289                           line++;
3290
3291                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3292                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3293                           len = 0;
3294                         }
3295                         else if( len < BT_maxkey )
3296                                 key[len++] = ch;
3297                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3298                 break;
3299
3300         case 'f':
3301                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3302                 if( in = fopen (args->infile, "rb") )
3303                   while( ch = getc(in), ch != EOF )
3304                         if( ch == '\n' )
3305                         {
3306                           line++;
3307                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3308                                 found++;
3309                           else if( bt->err )
3310                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3311                           len = 0;
3312                         }
3313                         else if( len < BT_maxkey )
3314                                 key[len++] = ch;
3315                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3316                 break;
3317
3318         case 's':
3319                 fprintf(stderr, "started scanning\n");
3320                 do {
3321                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3322                                 set->page = bt_mappage (bt, set->latch);
3323                         else
3324                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3325                         bt_lockpage (BtLockRead, set->latch);
3326                         next = bt_getid (set->page->right);
3327
3328                         for( slot = 0; slot++ < set->page->cnt; )
3329                          if( next || slot < set->page->cnt )
3330                           if( !slotptr(set->page, slot)->dead ) {
3331                                 ptr = keyptr(set->page, slot);
3332                                 len = ptr->len;
3333
3334                             if( slotptr(set->page, slot)->type == Duplicate )
3335                                         len -= BtId;
3336
3337                                 fwrite (ptr->key, len, 1, stdout);
3338                                 val = valptr(set->page, slot);
3339                                 fwrite (val->value, val->len, 1, stdout);
3340                                 fputc ('\n', stdout);
3341                                 cnt++;
3342                            }
3343
3344                         bt_unlockpage (BtLockRead, set->latch);
3345                         bt_unpinlatch (set->latch);
3346                 } while( page_no = next );
3347
3348                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3349                 break;
3350
3351         case 'r':
3352                 fprintf(stderr, "started reverse scan\n");
3353                 if( slot = bt_lastkey (bt) )
3354                    while( slot = bt_prevkey (bt, slot) ) {
3355                         if( slotptr(bt->cursor, slot)->dead )
3356                           continue;
3357
3358                         ptr = keyptr(bt->cursor, slot);
3359                         len = ptr->len;
3360
3361                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3362                                 len -= BtId;
3363
3364                         fwrite (ptr->key, len, 1, stdout);
3365                         val = valptr(bt->cursor, slot);
3366                         fwrite (val->value, val->len, 1, stdout);
3367                         fputc ('\n', stdout);
3368                         cnt++;
3369                   }
3370
3371                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3372                 break;
3373
3374         case 'c':
3375 #ifdef unix
3376                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3377 #endif
3378                 fprintf(stderr, "started counting\n");
3379                 page_no = LEAF_page;
3380
3381                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3382                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3383                                 break;
3384
3385                         if( !bt->frame->free && !bt->frame->lvl )
3386                                 cnt += bt->frame->act;
3387
3388                         bt->reads++;
3389                         page_no++;
3390                 }
3391                 
3392                 cnt--;  // remove stopper key
3393                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3394                 break;
3395         }
3396
3397         bt_close (bt);
3398 #ifdef unix
3399         return NULL;
3400 #else
3401         return 0;
3402 #endif
3403 }
3404
3405 typedef struct timeval timer;
3406
3407 int main (int argc, char **argv)
3408 {
3409 int idx, cnt, len, slot, err;
3410 int segsize, bits = 16;
3411 double start, stop;
3412 #ifdef unix
3413 pthread_t *threads;
3414 #else
3415 HANDLE *threads;
3416 #endif
3417 ThreadArg *args;
3418 uint poolsize = 0;
3419 float elapsed;
3420 int num = 0;
3421 char key[1];
3422 BtMgr *mgr;
3423 BtKey *ptr;
3424 BtDb *bt;
3425
3426         if( argc < 3 ) {
3427                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3428                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3429                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3430                 fprintf (stderr, "  page_bits is the page size in bits\n");
3431                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3432                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3433                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3434                 exit(0);
3435         }
3436
3437         start = getCpuTime(0);
3438
3439         if( argc > 3 )
3440                 bits = atoi(argv[3]);
3441
3442         if( argc > 4 )
3443                 poolsize = atoi(argv[4]);
3444
3445         if( !poolsize )
3446                 fprintf (stderr, "Warning: no mapped_pool\n");
3447
3448         if( argc > 5 )
3449                 num = atoi(argv[5]);
3450
3451         cnt = argc - 6;
3452 #ifdef unix
3453         threads = malloc (cnt * sizeof(pthread_t));
3454 #else
3455         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3456 #endif
3457         args = malloc (cnt * sizeof(ThreadArg));
3458
3459         mgr = bt_mgr ((argv[1]), bits, poolsize);
3460
3461         if( !mgr ) {
3462                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3463                 exit (1);
3464         }
3465
3466         //      fire off threads
3467
3468         for( idx = 0; idx < cnt; idx++ ) {
3469                 args[idx].infile = argv[idx + 6];
3470                 args[idx].type = argv[2];
3471                 args[idx].mgr = mgr;
3472                 args[idx].num = num;
3473                 args[idx].idx = idx;
3474 #ifdef unix
3475                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3476                         fprintf(stderr, "Error creating thread %d\n", err);
3477 #else
3478                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3479 #endif
3480         }
3481
3482         //      wait for termination
3483
3484 #ifdef unix
3485         for( idx = 0; idx < cnt; idx++ )
3486                 pthread_join (threads[idx], NULL);
3487 #else
3488         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3489
3490         for( idx = 0; idx < cnt; idx++ )
3491                 CloseHandle(threads[idx]);
3492
3493 #endif
3494         bt_poolaudit(mgr);
3495         bt_mgrclose (mgr);
3496
3497         elapsed = getCpuTime(0) - start;
3498         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3499         elapsed = getCpuTime(1);
3500         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3501         elapsed = getCpuTime(2);
3502         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3503 }
3504
3505 #endif  //STANDALONE