]> pd.if.org Git - btree/blob - threadskv8.c
44411881f67455309eede48bb81ef3ae46e81d77
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and ACID batched key-value updates
9
10 // 26 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 /*
88 There are six lock types for each node in four independent sets: 
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
95 */
96
97 typedef enum{
98         BtLockAccess = 1,
99         BtLockDelete = 2,
100         BtLockRead   = 4,
101         BtLockWrite  = 8,
102         BtLockParent = 16,
103         BtLockAtomic = 32
104 } BtLock;
105
106 //      definition for phase-fair reader/writer lock implementation
107
108 typedef struct {
109         ushort rin[1];
110         ushort rout[1];
111         ushort ticket[1];
112         ushort serving[1];
113 } RWLock;
114
115 //      write only queue lock
116
117 typedef struct {
118         ushort ticket[1];
119         ushort serving[1];
120 } WOLock;
121
122 #define PHID 0x1
123 #define PRES 0x2
124 #define MASK 0x3
125 #define RINC 0x4
126
127 //      definition for spin latch implementation
128
129 // exclusive is set for write access
130 // share is count of read accessors
131 // grant write lock when share == 0
132
133 volatile typedef struct {
134         ushort exclusive:1;
135         ushort pending:1;
136         ushort share:14;
137 } BtSpinLatch;
138
139 #define XCL 1
140 #define PEND 2
141 #define BOTH 3
142 #define SHARE 4
143
144 //  hash table entries
145
146 typedef struct {
147         volatile uint slot;             // Latch table entry at head of chain
148         BtSpinLatch latch[1];
149 } BtHashEntry;
150
151 //      latch manager table structure
152
153 typedef struct {
154         uid page_no;                    // latch set page number
155         RWLock readwr[1];               // read/write page lock
156         RWLock access[1];               // Access Intent/Page delete
157         WOLock parent[1];               // Posting of fence key in parent
158         WOLock atomic[1];               // Atomic update in progress
159         uint split;                             // right split page atomic insert
160         uint entry;                             // entry slot in latch table
161         uint next;                              // next entry in hash table chain
162         uint prev;                              // prev entry in hash table chain
163         volatile ushort pin;    // number of outstanding threads
164         ushort dirty:1;                 // page in cache is dirty
165 } BtLatchSet;
166
167 //      Define the length of the page record numbers
168
169 #define BtId 6
170
171 //      Page key slot definition.
172
173 //      Keys are marked dead, but remain on the page until
174 //      it cleanup is called. The fence key (highest key) for
175 //      a leaf page is always present, even after cleanup.
176
177 //      Slot types
178
179 //      In addition to the Unique keys that occupy slots
180 //      there are Librarian and Duplicate key
181 //      slots occupying the key slot array.
182
183 //      The Librarian slots are dead keys that
184 //      serve as filler, available to add new Unique
185 //      or Dup slots that are inserted into the B-tree.
186
187 //      The Duplicate slots have had their key bytes extended
188 //      by 6 bytes to contain a binary duplicate key uniqueifier.
189
190 typedef enum {
191         Unique,
192         Librarian,
193         Duplicate,
194         Delete,
195         Update
196 } BtSlotType;
197
198 typedef struct {
199         uint off:BT_maxbits;    // page offset for key start
200         uint type:3;                    // type of slot
201         uint dead:1;                    // set for deleted slot
202 } BtSlot;
203
204 //      The key structure occupies space at the upper end of
205 //      each page.  It's a length byte followed by the key
206 //      bytes.
207
208 typedef struct {
209         unsigned char len;              // this can be changed to a ushort or uint
210         unsigned char key[0];
211 } BtKey;
212
213 //      the value structure also occupies space at the upper
214 //      end of the page. Each key is immediately followed by a value.
215
216 typedef struct {
217         unsigned char len;              // this can be changed to a ushort or uint
218         unsigned char value[0];
219 } BtVal;
220
221 #define BT_maxkey       255             // maximum number of bytes in a key
222 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
223
224 //      The first part of an index page.
225 //      It is immediately followed
226 //      by the BtSlot array of keys.
227
228 //      note that this structure size
229 //      must be a multiple of 8 bytes
230 //      in order to place dups correctly.
231
232 typedef struct BtPage_ {
233         uint cnt;                                       // count of keys in page
234         uint act;                                       // count of active keys
235         uint min;                                       // next key offset
236         uint garbage;                           // page garbage in bytes
237         unsigned char bits:7;           // page size in bits
238         unsigned char free:1;           // page is on free chain
239         unsigned char lvl:7;            // level of page
240         unsigned char kill:1;           // page is being deleted
241         unsigned char right[BtId];      // page number to right
242         unsigned char left[BtId];       // page number to left
243         unsigned char filler[2];        // padding to multiple of 8
244 } *BtPage;
245
246 //  The loadpage interface object
247
248 typedef struct {
249         BtPage page;            // current page pointer
250         BtLatchSet *latch;      // current page latch set
251 } BtPageSet;
252
253 //      structure for latch manager on ALLOC_page
254
255 typedef struct {
256         struct BtPage_ alloc[1];        // next page_no in right ptr
257         unsigned long long dups[1];     // global duplicate key uniqueifier
258         unsigned char chain[BtId];      // head of free page_nos chain
259 } BtPageZero;
260
261 //      The object structure for Btree access
262
263 typedef struct {
264         uint page_size;                         // page size    
265         uint page_bits;                         // page size in bits    
266 #ifdef unix
267         int idx;
268 #else
269         HANDLE idx;
270 #endif
271         BtPageZero *pagezero;           // mapped allocation page
272         BtSpinLatch lock[1];            // allocation area lite latch
273         uint latchdeployed;                     // highest number of latch entries deployed
274         uint nlatchpage;                        // number of latch pages at BT_latch
275         uint latchtotal;                        // number of page latch entries
276         uint latchhash;                         // number of latch hash table slots
277         uint latchvictim;                       // next latch entry to examine
278         ushort thread_no[1];            // next thread number
279         BtHashEntry *hashtable;         // the buffer pool hash table entries
280         BtLatchSet *latchsets;          // mapped latch set from buffer pool
281         unsigned char *pagepool;        // mapped to the buffer pool pages
282 #ifndef unix
283         HANDLE halloc;                          // allocation handle
284         HANDLE hpool;                           // buffer pool handle
285 #endif
286 } BtMgr;
287
288 typedef struct {
289         BtMgr *mgr;                             // buffer manager for thread
290         BtPage cursor;                  // cached frame for start/next (never mapped)
291         BtPage frame;                   // spare frame for the page split (never mapped)
292         uid cursor_page;                                // current cursor page number   
293         unsigned char *mem;                             // frame, cursor, page memory buffer
294         unsigned char key[BT_keyarray]; // last found complete key
295         int found;                              // last delete or insert was found
296         int err;                                // last error
297         int reads, writes;              // number of reads and writes from the btree
298         ushort thread_no;               // thread number
299 } BtDb;
300
301 typedef enum {
302         BTERR_ok = 0,
303         BTERR_struct,
304         BTERR_ovflw,
305         BTERR_lock,
306         BTERR_map,
307         BTERR_read,
308         BTERR_wrt,
309         BTERR_atomic
310 } BTERR;
311
312 #define CLOCK_bit 0x8000
313
314 // B-Tree functions
315 extern void bt_close (BtDb *bt);
316 extern BtDb *bt_open (BtMgr *mgr);
317 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
318 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
319 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
320 extern BtKey *bt_foundkey (BtDb *bt);
321 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
322 extern uint bt_nextkey   (BtDb *bt, uint slot);
323
324 //      manager functions
325 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
326 void bt_mgrclose (BtMgr *mgr);
327
328 //  Helper functions to return slot values
329 //      from the cursor page.
330
331 extern BtKey *bt_key (BtDb *bt, uint slot);
332 extern BtVal *bt_val (BtDb *bt, uint slot);
333
334 //  The page is allocated from low and hi ends.
335 //  The key slots are allocated from the bottom,
336 //      while the text and value of the key
337 //  are allocated from the top.  When the two
338 //  areas meet, the page is split into two.
339
340 //  A key consists of a length byte, two bytes of
341 //  index number (0 - 65535), and up to 253 bytes
342 //  of key value.
343
344 //  Associated with each key is a value byte string
345 //      containing any value desired.
346
347 //  The b-tree root is always located at page 1.
348 //      The first leaf page of level zero is always
349 //      located on page 2.
350
351 //      The b-tree pages are linked with next
352 //      pointers to facilitate enumerators,
353 //      and provide for concurrency.
354
355 //      When to root page fills, it is split in two and
356 //      the tree height is raised by a new root at page
357 //      one with two keys.
358
359 //      Deleted keys are marked with a dead bit until
360 //      page cleanup. The fence key for a leaf node is
361 //      always present
362
363 //  To achieve maximum concurrency one page is locked at a time
364 //  as the tree is traversed to find leaf key in question. The right
365 //  page numbers are used in cases where the page is being split,
366 //      or consolidated.
367
368 //  Page 0 is dedicated to lock for new page extensions,
369 //      and chains empty pages together for reuse. It also
370 //      contains the latch manager hash table.
371
372 //      The ParentModification lock on a node is obtained to serialize posting
373 //      or changing the fence key for a node.
374
375 //      Empty pages are chained together through the ALLOC page and reused.
376
377 //      Access macros to address slot and key values from the page
378 //      Page slots use 1 based indexing.
379
380 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
381 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
382 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
383
384 void bt_putid(unsigned char *dest, uid id)
385 {
386 int i = BtId;
387
388         while( i-- )
389                 dest[i] = (unsigned char)id, id >>= 8;
390 }
391
392 uid bt_getid(unsigned char *src)
393 {
394 uid id = 0;
395 int i;
396
397         for( i = 0; i < BtId; i++ )
398                 id <<= 8, id |= *src++; 
399
400         return id;
401 }
402
403 uid bt_newdup (BtDb *bt)
404 {
405 #ifdef unix
406         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
407 #else
408         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
409 #endif
410 }
411
412 //      Write-Only Queue Lock
413
414 void WriteOLock (WOLock *lock)
415 {
416 ushort tix;
417 #ifdef unix
418         tix = __sync_fetch_and_add (lock->ticket, 1);
419 #else
420         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
421 #endif
422         // wait for our ticket to come up
423
424         while( tix != lock->serving[0] )
425 #ifdef unix
426                 sched_yield();
427 #else
428                 SwitchToThread ();
429 #endif
430 }
431
432 void WriteORelease (WOLock *lock)
433 {
434         lock->serving[0]++;
435 }
436
437 //      Phase-Fair reader/writer lock implementation
438
439 void WriteLock (RWLock *lock)
440 {
441 ushort w, r, tix;
442
443 #ifdef unix
444         tix = __sync_fetch_and_add (lock->ticket, 1);
445 #else
446         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
447 #endif
448         // wait for our ticket to come up
449
450         while( tix != lock->serving[0] )
451 #ifdef unix
452                 sched_yield();
453 #else
454                 SwitchToThread ();
455 #endif
456
457         w = PRES | (tix & PHID);
458 #ifdef  unix
459         r = __sync_fetch_and_add (lock->rin, w);
460 #else
461         r = _InterlockedExchangeAdd16 (lock->rin, w);
462 #endif
463         while( r != *lock->rout )
464 #ifdef unix
465                 sched_yield();
466 #else
467                 SwitchToThread();
468 #endif
469 }
470
471 void WriteRelease (RWLock *lock)
472 {
473 #ifdef unix
474         __sync_fetch_and_and (lock->rin, ~MASK);
475 #else
476         _InterlockedAnd16 (lock->rin, ~MASK);
477 #endif
478         lock->serving[0]++;
479 }
480
481 void ReadLock (RWLock *lock)
482 {
483 ushort w;
484 #ifdef unix
485         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
486 #else
487         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
488 #endif
489         if( w )
490           while( w == (*lock->rin & MASK) )
491 #ifdef unix
492                 sched_yield ();
493 #else
494                 SwitchToThread ();
495 #endif
496 }
497
498 void ReadRelease (RWLock *lock)
499 {
500 #ifdef unix
501         __sync_fetch_and_add (lock->rout, RINC);
502 #else
503         _InterlockedExchangeAdd16 (lock->rout, RINC);
504 #endif
505 }
506
507 //      Spin Latch Manager
508
509 //      wait until write lock mode is clear
510 //      and add 1 to the share count
511
512 void bt_spinreadlock(BtSpinLatch *latch)
513 {
514 ushort prev;
515
516   do {
517 #ifdef unix
518         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
519 #else
520         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
521 #endif
522         //  see if exclusive request is granted or pending
523
524         if( !(prev & BOTH) )
525                 return;
526 #ifdef unix
527         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
528 #else
529         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
530 #endif
531 #ifdef  unix
532   } while( sched_yield(), 1 );
533 #else
534   } while( SwitchToThread(), 1 );
535 #endif
536 }
537
538 //      wait for other read and write latches to relinquish
539
540 void bt_spinwritelock(BtSpinLatch *latch)
541 {
542 ushort prev;
543
544   do {
545 #ifdef  unix
546         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
547 #else
548         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
549 #endif
550         if( !(prev & XCL) )
551           if( !(prev & ~BOTH) )
552                 return;
553           else
554 #ifdef unix
555                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
556 #else
557                 _InterlockedAnd16((ushort *)latch, ~XCL);
558 #endif
559 #ifdef  unix
560   } while( sched_yield(), 1 );
561 #else
562   } while( SwitchToThread(), 1 );
563 #endif
564 }
565
566 //      try to obtain write lock
567
568 //      return 1 if obtained,
569 //              0 otherwise
570
571 int bt_spinwritetry(BtSpinLatch *latch)
572 {
573 ushort prev;
574
575 #ifdef  unix
576         prev = __sync_fetch_and_or((ushort *)latch, XCL);
577 #else
578         prev = _InterlockedOr16((ushort *)latch, XCL);
579 #endif
580         //      take write access if all bits are clear
581
582         if( !(prev & XCL) )
583           if( !(prev & ~BOTH) )
584                 return 1;
585           else
586 #ifdef unix
587                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
588 #else
589                 _InterlockedAnd16((ushort *)latch, ~XCL);
590 #endif
591         return 0;
592 }
593
594 //      clear write mode
595
596 void bt_spinreleasewrite(BtSpinLatch *latch)
597 {
598 #ifdef unix
599         __sync_fetch_and_and((ushort *)latch, ~BOTH);
600 #else
601         _InterlockedAnd16((ushort *)latch, ~BOTH);
602 #endif
603 }
604
605 //      decrement reader count
606
607 void bt_spinreleaseread(BtSpinLatch *latch)
608 {
609 #ifdef unix
610         __sync_fetch_and_add((ushort *)latch, -SHARE);
611 #else
612         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
613 #endif
614 }
615
616 //      read page from permanent location in Btree file
617
618 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
619 {
620 off64_t off = page_no << mgr->page_bits;
621
622 #ifdef unix
623         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
624                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
625                 return BTERR_read;
626         }
627 #else
628 OVERLAPPED ovl[1];
629 uint amt[1];
630
631         memset (ovl, 0, sizeof(OVERLAPPED));
632         ovl->Offset = off;
633         ovl->OffsetHigh = off >> 32;
634
635         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
636                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
637                 return BTERR_read;
638         }
639         if( *amt <  mgr->page_size ) {
640                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
641                 return BTERR_read;
642         }
643 #endif
644         return 0;
645 }
646
647 //      write page to permanent location in Btree file
648 //      clear the dirty bit
649
650 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
651 {
652 off64_t off = page_no << mgr->page_bits;
653
654 #ifdef unix
655         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
656                 return BTERR_wrt;
657 #else
658 OVERLAPPED ovl[1];
659 uint amt[1];
660
661         memset (ovl, 0, sizeof(OVERLAPPED));
662         ovl->Offset = off;
663         ovl->OffsetHigh = off >> 32;
664
665         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
666                 return BTERR_wrt;
667
668         if( *amt <  mgr->page_size )
669                 return BTERR_wrt;
670 #endif
671         return 0;
672 }
673
674 //      link latch table entry into head of latch hash table
675
676 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
677 {
678 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
679 BtLatchSet *latch = bt->mgr->latchsets + slot;
680
681         if( latch->next = bt->mgr->hashtable[hashidx].slot )
682                 bt->mgr->latchsets[latch->next].prev = slot;
683
684         bt->mgr->hashtable[hashidx].slot = slot;
685         latch->page_no = page_no;
686         latch->entry = slot;
687         latch->split = 0;
688         latch->prev = 0;
689         latch->pin = 1;
690
691         if( loadit )
692           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
693                 return bt->err;
694           else
695                 bt->reads++;
696
697         return bt->err = 0;
698 }
699
700 //      set CLOCK bit in latch
701 //      decrement pin count
702
703 void bt_unpinlatch (BtLatchSet *latch)
704 {
705 #ifdef unix
706         if( ~latch->pin & CLOCK_bit )
707                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
708         __sync_fetch_and_add(&latch->pin, -1);
709 #else
710         if( ~latch->pin & CLOCK_bit )
711                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
712         _InterlockedDecrement16 (&latch->pin);
713 #endif
714 }
715
716 //  return the btree cached page address
717
718 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
719 {
720 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
721
722         return page;
723 }
724
725 //      find existing latchset or inspire new one
726 //      return with latchset pinned
727
728 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
729 {
730 uint hashidx = page_no % bt->mgr->latchhash;
731 BtLatchSet *latch;
732 uint slot, idx;
733 uint lvl, cnt;
734 off64_t off;
735 uint amt[1];
736 BtPage page;
737
738   //  try to find our entry
739
740   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
741
742   if( slot = bt->mgr->hashtable[hashidx].slot ) do
743   {
744         latch = bt->mgr->latchsets + slot;
745         if( page_no == latch->page_no )
746                 break;
747   } while( slot = latch->next );
748
749   //  found our entry
750   //  increment clock
751
752   if( slot ) {
753         latch = bt->mgr->latchsets + slot;
754 #ifdef unix
755         __sync_fetch_and_add(&latch->pin, 1);
756 #else
757         _InterlockedIncrement16 (&latch->pin);
758 #endif
759         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
760         return latch;
761   }
762
763         //  see if there are any unused pool entries
764 #ifdef unix
765         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
766 #else
767         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
768 #endif
769
770         if( slot < bt->mgr->latchtotal ) {
771                 latch = bt->mgr->latchsets + slot;
772                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
773                         return NULL;
774                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
775                 return latch;
776         }
777
778 #ifdef unix
779         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
780 #else
781         _InterlockedDecrement (&bt->mgr->latchdeployed);
782 #endif
783   //  find and reuse previous entry on victim
784
785   while( 1 ) {
786 #ifdef unix
787         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
788 #else
789         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
790 #endif
791         // try to get write lock on hash chain
792         //      skip entry if not obtained
793         //      or has outstanding pins
794
795         slot %= bt->mgr->latchtotal;
796
797         if( !slot )
798                 continue;
799
800         latch = bt->mgr->latchsets + slot;
801         idx = latch->page_no % bt->mgr->latchhash;
802
803         //      see we are on same chain as hashidx
804
805         if( idx == hashidx )
806                 continue;
807
808         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
809                 continue;
810
811         //  skip this slot if it is pinned
812         //      or the CLOCK bit is set
813
814         if( latch->pin ) {
815           if( latch->pin & CLOCK_bit ) {
816 #ifdef unix
817                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
818 #else
819                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
820 #endif
821           }
822           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
823           continue;
824         }
825
826         //  update permanent page area in btree from buffer pool
827
828         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
829
830         if( latch->dirty )
831           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
832                 return NULL;
833           else
834                 latch->dirty = 0, bt->writes++;
835
836         //  unlink our available slot from its hash chain
837
838         if( latch->prev )
839                 bt->mgr->latchsets[latch->prev].next = latch->next;
840         else
841                 bt->mgr->hashtable[idx].slot = latch->next;
842
843         if( latch->next )
844                 bt->mgr->latchsets[latch->next].prev = latch->prev;
845
846         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
847
848         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
849                 return NULL;
850
851         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
852         return latch;
853   }
854 }
855
856 void bt_mgrclose (BtMgr *mgr)
857 {
858 BtLatchSet *latch;
859 uint num = 0;
860 BtPage page;
861 uint slot;
862
863         //      flush dirty pool pages to the btree
864
865         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
866                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
867                 latch = mgr->latchsets + slot;
868
869                 if( latch->dirty ) {
870                         bt_writepage(mgr, page, latch->page_no);
871                         latch->dirty = 0, num++;
872                 }
873 //              madvise (page, mgr->page_size, MADV_DONTNEED);
874         }
875
876         fprintf(stderr, "%d buffer pool pages flushed\n", num);
877
878 #ifdef unix
879         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
880         munmap (mgr->pagezero, mgr->page_size);
881 #else
882         FlushViewOfFile(mgr->pagezero, 0);
883         UnmapViewOfFile(mgr->pagezero);
884         UnmapViewOfFile(mgr->hashtable);
885         CloseHandle(mgr->halloc);
886         CloseHandle(mgr->hpool);
887 #endif
888 #ifdef unix
889         close (mgr->idx);
890         free (mgr);
891 #else
892         FlushFileBuffers(mgr->idx);
893         CloseHandle(mgr->idx);
894         GlobalFree (mgr);
895 #endif
896 }
897
898 //      close and release memory
899
900 void bt_close (BtDb *bt)
901 {
902 #ifdef unix
903         if( bt->mem )
904                 free (bt->mem);
905 #else
906         if( bt->mem)
907                 VirtualFree (bt->mem, 0, MEM_RELEASE);
908 #endif
909         free (bt);
910 }
911
912 //  open/create new btree buffer manager
913
914 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
915 //              size of page pool (e.g. 262144)
916
917 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
918 {
919 uint lvl, attr, last, slot, idx;
920 uint nlatchpage, latchhash;
921 unsigned char value[BtId];
922 int flag, initit = 0;
923 BtPageZero *pagezero;
924 off64_t size;
925 uint amt[1];
926 BtMgr* mgr;
927 BtKey* key;
928 BtVal *val;
929
930         // determine sanity of page size and buffer pool
931
932         if( bits > BT_maxbits )
933                 bits = BT_maxbits;
934         else if( bits < BT_minbits )
935                 bits = BT_minbits;
936
937         if( nodemax < 16 ) {
938                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
939                 return NULL;
940         }
941
942 #ifdef unix
943         mgr = calloc (1, sizeof(BtMgr));
944
945         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
946
947         if( mgr->idx == -1 ) {
948                 fprintf (stderr, "Unable to open btree file\n");
949                 return free(mgr), NULL;
950         }
951 #else
952         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
953         attr = FILE_ATTRIBUTE_NORMAL;
954         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
955
956         if( mgr->idx == INVALID_HANDLE_VALUE )
957                 return GlobalFree(mgr), NULL;
958 #endif
959
960 #ifdef unix
961         pagezero = valloc (BT_maxpage);
962         *amt = 0;
963
964         // read minimum page size to get root info
965         //      to support raw disk partition files
966         //      check if bits == 0 on the disk.
967
968         if( size = lseek (mgr->idx, 0L, 2) )
969                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
970                         if( pagezero->alloc->bits )
971                                 bits = pagezero->alloc->bits;
972                         else
973                                 initit = 1;
974                 else
975                         return free(mgr), free(pagezero), NULL;
976         else
977                 initit = 1;
978 #else
979         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
980         size = GetFileSize(mgr->idx, amt);
981
982         if( size || *amt ) {
983                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
984                         return bt_mgrclose (mgr), NULL;
985                 bits = pagezero->alloc->bits;
986         } else
987                 initit = 1;
988 #endif
989
990         mgr->page_size = 1 << bits;
991         mgr->page_bits = bits;
992
993         //  calculate number of latch hash table entries
994
995         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
996         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
997
998         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
999         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1000         mgr->latchtotal = nodemax;
1001
1002         if( !initit )
1003                 goto mgrlatch;
1004
1005         // initialize an empty b-tree with latch page, root page, page of leaves
1006         // and page(s) of latches and page pool cache
1007
1008         memset (pagezero, 0, 1 << bits);
1009         pagezero->alloc->bits = mgr->page_bits;
1010         bt_putid(pagezero->alloc->right, MIN_lvl+1);
1011
1012         //  initialize left-most LEAF page in
1013         //      alloc->left.
1014
1015         bt_putid (pagezero->alloc->left, LEAF_page);
1016
1017         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1018                 fprintf (stderr, "Unable to create btree page zero\n");
1019                 return bt_mgrclose (mgr), NULL;
1020         }
1021
1022         memset (pagezero, 0, 1 << bits);
1023         pagezero->alloc->bits = mgr->page_bits;
1024
1025         for( lvl=MIN_lvl; lvl--; ) {
1026                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1027                 key = keyptr(pagezero->alloc, 1);
1028                 key->len = 2;           // create stopper key
1029                 key->key[0] = 0xff;
1030                 key->key[1] = 0xff;
1031
1032                 bt_putid(value, MIN_lvl - lvl + 1);
1033                 val = valptr(pagezero->alloc, 1);
1034                 val->len = lvl ? BtId : 0;
1035                 memcpy (val->value, value, val->len);
1036
1037                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1038                 pagezero->alloc->lvl = lvl;
1039                 pagezero->alloc->cnt = 1;
1040                 pagezero->alloc->act = 1;
1041
1042                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1043                         fprintf (stderr, "Unable to create btree page zero\n");
1044                         return bt_mgrclose (mgr), NULL;
1045                 }
1046         }
1047
1048 mgrlatch:
1049 #ifdef unix
1050         free (pagezero);
1051 #else
1052         VirtualFree (pagezero, 0, MEM_RELEASE);
1053 #endif
1054 #ifdef unix
1055         // mlock the pagezero page
1056
1057         flag = PROT_READ | PROT_WRITE;
1058         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1059         if( mgr->pagezero == MAP_FAILED ) {
1060                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1061                 return bt_mgrclose (mgr), NULL;
1062         }
1063         mlock (mgr->pagezero, mgr->page_size);
1064
1065         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1066         if( mgr->hashtable == MAP_FAILED ) {
1067                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1068                 return bt_mgrclose (mgr), NULL;
1069         }
1070 #else
1071         flag = PAGE_READWRITE;
1072         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1073         if( !mgr->halloc ) {
1074                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1075                 return bt_mgrclose (mgr), NULL;
1076         }
1077
1078         flag = FILE_MAP_WRITE;
1079         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1080         if( !mgr->pagezero ) {
1081                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1082                 return bt_mgrclose (mgr), NULL;
1083         }
1084
1085         flag = PAGE_READWRITE;
1086         size = (uid)mgr->nlatchpage << mgr->page_bits;
1087         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1088         if( !mgr->hpool ) {
1089                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1090                 return bt_mgrclose (mgr), NULL;
1091         }
1092
1093         flag = FILE_MAP_WRITE;
1094         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1095         if( !mgr->hashtable ) {
1096                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1097                 return bt_mgrclose (mgr), NULL;
1098         }
1099 #endif
1100
1101         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1102         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1103
1104         return mgr;
1105 }
1106
1107 //      open BTree access method
1108 //      based on buffer manager
1109
1110 BtDb *bt_open (BtMgr *mgr)
1111 {
1112 BtDb *bt = malloc (sizeof(*bt));
1113
1114         memset (bt, 0, sizeof(*bt));
1115         bt->mgr = mgr;
1116 #ifdef unix
1117         bt->mem = valloc (2 *mgr->page_size);
1118 #else
1119         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1120 #endif
1121         bt->frame = (BtPage)bt->mem;
1122         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1123 #ifdef unix
1124         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1125 #else
1126         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1127 #endif
1128         return bt;
1129 }
1130
1131 //  compare two keys, return > 0, = 0, or < 0
1132 //  =0: keys are same
1133 //  -1: key2 > key1
1134 //  +1: key2 < key1
1135 //  as the comparison value
1136
1137 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1138 {
1139 uint len1 = key1->len;
1140 int ans;
1141
1142         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1143                 return ans;
1144
1145         if( len1 > len2 )
1146                 return 1;
1147         if( len1 < len2 )
1148                 return -1;
1149
1150         return 0;
1151 }
1152
1153 // place write, read, or parent lock on requested page_no.
1154
1155 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1156 {
1157         switch( mode ) {
1158         case BtLockRead:
1159                 ReadLock (latch->readwr);
1160                 break;
1161         case BtLockWrite:
1162                 WriteLock (latch->readwr);
1163                 break;
1164         case BtLockAccess:
1165                 ReadLock (latch->access);
1166                 break;
1167         case BtLockDelete:
1168                 WriteLock (latch->access);
1169                 break;
1170         case BtLockParent:
1171                 WriteOLock (latch->parent);
1172                 break;
1173         case BtLockAtomic:
1174                 WriteOLock (latch->atomic);
1175                 break;
1176         }
1177 }
1178
1179 // remove write, read, or parent lock on requested page
1180
1181 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1182 {
1183         switch( mode ) {
1184         case BtLockRead:
1185                 ReadRelease (latch->readwr);
1186                 break;
1187         case BtLockWrite:
1188                 WriteRelease (latch->readwr);
1189                 break;
1190         case BtLockAccess:
1191                 ReadRelease (latch->access);
1192                 break;
1193         case BtLockDelete:
1194                 WriteRelease (latch->access);
1195                 break;
1196         case BtLockParent:
1197                 WriteORelease (latch->parent);
1198                 break;
1199         case BtLockAtomic:
1200                 WriteORelease (latch->atomic);
1201                 break;
1202         }
1203 }
1204
1205 //      allocate a new page
1206 //      return with page latched, but unlocked.
1207
1208 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1209 {
1210 uid page_no;
1211 int blk;
1212
1213         //      lock allocation page
1214
1215         bt_spinwritelock(bt->mgr->lock);
1216
1217         // use empty chain first
1218         // else allocate empty page
1219
1220         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1221                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1222                         set->page = bt_mappage (bt, set->latch);
1223                 else
1224                         return bt->err = BTERR_struct, -1;
1225
1226                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1227                 bt_spinreleasewrite(bt->mgr->lock);
1228
1229                 memcpy (set->page, contents, bt->mgr->page_size);
1230                 set->latch->dirty = 1;
1231                 return 0;
1232         }
1233
1234         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1235         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1236
1237         // unlock allocation latch
1238
1239         bt_spinreleasewrite(bt->mgr->lock);
1240
1241         //      don't load cache from btree page
1242
1243         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1244                 set->page = bt_mappage (bt, set->latch);
1245         else
1246                 return bt->err = BTERR_struct;
1247
1248         memcpy (set->page, contents, bt->mgr->page_size);
1249         set->latch->dirty = 1;
1250         return 0;
1251 }
1252
1253 //  find slot in page for given key at a given level
1254
1255 int bt_findslot (BtPage page, unsigned char *key, uint len)
1256 {
1257 uint diff, higher = page->cnt, low = 1, slot;
1258 uint good = 0;
1259
1260         //        make stopper key an infinite fence value
1261
1262         if( bt_getid (page->right) )
1263                 higher++;
1264         else
1265                 good++;
1266
1267         //      low is the lowest candidate.
1268         //  loop ends when they meet
1269
1270         //  higher is already
1271         //      tested as .ge. the passed key.
1272
1273         while( diff = higher - low ) {
1274                 slot = low + ( diff >> 1 );
1275                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1276                         low = slot + 1;
1277                 else
1278                         higher = slot, good++;
1279         }
1280
1281         //      return zero if key is on right link page
1282
1283         return good ? higher : 0;
1284 }
1285
1286 //  find and load page at given level for given key
1287 //      leave page rd or wr locked as requested
1288
1289 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1290 {
1291 uid page_no = ROOT_page, prevpage = 0;
1292 uint drill = 0xff, slot;
1293 BtLatchSet *prevlatch;
1294 uint mode, prevmode;
1295
1296   //  start at root of btree and drill down
1297
1298   do {
1299         // determine lock mode of drill level
1300         mode = (drill == lvl) ? lock : BtLockRead; 
1301
1302         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1303           return 0;
1304
1305         // obtain access lock using lock chaining with Access mode
1306
1307         if( page_no > ROOT_page )
1308           bt_lockpage(bt, BtLockAccess, set->latch);
1309
1310         set->page = bt_mappage (bt, set->latch);
1311
1312         //      release & unpin parent or left sibling page
1313
1314         if( prevpage ) {
1315           bt_unlockpage(bt, prevmode, prevlatch);
1316           bt_unpinlatch (prevlatch);
1317           prevpage = 0;
1318         }
1319
1320         // obtain mode lock using lock chaining through AccessLock
1321
1322         bt_lockpage(bt, mode, set->latch);
1323
1324         if( set->page->free )
1325                 return bt->err = BTERR_struct, 0;
1326
1327         if( page_no > ROOT_page )
1328           bt_unlockpage(bt, BtLockAccess, set->latch);
1329
1330         // re-read and re-lock root after determining actual level of root
1331
1332         if( set->page->lvl != drill) {
1333                 if( set->latch->page_no != ROOT_page )
1334                         return bt->err = BTERR_struct, 0;
1335                         
1336                 drill = set->page->lvl;
1337
1338                 if( lock != BtLockRead && drill == lvl ) {
1339                   bt_unlockpage(bt, mode, set->latch);
1340                   bt_unpinlatch (set->latch);
1341                   continue;
1342                 }
1343         }
1344
1345         prevpage = set->latch->page_no;
1346         prevlatch = set->latch;
1347         prevmode = mode;
1348
1349         //  find key on page at this level
1350         //  and descend to requested level
1351
1352         if( !set->page->kill )
1353          if( slot = bt_findslot (set->page, key, len) ) {
1354           if( drill == lvl )
1355                 return slot;
1356
1357           // find next non-dead slot -- the fence key if nothing else
1358
1359           while( slotptr(set->page, slot)->dead )
1360                 if( slot++ < set->page->cnt )
1361                   continue;
1362                 else
1363                   return bt->err = BTERR_struct, 0;
1364
1365           page_no = bt_getid(valptr(set->page, slot)->value);
1366           drill--;
1367           continue;
1368          }
1369
1370         //  or slide right into next page
1371
1372         page_no = bt_getid(set->page->right);
1373   } while( page_no );
1374
1375   // return error on end of right chain
1376
1377   bt->err = BTERR_struct;
1378   return 0;     // return error
1379 }
1380
1381 //      return page to free list
1382 //      page must be delete & write locked
1383
1384 void bt_freepage (BtDb *bt, BtPageSet *set)
1385 {
1386         //      lock allocation page
1387
1388         bt_spinwritelock (bt->mgr->lock);
1389
1390         //      store chain
1391
1392         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1393         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1394         set->latch->dirty = 1;
1395         set->page->free = 1;
1396
1397         // unlock released page
1398
1399         bt_unlockpage (bt, BtLockDelete, set->latch);
1400         bt_unlockpage (bt, BtLockWrite, set->latch);
1401         bt_unpinlatch (set->latch);
1402
1403         // unlock allocation page
1404
1405         bt_spinreleasewrite (bt->mgr->lock);
1406 }
1407
1408 //      a fence key was deleted from a page
1409 //      push new fence value upwards
1410
1411 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1412 {
1413 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1414 unsigned char value[BtId];
1415 BtKey* ptr;
1416 uint idx;
1417
1418         //      remove the old fence value
1419
1420         ptr = keyptr(set->page, set->page->cnt);
1421         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1422         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1423         set->latch->dirty = 1;
1424
1425         //  cache new fence value
1426
1427         ptr = keyptr(set->page, set->page->cnt);
1428         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1429
1430         bt_lockpage (bt, BtLockParent, set->latch);
1431         bt_unlockpage (bt, BtLockWrite, set->latch);
1432
1433         //      insert new (now smaller) fence key
1434
1435         bt_putid (value, set->latch->page_no);
1436         ptr = (BtKey*)leftkey;
1437
1438         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1439           return bt->err;
1440
1441         //      now delete old fence key
1442
1443         ptr = (BtKey*)rightkey;
1444
1445         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1446                 return bt->err;
1447
1448         bt_unlockpage (bt, BtLockParent, set->latch);
1449         bt_unpinlatch(set->latch);
1450         return 0;
1451 }
1452
1453 //      root has a single child
1454 //      collapse a level from the tree
1455
1456 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1457 {
1458 BtPageSet child[1];
1459 uid page_no;
1460 uint idx;
1461
1462   // find the child entry and promote as new root contents
1463
1464   do {
1465         for( idx = 0; idx++ < root->page->cnt; )
1466           if( !slotptr(root->page, idx)->dead )
1467                 break;
1468
1469         page_no = bt_getid (valptr(root->page, idx)->value);
1470
1471         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1472                 child->page = bt_mappage (bt, child->latch);
1473         else
1474                 return bt->err;
1475
1476         bt_lockpage (bt, BtLockDelete, child->latch);
1477         bt_lockpage (bt, BtLockWrite, child->latch);
1478
1479         memcpy (root->page, child->page, bt->mgr->page_size);
1480         root->latch->dirty = 1;
1481
1482         bt_freepage (bt, child);
1483
1484   } while( root->page->lvl > 1 && root->page->act == 1 );
1485
1486   bt_unlockpage (bt, BtLockWrite, root->latch);
1487   bt_unpinlatch (root->latch);
1488   return 0;
1489 }
1490
1491 //  delete a page and manage keys
1492 //  call with page writelocked
1493 //      returns with page unpinned
1494
1495 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1496 {
1497 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1498 unsigned char value[BtId];
1499 uint lvl = set->page->lvl;
1500 BtPageSet right[1];
1501 uid page_no;
1502 BtKey *ptr;
1503
1504         //      cache copy of fence key
1505         //      to post in parent
1506
1507         ptr = keyptr(set->page, set->page->cnt);
1508         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1509
1510         //      obtain lock on right page
1511
1512         page_no = bt_getid(set->page->right);
1513
1514         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1515                 right->page = bt_mappage (bt, right->latch);
1516         else
1517                 return 0;
1518
1519         bt_lockpage (bt, BtLockWrite, right->latch);
1520
1521         // cache copy of key to update
1522
1523         ptr = keyptr(right->page, right->page->cnt);
1524         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1525
1526         if( right->page->kill )
1527                 return bt->err = BTERR_struct;
1528
1529         // pull contents of right peer into our empty page
1530
1531         memcpy (set->page, right->page, bt->mgr->page_size);
1532         set->latch->dirty = 1;
1533
1534         // mark right page deleted and point it to left page
1535         //      until we can post parent updates that remove access
1536         //      to the deleted page.
1537
1538         bt_putid (right->page->right, set->latch->page_no);
1539         right->latch->dirty = 1;
1540         right->page->kill = 1;
1541
1542         bt_lockpage (bt, BtLockParent, right->latch);
1543         bt_unlockpage (bt, BtLockWrite, right->latch);
1544
1545         bt_lockpage (bt, BtLockParent, set->latch);
1546         bt_unlockpage (bt, BtLockWrite, set->latch);
1547
1548         // redirect higher key directly to our new node contents
1549
1550         bt_putid (value, set->latch->page_no);
1551         ptr = (BtKey*)higherfence;
1552
1553         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1554           return bt->err;
1555
1556         //      delete old lower key to our node
1557
1558         ptr = (BtKey*)lowerfence;
1559
1560         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1561           return bt->err;
1562
1563         //      obtain delete and write locks to right node
1564
1565         bt_unlockpage (bt, BtLockParent, right->latch);
1566         bt_lockpage (bt, BtLockDelete, right->latch);
1567         bt_lockpage (bt, BtLockWrite, right->latch);
1568         bt_freepage (bt, right);
1569
1570         bt_unlockpage (bt, BtLockParent, set->latch);
1571         bt_unpinlatch (set->latch);
1572         bt->found = 1;
1573         return 0;
1574 }
1575
1576 //  find and delete key on page by marking delete flag bit
1577 //  if page becomes empty, delete it from the btree
1578
1579 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1580 {
1581 uint slot, idx, found, fence;
1582 BtPageSet set[1];
1583 BtKey *ptr;
1584 BtVal *val;
1585
1586         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1587                 ptr = keyptr(set->page, slot);
1588         else
1589                 return bt->err;
1590
1591         // if librarian slot, advance to real slot
1592
1593         if( slotptr(set->page, slot)->type == Librarian )
1594                 ptr = keyptr(set->page, ++slot);
1595
1596         fence = slot == set->page->cnt;
1597
1598         // if key is found delete it, otherwise ignore request
1599
1600         if( found = !keycmp (ptr, key, len) )
1601           if( found = slotptr(set->page, slot)->dead == 0 ) {
1602                 val = valptr(set->page,slot);
1603                 slotptr(set->page, slot)->dead = 1;
1604                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1605                 set->page->act--;
1606
1607                 // collapse empty slots beneath the fence
1608
1609                 while( idx = set->page->cnt - 1 )
1610                   if( slotptr(set->page, idx)->dead ) {
1611                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1612                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1613                   } else
1614                         break;
1615           }
1616
1617         //      did we delete a fence key in an upper level?
1618
1619         if( found && lvl && set->page->act && fence )
1620           if( bt_fixfence (bt, set, lvl) )
1621                 return bt->err;
1622           else
1623                 return bt->found = found, 0;
1624
1625         //      do we need to collapse root?
1626
1627         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1628           if( bt_collapseroot (bt, set) )
1629                 return bt->err;
1630           else
1631                 return bt->found = found, 0;
1632
1633         //      delete empty page
1634
1635         if( !set->page->act )
1636                 return bt_deletepage (bt, set);
1637
1638         set->latch->dirty = 1;
1639         bt_unlockpage(bt, BtLockWrite, set->latch);
1640         bt_unpinlatch (set->latch);
1641         return bt->found = found, 0;
1642 }
1643
1644 BtKey *bt_foundkey (BtDb *bt)
1645 {
1646         return (BtKey*)(bt->key);
1647 }
1648
1649 //      advance to next slot
1650
1651 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1652 {
1653 BtLatchSet *prevlatch;
1654 uid page_no;
1655
1656         if( slot < set->page->cnt )
1657                 return slot + 1;
1658
1659         prevlatch = set->latch;
1660
1661         if( page_no = bt_getid(set->page->right) )
1662           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1663                 set->page = bt_mappage (bt, set->latch);
1664           else
1665                 return 0;
1666         else
1667           return bt->err = BTERR_struct, 0;
1668
1669         // obtain access lock using lock chaining with Access mode
1670
1671         bt_lockpage(bt, BtLockAccess, set->latch);
1672
1673         bt_unlockpage(bt, BtLockRead, prevlatch);
1674         bt_unpinlatch (prevlatch);
1675
1676         bt_lockpage(bt, BtLockRead, set->latch);
1677         bt_unlockpage(bt, BtLockAccess, set->latch);
1678         return 1;
1679 }
1680
1681 //      find unique key or first duplicate key in
1682 //      leaf level and return number of value bytes
1683 //      or (-1) if not found.  Setup key for bt_foundkey
1684
1685 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1686 {
1687 BtPageSet set[1];
1688 uint len, slot;
1689 int ret = -1;
1690 BtKey *ptr;
1691 BtVal *val;
1692
1693   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1694    do {
1695         ptr = keyptr(set->page, slot);
1696
1697         //      skip librarian slot place holder
1698
1699         if( slotptr(set->page, slot)->type == Librarian )
1700                 ptr = keyptr(set->page, ++slot);
1701
1702         //      return actual key found
1703
1704         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1705         len = ptr->len;
1706
1707         if( slotptr(set->page, slot)->type == Duplicate )
1708                 len -= BtId;
1709
1710         //      not there if we reach the stopper key
1711
1712         if( slot == set->page->cnt )
1713           if( !bt_getid (set->page->right) )
1714                 break;
1715
1716         // if key exists, return >= 0 value bytes copied
1717         //      otherwise return (-1)
1718
1719         if( slotptr(set->page, slot)->dead )
1720                 continue;
1721
1722         if( keylen == len )
1723           if( !memcmp (ptr->key, key, len) ) {
1724                 val = valptr (set->page,slot);
1725                 if( valmax > val->len )
1726                         valmax = val->len;
1727                 memcpy (value, val->value, valmax);
1728                 ret = valmax;
1729           }
1730
1731         break;
1732
1733    } while( slot = bt_findnext (bt, set, slot) );
1734
1735   bt_unlockpage (bt, BtLockRead, set->latch);
1736   bt_unpinlatch (set->latch);
1737   return ret;
1738 }
1739
1740 //      check page for space available,
1741 //      clean if necessary and return
1742 //      0 - page needs splitting
1743 //      >0  new slot value
1744
1745 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1746 {
1747 uint nxt = bt->mgr->page_size;
1748 BtPage page = set->page;
1749 uint cnt = 0, idx = 0;
1750 uint max = page->cnt;
1751 uint newslot = max;
1752 BtKey *key;
1753 BtVal *val;
1754
1755         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1756                 return slot;
1757
1758         //      skip cleanup and proceed to split
1759         //      if there's not enough garbage
1760         //      to bother with.
1761
1762         if( page->garbage < nxt / 5 )
1763                 return 0;
1764
1765         memcpy (bt->frame, page, bt->mgr->page_size);
1766
1767         // skip page info and set rest of page to zero
1768
1769         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1770         set->latch->dirty = 1;
1771         page->garbage = 0;
1772         page->act = 0;
1773
1774         // clean up page first by
1775         // removing deleted keys
1776
1777         while( cnt++ < max ) {
1778                 if( cnt == slot )
1779                         newslot = idx + 2;
1780                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1781                         continue;
1782
1783                 // copy the value across
1784
1785                 val = valptr(bt->frame, cnt);
1786                 nxt -= val->len + sizeof(BtVal);
1787                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1788
1789                 // copy the key across
1790
1791                 key = keyptr(bt->frame, cnt);
1792                 nxt -= key->len + sizeof(BtKey);
1793                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1794
1795                 // make a librarian slot
1796
1797                 if( idx ) {
1798                         slotptr(page, ++idx)->off = nxt;
1799                         slotptr(page, idx)->type = Librarian;
1800                         slotptr(page, idx)->dead = 1;
1801                 }
1802
1803                 // set up the slot
1804
1805                 slotptr(page, ++idx)->off = nxt;
1806                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1807
1808                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1809                         page->act++;
1810         }
1811
1812         page->min = nxt;
1813         page->cnt = idx;
1814
1815         //      see if page has enough space now, or does it need splitting?
1816
1817         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1818                 return newslot;
1819
1820         return 0;
1821 }
1822
1823 // split the root and raise the height of the btree
1824
1825 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1826 {  
1827 unsigned char leftkey[BT_keyarray];
1828 uint nxt = bt->mgr->page_size;
1829 unsigned char value[BtId];
1830 BtPageSet left[1];
1831 uid left_page_no;
1832 BtKey *ptr;
1833 BtVal *val;
1834
1835         //      save left page fence key for new root
1836
1837         ptr = keyptr(root->page, root->page->cnt);
1838         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1839
1840         //  Obtain an empty page to use, and copy the current
1841         //  root contents into it, e.g. lower keys
1842
1843         if( bt_newpage(bt, left, root->page) )
1844                 return bt->err;
1845
1846         left_page_no = left->latch->page_no;
1847         bt_unpinlatch (left->latch);
1848
1849         // preserve the page info at the bottom
1850         // of higher keys and set rest to zero
1851
1852         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1853
1854         // insert stopper key at top of newroot page
1855         // and increase the root height
1856
1857         nxt -= BtId + sizeof(BtVal);
1858         bt_putid (value, right->page_no);
1859         val = (BtVal *)((unsigned char *)root->page + nxt);
1860         memcpy (val->value, value, BtId);
1861         val->len = BtId;
1862
1863         nxt -= 2 + sizeof(BtKey);
1864         slotptr(root->page, 2)->off = nxt;
1865         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1866         ptr->len = 2;
1867         ptr->key[0] = 0xff;
1868         ptr->key[1] = 0xff;
1869
1870         // insert lower keys page fence key on newroot page as first key
1871
1872         nxt -= BtId + sizeof(BtVal);
1873         bt_putid (value, left_page_no);
1874         val = (BtVal *)((unsigned char *)root->page + nxt);
1875         memcpy (val->value, value, BtId);
1876         val->len = BtId;
1877
1878         ptr = (BtKey *)leftkey;
1879         nxt -= ptr->len + sizeof(BtKey);
1880         slotptr(root->page, 1)->off = nxt;
1881         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1882         
1883         bt_putid(root->page->right, 0);
1884         root->page->min = nxt;          // reset lowest used offset and key count
1885         root->page->cnt = 2;
1886         root->page->act = 2;
1887         root->page->lvl++;
1888
1889         // release and unpin root pages
1890
1891         bt_unlockpage(bt, BtLockWrite, root->latch);
1892         bt_unpinlatch (root->latch);
1893
1894         bt_unpinlatch (right);
1895         return 0;
1896 }
1897
1898 //  split already locked full node
1899 //      leave it locked.
1900 //      return pool entry for new right
1901 //      page, unlocked
1902
1903 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1904 {
1905 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1906 uint lvl = set->page->lvl;
1907 BtPageSet right[1];
1908 BtKey *key, *ptr;
1909 BtVal *val, *src;
1910 uid right2;
1911 uint prev;
1912
1913         //  split higher half of keys to bt->frame
1914
1915         memset (bt->frame, 0, bt->mgr->page_size);
1916         max = set->page->cnt;
1917         cnt = max / 2;
1918         idx = 0;
1919
1920         while( cnt++ < max ) {
1921                 if( slotptr(set->page, cnt)->dead && cnt < max )
1922                         continue;
1923                 src = valptr(set->page, cnt);
1924                 nxt -= src->len + sizeof(BtVal);
1925                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1926
1927                 key = keyptr(set->page, cnt);
1928                 nxt -= key->len + sizeof(BtKey);
1929                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1930                 memcpy (ptr, key, key->len + sizeof(BtKey));
1931
1932                 //      add librarian slot
1933
1934                 if( idx ) {
1935                         slotptr(bt->frame, ++idx)->off = nxt;
1936                         slotptr(bt->frame, idx)->type = Librarian;
1937                         slotptr(bt->frame, idx)->dead = 1;
1938                 }
1939
1940                 //  add actual slot
1941
1942                 slotptr(bt->frame, ++idx)->off = nxt;
1943                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1944
1945                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1946                         bt->frame->act++;
1947         }
1948
1949         bt->frame->bits = bt->mgr->page_bits;
1950         bt->frame->min = nxt;
1951         bt->frame->cnt = idx;
1952         bt->frame->lvl = lvl;
1953
1954         // link right node
1955
1956         if( set->latch->page_no > ROOT_page )
1957                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1958
1959         //      get new free page and write higher keys to it.
1960
1961         if( bt_newpage(bt, right, bt->frame) )
1962                 return 0;
1963
1964         memcpy (bt->frame, set->page, bt->mgr->page_size);
1965         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1966         set->latch->dirty = 1;
1967
1968         nxt = bt->mgr->page_size;
1969         set->page->garbage = 0;
1970         set->page->act = 0;
1971         max /= 2;
1972         cnt = 0;
1973         idx = 0;
1974
1975         if( slotptr(bt->frame, max)->type == Librarian )
1976                 max--;
1977
1978         //  assemble page of smaller keys
1979
1980         while( cnt++ < max ) {
1981                 if( slotptr(bt->frame, cnt)->dead )
1982                         continue;
1983                 val = valptr(bt->frame, cnt);
1984                 nxt -= val->len + sizeof(BtVal);
1985                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1986
1987                 key = keyptr(bt->frame, cnt);
1988                 nxt -= key->len + sizeof(BtKey);
1989                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1990
1991                 //      add librarian slot
1992
1993                 if( idx ) {
1994                         slotptr(set->page, ++idx)->off = nxt;
1995                         slotptr(set->page, idx)->type = Librarian;
1996                         slotptr(set->page, idx)->dead = 1;
1997                 }
1998
1999                 //      add actual slot
2000
2001                 slotptr(set->page, ++idx)->off = nxt;
2002                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2003                 set->page->act++;
2004         }
2005
2006         bt_putid(set->page->right, right->latch->page_no);
2007         set->page->min = nxt;
2008         set->page->cnt = idx;
2009
2010         return right->latch->entry;
2011 }
2012
2013 //      fix keys for newly split page
2014 //      call with page locked, return
2015 //      unlocked
2016
2017 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2018 {
2019 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2020 unsigned char value[BtId];
2021 uint lvl = set->page->lvl;
2022 BtPage page;
2023 BtKey *ptr;
2024
2025         // if current page is the root page, split it
2026
2027         if( set->latch->page_no == ROOT_page )
2028                 return bt_splitroot (bt, set, right);
2029
2030         ptr = keyptr(set->page, set->page->cnt);
2031         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2032
2033         page = bt_mappage (bt, right);
2034
2035         ptr = keyptr(page, page->cnt);
2036         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2037
2038         // insert new fences in their parent pages
2039
2040         bt_lockpage (bt, BtLockParent, right);
2041
2042         bt_lockpage (bt, BtLockParent, set->latch);
2043         bt_unlockpage (bt, BtLockWrite, set->latch);
2044
2045         // insert new fence for reformulated left block of smaller keys
2046
2047         bt_putid (value, set->latch->page_no);
2048         ptr = (BtKey *)leftkey;
2049
2050         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2051                 return bt->err;
2052
2053         // switch fence for right block of larger keys to new right page
2054
2055         bt_putid (value, right->page_no);
2056         ptr = (BtKey *)rightkey;
2057
2058         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2059                 return bt->err;
2060
2061         bt_unlockpage (bt, BtLockParent, set->latch);
2062         bt_unpinlatch (set->latch);
2063
2064         bt_unlockpage (bt, BtLockParent, right);
2065         bt_unpinlatch (right);
2066         return 0;
2067 }
2068
2069 //      install new key and value onto page
2070 //      page must already be checked for
2071 //      adequate space
2072
2073 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2074 {
2075 uint idx, librarian;
2076 BtSlot *node;
2077 BtKey *ptr;
2078 BtVal *val;
2079
2080         //      if found slot > desired slot and previous slot
2081         //      is a librarian slot, use it
2082
2083         if( slot > 1 )
2084           if( slotptr(set->page, slot-1)->type == Librarian )
2085                 slot--;
2086
2087         // copy value onto page
2088
2089         set->page->min -= vallen + sizeof(BtVal);
2090         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2091         memcpy (val->value, value, vallen);
2092         val->len = vallen;
2093
2094         // copy key onto page
2095
2096         set->page->min -= keylen + sizeof(BtKey);
2097         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2098         memcpy (ptr->key, key, keylen);
2099         ptr->len = keylen;
2100         
2101         //      find first empty slot
2102
2103         for( idx = slot; idx < set->page->cnt; idx++ )
2104           if( slotptr(set->page, idx)->dead )
2105                 break;
2106
2107         // now insert key into array before slot
2108
2109         if( idx == set->page->cnt )
2110                 idx += 2, set->page->cnt += 2, librarian = 2;
2111         else
2112                 librarian = 1;
2113
2114         set->latch->dirty = 1;
2115         set->page->act++;
2116
2117         while( idx > slot + librarian - 1 )
2118                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2119
2120         //      add librarian slot
2121
2122         if( librarian > 1 ) {
2123                 node = slotptr(set->page, slot++);
2124                 node->off = set->page->min;
2125                 node->type = Librarian;
2126                 node->dead = 1;
2127         }
2128
2129         //      fill in new slot
2130
2131         node = slotptr(set->page, slot);
2132         node->off = set->page->min;
2133         node->type = type;
2134         node->dead = 0;
2135
2136         if( release ) {
2137                 bt_unlockpage (bt, BtLockWrite, set->latch);
2138                 bt_unpinlatch (set->latch);
2139         }
2140
2141         return 0;
2142 }
2143
2144 //  Insert new key into the btree at given level.
2145 //      either add a new key or update/add an existing one
2146
2147 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2148 {
2149 unsigned char newkey[BT_keyarray];
2150 uint slot, idx, len, entry;
2151 BtPageSet set[1];
2152 BtKey *ptr, *ins;
2153 uid sequence;
2154 BtVal *val;
2155 uint type;
2156
2157   // set up the key we're working on
2158
2159   ins = (BtKey*)newkey;
2160   memcpy (ins->key, key, keylen);
2161   ins->len = keylen;
2162
2163   // is this a non-unique index value?
2164
2165   if( unique )
2166         type = Unique;
2167   else {
2168         type = Duplicate;
2169         sequence = bt_newdup (bt);
2170         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2171         ins->len += BtId;
2172   }
2173
2174   while( 1 ) { // find the page and slot for the current key
2175         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2176                 ptr = keyptr(set->page, slot);
2177         else {
2178                 if( !bt->err )
2179                         bt->err = BTERR_ovflw;
2180                 return bt->err;
2181         }
2182
2183         // if librarian slot == found slot, advance to real slot
2184
2185         if( slotptr(set->page, slot)->type == Librarian )
2186           if( !keycmp (ptr, key, keylen) )
2187                 ptr = keyptr(set->page, ++slot);
2188
2189         len = ptr->len;
2190
2191         if( slotptr(set->page, slot)->type == Duplicate )
2192                 len -= BtId;
2193
2194         //  if inserting a duplicate key or unique key
2195         //      check for adequate space on the page
2196         //      and insert the new key before slot.
2197
2198         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2199           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2200                 if( !(entry = bt_splitpage (bt, set)) )
2201                   return bt->err;
2202                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2203                   return bt->err;
2204                 else
2205                   continue;
2206
2207           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2208         }
2209
2210         // if key already exists, update value and return
2211
2212         val = valptr(set->page, slot);
2213
2214         if( val->len >= vallen ) {
2215                 if( slotptr(set->page, slot)->dead )
2216                         set->page->act++;
2217                 set->page->garbage += val->len - vallen;
2218                 set->latch->dirty = 1;
2219                 slotptr(set->page, slot)->dead = 0;
2220                 val->len = vallen;
2221                 memcpy (val->value, value, vallen);
2222                 bt_unlockpage(bt, BtLockWrite, set->latch);
2223                 bt_unpinlatch (set->latch);
2224                 return 0;
2225         }
2226
2227         //      new update value doesn't fit in existing value area
2228
2229         if( !slotptr(set->page, slot)->dead )
2230                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2231         else {
2232                 slotptr(set->page, slot)->dead = 0;
2233                 set->page->act++;
2234         }
2235
2236         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2237           if( !(entry = bt_splitpage (bt, set)) )
2238                 return bt->err;
2239           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2240                 return bt->err;
2241           else
2242                 continue;
2243
2244         set->page->min -= vallen + sizeof(BtVal);
2245         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2246         memcpy (val->value, value, vallen);
2247         val->len = vallen;
2248
2249         set->latch->dirty = 1;
2250         set->page->min -= keylen + sizeof(BtKey);
2251         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2252         memcpy (ptr->key, key, keylen);
2253         ptr->len = keylen;
2254         
2255         slotptr(set->page, slot)->off = set->page->min;
2256         bt_unlockpage(bt, BtLockWrite, set->latch);
2257         bt_unpinlatch (set->latch);
2258         return 0;
2259   }
2260   return 0;
2261 }
2262
2263 typedef struct {
2264         uint entry;                     // latch table entry number
2265         uint slot:31;           // page slot number
2266         uint reuse:1;           // reused previous page
2267 } AtomicMod;
2268
2269 typedef struct {
2270         uid page_no;            // page number for split leaf
2271         void *next;                     // next key to insert
2272         uint entry:29;          // latch table entry number
2273         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2274         uint nounlock:1;        // don't unlock ParentModification
2275         unsigned char leafkey[BT_keyarray];
2276 } AtomicKey;
2277
2278 //  find and load leaf page for given key
2279 //      leave page Atomic locked and Read locked.
2280
2281 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2282 {
2283 BtLatchSet *prevlatch;
2284 uid page_no;
2285 uint slot;
2286
2287   //  find level one slot
2288
2289   if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2290         return 0;
2291
2292   // find next non-dead entry on this page
2293   //    it will be the fence key if nothing else
2294
2295   while( slotptr(set->page, slot)->dead )
2296         if( slot++ < set->page->cnt )
2297           continue;
2298         else
2299           return bt->err = BTERR_struct, 0;
2300
2301   page_no = bt_getid(valptr(set->page, slot)->value);
2302   prevlatch = set->latch;
2303
2304   while( page_no ) {
2305         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2306           set->page = bt_mappage (bt, set->latch);
2307         else
2308           return 0;
2309
2310         // obtain read lock using lock chaining with Access mode
2311         //      release & unpin parent/left sibling page
2312
2313         bt_lockpage(bt, BtLockAccess, set->latch);
2314
2315         bt_unlockpage(bt, BtLockRead, prevlatch);
2316         bt_unpinlatch (prevlatch);
2317
2318         bt_lockpage(bt, BtLockRead, set->latch);
2319
2320         //  find key on page at this level
2321         //  and descend to requested level
2322
2323         if( !set->page->kill )
2324          if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2325           bt_unlockpage(bt, BtLockRead, set->latch);
2326           bt_lockpage(bt, BtLockAtomic, set->latch);
2327           bt_lockpage(bt, BtLockRead, set->latch);
2328
2329           if( !set->page->kill )
2330            if( slot = bt_findslot (set->page, key, len) ) {
2331                 bt_unlockpage(bt, BtLockAccess, set->latch);
2332                 return slot;
2333            }
2334
2335           bt_unlockpage(bt, BtLockAtomic, set->latch);
2336           }
2337
2338         //  slide right into next page
2339
2340         bt_unlockpage(bt, BtLockAccess, set->latch);
2341         page_no = bt_getid(set->page->right);
2342         prevlatch = set->latch;
2343   }
2344
2345   // return error on end of right chain
2346
2347   bt->err = BTERR_struct;
2348   return 0;     // return error
2349 }
2350
2351 //      determine actual page where key is located
2352 //  return slot number
2353
2354 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2355 {
2356 BtKey *key = keyptr(source,src);
2357 uint slot = locks[src].slot;
2358 uint entry;
2359
2360         if( src > 1 && locks[src].reuse )
2361           entry = locks[src-1].entry, slot = 0;
2362         else
2363           entry = locks[src].entry;
2364
2365         if( slot ) {
2366                 set->latch = bt->mgr->latchsets + entry;
2367                 set->page = bt_mappage (bt, set->latch);
2368                 return slot;
2369         }
2370
2371         //      is locks->reuse set?
2372         //      if so, find where our key
2373         //      is located on previous page or split pages
2374
2375         do {
2376                 set->latch = bt->mgr->latchsets + entry;
2377                 set->page = bt_mappage (bt, set->latch);
2378
2379                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2380                   if( locks[src].reuse )
2381                         locks[src].entry = entry;
2382                   return slot;
2383                 }
2384         } while( entry = set->latch->split );
2385
2386         bt->err = BTERR_atomic;
2387         return 0;
2388 }
2389
2390 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2391 {
2392 BtKey *key = keyptr(source, src);
2393 BtVal *val = valptr(source, src);
2394 BtLatchSet *latch;
2395 BtPageSet set[1];
2396 uint entry;
2397
2398   while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2399         if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2400           return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2401
2402         if( entry = bt_splitpage (bt, set) )
2403           latch = bt->mgr->latchsets + entry;
2404         else
2405           return bt->err;
2406
2407         //      splice right page into split chain
2408         //      and WriteLock it.
2409
2410         latch->split = set->latch->split;
2411         set->latch->split = entry;
2412         bt_lockpage(bt, BtLockWrite, latch);
2413   }
2414
2415   return bt->err = BTERR_atomic;
2416 }
2417
2418 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2419 {
2420 BtKey *key = keyptr(source, src);
2421 uint idx, entry, slot;
2422 BtPageSet set[1];
2423 BtKey *ptr;
2424 BtVal *val;
2425
2426         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2427                 slotptr(set->page, slot)->dead = 1;
2428         else
2429                 return bt->err = BTERR_struct;
2430
2431         ptr = keyptr(set->page, slot);
2432         val = valptr(set->page, slot);
2433
2434         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2435         set->latch->dirty = 1;
2436         set->page->act--;
2437         return 0;
2438 }
2439
2440 //      delete an empty master page for a transaction
2441
2442 //      note that the far right page never empties because
2443 //      it always contains (at least) the infinite stopper key
2444 //      and that all pages that don't contain any keys are
2445 //      deleted, or are being held under Atomic lock.
2446
2447 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2448 {
2449 BtPageSet right[1], temp[1];
2450 unsigned char value[BtId];
2451 uid right_page_no;
2452 BtKey *ptr;
2453
2454         bt_lockpage(bt, BtLockWrite, prev->latch);
2455
2456         //      grab the right sibling
2457
2458         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2459                 right->page = bt_mappage (bt, right->latch);
2460         else
2461                 return bt->err;
2462
2463         bt_lockpage(bt, BtLockAtomic, right->latch);
2464         bt_lockpage(bt, BtLockWrite, right->latch);
2465
2466         //      and pull contents over empty page
2467         //      while preserving master's left link
2468
2469         memcpy (right->page->left, prev->page->left, BtId);
2470         memcpy (prev->page, right->page, bt->mgr->page_size);
2471
2472         //      forward seekers to old right sibling
2473         //      to new page location in set
2474
2475         bt_putid (right->page->right, prev->latch->page_no);
2476         right->latch->dirty = 1;
2477         right->page->kill = 1;
2478
2479         //      remove pointer to right page for searchers by
2480         //      changing right fence key to point to the master page
2481
2482         ptr = keyptr(right->page,right->page->cnt);
2483         bt_putid (value, prev->latch->page_no);
2484
2485         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2486                 return bt->err;
2487
2488         //  now that master page is in good shape we can
2489         //      remove its locks.
2490
2491         bt_unlockpage (bt, BtLockAtomic, prev->latch);
2492         bt_unlockpage (bt, BtLockWrite, prev->latch);
2493
2494         //  fix master's right sibling's left pointer
2495         //      to remove scanner's poiner to the right page
2496
2497         if( right_page_no = bt_getid (prev->page->right) ) {
2498           if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2499                 temp->page = bt_mappage (bt, temp->latch);
2500
2501           bt_lockpage (bt, BtLockWrite, temp->latch);
2502           bt_putid (temp->page->left, prev->latch->page_no);
2503           temp->latch->dirty = 1;
2504
2505           bt_unlockpage (bt, BtLockWrite, temp->latch);
2506           bt_unpinlatch (temp->latch);
2507         } else {        // master is now the far right page
2508           bt_spinwritelock (bt->mgr->lock);
2509           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2510           bt_spinreleasewrite(bt->mgr->lock);
2511         }
2512
2513         //      now that there are no pointers to the right page
2514         //      we can delete it after the last read access occurs
2515
2516         bt_unlockpage (bt, BtLockWrite, right->latch);
2517         bt_unlockpage (bt, BtLockAtomic, right->latch);
2518         bt_lockpage (bt, BtLockDelete, right->latch);
2519         bt_lockpage (bt, BtLockWrite, right->latch);
2520         bt_freepage (bt, right);
2521         return 0;
2522 }
2523
2524 //      atomic modification of a batch of keys.
2525
2526 //      return -1 if BTERR is set
2527 //      otherwise return slot number
2528 //      causing the key constraint violation
2529 //      or zero on successful completion.
2530
2531 int bt_atomictxn (BtDb *bt, BtPage source)
2532 {
2533 uint src, idx, slot, samepage, entry;
2534 AtomicKey *head, *tail, *leaf;
2535 BtPageSet set[1], prev[1];
2536 unsigned char value[BtId];
2537 BtKey *key, *ptr, *key2;
2538 BtLatchSet *latch;
2539 AtomicMod *locks;
2540 int result = 0;
2541 BtSlot temp[1];
2542 BtPage page;
2543 BtVal *val;
2544 uid right;
2545 int type;
2546
2547   locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2548   head = NULL;
2549   tail = NULL;
2550
2551   // stable sort the list of keys into order to
2552   //    prevent deadlocks between threads.
2553
2554   for( src = 1; src++ < source->cnt; ) {
2555         *temp = *slotptr(source,src);
2556         key = keyptr (source,src);
2557
2558         for( idx = src; --idx; ) {
2559           key2 = keyptr (source,idx);
2560           if( keycmp (key, key2->key, key2->len) < 0 ) {
2561                 *slotptr(source,idx+1) = *slotptr(source,idx);
2562                 *slotptr(source,idx) = *temp;
2563           } else
2564                 break;
2565         }
2566   }
2567
2568   // Load the leaf page for each key
2569   // group same page references with reuse bit
2570   // and determine any constraint violations
2571
2572   for( src = 0; src++ < source->cnt; ) {
2573         key = keyptr(source, src);
2574         slot = 0;
2575
2576         // first determine if this modification falls
2577         // on the same page as the previous modification
2578         //      note that the far right leaf page is a special case
2579
2580         if( samepage = src > 1 )
2581           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2582                 slot = bt_findslot(set->page, key->key, key->len);
2583           else // release read on previous page
2584                 bt_unlockpage(bt, BtLockRead, set->latch); 
2585
2586         if( !slot )
2587           if( slot = bt_atomicload(bt, set, key->key, key->len) )
2588                 set->latch->split = 0;
2589           else
2590                 return -1;
2591
2592         if( slotptr(set->page, slot)->type == Librarian )
2593           ptr = keyptr(set->page, ++slot);
2594         else
2595           ptr = keyptr(set->page, slot);
2596
2597         if( !samepage ) {
2598           locks[src].entry = set->latch->entry;
2599           locks[src].slot = slot;
2600           locks[src].reuse = 0;
2601         } else {
2602           locks[src].entry = 0;
2603           locks[src].slot = 0;
2604           locks[src].reuse = 1;
2605         }
2606
2607         switch( slotptr(source, src)->type ) {
2608         case Duplicate:
2609         case Unique:
2610           if( !slotptr(set->page, slot)->dead )
2611            if( slot < set->page->cnt || bt_getid (set->page->right) )
2612             if( !keycmp (ptr, key->key, key->len) ) {
2613
2614                   // return constraint violation if key already exists
2615
2616                   bt_unlockpage(bt, BtLockRead, set->latch);
2617                   result = src;
2618
2619                   while( src ) {
2620                         if( locks[src].entry ) {
2621                           set->latch = bt->mgr->latchsets + locks[src].entry;
2622                           bt_unlockpage(bt, BtLockAtomic, set->latch);
2623                           bt_unpinlatch (set->latch);
2624                         }
2625                         src--;
2626                   }
2627                   free (locks);
2628                   return result;
2629                 }
2630           break;
2631         }
2632   }
2633
2634   //  unlock last loadpage lock
2635
2636   if( source->cnt > 1 )
2637         bt_unlockpage(bt, BtLockRead, set->latch);
2638
2639   //  obtain write lock for each master page
2640
2641   for( src = 0; src++ < source->cnt; )
2642         if( locks[src].reuse )
2643           continue;
2644         else
2645           bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2646
2647   // insert or delete each key
2648   // process any splits or merges
2649   // release Write & Atomic latches
2650   // set ParentModifications and build
2651   // queue of keys to insert for split pages
2652   // or delete for deleted pages.
2653
2654   // run through txn list backwards
2655
2656   samepage = source->cnt + 1;
2657
2658   for( src = source->cnt; src; src-- ) {
2659         if( locks[src].reuse )
2660           continue;
2661
2662         //  perform the txn operations
2663         //      from smaller to larger on
2664         //  the same page
2665
2666         for( idx = src; idx < samepage; idx++ )
2667          switch( slotptr(source,idx)->type ) {
2668          case Delete:
2669           if( bt_atomicdelete (bt, source, locks, idx) )
2670                 return -1;
2671           break;
2672
2673         case Duplicate:
2674         case Unique:
2675           if( bt_atomicinsert (bt, source, locks, idx) )
2676                 return -1;
2677           break;
2678         }
2679
2680         //      after the same page operations have finished,
2681         //  process master page for splits or deletion.
2682
2683         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2684         prev->page = bt_mappage (bt, prev->latch);
2685         samepage = src;
2686
2687         //  pick-up all splits from master page
2688         //      each one is already WriteLocked.
2689
2690         entry = prev->latch->split;
2691
2692         while( entry ) {
2693           set->latch = bt->mgr->latchsets + entry;
2694           set->page = bt_mappage (bt, set->latch);
2695           entry = set->latch->split;
2696
2697           // delete empty master page by undoing its split
2698           //  (this is potentially another empty page)
2699           //  note that there are no new left pointers yet
2700
2701           if( !prev->page->act ) {
2702                 memcpy (set->page->left, prev->page->left, BtId);
2703                 memcpy (prev->page, set->page, bt->mgr->page_size);
2704                 bt_lockpage (bt, BtLockDelete, set->latch);
2705                 bt_freepage (bt, set);
2706
2707                 prev->latch->dirty = 1;
2708                 continue;
2709           }
2710
2711           // remove empty page from the split chain
2712
2713           if( !set->page->act ) {
2714                 memcpy (prev->page->right, set->page->right, BtId);
2715                 prev->latch->split = set->latch->split;
2716                 bt_lockpage (bt, BtLockDelete, set->latch);
2717                 bt_freepage (bt, set);
2718                 continue;
2719           }
2720
2721           //  schedule prev fence key update
2722
2723           ptr = keyptr(prev->page,prev->page->cnt);
2724           leaf = calloc (sizeof(AtomicKey), 1);
2725
2726           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2727           leaf->page_no = prev->latch->page_no;
2728           leaf->entry = prev->latch->entry;
2729           leaf->type = 0;
2730
2731           if( tail )
2732                 tail->next = leaf;
2733           else
2734                 head = leaf;
2735
2736           tail = leaf;
2737
2738           // splice in the left link into the split page
2739
2740           bt_putid (set->page->left, prev->latch->page_no);
2741           bt_lockpage(bt, BtLockParent, prev->latch);
2742           bt_unlockpage(bt, BtLockWrite, prev->latch);
2743           *prev = *set;
2744         }
2745
2746         //  update left pointer in next right page from last split page
2747         //      (if all splits were reversed, latch->split == 0)
2748
2749         if( latch->split ) {
2750           //  fix left pointer in master's original (now split)
2751           //  far right sibling or set rightmost page in page zero
2752
2753           if( right = bt_getid (prev->page->right) ) {
2754                 if( set->latch = bt_pinlatch (bt, right, 1) )
2755                   set->page = bt_mappage (bt, set->latch);
2756                 else
2757                   return -1;
2758
2759             bt_lockpage (bt, BtLockWrite, set->latch);
2760             bt_putid (set->page->left, prev->latch->page_no);
2761                 set->latch->dirty = 1;
2762             bt_unlockpage (bt, BtLockWrite, set->latch);
2763                 bt_unpinlatch (set->latch);
2764           } else {      // prev is rightmost page
2765             bt_spinwritelock (bt->mgr->lock);
2766                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2767             bt_spinreleasewrite(bt->mgr->lock);
2768           }
2769
2770           //  Process last page split in chain
2771
2772           ptr = keyptr(prev->page,prev->page->cnt);
2773           leaf = calloc (sizeof(AtomicKey), 1);
2774
2775           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2776           leaf->page_no = prev->latch->page_no;
2777           leaf->entry = prev->latch->entry;
2778           leaf->type = 0;
2779   
2780           if( tail )
2781                 tail->next = leaf;
2782           else
2783                 head = leaf;
2784
2785           tail = leaf;
2786
2787           bt_lockpage(bt, BtLockParent, prev->latch);
2788           bt_unlockpage(bt, BtLockWrite, prev->latch);
2789
2790           //  remove atomic lock on master page
2791
2792           bt_unlockpage(bt, BtLockAtomic, latch);
2793           continue;
2794         }
2795
2796         //  finished if prev page occupied (either master or final split)
2797
2798         if( prev->page->act ) {
2799           bt_unlockpage(bt, BtLockWrite, latch);
2800           bt_unlockpage(bt, BtLockAtomic, latch);
2801           bt_unpinlatch(latch);
2802           continue;
2803         }
2804
2805         // any and all splits were reversed, and the
2806         // master page located in prev is empty, delete it
2807         // by pulling over master's right sibling.
2808
2809         // Remove empty master's fence key
2810
2811         ptr = keyptr(prev->page,prev->page->cnt);
2812
2813         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2814                 return -1;
2815
2816         //      perform the remainder of the delete
2817         //      from the FIFO queue
2818
2819         leaf = calloc (sizeof(AtomicKey), 1);
2820
2821         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2822         leaf->page_no = prev->latch->page_no;
2823         leaf->entry = prev->latch->entry;
2824         leaf->nounlock = 1;
2825         leaf->type = 2;
2826   
2827         if( tail )
2828           tail->next = leaf;
2829         else
2830           head = leaf;
2831
2832         tail = leaf;
2833
2834         //      leave atomic lock in place until
2835         //      deletion completes in next phase.
2836
2837         bt_unlockpage(bt, BtLockWrite, prev->latch);
2838   }
2839   
2840   //  add & delete keys for any pages split or merged during transaction
2841
2842   if( leaf = head )
2843     do {
2844           set->latch = bt->mgr->latchsets + leaf->entry;
2845           set->page = bt_mappage (bt, set->latch);
2846
2847           bt_putid (value, leaf->page_no);
2848           ptr = (BtKey *)leaf->leafkey;
2849
2850           switch( leaf->type ) {
2851           case 0:       // insert key
2852             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2853                   return -1;
2854
2855                 break;
2856
2857           case 1:       // delete key
2858                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2859                   return -1;
2860
2861                 break;
2862
2863           case 2:       // free page
2864                 if( bt_atomicfree (bt, set) )
2865                   return -1;
2866
2867                 break;
2868           }
2869
2870           if( !leaf->nounlock )
2871             bt_unlockpage (bt, BtLockParent, set->latch);
2872
2873           bt_unpinlatch (set->latch);
2874           tail = leaf->next;
2875           free (leaf);
2876         } while( leaf = tail );
2877
2878   // return success
2879
2880   free (locks);
2881   return 0;
2882 }
2883
2884 //      set cursor to highest slot on highest page
2885
2886 uint bt_lastkey (BtDb *bt)
2887 {
2888 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2889 BtPageSet set[1];
2890
2891         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2892                 set->page = bt_mappage (bt, set->latch);
2893         else
2894                 return 0;
2895
2896     bt_lockpage(bt, BtLockRead, set->latch);
2897         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2898     bt_unlockpage(bt, BtLockRead, set->latch);
2899         bt_unpinlatch (set->latch);
2900
2901         bt->cursor_page = page_no;
2902         return bt->cursor->cnt;
2903 }
2904
2905 //      return previous slot on cursor page
2906
2907 uint bt_prevkey (BtDb *bt, uint slot)
2908 {
2909 uid ourright, next, us = bt->cursor_page;
2910 BtPageSet set[1];
2911
2912         if( --slot )
2913                 return slot;
2914
2915         ourright = bt_getid(bt->cursor->right);
2916
2917 goleft:
2918         if( !(next = bt_getid(bt->cursor->left)) )
2919                 return 0;
2920
2921 findourself:
2922         bt->cursor_page = next;
2923
2924         if( set->latch = bt_pinlatch (bt, next, 1) )
2925                 set->page = bt_mappage (bt, set->latch);
2926         else
2927                 return 0;
2928
2929     bt_lockpage(bt, BtLockRead, set->latch);
2930         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2931         bt_unlockpage(bt, BtLockRead, set->latch);
2932         bt_unpinlatch (set->latch);
2933         
2934         next = bt_getid (bt->cursor->right);
2935
2936         if( bt->cursor->kill )
2937                 goto findourself;
2938
2939         if( next != us )
2940           if( next == ourright )
2941                 goto goleft;
2942           else
2943                 goto findourself;
2944
2945         return bt->cursor->cnt;
2946 }
2947
2948 //  return next slot on cursor page
2949 //  or slide cursor right into next page
2950
2951 uint bt_nextkey (BtDb *bt, uint slot)
2952 {
2953 BtPageSet set[1];
2954 uid right;
2955
2956   do {
2957         right = bt_getid(bt->cursor->right);
2958
2959         while( slot++ < bt->cursor->cnt )
2960           if( slotptr(bt->cursor,slot)->dead )
2961                 continue;
2962           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2963                 return slot;
2964           else
2965                 break;
2966
2967         if( !right )
2968                 break;
2969
2970         bt->cursor_page = right;
2971
2972         if( set->latch = bt_pinlatch (bt, right, 1) )
2973                 set->page = bt_mappage (bt, set->latch);
2974         else
2975                 return 0;
2976
2977     bt_lockpage(bt, BtLockRead, set->latch);
2978
2979         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2980
2981         bt_unlockpage(bt, BtLockRead, set->latch);
2982         bt_unpinlatch (set->latch);
2983         slot = 0;
2984
2985   } while( 1 );
2986
2987   return bt->err = 0;
2988 }
2989
2990 //  cache page of keys into cursor and return starting slot for given key
2991
2992 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2993 {
2994 BtPageSet set[1];
2995 uint slot;
2996
2997         // cache page for retrieval
2998
2999         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3000           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3001         else
3002           return 0;
3003
3004         bt->cursor_page = set->latch->page_no;
3005
3006         bt_unlockpage(bt, BtLockRead, set->latch);
3007         bt_unpinlatch (set->latch);
3008         return slot;
3009 }
3010
3011 BtKey *bt_key(BtDb *bt, uint slot)
3012 {
3013         return keyptr(bt->cursor, slot);
3014 }
3015
3016 BtVal *bt_val(BtDb *bt, uint slot)
3017 {
3018         return valptr(bt->cursor,slot);
3019 }
3020
3021 #ifdef STANDALONE
3022
3023 #ifndef unix
3024 double getCpuTime(int type)
3025 {
3026 FILETIME crtime[1];
3027 FILETIME xittime[1];
3028 FILETIME systime[1];
3029 FILETIME usrtime[1];
3030 SYSTEMTIME timeconv[1];
3031 double ans = 0;
3032
3033         memset (timeconv, 0, sizeof(SYSTEMTIME));
3034
3035         switch( type ) {
3036         case 0:
3037                 GetSystemTimeAsFileTime (xittime);
3038                 FileTimeToSystemTime (xittime, timeconv);
3039                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3040                 break;
3041         case 1:
3042                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3043                 FileTimeToSystemTime (usrtime, timeconv);
3044                 break;
3045         case 2:
3046                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3047                 FileTimeToSystemTime (systime, timeconv);
3048                 break;
3049         }
3050
3051         ans += (double)timeconv->wHour * 3600;
3052         ans += (double)timeconv->wMinute * 60;
3053         ans += (double)timeconv->wSecond;
3054         ans += (double)timeconv->wMilliseconds / 1000;
3055         return ans;
3056 }
3057 #else
3058 #include <time.h>
3059 #include <sys/resource.h>
3060
3061 double getCpuTime(int type)
3062 {
3063 struct rusage used[1];
3064 struct timeval tv[1];
3065
3066         switch( type ) {
3067         case 0:
3068                 gettimeofday(tv, NULL);
3069                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3070
3071         case 1:
3072                 getrusage(RUSAGE_SELF, used);
3073                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3074
3075         case 2:
3076                 getrusage(RUSAGE_SELF, used);
3077                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3078         }
3079
3080         return 0;
3081 }
3082 #endif
3083
3084 void bt_poolaudit (BtMgr *mgr)
3085 {
3086 BtLatchSet *latch;
3087 uint slot = 0;
3088
3089         while( slot++ < mgr->latchdeployed ) {
3090                 latch = mgr->latchsets + slot;
3091
3092                 if( *latch->readwr->rin & MASK )
3093                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3094                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3095
3096                 if( *latch->access->rin & MASK )
3097                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3098                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3099
3100                 if( *latch->parent->ticket != *latch->parent->serving )
3101                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3102                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3103
3104                 if( latch->pin & ~CLOCK_bit ) {
3105                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3106                         latch->pin = 0;
3107                 }
3108         }
3109 }
3110
3111 uint bt_latchaudit (BtDb *bt)
3112 {
3113 ushort idx, hashidx;
3114 uid next, page_no;
3115 BtLatchSet *latch;
3116 uint cnt = 0;
3117 BtKey *ptr;
3118
3119         if( *(ushort *)(bt->mgr->lock) )
3120                 fprintf(stderr, "Alloc page locked\n");
3121         *(ushort *)(bt->mgr->lock) = 0;
3122
3123         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3124                 latch = bt->mgr->latchsets + idx;
3125                 if( *latch->readwr->rin & MASK )
3126                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3127                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3128
3129                 if( *latch->access->rin & MASK )
3130                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3131                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3132
3133                 if( *latch->parent->ticket != *latch->parent->serving )
3134                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3135                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3136
3137                 if( latch->pin ) {
3138                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3139                         latch->pin = 0;
3140                 }
3141         }
3142
3143         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3144           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3145                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3146
3147           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3148
3149           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3150                 latch = bt->mgr->latchsets + idx;
3151                 if( latch->pin )
3152                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3153           } while( idx = latch->next );
3154         }
3155
3156         page_no = LEAF_page;
3157
3158         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3159         uid off = page_no << bt->mgr->page_bits;
3160 #ifdef unix
3161           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3162 #else
3163         DWORD amt[1];
3164
3165           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3166
3167           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3168                 return bt->err = BTERR_map;
3169
3170           if( *amt <  bt->mgr->page_size )
3171                 return bt->err = BTERR_map;
3172 #endif
3173                 if( !bt->frame->free && !bt->frame->lvl )
3174                         cnt += bt->frame->act;
3175                 page_no++;
3176         }
3177                 
3178         cnt--;  // remove stopper key
3179         fprintf(stderr, " Total keys read %d\n", cnt);
3180
3181         bt_close (bt);
3182         return 0;
3183 }
3184
3185 typedef struct {
3186         char idx;
3187         char *type;
3188         char *infile;
3189         BtMgr *mgr;
3190         int num;
3191 } ThreadArg;
3192
3193 //  standalone program to index file of keys
3194 //  then list them onto std-out
3195
3196 #ifdef unix
3197 void *index_file (void *arg)
3198 #else
3199 uint __stdcall index_file (void *arg)
3200 #endif
3201 {
3202 int line = 0, found = 0, cnt = 0, idx;
3203 uid next, page_no = LEAF_page;  // start on first page of leaves
3204 int ch, len = 0, slot, type = 0;
3205 unsigned char key[BT_maxkey];
3206 unsigned char txn[65536];
3207 ThreadArg *args = arg;
3208 BtPageSet set[1];
3209 uint nxt = 65536;
3210 BtPage page;
3211 BtKey *ptr;
3212 BtVal *val;
3213 BtDb *bt;
3214 FILE *in;
3215
3216         bt = bt_open (args->mgr);
3217         page = (BtPage)txn;
3218
3219         if( args->idx < strlen (args->type) )
3220                 ch = args->type[args->idx];
3221         else
3222                 ch = args->type[strlen(args->type) - 1];
3223
3224         switch(ch | 0x20)
3225         {
3226         case 'a':
3227                 fprintf(stderr, "started latch mgr audit\n");
3228                 cnt = bt_latchaudit (bt);
3229                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3230                 break;
3231
3232         case 'd':
3233                 type = Delete;
3234
3235         case 'p':
3236                 if( !type )
3237                         type = Unique;
3238
3239                 if( args->num )
3240                  if( type == Delete )
3241                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3242                  else
3243                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3244                 else
3245                  if( type == Delete )
3246                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3247                  else
3248                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3249
3250                 if( in = fopen (args->infile, "rb") )
3251                   while( ch = getc(in), ch != EOF )
3252                         if( ch == '\n' )
3253                         {
3254                           line++;
3255
3256                           if( !args->num ) {
3257                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3258                                   fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3259                             len = 0;
3260                                 continue;
3261                           }
3262
3263                           nxt -= len - 10;
3264                           memcpy (txn + nxt, key + 10, len - 10);
3265                           nxt -= 1;
3266                           txn[nxt] = len - 10;
3267                           nxt -= 10;
3268                           memcpy (txn + nxt, key, 10);
3269                           nxt -= 1;
3270                           txn[nxt] = 10;
3271                           slotptr(page,++cnt)->off  = nxt;
3272                           slotptr(page,cnt)->type = type;
3273                           len = 0;
3274
3275                           if( cnt < args->num )
3276                                 continue;
3277
3278                           page->cnt = cnt;
3279                           page->act = cnt;
3280                           page->min = nxt;
3281
3282                           if( bt_atomictxn (bt, page) )
3283                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3284                           nxt = sizeof(txn);
3285                           cnt = 0;
3286
3287                         }
3288                         else if( len < BT_maxkey )
3289                                 key[len++] = ch;
3290                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3291                 break;
3292
3293         case 'w':
3294                 fprintf(stderr, "started indexing for %s\n", args->infile);
3295                 if( in = fopen (args->infile, "r") )
3296                   while( ch = getc(in), ch != EOF )
3297                         if( ch == '\n' )
3298                         {
3299                           line++;
3300
3301                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3302                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3303                           len = 0;
3304                         }
3305                         else if( len < BT_maxkey )
3306                                 key[len++] = ch;
3307                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3308                 break;
3309
3310         case 'f':
3311                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3312                 if( in = fopen (args->infile, "rb") )
3313                   while( ch = getc(in), ch != EOF )
3314                         if( ch == '\n' )
3315                         {
3316                           line++;
3317                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3318                                 found++;
3319                           else if( bt->err )
3320                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3321                           len = 0;
3322                         }
3323                         else if( len < BT_maxkey )
3324                                 key[len++] = ch;
3325                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3326                 break;
3327
3328         case 's':
3329                 fprintf(stderr, "started scanning\n");
3330                 do {
3331                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3332                                 set->page = bt_mappage (bt, set->latch);
3333                         else
3334                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3335                         bt_lockpage (bt, BtLockRead, set->latch);
3336                         next = bt_getid (set->page->right);
3337
3338                         for( slot = 0; slot++ < set->page->cnt; )
3339                          if( next || slot < set->page->cnt )
3340                           if( !slotptr(set->page, slot)->dead ) {
3341                                 ptr = keyptr(set->page, slot);
3342                                 len = ptr->len;
3343
3344                             if( slotptr(set->page, slot)->type == Duplicate )
3345                                         len -= BtId;
3346
3347                                 fwrite (ptr->key, len, 1, stdout);
3348                                 val = valptr(set->page, slot);
3349                                 fwrite (val->value, val->len, 1, stdout);
3350                                 fputc ('\n', stdout);
3351                                 cnt++;
3352                            }
3353
3354                         bt_unlockpage (bt, BtLockRead, set->latch);
3355                         bt_unpinlatch (set->latch);
3356                 } while( page_no = next );
3357
3358                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3359                 break;
3360
3361         case 'r':
3362                 fprintf(stderr, "started reverse scan\n");
3363                 if( slot = bt_lastkey (bt) )
3364                    while( slot = bt_prevkey (bt, slot) ) {
3365                         if( slotptr(bt->cursor, slot)->dead )
3366                           continue;
3367
3368                         ptr = keyptr(bt->cursor, slot);
3369                         len = ptr->len;
3370
3371                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3372                                 len -= BtId;
3373
3374                         fwrite (ptr->key, len, 1, stdout);
3375                         val = valptr(bt->cursor, slot);
3376                         fwrite (val->value, val->len, 1, stdout);
3377                         fputc ('\n', stdout);
3378                         cnt++;
3379                   }
3380
3381                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3382                 break;
3383
3384         case 'c':
3385 #ifdef unix
3386                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3387 #endif
3388                 fprintf(stderr, "started counting\n");
3389                 page_no = LEAF_page;
3390
3391                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3392                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3393                                 break;
3394
3395                         if( !bt->frame->free && !bt->frame->lvl )
3396                                 cnt += bt->frame->act;
3397
3398                         bt->reads++;
3399                         page_no++;
3400                 }
3401                 
3402                 cnt--;  // remove stopper key
3403                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3404                 break;
3405         }
3406
3407         bt_close (bt);
3408 #ifdef unix
3409         return NULL;
3410 #else
3411         return 0;
3412 #endif
3413 }
3414
3415 typedef struct timeval timer;
3416
3417 int main (int argc, char **argv)
3418 {
3419 int idx, cnt, len, slot, err;
3420 int segsize, bits = 16;
3421 double start, stop;
3422 #ifdef unix
3423 pthread_t *threads;
3424 #else
3425 HANDLE *threads;
3426 #endif
3427 ThreadArg *args;
3428 uint poolsize = 0;
3429 float elapsed;
3430 int num = 0;
3431 char key[1];
3432 BtMgr *mgr;
3433 BtKey *ptr;
3434 BtDb *bt;
3435
3436         if( argc < 3 ) {
3437                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3438                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3439                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3440                 fprintf (stderr, "  page_bits is the page size in bits\n");
3441                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3442                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3443                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3444                 exit(0);
3445         }
3446
3447         start = getCpuTime(0);
3448
3449         if( argc > 3 )
3450                 bits = atoi(argv[3]);
3451
3452         if( argc > 4 )
3453                 poolsize = atoi(argv[4]);
3454
3455         if( !poolsize )
3456                 fprintf (stderr, "Warning: no mapped_pool\n");
3457
3458         if( argc > 5 )
3459                 num = atoi(argv[5]);
3460
3461         cnt = argc - 6;
3462 #ifdef unix
3463         threads = malloc (cnt * sizeof(pthread_t));
3464 #else
3465         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3466 #endif
3467         args = malloc (cnt * sizeof(ThreadArg));
3468
3469         mgr = bt_mgr ((argv[1]), bits, poolsize);
3470
3471         if( !mgr ) {
3472                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3473                 exit (1);
3474         }
3475
3476         //      fire off threads
3477
3478         for( idx = 0; idx < cnt; idx++ ) {
3479                 args[idx].infile = argv[idx + 6];
3480                 args[idx].type = argv[2];
3481                 args[idx].mgr = mgr;
3482                 args[idx].num = num;
3483                 args[idx].idx = idx;
3484 #ifdef unix
3485                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3486                         fprintf(stderr, "Error creating thread %d\n", err);
3487 #else
3488                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3489 #endif
3490         }
3491
3492         //      wait for termination
3493
3494 #ifdef unix
3495         for( idx = 0; idx < cnt; idx++ )
3496                 pthread_join (threads[idx], NULL);
3497 #else
3498         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3499
3500         for( idx = 0; idx < cnt; idx++ )
3501                 CloseHandle(threads[idx]);
3502
3503 #endif
3504         bt_poolaudit(mgr);
3505         bt_mgrclose (mgr);
3506
3507         elapsed = getCpuTime(0) - start;
3508         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3509         elapsed = getCpuTime(1);
3510         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3511         elapsed = getCpuTime(2);
3512         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3513 }
3514
3515 #endif  //STANDALONE