]> pd.if.org Git - btree/blob - threadskv8.c
Fix latching protocol for bt_atomicload
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and ACID batched key-value updates
9
10 // 26 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 /*
88 There are six lock types for each node in four independent sets: 
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
95 */
96
97 typedef enum{
98         BtLockAccess = 1,
99         BtLockDelete = 2,
100         BtLockRead   = 4,
101         BtLockWrite  = 8,
102         BtLockParent = 16,
103         BtLockAtomic = 32
104 } BtLock;
105
106 //      definition for phase-fair reader/writer lock implementation
107
108 typedef struct {
109         ushort rin[1];
110         ushort rout[1];
111         ushort ticket[1];
112         ushort serving[1];
113 } RWLock;
114
115 //      write only queue lock
116
117 typedef struct {
118         ushort ticket[1];
119         ushort serving[1];
120 } WOLock;
121
122 #define PHID 0x1
123 #define PRES 0x2
124 #define MASK 0x3
125 #define RINC 0x4
126
127 //      definition for spin latch implementation
128
129 // exclusive is set for write access
130 // share is count of read accessors
131 // grant write lock when share == 0
132
133 volatile typedef struct {
134         ushort exclusive:1;
135         ushort pending:1;
136         ushort share:14;
137 } BtSpinLatch;
138
139 #define XCL 1
140 #define PEND 2
141 #define BOTH 3
142 #define SHARE 4
143
144 //  hash table entries
145
146 typedef struct {
147         volatile uint slot;             // Latch table entry at head of chain
148         BtSpinLatch latch[1];
149 } BtHashEntry;
150
151 //      latch manager table structure
152
153 typedef struct {
154         uid page_no;                    // latch set page number
155         RWLock readwr[1];               // read/write page lock
156         RWLock access[1];               // Access Intent/Page delete
157         WOLock parent[1];               // Posting of fence key in parent
158         WOLock atomic[1];               // Atomic update in progress
159         uint split;                             // right split page atomic insert
160         uint entry;                             // entry slot in latch table
161         uint next;                              // next entry in hash table chain
162         uint prev;                              // prev entry in hash table chain
163         volatile ushort pin;    // number of outstanding threads
164         ushort dirty:1;                 // page in cache is dirty
165 } BtLatchSet;
166
167 //      Define the length of the page record numbers
168
169 #define BtId 6
170
171 //      Page key slot definition.
172
173 //      Keys are marked dead, but remain on the page until
174 //      it cleanup is called. The fence key (highest key) for
175 //      a leaf page is always present, even after cleanup.
176
177 //      Slot types
178
179 //      In addition to the Unique keys that occupy slots
180 //      there are Librarian and Duplicate key
181 //      slots occupying the key slot array.
182
183 //      The Librarian slots are dead keys that
184 //      serve as filler, available to add new Unique
185 //      or Dup slots that are inserted into the B-tree.
186
187 //      The Duplicate slots have had their key bytes extended
188 //      by 6 bytes to contain a binary duplicate key uniqueifier.
189
190 typedef enum {
191         Unique,
192         Librarian,
193         Duplicate,
194         Delete,
195         Update
196 } BtSlotType;
197
198 typedef struct {
199         uint off:BT_maxbits;    // page offset for key start
200         uint type:3;                    // type of slot
201         uint dead:1;                    // set for deleted slot
202 } BtSlot;
203
204 //      The key structure occupies space at the upper end of
205 //      each page.  It's a length byte followed by the key
206 //      bytes.
207
208 typedef struct {
209         unsigned char len;              // this can be changed to a ushort or uint
210         unsigned char key[0];
211 } BtKey;
212
213 //      the value structure also occupies space at the upper
214 //      end of the page. Each key is immediately followed by a value.
215
216 typedef struct {
217         unsigned char len;              // this can be changed to a ushort or uint
218         unsigned char value[0];
219 } BtVal;
220
221 #define BT_maxkey       255             // maximum number of bytes in a key
222 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
223
224 //      The first part of an index page.
225 //      It is immediately followed
226 //      by the BtSlot array of keys.
227
228 //      note that this structure size
229 //      must be a multiple of 8 bytes
230 //      in order to place dups correctly.
231
232 typedef struct BtPage_ {
233         uint cnt;                                       // count of keys in page
234         uint act;                                       // count of active keys
235         uint min;                                       // next key offset
236         uint garbage;                           // page garbage in bytes
237         unsigned char bits:7;           // page size in bits
238         unsigned char free:1;           // page is on free chain
239         unsigned char lvl:7;            // level of page
240         unsigned char kill:1;           // page is being deleted
241         unsigned char right[BtId];      // page number to right
242         unsigned char left[BtId];       // page number to left
243         unsigned char filler[2];        // padding to multiple of 8
244 } *BtPage;
245
246 //  The loadpage interface object
247
248 typedef struct {
249         BtPage page;            // current page pointer
250         BtLatchSet *latch;      // current page latch set
251 } BtPageSet;
252
253 //      structure for latch manager on ALLOC_page
254
255 typedef struct {
256         struct BtPage_ alloc[1];        // next page_no in right ptr
257         unsigned long long dups[1];     // global duplicate key uniqueifier
258         unsigned char chain[BtId];      // head of free page_nos chain
259 } BtPageZero;
260
261 //      The object structure for Btree access
262
263 typedef struct {
264         uint page_size;                         // page size    
265         uint page_bits;                         // page size in bits    
266 #ifdef unix
267         int idx;
268 #else
269         HANDLE idx;
270 #endif
271         BtPageZero *pagezero;           // mapped allocation page
272         BtSpinLatch lock[1];            // allocation area lite latch
273         uint latchdeployed;                     // highest number of latch entries deployed
274         uint nlatchpage;                        // number of latch pages at BT_latch
275         uint latchtotal;                        // number of page latch entries
276         uint latchhash;                         // number of latch hash table slots
277         uint latchvictim;                       // next latch entry to examine
278         ushort thread_no[1];            // next thread number
279         BtHashEntry *hashtable;         // the buffer pool hash table entries
280         BtLatchSet *latchsets;          // mapped latch set from buffer pool
281         unsigned char *pagepool;        // mapped to the buffer pool pages
282 #ifndef unix
283         HANDLE halloc;                          // allocation handle
284         HANDLE hpool;                           // buffer pool handle
285 #endif
286 } BtMgr;
287
288 typedef struct {
289         BtMgr *mgr;                             // buffer manager for thread
290         BtPage cursor;                  // cached frame for start/next (never mapped)
291         BtPage frame;                   // spare frame for the page split (never mapped)
292         uid cursor_page;                                // current cursor page number   
293         unsigned char *mem;                             // frame, cursor, page memory buffer
294         unsigned char key[BT_keyarray]; // last found complete key
295         int found;                              // last delete or insert was found
296         int err;                                // last error
297         int reads, writes;              // number of reads and writes from the btree
298         ushort thread_no;               // thread number
299 } BtDb;
300
301 typedef enum {
302         BTERR_ok = 0,
303         BTERR_struct,
304         BTERR_ovflw,
305         BTERR_lock,
306         BTERR_map,
307         BTERR_read,
308         BTERR_wrt,
309         BTERR_atomic
310 } BTERR;
311
312 #define CLOCK_bit 0x8000
313
314 // B-Tree functions
315 extern void bt_close (BtDb *bt);
316 extern BtDb *bt_open (BtMgr *mgr);
317 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
318 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
319 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
320 extern BtKey *bt_foundkey (BtDb *bt);
321 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
322 extern uint bt_nextkey   (BtDb *bt, uint slot);
323
324 //      manager functions
325 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
326 void bt_mgrclose (BtMgr *mgr);
327
328 //  Helper functions to return slot values
329 //      from the cursor page.
330
331 extern BtKey *bt_key (BtDb *bt, uint slot);
332 extern BtVal *bt_val (BtDb *bt, uint slot);
333
334 //  The page is allocated from low and hi ends.
335 //  The key slots are allocated from the bottom,
336 //      while the text and value of the key
337 //  are allocated from the top.  When the two
338 //  areas meet, the page is split into two.
339
340 //  A key consists of a length byte, two bytes of
341 //  index number (0 - 65535), and up to 253 bytes
342 //  of key value.
343
344 //  Associated with each key is a value byte string
345 //      containing any value desired.
346
347 //  The b-tree root is always located at page 1.
348 //      The first leaf page of level zero is always
349 //      located on page 2.
350
351 //      The b-tree pages are linked with next
352 //      pointers to facilitate enumerators,
353 //      and provide for concurrency.
354
355 //      When to root page fills, it is split in two and
356 //      the tree height is raised by a new root at page
357 //      one with two keys.
358
359 //      Deleted keys are marked with a dead bit until
360 //      page cleanup. The fence key for a leaf node is
361 //      always present
362
363 //  To achieve maximum concurrency one page is locked at a time
364 //  as the tree is traversed to find leaf key in question. The right
365 //  page numbers are used in cases where the page is being split,
366 //      or consolidated.
367
368 //  Page 0 is dedicated to lock for new page extensions,
369 //      and chains empty pages together for reuse. It also
370 //      contains the latch manager hash table.
371
372 //      The ParentModification lock on a node is obtained to serialize posting
373 //      or changing the fence key for a node.
374
375 //      Empty pages are chained together through the ALLOC page and reused.
376
377 //      Access macros to address slot and key values from the page
378 //      Page slots use 1 based indexing.
379
380 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
381 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
382 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
383
384 void bt_putid(unsigned char *dest, uid id)
385 {
386 int i = BtId;
387
388         while( i-- )
389                 dest[i] = (unsigned char)id, id >>= 8;
390 }
391
392 uid bt_getid(unsigned char *src)
393 {
394 uid id = 0;
395 int i;
396
397         for( i = 0; i < BtId; i++ )
398                 id <<= 8, id |= *src++; 
399
400         return id;
401 }
402
403 uid bt_newdup (BtDb *bt)
404 {
405 #ifdef unix
406         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
407 #else
408         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
409 #endif
410 }
411
412 //      Write-Only Queue Lock
413
414 void WriteOLock (WOLock *lock)
415 {
416 ushort tix;
417 #ifdef unix
418         tix = __sync_fetch_and_add (lock->ticket, 1);
419 #else
420         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
421 #endif
422         // wait for our ticket to come up
423
424         while( tix != lock->serving[0] )
425 #ifdef unix
426                 sched_yield();
427 #else
428                 SwitchToThread ();
429 #endif
430 }
431
432 void WriteORelease (WOLock *lock)
433 {
434         lock->serving[0]++;
435 }
436
437 //      Phase-Fair reader/writer lock implementation
438
439 void WriteLock (RWLock *lock)
440 {
441 ushort w, r, tix;
442
443 #ifdef unix
444         tix = __sync_fetch_and_add (lock->ticket, 1);
445 #else
446         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
447 #endif
448         // wait for our ticket to come up
449
450         while( tix != lock->serving[0] )
451 #ifdef unix
452                 sched_yield();
453 #else
454                 SwitchToThread ();
455 #endif
456
457         w = PRES | (tix & PHID);
458 #ifdef  unix
459         r = __sync_fetch_and_add (lock->rin, w);
460 #else
461         r = _InterlockedExchangeAdd16 (lock->rin, w);
462 #endif
463         while( r != *lock->rout )
464 #ifdef unix
465                 sched_yield();
466 #else
467                 SwitchToThread();
468 #endif
469 }
470
471 void WriteRelease (RWLock *lock)
472 {
473 #ifdef unix
474         __sync_fetch_and_and (lock->rin, ~MASK);
475 #else
476         _InterlockedAnd16 (lock->rin, ~MASK);
477 #endif
478         lock->serving[0]++;
479 }
480
481 void ReadLock (RWLock *lock)
482 {
483 ushort w;
484 #ifdef unix
485         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
486 #else
487         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
488 #endif
489         if( w )
490           while( w == (*lock->rin & MASK) )
491 #ifdef unix
492                 sched_yield ();
493 #else
494                 SwitchToThread ();
495 #endif
496 }
497
498 void ReadRelease (RWLock *lock)
499 {
500 #ifdef unix
501         __sync_fetch_and_add (lock->rout, RINC);
502 #else
503         _InterlockedExchangeAdd16 (lock->rout, RINC);
504 #endif
505 }
506
507 //      Spin Latch Manager
508
509 //      wait until write lock mode is clear
510 //      and add 1 to the share count
511
512 void bt_spinreadlock(BtSpinLatch *latch)
513 {
514 ushort prev;
515
516   do {
517 #ifdef unix
518         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
519 #else
520         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
521 #endif
522         //  see if exclusive request is granted or pending
523
524         if( !(prev & BOTH) )
525                 return;
526 #ifdef unix
527         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
528 #else
529         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
530 #endif
531 #ifdef  unix
532   } while( sched_yield(), 1 );
533 #else
534   } while( SwitchToThread(), 1 );
535 #endif
536 }
537
538 //      wait for other read and write latches to relinquish
539
540 void bt_spinwritelock(BtSpinLatch *latch)
541 {
542 ushort prev;
543
544   do {
545 #ifdef  unix
546         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
547 #else
548         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
549 #endif
550         if( !(prev & XCL) )
551           if( !(prev & ~BOTH) )
552                 return;
553           else
554 #ifdef unix
555                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
556 #else
557                 _InterlockedAnd16((ushort *)latch, ~XCL);
558 #endif
559 #ifdef  unix
560   } while( sched_yield(), 1 );
561 #else
562   } while( SwitchToThread(), 1 );
563 #endif
564 }
565
566 //      try to obtain write lock
567
568 //      return 1 if obtained,
569 //              0 otherwise
570
571 int bt_spinwritetry(BtSpinLatch *latch)
572 {
573 ushort prev;
574
575 #ifdef  unix
576         prev = __sync_fetch_and_or((ushort *)latch, XCL);
577 #else
578         prev = _InterlockedOr16((ushort *)latch, XCL);
579 #endif
580         //      take write access if all bits are clear
581
582         if( !(prev & XCL) )
583           if( !(prev & ~BOTH) )
584                 return 1;
585           else
586 #ifdef unix
587                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
588 #else
589                 _InterlockedAnd16((ushort *)latch, ~XCL);
590 #endif
591         return 0;
592 }
593
594 //      clear write mode
595
596 void bt_spinreleasewrite(BtSpinLatch *latch)
597 {
598 #ifdef unix
599         __sync_fetch_and_and((ushort *)latch, ~BOTH);
600 #else
601         _InterlockedAnd16((ushort *)latch, ~BOTH);
602 #endif
603 }
604
605 //      decrement reader count
606
607 void bt_spinreleaseread(BtSpinLatch *latch)
608 {
609 #ifdef unix
610         __sync_fetch_and_add((ushort *)latch, -SHARE);
611 #else
612         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
613 #endif
614 }
615
616 //      read page from permanent location in Btree file
617
618 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
619 {
620 off64_t off = page_no << mgr->page_bits;
621
622 #ifdef unix
623         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
624                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
625                 return BTERR_read;
626         }
627 #else
628 OVERLAPPED ovl[1];
629 uint amt[1];
630
631         memset (ovl, 0, sizeof(OVERLAPPED));
632         ovl->Offset = off;
633         ovl->OffsetHigh = off >> 32;
634
635         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
636                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
637                 return BTERR_read;
638         }
639         if( *amt <  mgr->page_size ) {
640                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
641                 return BTERR_read;
642         }
643 #endif
644         return 0;
645 }
646
647 //      write page to permanent location in Btree file
648 //      clear the dirty bit
649
650 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
651 {
652 off64_t off = page_no << mgr->page_bits;
653
654 #ifdef unix
655         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
656                 return BTERR_wrt;
657 #else
658 OVERLAPPED ovl[1];
659 uint amt[1];
660
661         memset (ovl, 0, sizeof(OVERLAPPED));
662         ovl->Offset = off;
663         ovl->OffsetHigh = off >> 32;
664
665         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
666                 return BTERR_wrt;
667
668         if( *amt <  mgr->page_size )
669                 return BTERR_wrt;
670 #endif
671         return 0;
672 }
673
674 //      link latch table entry into head of latch hash table
675
676 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
677 {
678 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
679 BtLatchSet *latch = bt->mgr->latchsets + slot;
680
681         if( latch->next = bt->mgr->hashtable[hashidx].slot )
682                 bt->mgr->latchsets[latch->next].prev = slot;
683
684         bt->mgr->hashtable[hashidx].slot = slot;
685         latch->page_no = page_no;
686         latch->entry = slot;
687         latch->split = 0;
688         latch->prev = 0;
689         latch->pin = 1;
690
691         if( loadit )
692           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
693                 return bt->err;
694           else
695                 bt->reads++;
696
697         return bt->err = 0;
698 }
699
700 //      set CLOCK bit in latch
701 //      decrement pin count
702
703 void bt_unpinlatch (BtLatchSet *latch)
704 {
705 #ifdef unix
706         if( ~latch->pin & CLOCK_bit )
707                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
708         __sync_fetch_and_add(&latch->pin, -1);
709 #else
710         if( ~latch->pin & CLOCK_bit )
711                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
712         _InterlockedDecrement16 (&latch->pin);
713 #endif
714 }
715
716 //  return the btree cached page address
717
718 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
719 {
720 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
721
722         return page;
723 }
724
725 //      find existing latchset or inspire new one
726 //      return with latchset pinned
727
728 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
729 {
730 uint hashidx = page_no % bt->mgr->latchhash;
731 BtLatchSet *latch;
732 uint slot, idx;
733 uint lvl, cnt;
734 off64_t off;
735 uint amt[1];
736 BtPage page;
737
738   //  try to find our entry
739
740   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
741
742   if( slot = bt->mgr->hashtable[hashidx].slot ) do
743   {
744         latch = bt->mgr->latchsets + slot;
745         if( page_no == latch->page_no )
746                 break;
747   } while( slot = latch->next );
748
749   //  found our entry
750   //  increment clock
751
752   if( slot ) {
753         latch = bt->mgr->latchsets + slot;
754 #ifdef unix
755         __sync_fetch_and_add(&latch->pin, 1);
756 #else
757         _InterlockedIncrement16 (&latch->pin);
758 #endif
759         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
760         return latch;
761   }
762
763         //  see if there are any unused pool entries
764 #ifdef unix
765         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
766 #else
767         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
768 #endif
769
770         if( slot < bt->mgr->latchtotal ) {
771                 latch = bt->mgr->latchsets + slot;
772                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
773                         return NULL;
774                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
775                 return latch;
776         }
777
778 #ifdef unix
779         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
780 #else
781         _InterlockedDecrement (&bt->mgr->latchdeployed);
782 #endif
783   //  find and reuse previous entry on victim
784
785   while( 1 ) {
786 #ifdef unix
787         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
788 #else
789         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
790 #endif
791         // try to get write lock on hash chain
792         //      skip entry if not obtained
793         //      or has outstanding pins
794
795         slot %= bt->mgr->latchtotal;
796
797         if( !slot )
798                 continue;
799
800         latch = bt->mgr->latchsets + slot;
801         idx = latch->page_no % bt->mgr->latchhash;
802
803         //      see we are on same chain as hashidx
804
805         if( idx == hashidx )
806                 continue;
807
808         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
809                 continue;
810
811         //  skip this slot if it is pinned
812         //      or the CLOCK bit is set
813
814         if( latch->pin ) {
815           if( latch->pin & CLOCK_bit ) {
816 #ifdef unix
817                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
818 #else
819                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
820 #endif
821           }
822           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
823           continue;
824         }
825
826         //  update permanent page area in btree from buffer pool
827
828         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
829
830         if( latch->dirty )
831           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
832                 return NULL;
833           else
834                 latch->dirty = 0, bt->writes++;
835
836         //  unlink our available slot from its hash chain
837
838         if( latch->prev )
839                 bt->mgr->latchsets[latch->prev].next = latch->next;
840         else
841                 bt->mgr->hashtable[idx].slot = latch->next;
842
843         if( latch->next )
844                 bt->mgr->latchsets[latch->next].prev = latch->prev;
845
846         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
847
848         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
849                 return NULL;
850
851         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
852         return latch;
853   }
854 }
855
856 void bt_mgrclose (BtMgr *mgr)
857 {
858 BtLatchSet *latch;
859 uint num = 0;
860 BtPage page;
861 uint slot;
862
863         //      flush dirty pool pages to the btree
864
865         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
866                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
867                 latch = mgr->latchsets + slot;
868
869                 if( latch->dirty ) {
870                         bt_writepage(mgr, page, latch->page_no);
871                         latch->dirty = 0, num++;
872                 }
873 //              madvise (page, mgr->page_size, MADV_DONTNEED);
874         }
875
876         fprintf(stderr, "%d buffer pool pages flushed\n", num);
877
878 #ifdef unix
879         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
880         munmap (mgr->pagezero, mgr->page_size);
881 #else
882         FlushViewOfFile(mgr->pagezero, 0);
883         UnmapViewOfFile(mgr->pagezero);
884         UnmapViewOfFile(mgr->hashtable);
885         CloseHandle(mgr->halloc);
886         CloseHandle(mgr->hpool);
887 #endif
888 #ifdef unix
889         close (mgr->idx);
890         free (mgr);
891 #else
892         FlushFileBuffers(mgr->idx);
893         CloseHandle(mgr->idx);
894         GlobalFree (mgr);
895 #endif
896 }
897
898 //      close and release memory
899
900 void bt_close (BtDb *bt)
901 {
902 #ifdef unix
903         if( bt->mem )
904                 free (bt->mem);
905 #else
906         if( bt->mem)
907                 VirtualFree (bt->mem, 0, MEM_RELEASE);
908 #endif
909         free (bt);
910 }
911
912 //  open/create new btree buffer manager
913
914 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
915 //              size of page pool (e.g. 262144)
916
917 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
918 {
919 uint lvl, attr, last, slot, idx;
920 uint nlatchpage, latchhash;
921 unsigned char value[BtId];
922 int flag, initit = 0;
923 BtPageZero *pagezero;
924 off64_t size;
925 uint amt[1];
926 BtMgr* mgr;
927 BtKey* key;
928 BtVal *val;
929
930         // determine sanity of page size and buffer pool
931
932         if( bits > BT_maxbits )
933                 bits = BT_maxbits;
934         else if( bits < BT_minbits )
935                 bits = BT_minbits;
936
937         if( nodemax < 16 ) {
938                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
939                 return NULL;
940         }
941
942 #ifdef unix
943         mgr = calloc (1, sizeof(BtMgr));
944
945         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
946
947         if( mgr->idx == -1 ) {
948                 fprintf (stderr, "Unable to open btree file\n");
949                 return free(mgr), NULL;
950         }
951 #else
952         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
953         attr = FILE_ATTRIBUTE_NORMAL;
954         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
955
956         if( mgr->idx == INVALID_HANDLE_VALUE )
957                 return GlobalFree(mgr), NULL;
958 #endif
959
960 #ifdef unix
961         pagezero = valloc (BT_maxpage);
962         *amt = 0;
963
964         // read minimum page size to get root info
965         //      to support raw disk partition files
966         //      check if bits == 0 on the disk.
967
968         if( size = lseek (mgr->idx, 0L, 2) )
969                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
970                         if( pagezero->alloc->bits )
971                                 bits = pagezero->alloc->bits;
972                         else
973                                 initit = 1;
974                 else
975                         return free(mgr), free(pagezero), NULL;
976         else
977                 initit = 1;
978 #else
979         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
980         size = GetFileSize(mgr->idx, amt);
981
982         if( size || *amt ) {
983                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
984                         return bt_mgrclose (mgr), NULL;
985                 bits = pagezero->alloc->bits;
986         } else
987                 initit = 1;
988 #endif
989
990         mgr->page_size = 1 << bits;
991         mgr->page_bits = bits;
992
993         //  calculate number of latch hash table entries
994
995         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
996         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
997
998         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
999         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1000         mgr->latchtotal = nodemax;
1001
1002         if( !initit )
1003                 goto mgrlatch;
1004
1005         // initialize an empty b-tree with latch page, root page, page of leaves
1006         // and page(s) of latches and page pool cache
1007
1008         memset (pagezero, 0, 1 << bits);
1009         pagezero->alloc->bits = mgr->page_bits;
1010         bt_putid(pagezero->alloc->right, MIN_lvl+1);
1011
1012         //  initialize left-most LEAF page in
1013         //      alloc->left.
1014
1015         bt_putid (pagezero->alloc->left, LEAF_page);
1016
1017         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1018                 fprintf (stderr, "Unable to create btree page zero\n");
1019                 return bt_mgrclose (mgr), NULL;
1020         }
1021
1022         memset (pagezero, 0, 1 << bits);
1023         pagezero->alloc->bits = mgr->page_bits;
1024
1025         for( lvl=MIN_lvl; lvl--; ) {
1026                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1027                 key = keyptr(pagezero->alloc, 1);
1028                 key->len = 2;           // create stopper key
1029                 key->key[0] = 0xff;
1030                 key->key[1] = 0xff;
1031
1032                 bt_putid(value, MIN_lvl - lvl + 1);
1033                 val = valptr(pagezero->alloc, 1);
1034                 val->len = lvl ? BtId : 0;
1035                 memcpy (val->value, value, val->len);
1036
1037                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1038                 pagezero->alloc->lvl = lvl;
1039                 pagezero->alloc->cnt = 1;
1040                 pagezero->alloc->act = 1;
1041
1042                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1043                         fprintf (stderr, "Unable to create btree page zero\n");
1044                         return bt_mgrclose (mgr), NULL;
1045                 }
1046         }
1047
1048 mgrlatch:
1049 #ifdef unix
1050         free (pagezero);
1051 #else
1052         VirtualFree (pagezero, 0, MEM_RELEASE);
1053 #endif
1054 #ifdef unix
1055         // mlock the pagezero page
1056
1057         flag = PROT_READ | PROT_WRITE;
1058         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1059         if( mgr->pagezero == MAP_FAILED ) {
1060                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1061                 return bt_mgrclose (mgr), NULL;
1062         }
1063         mlock (mgr->pagezero, mgr->page_size);
1064
1065         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1066         if( mgr->hashtable == MAP_FAILED ) {
1067                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1068                 return bt_mgrclose (mgr), NULL;
1069         }
1070 #else
1071         flag = PAGE_READWRITE;
1072         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1073         if( !mgr->halloc ) {
1074                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1075                 return bt_mgrclose (mgr), NULL;
1076         }
1077
1078         flag = FILE_MAP_WRITE;
1079         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1080         if( !mgr->pagezero ) {
1081                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1082                 return bt_mgrclose (mgr), NULL;
1083         }
1084
1085         flag = PAGE_READWRITE;
1086         size = (uid)mgr->nlatchpage << mgr->page_bits;
1087         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1088         if( !mgr->hpool ) {
1089                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1090                 return bt_mgrclose (mgr), NULL;
1091         }
1092
1093         flag = FILE_MAP_WRITE;
1094         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1095         if( !mgr->hashtable ) {
1096                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1097                 return bt_mgrclose (mgr), NULL;
1098         }
1099 #endif
1100
1101         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1102         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1103
1104         return mgr;
1105 }
1106
1107 //      open BTree access method
1108 //      based on buffer manager
1109
1110 BtDb *bt_open (BtMgr *mgr)
1111 {
1112 BtDb *bt = malloc (sizeof(*bt));
1113
1114         memset (bt, 0, sizeof(*bt));
1115         bt->mgr = mgr;
1116 #ifdef unix
1117         bt->mem = valloc (2 *mgr->page_size);
1118 #else
1119         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1120 #endif
1121         bt->frame = (BtPage)bt->mem;
1122         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1123 #ifdef unix
1124         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1125 #else
1126         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1127 #endif
1128         return bt;
1129 }
1130
1131 //  compare two keys, return > 0, = 0, or < 0
1132 //  =0: keys are same
1133 //  -1: key2 > key1
1134 //  +1: key2 < key1
1135 //  as the comparison value
1136
1137 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1138 {
1139 uint len1 = key1->len;
1140 int ans;
1141
1142         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1143                 return ans;
1144
1145         if( len1 > len2 )
1146                 return 1;
1147         if( len1 < len2 )
1148                 return -1;
1149
1150         return 0;
1151 }
1152
1153 // place write, read, or parent lock on requested page_no.
1154
1155 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1156 {
1157         switch( mode ) {
1158         case BtLockRead:
1159                 ReadLock (latch->readwr);
1160                 break;
1161         case BtLockWrite:
1162                 WriteLock (latch->readwr);
1163                 break;
1164         case BtLockAccess:
1165                 ReadLock (latch->access);
1166                 break;
1167         case BtLockDelete:
1168                 WriteLock (latch->access);
1169                 break;
1170         case BtLockParent:
1171                 WriteOLock (latch->parent);
1172                 break;
1173         case BtLockAtomic:
1174                 WriteOLock (latch->atomic);
1175                 break;
1176         }
1177 }
1178
1179 // remove write, read, or parent lock on requested page
1180
1181 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1182 {
1183         switch( mode ) {
1184         case BtLockRead:
1185                 ReadRelease (latch->readwr);
1186                 break;
1187         case BtLockWrite:
1188                 WriteRelease (latch->readwr);
1189                 break;
1190         case BtLockAccess:
1191                 ReadRelease (latch->access);
1192                 break;
1193         case BtLockDelete:
1194                 WriteRelease (latch->access);
1195                 break;
1196         case BtLockParent:
1197                 WriteORelease (latch->parent);
1198                 break;
1199         case BtLockAtomic:
1200                 WriteORelease (latch->atomic);
1201                 break;
1202         }
1203 }
1204
1205 //      allocate a new page
1206 //      return with page latched, but unlocked.
1207
1208 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1209 {
1210 uid page_no;
1211 int blk;
1212
1213         //      lock allocation page
1214
1215         bt_spinwritelock(bt->mgr->lock);
1216
1217         // use empty chain first
1218         // else allocate empty page
1219
1220         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1221                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1222                         set->page = bt_mappage (bt, set->latch);
1223                 else
1224                         return bt->err = BTERR_struct, -1;
1225
1226                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1227                 bt_spinreleasewrite(bt->mgr->lock);
1228
1229                 memcpy (set->page, contents, bt->mgr->page_size);
1230                 set->latch->dirty = 1;
1231                 return 0;
1232         }
1233
1234         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1235         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1236
1237         // unlock allocation latch
1238
1239         bt_spinreleasewrite(bt->mgr->lock);
1240
1241         //      don't load cache from btree page
1242
1243         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1244                 set->page = bt_mappage (bt, set->latch);
1245         else
1246                 return bt->err = BTERR_struct;
1247
1248         memcpy (set->page, contents, bt->mgr->page_size);
1249         set->latch->dirty = 1;
1250         return 0;
1251 }
1252
1253 //  find slot in page for given key at a given level
1254
1255 int bt_findslot (BtPage page, unsigned char *key, uint len)
1256 {
1257 uint diff, higher = page->cnt, low = 1, slot;
1258 uint good = 0;
1259
1260         //        make stopper key an infinite fence value
1261
1262         if( bt_getid (page->right) )
1263                 higher++;
1264         else
1265                 good++;
1266
1267         //      low is the lowest candidate.
1268         //  loop ends when they meet
1269
1270         //  higher is already
1271         //      tested as .ge. the passed key.
1272
1273         while( diff = higher - low ) {
1274                 slot = low + ( diff >> 1 );
1275                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1276                         low = slot + 1;
1277                 else
1278                         higher = slot, good++;
1279         }
1280
1281         //      return zero if key is on right link page
1282
1283         return good ? higher : 0;
1284 }
1285
1286 //  find and load page at given level for given key
1287 //      leave page rd or wr locked as requested
1288
1289 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1290 {
1291 uid page_no = ROOT_page, prevpage = 0;
1292 uint drill = 0xff, slot;
1293 BtLatchSet *prevlatch;
1294 uint mode, prevmode;
1295
1296   //  start at root of btree and drill down
1297
1298   do {
1299         // determine lock mode of drill level
1300         mode = (drill == lvl) ? lock : BtLockRead; 
1301
1302         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1303           return 0;
1304
1305         // obtain access lock using lock chaining with Access mode
1306
1307         if( page_no > ROOT_page )
1308           bt_lockpage(bt, BtLockAccess, set->latch);
1309
1310         set->page = bt_mappage (bt, set->latch);
1311
1312         //      release & unpin parent or left sibling page
1313
1314         if( prevpage ) {
1315           bt_unlockpage(bt, prevmode, prevlatch);
1316           bt_unpinlatch (prevlatch);
1317           prevpage = 0;
1318         }
1319
1320         // obtain mode lock using lock chaining through AccessLock
1321
1322         bt_lockpage(bt, mode, set->latch);
1323
1324         if( set->page->free )
1325                 return bt->err = BTERR_struct, 0;
1326
1327         if( page_no > ROOT_page )
1328           bt_unlockpage(bt, BtLockAccess, set->latch);
1329
1330         // re-read and re-lock root after determining actual level of root
1331
1332         if( set->page->lvl != drill) {
1333                 if( set->latch->page_no != ROOT_page )
1334                         return bt->err = BTERR_struct, 0;
1335                         
1336                 drill = set->page->lvl;
1337
1338                 if( lock != BtLockRead && drill == lvl ) {
1339                   bt_unlockpage(bt, mode, set->latch);
1340                   bt_unpinlatch (set->latch);
1341                   continue;
1342                 }
1343         }
1344
1345         prevpage = set->latch->page_no;
1346         prevlatch = set->latch;
1347         prevmode = mode;
1348
1349         //  find key on page at this level
1350         //  and descend to requested level
1351
1352         if( !set->page->kill )
1353          if( slot = bt_findslot (set->page, key, len) ) {
1354           if( drill == lvl )
1355                 return slot;
1356
1357           // find next non-dead slot -- the fence key if nothing else
1358
1359           while( slotptr(set->page, slot)->dead )
1360                 if( slot++ < set->page->cnt )
1361                   continue;
1362                 else
1363                   return bt->err = BTERR_struct, 0;
1364
1365           page_no = bt_getid(valptr(set->page, slot)->value);
1366           drill--;
1367           continue;
1368          }
1369
1370         //  or slide right into next page
1371
1372         page_no = bt_getid(set->page->right);
1373   } while( page_no );
1374
1375   // return error on end of right chain
1376
1377   bt->err = BTERR_struct;
1378   return 0;     // return error
1379 }
1380
1381 //      return page to free list
1382 //      page must be delete & write locked
1383
1384 void bt_freepage (BtDb *bt, BtPageSet *set)
1385 {
1386         //      lock allocation page
1387
1388         bt_spinwritelock (bt->mgr->lock);
1389
1390         //      store chain
1391
1392         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1393         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1394         set->latch->dirty = 1;
1395         set->page->free = 1;
1396
1397         // unlock released page
1398
1399         bt_unlockpage (bt, BtLockDelete, set->latch);
1400         bt_unlockpage (bt, BtLockWrite, set->latch);
1401         bt_unpinlatch (set->latch);
1402
1403         // unlock allocation page
1404
1405         bt_spinreleasewrite (bt->mgr->lock);
1406 }
1407
1408 //      a fence key was deleted from a page
1409 //      push new fence value upwards
1410
1411 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1412 {
1413 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1414 unsigned char value[BtId];
1415 BtKey* ptr;
1416 uint idx;
1417
1418         //      remove the old fence value
1419
1420         ptr = keyptr(set->page, set->page->cnt);
1421         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1422         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1423         set->latch->dirty = 1;
1424
1425         //  cache new fence value
1426
1427         ptr = keyptr(set->page, set->page->cnt);
1428         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1429
1430         bt_lockpage (bt, BtLockParent, set->latch);
1431         bt_unlockpage (bt, BtLockWrite, set->latch);
1432
1433         //      insert new (now smaller) fence key
1434
1435         bt_putid (value, set->latch->page_no);
1436         ptr = (BtKey*)leftkey;
1437
1438         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1439           return bt->err;
1440
1441         //      now delete old fence key
1442
1443         ptr = (BtKey*)rightkey;
1444
1445         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1446                 return bt->err;
1447
1448         bt_unlockpage (bt, BtLockParent, set->latch);
1449         bt_unpinlatch(set->latch);
1450         return 0;
1451 }
1452
1453 //      root has a single child
1454 //      collapse a level from the tree
1455
1456 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1457 {
1458 BtPageSet child[1];
1459 uid page_no;
1460 uint idx;
1461
1462   // find the child entry and promote as new root contents
1463
1464   do {
1465         for( idx = 0; idx++ < root->page->cnt; )
1466           if( !slotptr(root->page, idx)->dead )
1467                 break;
1468
1469         page_no = bt_getid (valptr(root->page, idx)->value);
1470
1471         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1472                 child->page = bt_mappage (bt, child->latch);
1473         else
1474                 return bt->err;
1475
1476         bt_lockpage (bt, BtLockDelete, child->latch);
1477         bt_lockpage (bt, BtLockWrite, child->latch);
1478
1479         memcpy (root->page, child->page, bt->mgr->page_size);
1480         root->latch->dirty = 1;
1481
1482         bt_freepage (bt, child);
1483
1484   } while( root->page->lvl > 1 && root->page->act == 1 );
1485
1486   bt_unlockpage (bt, BtLockWrite, root->latch);
1487   bt_unpinlatch (root->latch);
1488   return 0;
1489 }
1490
1491 //  delete a page and manage keys
1492 //  call with page writelocked
1493 //      returns with page unpinned
1494
1495 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1496 {
1497 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1498 unsigned char value[BtId];
1499 uint lvl = set->page->lvl;
1500 BtPageSet right[1];
1501 uid page_no;
1502 BtKey *ptr;
1503
1504         //      cache copy of fence key
1505         //      to post in parent
1506
1507         ptr = keyptr(set->page, set->page->cnt);
1508         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1509
1510         //      obtain lock on right page
1511
1512         page_no = bt_getid(set->page->right);
1513
1514         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1515                 right->page = bt_mappage (bt, right->latch);
1516         else
1517                 return 0;
1518
1519         bt_lockpage (bt, BtLockWrite, right->latch);
1520
1521         // cache copy of key to update
1522
1523         ptr = keyptr(right->page, right->page->cnt);
1524         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1525
1526         if( right->page->kill )
1527                 return bt->err = BTERR_struct;
1528
1529         // pull contents of right peer into our empty page
1530
1531         memcpy (set->page, right->page, bt->mgr->page_size);
1532         set->latch->dirty = 1;
1533
1534         // mark right page deleted and point it to left page
1535         //      until we can post parent updates that remove access
1536         //      to the deleted page.
1537
1538         bt_putid (right->page->right, set->latch->page_no);
1539         right->latch->dirty = 1;
1540         right->page->kill = 1;
1541
1542         bt_lockpage (bt, BtLockParent, right->latch);
1543         bt_unlockpage (bt, BtLockWrite, right->latch);
1544
1545         bt_lockpage (bt, BtLockParent, set->latch);
1546         bt_unlockpage (bt, BtLockWrite, set->latch);
1547
1548         // redirect higher key directly to our new node contents
1549
1550         bt_putid (value, set->latch->page_no);
1551         ptr = (BtKey*)higherfence;
1552
1553         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1554           return bt->err;
1555
1556         //      delete old lower key to our node
1557
1558         ptr = (BtKey*)lowerfence;
1559
1560         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1561           return bt->err;
1562
1563         //      obtain delete and write locks to right node
1564
1565         bt_unlockpage (bt, BtLockParent, right->latch);
1566         bt_lockpage (bt, BtLockDelete, right->latch);
1567         bt_lockpage (bt, BtLockWrite, right->latch);
1568         bt_freepage (bt, right);
1569
1570         bt_unlockpage (bt, BtLockParent, set->latch);
1571         bt_unpinlatch (set->latch);
1572         bt->found = 1;
1573         return 0;
1574 }
1575
1576 //  find and delete key on page by marking delete flag bit
1577 //  if page becomes empty, delete it from the btree
1578
1579 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1580 {
1581 uint slot, idx, found, fence;
1582 BtPageSet set[1];
1583 BtKey *ptr;
1584 BtVal *val;
1585
1586         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1587                 ptr = keyptr(set->page, slot);
1588         else
1589                 return bt->err;
1590
1591         // if librarian slot, advance to real slot
1592
1593         if( slotptr(set->page, slot)->type == Librarian )
1594                 ptr = keyptr(set->page, ++slot);
1595
1596         fence = slot == set->page->cnt;
1597
1598         // if key is found delete it, otherwise ignore request
1599
1600         if( found = !keycmp (ptr, key, len) )
1601           if( found = slotptr(set->page, slot)->dead == 0 ) {
1602                 val = valptr(set->page,slot);
1603                 slotptr(set->page, slot)->dead = 1;
1604                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1605                 set->page->act--;
1606
1607                 // collapse empty slots beneath the fence
1608
1609                 while( idx = set->page->cnt - 1 )
1610                   if( slotptr(set->page, idx)->dead ) {
1611                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1612                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1613                   } else
1614                         break;
1615           }
1616
1617         //      did we delete a fence key in an upper level?
1618
1619         if( found && lvl && set->page->act && fence )
1620           if( bt_fixfence (bt, set, lvl) )
1621                 return bt->err;
1622           else
1623                 return bt->found = found, 0;
1624
1625         //      do we need to collapse root?
1626
1627         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1628           if( bt_collapseroot (bt, set) )
1629                 return bt->err;
1630           else
1631                 return bt->found = found, 0;
1632
1633         //      delete empty page
1634
1635         if( !set->page->act )
1636                 return bt_deletepage (bt, set);
1637
1638         set->latch->dirty = 1;
1639         bt_unlockpage(bt, BtLockWrite, set->latch);
1640         bt_unpinlatch (set->latch);
1641         return bt->found = found, 0;
1642 }
1643
1644 BtKey *bt_foundkey (BtDb *bt)
1645 {
1646         return (BtKey*)(bt->key);
1647 }
1648
1649 //      advance to next slot
1650
1651 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1652 {
1653 BtLatchSet *prevlatch;
1654 uid page_no;
1655
1656         if( slot < set->page->cnt )
1657                 return slot + 1;
1658
1659         prevlatch = set->latch;
1660
1661         if( page_no = bt_getid(set->page->right) )
1662           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1663                 set->page = bt_mappage (bt, set->latch);
1664           else
1665                 return 0;
1666         else
1667           return bt->err = BTERR_struct, 0;
1668
1669         // obtain access lock using lock chaining with Access mode
1670
1671         bt_lockpage(bt, BtLockAccess, set->latch);
1672
1673         bt_unlockpage(bt, BtLockRead, prevlatch);
1674         bt_unpinlatch (prevlatch);
1675
1676         bt_lockpage(bt, BtLockRead, set->latch);
1677         bt_unlockpage(bt, BtLockAccess, set->latch);
1678         return 1;
1679 }
1680
1681 //      find unique key or first duplicate key in
1682 //      leaf level and return number of value bytes
1683 //      or (-1) if not found.  Setup key for bt_foundkey
1684
1685 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1686 {
1687 BtPageSet set[1];
1688 uint len, slot;
1689 int ret = -1;
1690 BtKey *ptr;
1691 BtVal *val;
1692
1693   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1694    do {
1695         ptr = keyptr(set->page, slot);
1696
1697         //      skip librarian slot place holder
1698
1699         if( slotptr(set->page, slot)->type == Librarian )
1700                 ptr = keyptr(set->page, ++slot);
1701
1702         //      return actual key found
1703
1704         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1705         len = ptr->len;
1706
1707         if( slotptr(set->page, slot)->type == Duplicate )
1708                 len -= BtId;
1709
1710         //      not there if we reach the stopper key
1711
1712         if( slot == set->page->cnt )
1713           if( !bt_getid (set->page->right) )
1714                 break;
1715
1716         // if key exists, return >= 0 value bytes copied
1717         //      otherwise return (-1)
1718
1719         if( slotptr(set->page, slot)->dead )
1720                 continue;
1721
1722         if( keylen == len )
1723           if( !memcmp (ptr->key, key, len) ) {
1724                 val = valptr (set->page,slot);
1725                 if( valmax > val->len )
1726                         valmax = val->len;
1727                 memcpy (value, val->value, valmax);
1728                 ret = valmax;
1729           }
1730
1731         break;
1732
1733    } while( slot = bt_findnext (bt, set, slot) );
1734
1735   bt_unlockpage (bt, BtLockRead, set->latch);
1736   bt_unpinlatch (set->latch);
1737   return ret;
1738 }
1739
1740 //      check page for space available,
1741 //      clean if necessary and return
1742 //      0 - page needs splitting
1743 //      >0  new slot value
1744
1745 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1746 {
1747 uint nxt = bt->mgr->page_size;
1748 BtPage page = set->page;
1749 uint cnt = 0, idx = 0;
1750 uint max = page->cnt;
1751 uint newslot = max;
1752 BtKey *key;
1753 BtVal *val;
1754
1755         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1756                 return slot;
1757
1758         //      skip cleanup and proceed to split
1759         //      if there's not enough garbage
1760         //      to bother with.
1761
1762         if( page->garbage < nxt / 5 )
1763                 return 0;
1764
1765         memcpy (bt->frame, page, bt->mgr->page_size);
1766
1767         // skip page info and set rest of page to zero
1768
1769         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1770         set->latch->dirty = 1;
1771         page->garbage = 0;
1772         page->act = 0;
1773
1774         // clean up page first by
1775         // removing deleted keys
1776
1777         while( cnt++ < max ) {
1778                 if( cnt == slot )
1779                         newslot = idx + 2;
1780                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1781                         continue;
1782
1783                 // copy the value across
1784
1785                 val = valptr(bt->frame, cnt);
1786                 nxt -= val->len + sizeof(BtVal);
1787                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1788
1789                 // copy the key across
1790
1791                 key = keyptr(bt->frame, cnt);
1792                 nxt -= key->len + sizeof(BtKey);
1793                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1794
1795                 // make a librarian slot
1796
1797                 if( idx ) {
1798                         slotptr(page, ++idx)->off = nxt;
1799                         slotptr(page, idx)->type = Librarian;
1800                         slotptr(page, idx)->dead = 1;
1801                 }
1802
1803                 // set up the slot
1804
1805                 slotptr(page, ++idx)->off = nxt;
1806                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1807
1808                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1809                         page->act++;
1810         }
1811
1812         page->min = nxt;
1813         page->cnt = idx;
1814
1815         //      see if page has enough space now, or does it need splitting?
1816
1817         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1818                 return newslot;
1819
1820         return 0;
1821 }
1822
1823 // split the root and raise the height of the btree
1824
1825 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1826 {  
1827 unsigned char leftkey[BT_keyarray];
1828 uint nxt = bt->mgr->page_size;
1829 unsigned char value[BtId];
1830 BtPageSet left[1];
1831 uid left_page_no;
1832 BtKey *ptr;
1833 BtVal *val;
1834
1835         //      save left page fence key for new root
1836
1837         ptr = keyptr(root->page, root->page->cnt);
1838         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1839
1840         //  Obtain an empty page to use, and copy the current
1841         //  root contents into it, e.g. lower keys
1842
1843         if( bt_newpage(bt, left, root->page) )
1844                 return bt->err;
1845
1846         left_page_no = left->latch->page_no;
1847         bt_unpinlatch (left->latch);
1848
1849         // preserve the page info at the bottom
1850         // of higher keys and set rest to zero
1851
1852         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1853
1854         // insert stopper key at top of newroot page
1855         // and increase the root height
1856
1857         nxt -= BtId + sizeof(BtVal);
1858         bt_putid (value, right->page_no);
1859         val = (BtVal *)((unsigned char *)root->page + nxt);
1860         memcpy (val->value, value, BtId);
1861         val->len = BtId;
1862
1863         nxt -= 2 + sizeof(BtKey);
1864         slotptr(root->page, 2)->off = nxt;
1865         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1866         ptr->len = 2;
1867         ptr->key[0] = 0xff;
1868         ptr->key[1] = 0xff;
1869
1870         // insert lower keys page fence key on newroot page as first key
1871
1872         nxt -= BtId + sizeof(BtVal);
1873         bt_putid (value, left_page_no);
1874         val = (BtVal *)((unsigned char *)root->page + nxt);
1875         memcpy (val->value, value, BtId);
1876         val->len = BtId;
1877
1878         ptr = (BtKey *)leftkey;
1879         nxt -= ptr->len + sizeof(BtKey);
1880         slotptr(root->page, 1)->off = nxt;
1881         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1882         
1883         bt_putid(root->page->right, 0);
1884         root->page->min = nxt;          // reset lowest used offset and key count
1885         root->page->cnt = 2;
1886         root->page->act = 2;
1887         root->page->lvl++;
1888
1889         // release and unpin root pages
1890
1891         bt_unlockpage(bt, BtLockWrite, root->latch);
1892         bt_unpinlatch (root->latch);
1893
1894         bt_unpinlatch (right);
1895         return 0;
1896 }
1897
1898 //  split already locked full node
1899 //      leave it locked.
1900 //      return pool entry for new right
1901 //      page, unlocked
1902
1903 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1904 {
1905 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1906 uint lvl = set->page->lvl;
1907 BtPageSet right[1];
1908 BtKey *key, *ptr;
1909 BtVal *val, *src;
1910 uid right2;
1911 uint prev;
1912
1913         //  split higher half of keys to bt->frame
1914
1915         memset (bt->frame, 0, bt->mgr->page_size);
1916         max = set->page->cnt;
1917         cnt = max / 2;
1918         idx = 0;
1919
1920         while( cnt++ < max ) {
1921                 if( slotptr(set->page, cnt)->dead && cnt < max )
1922                         continue;
1923                 src = valptr(set->page, cnt);
1924                 nxt -= src->len + sizeof(BtVal);
1925                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1926
1927                 key = keyptr(set->page, cnt);
1928                 nxt -= key->len + sizeof(BtKey);
1929                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1930                 memcpy (ptr, key, key->len + sizeof(BtKey));
1931
1932                 //      add librarian slot
1933
1934                 if( idx ) {
1935                         slotptr(bt->frame, ++idx)->off = nxt;
1936                         slotptr(bt->frame, idx)->type = Librarian;
1937                         slotptr(bt->frame, idx)->dead = 1;
1938                 }
1939
1940                 //  add actual slot
1941
1942                 slotptr(bt->frame, ++idx)->off = nxt;
1943                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1944
1945                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1946                         bt->frame->act++;
1947         }
1948
1949         bt->frame->bits = bt->mgr->page_bits;
1950         bt->frame->min = nxt;
1951         bt->frame->cnt = idx;
1952         bt->frame->lvl = lvl;
1953
1954         // link right node
1955
1956         if( set->latch->page_no > ROOT_page )
1957                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1958
1959         //      get new free page and write higher keys to it.
1960
1961         if( bt_newpage(bt, right, bt->frame) )
1962                 return 0;
1963
1964         memcpy (bt->frame, set->page, bt->mgr->page_size);
1965         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1966         set->latch->dirty = 1;
1967
1968         nxt = bt->mgr->page_size;
1969         set->page->garbage = 0;
1970         set->page->act = 0;
1971         max /= 2;
1972         cnt = 0;
1973         idx = 0;
1974
1975         if( slotptr(bt->frame, max)->type == Librarian )
1976                 max--;
1977
1978         //  assemble page of smaller keys
1979
1980         while( cnt++ < max ) {
1981                 if( slotptr(bt->frame, cnt)->dead )
1982                         continue;
1983                 val = valptr(bt->frame, cnt);
1984                 nxt -= val->len + sizeof(BtVal);
1985                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1986
1987                 key = keyptr(bt->frame, cnt);
1988                 nxt -= key->len + sizeof(BtKey);
1989                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1990
1991                 //      add librarian slot
1992
1993                 if( idx ) {
1994                         slotptr(set->page, ++idx)->off = nxt;
1995                         slotptr(set->page, idx)->type = Librarian;
1996                         slotptr(set->page, idx)->dead = 1;
1997                 }
1998
1999                 //      add actual slot
2000
2001                 slotptr(set->page, ++idx)->off = nxt;
2002                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2003                 set->page->act++;
2004         }
2005
2006         bt_putid(set->page->right, right->latch->page_no);
2007         set->page->min = nxt;
2008         set->page->cnt = idx;
2009
2010         return right->latch->entry;
2011 }
2012
2013 //      fix keys for newly split page
2014 //      call with page locked, return
2015 //      unlocked
2016
2017 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2018 {
2019 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2020 unsigned char value[BtId];
2021 uint lvl = set->page->lvl;
2022 BtPage page;
2023 BtKey *ptr;
2024
2025         // if current page is the root page, split it
2026
2027         if( set->latch->page_no == ROOT_page )
2028                 return bt_splitroot (bt, set, right);
2029
2030         ptr = keyptr(set->page, set->page->cnt);
2031         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2032
2033         page = bt_mappage (bt, right);
2034
2035         ptr = keyptr(page, page->cnt);
2036         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2037
2038         // insert new fences in their parent pages
2039
2040         bt_lockpage (bt, BtLockParent, right);
2041
2042         bt_lockpage (bt, BtLockParent, set->latch);
2043         bt_unlockpage (bt, BtLockWrite, set->latch);
2044
2045         // insert new fence for reformulated left block of smaller keys
2046
2047         bt_putid (value, set->latch->page_no);
2048         ptr = (BtKey *)leftkey;
2049
2050         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2051                 return bt->err;
2052
2053         // switch fence for right block of larger keys to new right page
2054
2055         bt_putid (value, right->page_no);
2056         ptr = (BtKey *)rightkey;
2057
2058         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2059                 return bt->err;
2060
2061         bt_unlockpage (bt, BtLockParent, set->latch);
2062         bt_unpinlatch (set->latch);
2063
2064         bt_unlockpage (bt, BtLockParent, right);
2065         bt_unpinlatch (right);
2066         return 0;
2067 }
2068
2069 //      install new key and value onto page
2070 //      page must already be checked for
2071 //      adequate space
2072
2073 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2074 {
2075 uint idx, librarian;
2076 BtSlot *node;
2077 BtKey *ptr;
2078 BtVal *val;
2079
2080         //      if found slot > desired slot and previous slot
2081         //      is a librarian slot, use it
2082
2083         if( slot > 1 )
2084           if( slotptr(set->page, slot-1)->type == Librarian )
2085                 slot--;
2086
2087         // copy value onto page
2088
2089         set->page->min -= vallen + sizeof(BtVal);
2090         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2091         memcpy (val->value, value, vallen);
2092         val->len = vallen;
2093
2094         // copy key onto page
2095
2096         set->page->min -= keylen + sizeof(BtKey);
2097         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2098         memcpy (ptr->key, key, keylen);
2099         ptr->len = keylen;
2100         
2101         //      find first empty slot
2102
2103         for( idx = slot; idx < set->page->cnt; idx++ )
2104           if( slotptr(set->page, idx)->dead )
2105                 break;
2106
2107         // now insert key into array before slot
2108
2109         if( idx == set->page->cnt )
2110                 idx += 2, set->page->cnt += 2, librarian = 2;
2111         else
2112                 librarian = 1;
2113
2114         set->latch->dirty = 1;
2115         set->page->act++;
2116
2117         while( idx > slot + librarian - 1 )
2118                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2119
2120         //      add librarian slot
2121
2122         if( librarian > 1 ) {
2123                 node = slotptr(set->page, slot++);
2124                 node->off = set->page->min;
2125                 node->type = Librarian;
2126                 node->dead = 1;
2127         }
2128
2129         //      fill in new slot
2130
2131         node = slotptr(set->page, slot);
2132         node->off = set->page->min;
2133         node->type = type;
2134         node->dead = 0;
2135
2136         if( release ) {
2137                 bt_unlockpage (bt, BtLockWrite, set->latch);
2138                 bt_unpinlatch (set->latch);
2139         }
2140
2141         return 0;
2142 }
2143
2144 //  Insert new key into the btree at given level.
2145 //      either add a new key or update/add an existing one
2146
2147 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2148 {
2149 unsigned char newkey[BT_keyarray];
2150 uint slot, idx, len, entry;
2151 BtPageSet set[1];
2152 BtKey *ptr, *ins;
2153 uid sequence;
2154 BtVal *val;
2155 uint type;
2156
2157   // set up the key we're working on
2158
2159   ins = (BtKey*)newkey;
2160   memcpy (ins->key, key, keylen);
2161   ins->len = keylen;
2162
2163   // is this a non-unique index value?
2164
2165   if( unique )
2166         type = Unique;
2167   else {
2168         type = Duplicate;
2169         sequence = bt_newdup (bt);
2170         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2171         ins->len += BtId;
2172   }
2173
2174   while( 1 ) { // find the page and slot for the current key
2175         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2176                 ptr = keyptr(set->page, slot);
2177         else {
2178                 if( !bt->err )
2179                         bt->err = BTERR_ovflw;
2180                 return bt->err;
2181         }
2182
2183         // if librarian slot == found slot, advance to real slot
2184
2185         if( slotptr(set->page, slot)->type == Librarian )
2186           if( !keycmp (ptr, key, keylen) )
2187                 ptr = keyptr(set->page, ++slot);
2188
2189         len = ptr->len;
2190
2191         if( slotptr(set->page, slot)->type == Duplicate )
2192                 len -= BtId;
2193
2194         //  if inserting a duplicate key or unique key
2195         //      check for adequate space on the page
2196         //      and insert the new key before slot.
2197
2198         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2199           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2200                 if( !(entry = bt_splitpage (bt, set)) )
2201                   return bt->err;
2202                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2203                   return bt->err;
2204                 else
2205                   continue;
2206
2207           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2208         }
2209
2210         // if key already exists, update value and return
2211
2212         val = valptr(set->page, slot);
2213
2214         if( val->len >= vallen ) {
2215                 if( slotptr(set->page, slot)->dead )
2216                         set->page->act++;
2217                 set->page->garbage += val->len - vallen;
2218                 set->latch->dirty = 1;
2219                 slotptr(set->page, slot)->dead = 0;
2220                 val->len = vallen;
2221                 memcpy (val->value, value, vallen);
2222                 bt_unlockpage(bt, BtLockWrite, set->latch);
2223                 bt_unpinlatch (set->latch);
2224                 return 0;
2225         }
2226
2227         //      new update value doesn't fit in existing value area
2228
2229         if( !slotptr(set->page, slot)->dead )
2230                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2231         else {
2232                 slotptr(set->page, slot)->dead = 0;
2233                 set->page->act++;
2234         }
2235
2236         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2237           if( !(entry = bt_splitpage (bt, set)) )
2238                 return bt->err;
2239           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2240                 return bt->err;
2241           else
2242                 continue;
2243
2244         set->page->min -= vallen + sizeof(BtVal);
2245         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2246         memcpy (val->value, value, vallen);
2247         val->len = vallen;
2248
2249         set->latch->dirty = 1;
2250         set->page->min -= keylen + sizeof(BtKey);
2251         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2252         memcpy (ptr->key, key, keylen);
2253         ptr->len = keylen;
2254         
2255         slotptr(set->page, slot)->off = set->page->min;
2256         bt_unlockpage(bt, BtLockWrite, set->latch);
2257         bt_unpinlatch (set->latch);
2258         return 0;
2259   }
2260   return 0;
2261 }
2262
2263 typedef struct {
2264         uint entry;                     // latch table entry number
2265         uint slot:31;           // page slot number
2266         uint reuse:1;           // reused previous page
2267 } AtomicMod;
2268
2269 typedef struct {
2270         uid page_no;            // page number for split leaf
2271         void *next;                     // next key to insert
2272         uint entry:29;          // latch table entry number
2273         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2274         uint nounlock:1;        // don't unlock ParentModification
2275         unsigned char leafkey[BT_keyarray];
2276 } AtomicKey;
2277
2278 //  find and load leaf page for given key
2279 //      leave page Atomic locked and Read locked.
2280
2281 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2282 {
2283 BtLatchSet *prevlatch;
2284 uid page_no;
2285 uint slot;
2286
2287   //  find level one slot
2288
2289   if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2290         return 0;
2291
2292   // find next non-dead entry on this page
2293   //    it will be the fence key if nothing else
2294
2295   while( slotptr(set->page, slot)->dead )
2296         if( slot++ < set->page->cnt )
2297           continue;
2298         else
2299           return bt->err = BTERR_struct, 0;
2300
2301   page_no = bt_getid(valptr(set->page, slot)->value);
2302   prevlatch = set->latch;
2303
2304   while( page_no ) {
2305         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2306           set->page = bt_mappage (bt, set->latch);
2307         else
2308           return 0;
2309
2310         // obtain read lock using lock chaining with Access mode
2311         //      release & unpin parent/left sibling page
2312
2313         bt_lockpage(bt, BtLockAccess, set->latch);
2314
2315         bt_unlockpage(bt, BtLockRead, prevlatch);
2316         bt_unpinlatch (prevlatch);
2317
2318         bt_lockpage(bt, BtLockRead, set->latch);
2319
2320         //  find key on page at this level
2321         //  and descend to requested level
2322
2323         if( !set->page->kill )
2324          if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2325           bt_unlockpage(bt, BtLockRead, set->latch);
2326           bt_lockpage(bt, BtLockAtomic, set->latch);
2327           bt_lockpage(bt, BtLockRead, set->latch);
2328           bt_unlockpage(bt, BtLockAccess, set->latch);
2329
2330           if( !set->page->kill )
2331            if( slot = bt_findslot (set->page, key, len) )
2332                 return slot;
2333
2334           bt_unlockpage(bt, BtLockAtomic, set->latch);
2335           }
2336
2337         //  slide right into next page
2338
2339         page_no = bt_getid(set->page->right);
2340         prevlatch = set->latch;
2341   }
2342
2343   // return error on end of right chain
2344
2345   bt->err = BTERR_struct;
2346   return 0;     // return error
2347 }
2348
2349 //      determine actual page where key is located
2350 //  return slot number
2351
2352 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2353 {
2354 BtKey *key = keyptr(source,src);
2355 uint slot = locks[src].slot;
2356 uint entry;
2357
2358         if( src > 1 && locks[src].reuse )
2359           entry = locks[src-1].entry, slot = 0;
2360         else
2361           entry = locks[src].entry;
2362
2363         if( slot ) {
2364                 set->latch = bt->mgr->latchsets + entry;
2365                 set->page = bt_mappage (bt, set->latch);
2366                 return slot;
2367         }
2368
2369         //      is locks->reuse set?
2370         //      if so, find where our key
2371         //      is located on previous page or split pages
2372
2373         do {
2374                 set->latch = bt->mgr->latchsets + entry;
2375                 set->page = bt_mappage (bt, set->latch);
2376
2377                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2378                   if( locks[src].reuse )
2379                         locks[src].entry = entry;
2380                   return slot;
2381                 }
2382         } while( entry = set->latch->split );
2383
2384         bt->err = BTERR_atomic;
2385         return 0;
2386 }
2387
2388 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2389 {
2390 BtKey *key = keyptr(source, src);
2391 BtVal *val = valptr(source, src);
2392 BtLatchSet *latch;
2393 BtPageSet set[1];
2394 uint entry;
2395
2396   while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2397         if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2398           return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2399
2400         if( entry = bt_splitpage (bt, set) )
2401           latch = bt->mgr->latchsets + entry;
2402         else
2403           return bt->err;
2404
2405         //      splice right page into split chain
2406         //      and WriteLock it.
2407
2408         latch->split = set->latch->split;
2409         set->latch->split = entry;
2410         bt_lockpage(bt, BtLockWrite, latch);
2411   }
2412
2413   return bt->err = BTERR_atomic;
2414 }
2415
2416 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2417 {
2418 BtKey *key = keyptr(source, src);
2419 uint idx, entry, slot;
2420 BtPageSet set[1];
2421 BtKey *ptr;
2422 BtVal *val;
2423
2424         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2425                 slotptr(set->page, slot)->dead = 1;
2426         else
2427                 return bt->err = BTERR_struct;
2428
2429         ptr = keyptr(set->page, slot);
2430         val = valptr(set->page, slot);
2431
2432         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2433         set->latch->dirty = 1;
2434         set->page->act--;
2435         return 0;
2436 }
2437
2438 //      delete an empty master page for a transaction
2439
2440 //      note that the far right page never empties because
2441 //      it always contains (at least) the infinite stopper key
2442 //      and that all pages that don't contain any keys are
2443 //      deleted, or are being held under Atomic lock.
2444
2445 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2446 {
2447 BtPageSet right[1], temp[1];
2448 unsigned char value[BtId];
2449 uid right_page_no;
2450 BtKey *ptr;
2451
2452         bt_lockpage(bt, BtLockWrite, prev->latch);
2453
2454         //      grab the right sibling
2455
2456         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2457                 right->page = bt_mappage (bt, right->latch);
2458         else
2459                 return bt->err;
2460
2461         bt_lockpage(bt, BtLockAtomic, right->latch);
2462         bt_lockpage(bt, BtLockWrite, right->latch);
2463
2464         //      and pull contents over empty page
2465         //      while preserving master's left link
2466
2467         memcpy (right->page->left, prev->page->left, BtId);
2468         memcpy (prev->page, right->page, bt->mgr->page_size);
2469
2470         //      forward seekers to old right sibling
2471         //      to new page location in set
2472
2473         bt_putid (right->page->right, prev->latch->page_no);
2474         right->latch->dirty = 1;
2475         right->page->kill = 1;
2476
2477         //      remove pointer to right page for searchers by
2478         //      changing right fence key to point to the master page
2479
2480         ptr = keyptr(right->page,right->page->cnt);
2481         bt_putid (value, prev->latch->page_no);
2482
2483         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2484                 return bt->err;
2485
2486         //  now that master page is in good shape we can
2487         //      remove its locks.
2488
2489         bt_unlockpage (bt, BtLockAtomic, prev->latch);
2490         bt_unlockpage (bt, BtLockWrite, prev->latch);
2491
2492         //  fix master's right sibling's left pointer
2493         //      to remove scanner's poiner to the right page
2494
2495         if( right_page_no = bt_getid (prev->page->right) ) {
2496           if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2497                 temp->page = bt_mappage (bt, temp->latch);
2498
2499           bt_lockpage (bt, BtLockWrite, temp->latch);
2500           bt_putid (temp->page->left, prev->latch->page_no);
2501           temp->latch->dirty = 1;
2502
2503           bt_unlockpage (bt, BtLockWrite, temp->latch);
2504           bt_unpinlatch (temp->latch);
2505         } else {        // master is now the far right page
2506           bt_spinwritelock (bt->mgr->lock);
2507           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2508           bt_spinreleasewrite(bt->mgr->lock);
2509         }
2510
2511         //      now that there are no pointers to the right page
2512         //      we can delete it after the last read access occurs
2513
2514         bt_unlockpage (bt, BtLockWrite, right->latch);
2515         bt_unlockpage (bt, BtLockAtomic, right->latch);
2516         bt_lockpage (bt, BtLockDelete, right->latch);
2517         bt_lockpage (bt, BtLockWrite, right->latch);
2518         bt_freepage (bt, right);
2519         return 0;
2520 }
2521
2522 //      atomic modification of a batch of keys.
2523
2524 //      return -1 if BTERR is set
2525 //      otherwise return slot number
2526 //      causing the key constraint violation
2527 //      or zero on successful completion.
2528
2529 int bt_atomictxn (BtDb *bt, BtPage source)
2530 {
2531 uint src, idx, slot, samepage, entry;
2532 AtomicKey *head, *tail, *leaf;
2533 BtPageSet set[1], prev[1];
2534 unsigned char value[BtId];
2535 BtKey *key, *ptr, *key2;
2536 BtLatchSet *latch;
2537 AtomicMod *locks;
2538 int result = 0;
2539 BtSlot temp[1];
2540 BtPage page;
2541 BtVal *val;
2542 uid right;
2543 int type;
2544
2545   locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2546   head = NULL;
2547   tail = NULL;
2548
2549   // stable sort the list of keys into order to
2550   //    prevent deadlocks between threads.
2551
2552   for( src = 1; src++ < source->cnt; ) {
2553         *temp = *slotptr(source,src);
2554         key = keyptr (source,src);
2555
2556         for( idx = src; --idx; ) {
2557           key2 = keyptr (source,idx);
2558           if( keycmp (key, key2->key, key2->len) < 0 ) {
2559                 *slotptr(source,idx+1) = *slotptr(source,idx);
2560                 *slotptr(source,idx) = *temp;
2561           } else
2562                 break;
2563         }
2564   }
2565
2566   // Load the leaf page for each key
2567   // group same page references with reuse bit
2568   // and determine any constraint violations
2569
2570   for( src = 0; src++ < source->cnt; ) {
2571         key = keyptr(source, src);
2572         slot = 0;
2573
2574         // first determine if this modification falls
2575         // on the same page as the previous modification
2576         //      note that the far right leaf page is a special case
2577
2578         if( samepage = src > 1 )
2579           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2580                 slot = bt_findslot(set->page, key->key, key->len);
2581           else // release read on previous page
2582                 bt_unlockpage(bt, BtLockRead, set->latch); 
2583
2584         if( !slot )
2585           if( slot = bt_atomicload(bt, set, key->key, key->len) )
2586                 set->latch->split = 0;
2587           else
2588                 return -1;
2589
2590         if( slotptr(set->page, slot)->type == Librarian )
2591           ptr = keyptr(set->page, ++slot);
2592         else
2593           ptr = keyptr(set->page, slot);
2594
2595         if( !samepage ) {
2596           locks[src].entry = set->latch->entry;
2597           locks[src].slot = slot;
2598           locks[src].reuse = 0;
2599         } else {
2600           locks[src].entry = 0;
2601           locks[src].slot = 0;
2602           locks[src].reuse = 1;
2603         }
2604
2605         switch( slotptr(source, src)->type ) {
2606         case Duplicate:
2607         case Unique:
2608           if( !slotptr(set->page, slot)->dead )
2609            if( slot < set->page->cnt || bt_getid (set->page->right) )
2610             if( !keycmp (ptr, key->key, key->len) ) {
2611
2612                   // return constraint violation if key already exists
2613
2614                   bt_unlockpage(bt, BtLockRead, set->latch);
2615                   result = src;
2616
2617                   while( src ) {
2618                         if( locks[src].entry ) {
2619                           set->latch = bt->mgr->latchsets + locks[src].entry;
2620                           bt_unlockpage(bt, BtLockAtomic, set->latch);
2621                           bt_unpinlatch (set->latch);
2622                         }
2623                         src--;
2624                   }
2625                   free (locks);
2626                   return result;
2627                 }
2628           break;
2629         }
2630   }
2631
2632   //  unlock last loadpage lock
2633
2634   if( source->cnt > 1 )
2635         bt_unlockpage(bt, BtLockRead, set->latch);
2636
2637   //  obtain write lock for each master page
2638
2639   for( src = 0; src++ < source->cnt; )
2640         if( locks[src].reuse )
2641           continue;
2642         else
2643           bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2644
2645   // insert or delete each key
2646   // process any splits or merges
2647   // release Write & Atomic latches
2648   // set ParentModifications and build
2649   // queue of keys to insert for split pages
2650   // or delete for deleted pages.
2651
2652   // run through txn list backwards
2653
2654   samepage = source->cnt + 1;
2655
2656   for( src = source->cnt; src; src-- ) {
2657         if( locks[src].reuse )
2658           continue;
2659
2660         //  perform the txn operations
2661         //      from smaller to larger on
2662         //  the same page
2663
2664         for( idx = src; idx < samepage; idx++ )
2665          switch( slotptr(source,idx)->type ) {
2666          case Delete:
2667           if( bt_atomicdelete (bt, source, locks, idx) )
2668                 return -1;
2669           break;
2670
2671         case Duplicate:
2672         case Unique:
2673           if( bt_atomicinsert (bt, source, locks, idx) )
2674                 return -1;
2675           break;
2676         }
2677
2678         //      after the same page operations have finished,
2679         //  process master page for splits or deletion.
2680
2681         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2682         prev->page = bt_mappage (bt, prev->latch);
2683         samepage = src;
2684
2685         //  pick-up all splits from master page
2686         //      each one is already WriteLocked.
2687
2688         entry = prev->latch->split;
2689
2690         while( entry ) {
2691           set->latch = bt->mgr->latchsets + entry;
2692           set->page = bt_mappage (bt, set->latch);
2693           entry = set->latch->split;
2694
2695           // delete empty master page by undoing its split
2696           //  (this is potentially another empty page)
2697           //  note that there are no new left pointers yet
2698
2699           if( !prev->page->act ) {
2700                 memcpy (set->page->left, prev->page->left, BtId);
2701                 memcpy (prev->page, set->page, bt->mgr->page_size);
2702                 bt_lockpage (bt, BtLockDelete, set->latch);
2703                 bt_freepage (bt, set);
2704
2705                 prev->latch->dirty = 1;
2706                 continue;
2707           }
2708
2709           // remove empty page from the split chain
2710
2711           if( !set->page->act ) {
2712                 memcpy (prev->page->right, set->page->right, BtId);
2713                 prev->latch->split = set->latch->split;
2714                 bt_lockpage (bt, BtLockDelete, set->latch);
2715                 bt_freepage (bt, set);
2716                 continue;
2717           }
2718
2719           //  schedule prev fence key update
2720
2721           ptr = keyptr(prev->page,prev->page->cnt);
2722           leaf = calloc (sizeof(AtomicKey), 1);
2723
2724           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2725           leaf->page_no = prev->latch->page_no;
2726           leaf->entry = prev->latch->entry;
2727           leaf->type = 0;
2728
2729           if( tail )
2730                 tail->next = leaf;
2731           else
2732                 head = leaf;
2733
2734           tail = leaf;
2735
2736           // splice in the left link into the split page
2737
2738           bt_putid (set->page->left, prev->latch->page_no);
2739           bt_lockpage(bt, BtLockParent, prev->latch);
2740           bt_unlockpage(bt, BtLockWrite, prev->latch);
2741           *prev = *set;
2742         }
2743
2744         //  update left pointer in next right page from last split page
2745         //      (if all splits were reversed, latch->split == 0)
2746
2747         if( latch->split ) {
2748           //  fix left pointer in master's original (now split)
2749           //  far right sibling or set rightmost page in page zero
2750
2751           if( right = bt_getid (prev->page->right) ) {
2752                 if( set->latch = bt_pinlatch (bt, right, 1) )
2753                   set->page = bt_mappage (bt, set->latch);
2754                 else
2755                   return -1;
2756
2757             bt_lockpage (bt, BtLockWrite, set->latch);
2758             bt_putid (set->page->left, prev->latch->page_no);
2759                 set->latch->dirty = 1;
2760             bt_unlockpage (bt, BtLockWrite, set->latch);
2761                 bt_unpinlatch (set->latch);
2762           } else {      // prev is rightmost page
2763             bt_spinwritelock (bt->mgr->lock);
2764                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2765             bt_spinreleasewrite(bt->mgr->lock);
2766           }
2767
2768           //  Process last page split in chain
2769
2770           ptr = keyptr(prev->page,prev->page->cnt);
2771           leaf = calloc (sizeof(AtomicKey), 1);
2772
2773           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2774           leaf->page_no = prev->latch->page_no;
2775           leaf->entry = prev->latch->entry;
2776           leaf->type = 0;
2777   
2778           if( tail )
2779                 tail->next = leaf;
2780           else
2781                 head = leaf;
2782
2783           tail = leaf;
2784
2785           bt_lockpage(bt, BtLockParent, prev->latch);
2786           bt_unlockpage(bt, BtLockWrite, prev->latch);
2787
2788           //  remove atomic lock on master page
2789
2790           bt_unlockpage(bt, BtLockAtomic, latch);
2791           continue;
2792         }
2793
2794         //  finished if prev page occupied (either master or final split)
2795
2796         if( prev->page->act ) {
2797           bt_unlockpage(bt, BtLockWrite, latch);
2798           bt_unlockpage(bt, BtLockAtomic, latch);
2799           bt_unpinlatch(latch);
2800           continue;
2801         }
2802
2803         // any and all splits were reversed, and the
2804         // master page located in prev is empty, delete it
2805         // by pulling over master's right sibling.
2806
2807         // Remove empty master's fence key
2808
2809         ptr = keyptr(prev->page,prev->page->cnt);
2810
2811         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2812                 return -1;
2813
2814         //      perform the remainder of the delete
2815         //      from the FIFO queue
2816
2817         leaf = calloc (sizeof(AtomicKey), 1);
2818
2819         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2820         leaf->page_no = prev->latch->page_no;
2821         leaf->entry = prev->latch->entry;
2822         leaf->nounlock = 1;
2823         leaf->type = 2;
2824   
2825         if( tail )
2826           tail->next = leaf;
2827         else
2828           head = leaf;
2829
2830         tail = leaf;
2831
2832         //      leave atomic lock in place until
2833         //      deletion completes in next phase.
2834
2835         bt_unlockpage(bt, BtLockWrite, prev->latch);
2836   }
2837   
2838   //  add & delete keys for any pages split or merged during transaction
2839
2840   if( leaf = head )
2841     do {
2842           set->latch = bt->mgr->latchsets + leaf->entry;
2843           set->page = bt_mappage (bt, set->latch);
2844
2845           bt_putid (value, leaf->page_no);
2846           ptr = (BtKey *)leaf->leafkey;
2847
2848           switch( leaf->type ) {
2849           case 0:       // insert key
2850             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2851                   return -1;
2852
2853                 break;
2854
2855           case 1:       // delete key
2856                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2857                   return -1;
2858
2859                 break;
2860
2861           case 2:       // free page
2862                 if( bt_atomicfree (bt, set) )
2863                   return -1;
2864
2865                 break;
2866           }
2867
2868           if( !leaf->nounlock )
2869             bt_unlockpage (bt, BtLockParent, set->latch);
2870
2871           bt_unpinlatch (set->latch);
2872           tail = leaf->next;
2873           free (leaf);
2874         } while( leaf = tail );
2875
2876   // return success
2877
2878   free (locks);
2879   return 0;
2880 }
2881
2882 //      set cursor to highest slot on highest page
2883
2884 uint bt_lastkey (BtDb *bt)
2885 {
2886 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2887 BtPageSet set[1];
2888
2889         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2890                 set->page = bt_mappage (bt, set->latch);
2891         else
2892                 return 0;
2893
2894     bt_lockpage(bt, BtLockRead, set->latch);
2895         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2896     bt_unlockpage(bt, BtLockRead, set->latch);
2897         bt_unpinlatch (set->latch);
2898
2899         bt->cursor_page = page_no;
2900         return bt->cursor->cnt;
2901 }
2902
2903 //      return previous slot on cursor page
2904
2905 uint bt_prevkey (BtDb *bt, uint slot)
2906 {
2907 uid ourright, next, us = bt->cursor_page;
2908 BtPageSet set[1];
2909
2910         if( --slot )
2911                 return slot;
2912
2913         ourright = bt_getid(bt->cursor->right);
2914
2915 goleft:
2916         if( !(next = bt_getid(bt->cursor->left)) )
2917                 return 0;
2918
2919 findourself:
2920         bt->cursor_page = next;
2921
2922         if( set->latch = bt_pinlatch (bt, next, 1) )
2923                 set->page = bt_mappage (bt, set->latch);
2924         else
2925                 return 0;
2926
2927     bt_lockpage(bt, BtLockRead, set->latch);
2928         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2929         bt_unlockpage(bt, BtLockRead, set->latch);
2930         bt_unpinlatch (set->latch);
2931         
2932         next = bt_getid (bt->cursor->right);
2933
2934         if( bt->cursor->kill )
2935                 goto findourself;
2936
2937         if( next != us )
2938           if( next == ourright )
2939                 goto goleft;
2940           else
2941                 goto findourself;
2942
2943         return bt->cursor->cnt;
2944 }
2945
2946 //  return next slot on cursor page
2947 //  or slide cursor right into next page
2948
2949 uint bt_nextkey (BtDb *bt, uint slot)
2950 {
2951 BtPageSet set[1];
2952 uid right;
2953
2954   do {
2955         right = bt_getid(bt->cursor->right);
2956
2957         while( slot++ < bt->cursor->cnt )
2958           if( slotptr(bt->cursor,slot)->dead )
2959                 continue;
2960           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2961                 return slot;
2962           else
2963                 break;
2964
2965         if( !right )
2966                 break;
2967
2968         bt->cursor_page = right;
2969
2970         if( set->latch = bt_pinlatch (bt, right, 1) )
2971                 set->page = bt_mappage (bt, set->latch);
2972         else
2973                 return 0;
2974
2975     bt_lockpage(bt, BtLockRead, set->latch);
2976
2977         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2978
2979         bt_unlockpage(bt, BtLockRead, set->latch);
2980         bt_unpinlatch (set->latch);
2981         slot = 0;
2982
2983   } while( 1 );
2984
2985   return bt->err = 0;
2986 }
2987
2988 //  cache page of keys into cursor and return starting slot for given key
2989
2990 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2991 {
2992 BtPageSet set[1];
2993 uint slot;
2994
2995         // cache page for retrieval
2996
2997         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2998           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2999         else
3000           return 0;
3001
3002         bt->cursor_page = set->latch->page_no;
3003
3004         bt_unlockpage(bt, BtLockRead, set->latch);
3005         bt_unpinlatch (set->latch);
3006         return slot;
3007 }
3008
3009 BtKey *bt_key(BtDb *bt, uint slot)
3010 {
3011         return keyptr(bt->cursor, slot);
3012 }
3013
3014 BtVal *bt_val(BtDb *bt, uint slot)
3015 {
3016         return valptr(bt->cursor,slot);
3017 }
3018
3019 #ifdef STANDALONE
3020
3021 #ifndef unix
3022 double getCpuTime(int type)
3023 {
3024 FILETIME crtime[1];
3025 FILETIME xittime[1];
3026 FILETIME systime[1];
3027 FILETIME usrtime[1];
3028 SYSTEMTIME timeconv[1];
3029 double ans = 0;
3030
3031         memset (timeconv, 0, sizeof(SYSTEMTIME));
3032
3033         switch( type ) {
3034         case 0:
3035                 GetSystemTimeAsFileTime (xittime);
3036                 FileTimeToSystemTime (xittime, timeconv);
3037                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3038                 break;
3039         case 1:
3040                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3041                 FileTimeToSystemTime (usrtime, timeconv);
3042                 break;
3043         case 2:
3044                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3045                 FileTimeToSystemTime (systime, timeconv);
3046                 break;
3047         }
3048
3049         ans += (double)timeconv->wHour * 3600;
3050         ans += (double)timeconv->wMinute * 60;
3051         ans += (double)timeconv->wSecond;
3052         ans += (double)timeconv->wMilliseconds / 1000;
3053         return ans;
3054 }
3055 #else
3056 #include <time.h>
3057 #include <sys/resource.h>
3058
3059 double getCpuTime(int type)
3060 {
3061 struct rusage used[1];
3062 struct timeval tv[1];
3063
3064         switch( type ) {
3065         case 0:
3066                 gettimeofday(tv, NULL);
3067                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3068
3069         case 1:
3070                 getrusage(RUSAGE_SELF, used);
3071                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3072
3073         case 2:
3074                 getrusage(RUSAGE_SELF, used);
3075                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3076         }
3077
3078         return 0;
3079 }
3080 #endif
3081
3082 void bt_poolaudit (BtMgr *mgr)
3083 {
3084 BtLatchSet *latch;
3085 uint slot = 0;
3086
3087         while( slot++ < mgr->latchdeployed ) {
3088                 latch = mgr->latchsets + slot;
3089
3090                 if( *latch->readwr->rin & MASK )
3091                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3092                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3093
3094                 if( *latch->access->rin & MASK )
3095                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3096                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3097
3098                 if( *latch->parent->ticket != *latch->parent->serving )
3099                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3100                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3101
3102                 if( latch->pin & ~CLOCK_bit ) {
3103                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3104                         latch->pin = 0;
3105                 }
3106         }
3107 }
3108
3109 uint bt_latchaudit (BtDb *bt)
3110 {
3111 ushort idx, hashidx;
3112 uid next, page_no;
3113 BtLatchSet *latch;
3114 uint cnt = 0;
3115 BtKey *ptr;
3116
3117         if( *(ushort *)(bt->mgr->lock) )
3118                 fprintf(stderr, "Alloc page locked\n");
3119         *(ushort *)(bt->mgr->lock) = 0;
3120
3121         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3122                 latch = bt->mgr->latchsets + idx;
3123                 if( *latch->readwr->rin & MASK )
3124                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3125                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3126
3127                 if( *latch->access->rin & MASK )
3128                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3129                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3130
3131                 if( *latch->parent->ticket != *latch->parent->serving )
3132                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3133                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3134
3135                 if( latch->pin ) {
3136                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3137                         latch->pin = 0;
3138                 }
3139         }
3140
3141         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3142           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3143                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3144
3145           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3146
3147           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3148                 latch = bt->mgr->latchsets + idx;
3149                 if( latch->pin )
3150                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3151           } while( idx = latch->next );
3152         }
3153
3154         page_no = LEAF_page;
3155
3156         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3157         uid off = page_no << bt->mgr->page_bits;
3158 #ifdef unix
3159           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3160 #else
3161         DWORD amt[1];
3162
3163           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3164
3165           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3166                 return bt->err = BTERR_map;
3167
3168           if( *amt <  bt->mgr->page_size )
3169                 return bt->err = BTERR_map;
3170 #endif
3171                 if( !bt->frame->free && !bt->frame->lvl )
3172                         cnt += bt->frame->act;
3173                 page_no++;
3174         }
3175                 
3176         cnt--;  // remove stopper key
3177         fprintf(stderr, " Total keys read %d\n", cnt);
3178
3179         bt_close (bt);
3180         return 0;
3181 }
3182
3183 typedef struct {
3184         char idx;
3185         char *type;
3186         char *infile;
3187         BtMgr *mgr;
3188         int num;
3189 } ThreadArg;
3190
3191 //  standalone program to index file of keys
3192 //  then list them onto std-out
3193
3194 #ifdef unix
3195 void *index_file (void *arg)
3196 #else
3197 uint __stdcall index_file (void *arg)
3198 #endif
3199 {
3200 int line = 0, found = 0, cnt = 0, idx;
3201 uid next, page_no = LEAF_page;  // start on first page of leaves
3202 int ch, len = 0, slot, type = 0;
3203 unsigned char key[BT_maxkey];
3204 unsigned char txn[65536];
3205 ThreadArg *args = arg;
3206 BtPageSet set[1];
3207 uint nxt = 65536;
3208 BtPage page;
3209 BtKey *ptr;
3210 BtVal *val;
3211 BtDb *bt;
3212 FILE *in;
3213
3214         bt = bt_open (args->mgr);
3215         page = (BtPage)txn;
3216
3217         if( args->idx < strlen (args->type) )
3218                 ch = args->type[args->idx];
3219         else
3220                 ch = args->type[strlen(args->type) - 1];
3221
3222         switch(ch | 0x20)
3223         {
3224         case 'a':
3225                 fprintf(stderr, "started latch mgr audit\n");
3226                 cnt = bt_latchaudit (bt);
3227                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3228                 break;
3229
3230         case 'd':
3231                 type = Delete;
3232
3233         case 'p':
3234                 if( !type )
3235                         type = Unique;
3236
3237                 if( args->num )
3238                  if( type == Delete )
3239                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3240                  else
3241                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3242                 else
3243                  if( type == Delete )
3244                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3245                  else
3246                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3247
3248                 if( in = fopen (args->infile, "rb") )
3249                   while( ch = getc(in), ch != EOF )
3250                         if( ch == '\n' )
3251                         {
3252                           line++;
3253
3254                           if( !args->num ) {
3255                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3256                                   fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3257                             len = 0;
3258                                 continue;
3259                           }
3260
3261                           nxt -= len - 10;
3262                           memcpy (txn + nxt, key + 10, len - 10);
3263                           nxt -= 1;
3264                           txn[nxt] = len - 10;
3265                           nxt -= 10;
3266                           memcpy (txn + nxt, key, 10);
3267                           nxt -= 1;
3268                           txn[nxt] = 10;
3269                           slotptr(page,++cnt)->off  = nxt;
3270                           slotptr(page,cnt)->type = type;
3271                           len = 0;
3272
3273                           if( cnt < args->num )
3274                                 continue;
3275
3276                           page->cnt = cnt;
3277                           page->act = cnt;
3278                           page->min = nxt;
3279
3280                           if( bt_atomictxn (bt, page) )
3281                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3282                           nxt = sizeof(txn);
3283                           cnt = 0;
3284
3285                         }
3286                         else if( len < BT_maxkey )
3287                                 key[len++] = ch;
3288                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3289                 break;
3290
3291         case 'w':
3292                 fprintf(stderr, "started indexing for %s\n", args->infile);
3293                 if( in = fopen (args->infile, "r") )
3294                   while( ch = getc(in), ch != EOF )
3295                         if( ch == '\n' )
3296                         {
3297                           line++;
3298
3299                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3300                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3301                           len = 0;
3302                         }
3303                         else if( len < BT_maxkey )
3304                                 key[len++] = ch;
3305                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3306                 break;
3307
3308         case 'f':
3309                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3310                 if( in = fopen (args->infile, "rb") )
3311                   while( ch = getc(in), ch != EOF )
3312                         if( ch == '\n' )
3313                         {
3314                           line++;
3315                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3316                                 found++;
3317                           else if( bt->err )
3318                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3319                           len = 0;
3320                         }
3321                         else if( len < BT_maxkey )
3322                                 key[len++] = ch;
3323                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3324                 break;
3325
3326         case 's':
3327                 fprintf(stderr, "started scanning\n");
3328                 do {
3329                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3330                                 set->page = bt_mappage (bt, set->latch);
3331                         else
3332                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3333                         bt_lockpage (bt, BtLockRead, set->latch);
3334                         next = bt_getid (set->page->right);
3335
3336                         for( slot = 0; slot++ < set->page->cnt; )
3337                          if( next || slot < set->page->cnt )
3338                           if( !slotptr(set->page, slot)->dead ) {
3339                                 ptr = keyptr(set->page, slot);
3340                                 len = ptr->len;
3341
3342                             if( slotptr(set->page, slot)->type == Duplicate )
3343                                         len -= BtId;
3344
3345                                 fwrite (ptr->key, len, 1, stdout);
3346                                 val = valptr(set->page, slot);
3347                                 fwrite (val->value, val->len, 1, stdout);
3348                                 fputc ('\n', stdout);
3349                                 cnt++;
3350                            }
3351
3352                         bt_unlockpage (bt, BtLockRead, set->latch);
3353                         bt_unpinlatch (set->latch);
3354                 } while( page_no = next );
3355
3356                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3357                 break;
3358
3359         case 'r':
3360                 fprintf(stderr, "started reverse scan\n");
3361                 if( slot = bt_lastkey (bt) )
3362                    while( slot = bt_prevkey (bt, slot) ) {
3363                         if( slotptr(bt->cursor, slot)->dead )
3364                           continue;
3365
3366                         ptr = keyptr(bt->cursor, slot);
3367                         len = ptr->len;
3368
3369                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3370                                 len -= BtId;
3371
3372                         fwrite (ptr->key, len, 1, stdout);
3373                         val = valptr(bt->cursor, slot);
3374                         fwrite (val->value, val->len, 1, stdout);
3375                         fputc ('\n', stdout);
3376                         cnt++;
3377                   }
3378
3379                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3380                 break;
3381
3382         case 'c':
3383 #ifdef unix
3384                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3385 #endif
3386                 fprintf(stderr, "started counting\n");
3387                 page_no = LEAF_page;
3388
3389                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3390                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3391                                 break;
3392
3393                         if( !bt->frame->free && !bt->frame->lvl )
3394                                 cnt += bt->frame->act;
3395
3396                         bt->reads++;
3397                         page_no++;
3398                 }
3399                 
3400                 cnt--;  // remove stopper key
3401                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3402                 break;
3403         }
3404
3405         bt_close (bt);
3406 #ifdef unix
3407         return NULL;
3408 #else
3409         return 0;
3410 #endif
3411 }
3412
3413 typedef struct timeval timer;
3414
3415 int main (int argc, char **argv)
3416 {
3417 int idx, cnt, len, slot, err;
3418 int segsize, bits = 16;
3419 double start, stop;
3420 #ifdef unix
3421 pthread_t *threads;
3422 #else
3423 HANDLE *threads;
3424 #endif
3425 ThreadArg *args;
3426 uint poolsize = 0;
3427 float elapsed;
3428 int num = 0;
3429 char key[1];
3430 BtMgr *mgr;
3431 BtKey *ptr;
3432 BtDb *bt;
3433
3434         if( argc < 3 ) {
3435                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3436                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3437                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3438                 fprintf (stderr, "  page_bits is the page size in bits\n");
3439                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3440                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3441                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3442                 exit(0);
3443         }
3444
3445         start = getCpuTime(0);
3446
3447         if( argc > 3 )
3448                 bits = atoi(argv[3]);
3449
3450         if( argc > 4 )
3451                 poolsize = atoi(argv[4]);
3452
3453         if( !poolsize )
3454                 fprintf (stderr, "Warning: no mapped_pool\n");
3455
3456         if( argc > 5 )
3457                 num = atoi(argv[5]);
3458
3459         cnt = argc - 6;
3460 #ifdef unix
3461         threads = malloc (cnt * sizeof(pthread_t));
3462 #else
3463         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3464 #endif
3465         args = malloc (cnt * sizeof(ThreadArg));
3466
3467         mgr = bt_mgr ((argv[1]), bits, poolsize);
3468
3469         if( !mgr ) {
3470                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3471                 exit (1);
3472         }
3473
3474         //      fire off threads
3475
3476         for( idx = 0; idx < cnt; idx++ ) {
3477                 args[idx].infile = argv[idx + 6];
3478                 args[idx].type = argv[2];
3479                 args[idx].mgr = mgr;
3480                 args[idx].num = num;
3481                 args[idx].idx = idx;
3482 #ifdef unix
3483                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3484                         fprintf(stderr, "Error creating thread %d\n", err);
3485 #else
3486                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3487 #endif
3488         }
3489
3490         //      wait for termination
3491
3492 #ifdef unix
3493         for( idx = 0; idx < cnt; idx++ )
3494                 pthread_join (threads[idx], NULL);
3495 #else
3496         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3497
3498         for( idx = 0; idx < cnt; idx++ )
3499                 CloseHandle(threads[idx]);
3500
3501 #endif
3502         bt_poolaudit(mgr);
3503         bt_mgrclose (mgr);
3504
3505         elapsed = getCpuTime(0) - start;
3506         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3507         elapsed = getCpuTime(1);
3508         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3509         elapsed = getCpuTime(2);
3510         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3511 }
3512
3513 #endif  //STANDALONE