]> pd.if.org Git - btree/blob - threadskv6.c
ce037105c8055c674d3df8ed7c15a4852784914a
[btree] / threadskv6.c
1 // btree version threadskv6 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      and traditional buffer pool manager
8
9 // 10 SEP 2014
10
11 // author: karl malbrain, malbrain@cal.berkeley.edu
12
13 /*
14 This work, including the source code, documentation
15 and related data, is placed into the public domain.
16
17 The orginal author is Karl Malbrain.
18
19 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
20 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
21 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
22 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
23 RESULTING FROM THE USE, MODIFICATION, OR
24 REDISTRIBUTION OF THIS SOFTWARE.
25 */
26
27 // Please see the project home page for documentation
28 // code.google.com/p/high-concurrency-btree
29
30 #define _FILE_OFFSET_BITS 64
31 #define _LARGEFILE64_SOURCE
32
33 #ifdef linux
34 #define _GNU_SOURCE
35 #endif
36
37 #ifdef unix
38 #include <unistd.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <fcntl.h>
42 #include <sys/time.h>
43 #include <sys/mman.h>
44 #include <errno.h>
45 #include <pthread.h>
46 #else
47 #define WIN32_LEAN_AND_MEAN
48 #include <windows.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51 #include <time.h>
52 #include <fcntl.h>
53 #include <process.h>
54 #include <intrin.h>
55 #endif
56
57 #include <memory.h>
58 #include <string.h>
59 #include <stddef.h>
60
61 typedef unsigned long long      uid;
62
63 #ifndef unix
64 typedef unsigned long long      off64_t;
65 typedef unsigned short          ushort;
66 typedef unsigned int            uint;
67 #endif
68
69 #define BT_ro 0x6f72    // ro
70 #define BT_rw 0x7772    // rw
71
72 #define BT_maxbits              24                                      // maximum page size in bits
73 #define BT_minbits              9                                       // minimum page size in bits
74 #define BT_minpage              (1 << BT_minbits)       // minimum page size
75 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
76
77 //  BTree page number constants
78 #define ALLOC_page              0       // allocation page
79 #define ROOT_page               1       // root of the btree
80 #define LEAF_page               2       // first page of leaves
81
82 //      Number of levels to create in a new BTree
83
84 #define MIN_lvl                 2
85
86 /*
87 There are five lock types for each node in three independent sets: 
88 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
89 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
90 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
91 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
92 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
93 */
94
95 typedef enum{
96         BtLockAccess,
97         BtLockDelete,
98         BtLockRead,
99         BtLockWrite,
100         BtLockParent
101 } BtLock;
102
103 //      definition for phase-fair reader/writer lock implementation
104
105 typedef struct {
106         ushort rin[1];
107         ushort rout[1];
108         ushort ticket[1];
109         ushort serving[1];
110 } RWLock;
111
112 #define PHID 0x1
113 #define PRES 0x2
114 #define MASK 0x3
115 #define RINC 0x4
116
117 //      definition for spin latch implementation
118
119 // exclusive is set for write access
120 // share is count of read accessors
121 // grant write lock when share == 0
122
123 volatile typedef struct {
124         ushort exclusive:1;
125         ushort pending:1;
126         ushort share:14;
127 } BtSpinLatch;
128
129 #define XCL 1
130 #define PEND 2
131 #define BOTH 3
132 #define SHARE 4
133
134 //  hash table entries
135
136 typedef struct {
137         volatile uint slot;             // Latch table entry at head of chain
138         BtSpinLatch latch[1];
139 } BtHashEntry;
140
141 //      latch manager table structure
142
143 typedef struct {
144         volatile uid page_no;   // latch set page number
145         RWLock readwr[1];               // read/write page lock
146         RWLock access[1];               // Access Intent/Page delete
147         RWLock parent[1];               // Posting of fence key in parent
148         volatile uint next;             // next entry in hash table chain
149         volatile uint prev;             // prev entry in hash table chain
150         volatile ushort pin;    // number of outstanding latches
151         ushort dirty:1;                 // page in cache is dirty
152 } BtLatchSet;
153
154 //      Define the length of the page record numbers
155
156 #define BtId 6
157
158 //      Page key slot definition.
159
160 //      Keys are marked dead, but remain on the page until
161 //      it cleanup is called. The fence key (highest key) for
162 //      a leaf page is always present, even after cleanup.
163
164 //      Slot types
165
166 //      In addition to the Unique keys that occupy slots
167 //      there are Librarian and Duplicate key
168 //      slots occupying the key slot array.
169
170 //      The Librarian slots are dead keys that
171 //      serve as filler, available to add new Unique
172 //      or Dup slots that are inserted into the B-tree.
173
174 //      The Duplicate slots have had their key bytes extended
175 //      by 6 bytes to contain a binary duplicate key uniqueifier.
176
177 typedef enum {
178         Unique,
179         Librarian,
180         Duplicate
181 } BtSlotType;
182
183 typedef struct {
184         uint off:BT_maxbits;    // page offset for key start
185         uint type:3;                    // type of slot
186         uint dead:1;                    // set for deleted slot
187 } BtSlot;
188
189 //      The key structure occupies space at the upper end of
190 //      each page.  It's a length byte followed by the key
191 //      bytes.
192
193 typedef struct {
194         unsigned char len;              // this can be changed to a ushort or uint
195         unsigned char key[0];
196 } BtKey;
197
198 //      the value structure also occupies space at the upper
199 //      end of the page. Each key is immediately followed by a value.
200
201 typedef struct {
202         unsigned char len;              // this can be changed to a ushort or uint
203         unsigned char value[0];
204 } BtVal;
205
206 #define BT_maxkey       255             // maximum number of bytes in a key
207 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
208
209 //      The first part of an index page.
210 //      It is immediately followed
211 //      by the BtSlot array of keys.
212
213 //      note that this structure size
214 //      must be a multiple of 8 bytes
215 //      in order to place dups correctly.
216
217 typedef struct BtPage_ {
218         uint cnt;                                       // count of keys in page
219         uint act;                                       // count of active keys
220         uint min;                                       // next key offset
221         uint garbage;                           // page garbage in bytes
222         unsigned char bits:7;           // page size in bits
223         unsigned char free:1;           // page is on free chain
224         unsigned char lvl:7;            // level of page
225         unsigned char kill:1;           // page is being deleted
226         unsigned char left[BtId];       // page number to left
227         unsigned char filler[2];        // padding to multiple of 8
228         unsigned char right[BtId];      // page number to right
229 } *BtPage;
230
231 //  The loadpage interface object
232
233 typedef struct {
234         uid page_no;            // current page number
235         BtPage page;            // current page pointer
236         BtLatchSet *latch;      // current page latch set
237 } BtPageSet;
238
239 //      structure for latch manager on ALLOC_page
240
241 typedef struct {
242         struct BtPage_ alloc[1];        // next page_no in right ptr
243         unsigned long long dups[1];     // global duplicate key uniqueifier
244         unsigned char chain[BtId];      // head of free page_nos chain
245 } BtPageZero;
246
247 //      The object structure for Btree access
248
249 typedef struct {
250         uint page_size;                         // page size    
251         uint page_bits;                         // page size in bits    
252 #ifdef unix
253         int idx;
254 #else
255         HANDLE idx;
256 #endif
257         BtPageZero *pagezero;           // mapped allocation page
258         BtSpinLatch lock[1];            // allocation area lite latch
259         uint latchdeployed;                     // highest number of latch entries deployed
260         uint nlatchpage;                        // number of latch pages at BT_latch
261         uint latchtotal;                        // number of page latch entries
262         uint latchhash;                         // number of latch hash table slots
263         uint latchvictim;                       // next latch entry to examine
264         BtHashEntry *hashtable;         // the anonymous mapping buffer pool
265         BtLatchSet *latchsets;          // mapped latch set from latch pages
266         unsigned char *pagepool;        // mapped to the buffer pool pages
267 #ifndef unix
268         HANDLE halloc;                          // allocation handle
269         HANDLE hpool;                           // buffer pool handle
270 #endif
271 } BtMgr;
272
273 typedef struct {
274         BtMgr *mgr;                             // buffer manager for thread
275         BtPage cursor;                  // cached frame for start/next (never mapped)
276         BtPage frame;                   // spare frame for the page split (never mapped)
277         uid cursor_page;                                // current cursor page number   
278         unsigned char *mem;                             // frame, cursor, page memory buffer
279         unsigned char key[BT_keyarray]; // last found complete key
280         int found;                              // last delete or insert was found
281         int err;                                // last error
282         int reads, writes;              // number of reads and writes from the btree
283 } BtDb;
284
285 typedef enum {
286         BTERR_ok = 0,
287         BTERR_struct,
288         BTERR_ovflw,
289         BTERR_lock,
290         BTERR_map,
291         BTERR_read,
292         BTERR_wrt,
293         BTERR_hash
294 } BTERR;
295
296 #define CLOCK_bit 0x8000
297
298 // B-Tree functions
299 extern void bt_close (BtDb *bt);
300 extern BtDb *bt_open (BtMgr *mgr);
301 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
302 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
303 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
304 extern BtKey *bt_foundkey (BtDb *bt);
305 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
306 extern uint bt_nextkey   (BtDb *bt, uint slot);
307
308 //      manager functions
309 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
310 void bt_mgrclose (BtMgr *mgr);
311
312 //  Helper functions to return slot values
313 //      from the cursor page.
314
315 extern BtKey *bt_key (BtDb *bt, uint slot);
316 extern BtVal *bt_val (BtDb *bt, uint slot);
317
318 //  The page is allocated from low and hi ends.
319 //  The key slots are allocated from the bottom,
320 //      while the text and value of the key
321 //  are allocated from the top.  When the two
322 //  areas meet, the page is split into two.
323
324 //  A key consists of a length byte, two bytes of
325 //  index number (0 - 65535), and up to 253 bytes
326 //  of key value.
327
328 //  Associated with each key is a value byte string
329 //      containing any value desired.
330
331 //  The b-tree root is always located at page 1.
332 //      The first leaf page of level zero is always
333 //      located on page 2.
334
335 //      The b-tree pages are linked with next
336 //      pointers to facilitate enumerators,
337 //      and provide for concurrency.
338
339 //      When to root page fills, it is split in two and
340 //      the tree height is raised by a new root at page
341 //      one with two keys.
342
343 //      Deleted keys are marked with a dead bit until
344 //      page cleanup. The fence key for a leaf node is
345 //      always present
346
347 //  To achieve maximum concurrency one page is locked at a time
348 //  as the tree is traversed to find leaf key in question. The right
349 //  page numbers are used in cases where the page is being split,
350 //      or consolidated.
351
352 //  Page 0 is dedicated to lock for new page extensions,
353 //      and chains empty pages together for reuse. It also
354 //      contains the latch manager hash table.
355
356 //      The ParentModification lock on a node is obtained to serialize posting
357 //      or changing the fence key for a node.
358
359 //      Empty pages are chained together through the ALLOC page and reused.
360
361 //      Access macros to address slot and key values from the page
362 //      Page slots use 1 based indexing.
363
364 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
365 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
366 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
367
368 void bt_putid(unsigned char *dest, uid id)
369 {
370 int i = BtId;
371
372         while( i-- )
373                 dest[i] = (unsigned char)id, id >>= 8;
374 }
375
376 uid bt_getid(unsigned char *src)
377 {
378 uid id = 0;
379 int i;
380
381         for( i = 0; i < BtId; i++ )
382                 id <<= 8, id |= *src++; 
383
384         return id;
385 }
386
387 uid bt_newdup (BtDb *bt)
388 {
389 #ifdef unix
390         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
391 #else
392         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
393 #endif
394 }
395
396 //      Phase-Fair reader/writer lock implementation
397
398 void WriteLock (RWLock *lock)
399 {
400 ushort w, r, tix;
401
402 #ifdef unix
403         tix = __sync_fetch_and_add (lock->ticket, 1);
404 #else
405         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
406 #endif
407         // wait for our ticket to come up
408
409         while( tix != lock->serving[0] )
410 #ifdef unix
411                 sched_yield();
412 #else
413                 SwitchToThread ();
414 #endif
415
416         w = PRES | (tix & PHID);
417 #ifdef  unix
418         r = __sync_fetch_and_add (lock->rin, w);
419 #else
420         r = _InterlockedExchangeAdd16 (lock->rin, w);
421 #endif
422         while( r != *lock->rout )
423 #ifdef unix
424                 sched_yield();
425 #else
426                 SwitchToThread();
427 #endif
428 }
429
430 void WriteRelease (RWLock *lock)
431 {
432 #ifdef unix
433         __sync_fetch_and_and (lock->rin, ~MASK);
434 #else
435         _InterlockedAnd16 (lock->rin, ~MASK);
436 #endif
437         lock->serving[0]++;
438 }
439
440 void ReadLock (RWLock *lock)
441 {
442 ushort w;
443 #ifdef unix
444         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
445 #else
446         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
447 #endif
448         if( w )
449           while( w == (*lock->rin & MASK) )
450 #ifdef unix
451                 sched_yield ();
452 #else
453                 SwitchToThread ();
454 #endif
455 }
456
457 void ReadRelease (RWLock *lock)
458 {
459 #ifdef unix
460         __sync_fetch_and_add (lock->rout, RINC);
461 #else
462         _InterlockedExchangeAdd16 (lock->rout, RINC);
463 #endif
464 }
465
466 //      Spin Latch Manager
467
468 //      wait until write lock mode is clear
469 //      and add 1 to the share count
470
471 void bt_spinreadlock(BtSpinLatch *latch)
472 {
473 ushort prev;
474
475   do {
476 #ifdef unix
477         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
478 #else
479         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
480 #endif
481         //  see if exclusive request is granted or pending
482
483         if( !(prev & BOTH) )
484                 return;
485 #ifdef unix
486         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
487 #else
488         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
489 #endif
490 #ifdef  unix
491   } while( sched_yield(), 1 );
492 #else
493   } while( SwitchToThread(), 1 );
494 #endif
495 }
496
497 //      wait for other read and write latches to relinquish
498
499 void bt_spinwritelock(BtSpinLatch *latch)
500 {
501 ushort prev;
502
503   do {
504 #ifdef  unix
505         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
506 #else
507         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
508 #endif
509         if( !(prev & XCL) )
510           if( !(prev & ~BOTH) )
511                 return;
512           else
513 #ifdef unix
514                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
515 #else
516                 _InterlockedAnd16((ushort *)latch, ~XCL);
517 #endif
518 #ifdef  unix
519   } while( sched_yield(), 1 );
520 #else
521   } while( SwitchToThread(), 1 );
522 #endif
523 }
524
525 //      try to obtain write lock
526
527 //      return 1 if obtained,
528 //              0 otherwise
529
530 int bt_spinwritetry(BtSpinLatch *latch)
531 {
532 ushort prev;
533
534 #ifdef  unix
535         prev = __sync_fetch_and_or((ushort *)latch, XCL);
536 #else
537         prev = _InterlockedOr16((ushort *)latch, XCL);
538 #endif
539         //      take write access if all bits are clear
540
541         if( !(prev & XCL) )
542           if( !(prev & ~BOTH) )
543                 return 1;
544           else
545 #ifdef unix
546                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
547 #else
548                 _InterlockedAnd16((ushort *)latch, ~XCL);
549 #endif
550         return 0;
551 }
552
553 //      clear write mode
554
555 void bt_spinreleasewrite(BtSpinLatch *latch)
556 {
557 #ifdef unix
558         __sync_fetch_and_and((ushort *)latch, ~BOTH);
559 #else
560         _InterlockedAnd16((ushort *)latch, ~BOTH);
561 #endif
562 }
563
564 //      decrement reader count
565
566 void bt_spinreleaseread(BtSpinLatch *latch)
567 {
568 #ifdef unix
569         __sync_fetch_and_add((ushort *)latch, -SHARE);
570 #else
571         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
572 #endif
573 }
574
575 //      read page from permanent location in Btree file
576
577 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
578 {
579 off64_t off = page_no << mgr->page_bits;
580
581 #ifdef unix
582         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
583                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
584                 return BTERR_read;
585         }
586 #else
587 OVERLAPPED ovl[1];
588 uint amt[1];
589
590         memset (ovl, 0, sizeof(OVERLAPPED));
591         ovl->Offset = off;
592         ovl->OffsetHigh = off >> 32;
593
594         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
595                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
596                 return BTERR_read;
597         }
598         if( *amt <  mgr->page_size ) {
599                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
600                 return BTERR_read;
601         }
602 #endif
603         return 0;
604 }
605
606 //      write page to permanent location in Btree file
607 //      clear the dirty bit
608
609 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
610 {
611 off64_t off = page_no << mgr->page_bits;
612
613 #ifdef unix
614         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
615                 return BTERR_wrt;
616 #else
617 OVERLAPPED ovl[1];
618 uint amt[1];
619
620         memset (ovl, 0, sizeof(OVERLAPPED));
621         ovl->Offset = off;
622         ovl->OffsetHigh = off >> 32;
623
624         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
625                 return BTERR_wrt;
626
627         if( *amt <  mgr->page_size )
628                 return BTERR_wrt;
629 #endif
630         return 0;
631 }
632
633 //      link latch table entry into head of latch hash table
634
635 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
636 {
637 BtPage page = (BtPage)((uid)slot * bt->mgr->page_size + bt->mgr->pagepool);
638 BtLatchSet *latch = bt->mgr->latchsets + slot;
639
640         if( latch->next = bt->mgr->hashtable[hashidx].slot )
641                 bt->mgr->latchsets[latch->next].prev = slot;
642
643         bt->mgr->hashtable[hashidx].slot = slot;
644         latch->pin = CLOCK_bit | 1;     // initialize clock
645         latch->page_no = page_no;
646         latch->prev = 0;
647
648         if( loadit )
649           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
650                 return bt->err;
651           else
652                 bt->reads++;
653
654         return bt->err = 0;
655 }
656
657 //      release latch pin
658
659 void bt_unpinlatch (BtLatchSet *set)
660 {
661 #ifdef unix
662         __sync_fetch_and_add(&set->pin, -1);
663 #else
664         _InterlockedDecrement16 (&set->pin);
665 #endif
666 }
667
668 //  map the btree cached page onto current page
669
670 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
671 {
672 BtPage page = (BtPage)((uid)(latch - bt->mgr->latchsets) * bt->mgr->page_size + bt->mgr->pagepool);
673
674         return page;
675 }
676
677 //      find existing latchset or inspire new one
678 //      return with latchset pinned
679
680 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
681 {
682 uint hashidx = page_no % bt->mgr->latchhash;
683 BtLatchSet *latch;
684 uint slot, idx;
685 uint lvl, cnt;
686 off64_t off;
687 uint amt[1];
688 BtPage page;
689
690   //  try to find our entry
691
692   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
693
694   if( slot = bt->mgr->hashtable[hashidx].slot ) do
695   {
696         latch = bt->mgr->latchsets + slot;
697         if( page_no == latch->page_no )
698                 break;
699   } while( slot = latch->next );
700
701   //  found our entry
702   //  increment clock
703
704   if( slot ) {
705         latch = bt->mgr->latchsets + slot;
706 #ifdef unix
707         __sync_fetch_and_add(&latch->pin, 1);
708         __sync_fetch_and_or(&latch->pin, CLOCK_bit);
709 #else
710         _InterlockedIncrement16 (&latch->pin);
711         _InterlockedOr16 (&latch->pin, CLOCK_bit);
712 #endif
713         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
714         return latch;
715   }
716
717         //  see if there are any unused pool entries
718 #ifdef unix
719         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
720 #else
721         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
722 #endif
723
724         if( slot < bt->mgr->latchtotal ) {
725                 latch = bt->mgr->latchsets + slot;
726                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
727                         return NULL;
728                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
729                 return latch;
730         }
731
732 #ifdef unix
733         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
734 #else
735         _InterlockedDecrement (&bt->mgr->latchdeployed);
736 #endif
737   //  find and reuse previous entry on victim
738
739   while( 1 ) {
740 #ifdef unix
741         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
742 #else
743         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
744 #endif
745         // try to get write lock on hash chain
746         //      skip entry if not obtained
747         //      or has outstanding pins
748
749         slot %= bt->mgr->latchtotal;
750
751         if( !slot )
752                 continue;
753
754         latch = bt->mgr->latchsets + slot;
755         idx = latch->page_no % bt->mgr->latchhash;
756
757         //      see we are on same chain as hashidx
758
759         if( idx == hashidx )
760                 continue;
761
762         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
763                 continue;
764
765         if( latch->pin & CLOCK_bit ) {
766 #ifdef unix
767                 __sync_fetch_and_add(&latch->pin, -CLOCK_bit);
768 #else
769                 _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_bit);
770 #endif
771           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
772           continue;
773         }
774
775         //  update permanent page area in btree
776
777         page = (BtPage)((uid)slot * bt->mgr->page_size + bt->mgr->pagepool);
778
779         if( latch->dirty )
780           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
781                 return NULL;
782           else
783                 latch->dirty = 0, bt->writes++;
784
785         //  unlink our available slot from its hash chain
786
787         if( latch->prev )
788                 bt->mgr->latchsets[latch->prev].next = latch->next;
789         else
790                 bt->mgr->hashtable[idx].slot = latch->next;
791
792         if( latch->next )
793                 bt->mgr->latchsets[latch->next].prev = latch->prev;
794
795         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
796
797         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
798                 return NULL;
799
800         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
801         return latch;
802   }
803 }
804
805 void bt_mgrclose (BtMgr *mgr)
806 {
807 BtLatchSet *latch;
808 uint num = 0;
809 BtPage page;
810 uint slot;
811
812         //      flush dirty pool pages to the btree
813
814         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
815                 page = (BtPage)((uid)slot * mgr->page_size + mgr->pagepool);
816                 latch = mgr->latchsets + slot;
817
818                 if( latch->dirty ) {
819                         bt_writepage(mgr, page, latch->page_no);
820                         latch->dirty = 0, num++;
821                 }
822         }
823
824         fprintf(stderr, "%d buffer pool pages flushed\n", num);
825
826 #ifdef unix
827         munmap (mgr->hashtable, mgr->nlatchpage * mgr->page_size);
828         munmap (mgr->pagezero, mgr->page_size);
829 #else
830         FlushViewOfFile(mgr->pagezero, 0);
831         UnmapViewOfFile(mgr->pagezero);
832         UnmapViewOfFile(mgr->hashtable);
833         CloseHandle(mgr->halloc);
834         CloseHandle(mgr->hpool);
835 #endif
836 #ifdef unix
837         close (mgr->idx);
838         free (mgr);
839 #else
840         FlushFileBuffers(mgr->idx);
841         CloseHandle(mgr->idx);
842         GlobalFree (mgr);
843 #endif
844 }
845
846 //      close and release memory
847
848 void bt_close (BtDb *bt)
849 {
850 #ifdef unix
851         if( bt->mem )
852                 free (bt->mem);
853 #else
854         if( bt->mem)
855                 VirtualFree (bt->mem, 0, MEM_RELEASE);
856 #endif
857         free (bt);
858 }
859
860 //  open/create new btree buffer manager
861
862 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
863 //              size of page pool (e.g. 262144)
864
865 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
866 {
867 uint lvl, attr, last, slot, idx;
868 uint nlatchpage, latchhash;
869 unsigned char value[BtId];
870 int flag, initit = 0;
871 BtPageZero *pagezero;
872 off64_t size;
873 uint amt[1];
874 BtMgr* mgr;
875 BtKey* key;
876 BtVal *val;
877
878         // determine sanity of page size and buffer pool
879
880         if( bits > BT_maxbits )
881                 bits = BT_maxbits;
882         else if( bits < BT_minbits )
883                 bits = BT_minbits;
884
885         if( nodemax < 16 ) {
886                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
887                 return NULL;
888         }
889
890 #ifdef unix
891         mgr = calloc (1, sizeof(BtMgr));
892
893         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
894
895         if( mgr->idx == -1 ) {
896                 fprintf (stderr, "Unable to open btree file\n");
897                 return free(mgr), NULL;
898         }
899 #else
900         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
901         attr = FILE_ATTRIBUTE_NORMAL;
902         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
903
904         if( mgr->idx == INVALID_HANDLE_VALUE )
905                 return GlobalFree(mgr), NULL;
906 #endif
907
908 #ifdef unix
909         pagezero = valloc (BT_maxpage);
910         *amt = 0;
911
912         // read minimum page size to get root info
913         //      to support raw disk partition files
914         //      check if bits == 0 on the disk.
915
916         if( size = lseek (mgr->idx, 0L, 2) )
917                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
918                         if( pagezero->alloc->bits )
919                                 bits = pagezero->alloc->bits;
920                         else
921                                 initit = 1;
922                 else
923                         return free(mgr), free(pagezero), NULL;
924         else
925                 initit = 1;
926 #else
927         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
928         size = GetFileSize(mgr->idx, amt);
929
930         if( size || *amt ) {
931                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
932                         return bt_mgrclose (mgr), NULL;
933                 bits = pagezero->alloc->bits;
934         } else
935                 initit = 1;
936 #endif
937
938         mgr->page_size = 1 << bits;
939         mgr->page_bits = bits;
940
941         //  calculate number of latch hash table entries
942
943         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
944         mgr->latchhash = mgr->nlatchpage * mgr->page_size / sizeof(BtHashEntry);
945
946         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
947         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
948         mgr->latchtotal = nodemax;
949
950         if( !initit )
951                 goto mgrlatch;
952
953         // initialize an empty b-tree with latch page, root page, page of leaves
954         // and page(s) of latches and page pool cache
955
956         memset (pagezero, 0, 1 << bits);
957         pagezero->alloc->bits = mgr->page_bits;
958         bt_putid(pagezero->alloc->right, MIN_lvl+1);
959
960         //  initialize left-most LEAF page in
961         //      alloc->left.
962
963         bt_putid (pagezero->alloc->left, LEAF_page);
964
965         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
966                 fprintf (stderr, "Unable to create btree page zero\n");
967                 return bt_mgrclose (mgr), NULL;
968         }
969
970         memset (pagezero, 0, 1 << bits);
971         pagezero->alloc->bits = mgr->page_bits;
972
973         for( lvl=MIN_lvl; lvl--; ) {
974                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
975                 key = keyptr(pagezero->alloc, 1);
976                 key->len = 2;           // create stopper key
977                 key->key[0] = 0xff;
978                 key->key[1] = 0xff;
979
980                 bt_putid(value, MIN_lvl - lvl + 1);
981                 val = valptr(pagezero->alloc, 1);
982                 val->len = lvl ? BtId : 0;
983                 memcpy (val->value, value, val->len);
984
985                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
986                 pagezero->alloc->lvl = lvl;
987                 pagezero->alloc->cnt = 1;
988                 pagezero->alloc->act = 1;
989
990                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
991                         fprintf (stderr, "Unable to create btree page zero\n");
992                         return bt_mgrclose (mgr), NULL;
993                 }
994         }
995
996 mgrlatch:
997 #ifdef unix
998         free (pagezero);
999 #else
1000         VirtualFree (pagezero, 0, MEM_RELEASE);
1001 #endif
1002 #ifdef unix
1003         // mlock the pagezero page
1004
1005         flag = PROT_READ | PROT_WRITE;
1006         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1007         if( mgr->pagezero == MAP_FAILED ) {
1008                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1009                 return bt_mgrclose (mgr), NULL;
1010         }
1011         mlock (mgr->pagezero, mgr->page_size);
1012
1013         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage * mgr->page_size, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1014         if( mgr->hashtable == MAP_FAILED ) {
1015                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1016                 return bt_mgrclose (mgr), NULL;
1017         }
1018 #else
1019         flag = PAGE_READWRITE;
1020         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1021         if( !mgr->halloc ) {
1022                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1023                 return bt_mgrclose (mgr), NULL;
1024         }
1025
1026         flag = FILE_MAP_WRITE;
1027         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1028         if( !mgr->pagezero ) {
1029                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1030                 return bt_mgrclose (mgr), NULL;
1031         }
1032
1033         flag = PAGE_READWRITE;
1034         size = (uid)mgr->nlatchpage << mgr->page_bits;
1035         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1036         if( !mgr->hpool ) {
1037                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1038                 return bt_mgrclose (mgr), NULL;
1039         }
1040
1041         flag = FILE_MAP_WRITE;
1042         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1043         if( !mgr->hashtable ) {
1044                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1045                 return bt_mgrclose (mgr), NULL;
1046         }
1047 #endif
1048
1049         mgr->pagepool = (unsigned char *)mgr->hashtable + (uid)(mgr->nlatchpage - mgr->latchtotal) * mgr->page_size;
1050         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1051
1052         return mgr;
1053 }
1054
1055 //      open BTree access method
1056 //      based on buffer manager
1057
1058 BtDb *bt_open (BtMgr *mgr)
1059 {
1060 BtDb *bt = malloc (sizeof(*bt));
1061
1062         memset (bt, 0, sizeof(*bt));
1063         bt->mgr = mgr;
1064 #ifdef unix
1065         bt->mem = valloc (2 *mgr->page_size);
1066 #else
1067         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1068 #endif
1069         bt->frame = (BtPage)bt->mem;
1070         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1071         return bt;
1072 }
1073
1074 //  compare two keys, returning > 0, = 0, or < 0
1075 //  as the comparison value
1076
1077 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1078 {
1079 uint len1 = key1->len;
1080 int ans;
1081
1082         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1083                 return ans;
1084
1085         if( len1 > len2 )
1086                 return 1;
1087         if( len1 < len2 )
1088                 return -1;
1089
1090         return 0;
1091 }
1092
1093 // place write, read, or parent lock on requested page_no.
1094
1095 void bt_lockpage(BtLock mode, BtLatchSet *set)
1096 {
1097         switch( mode ) {
1098         case BtLockRead:
1099                 ReadLock (set->readwr);
1100                 break;
1101         case BtLockWrite:
1102                 WriteLock (set->readwr);
1103                 break;
1104         case BtLockAccess:
1105                 ReadLock (set->access);
1106                 break;
1107         case BtLockDelete:
1108                 WriteLock (set->access);
1109                 break;
1110         case BtLockParent:
1111                 WriteLock (set->parent);
1112                 break;
1113         }
1114 }
1115
1116 // remove write, read, or parent lock on requested page
1117
1118 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1119 {
1120         switch( mode ) {
1121         case BtLockRead:
1122                 ReadRelease (set->readwr);
1123                 break;
1124         case BtLockWrite:
1125                 WriteRelease (set->readwr);
1126                 break;
1127         case BtLockAccess:
1128                 ReadRelease (set->access);
1129                 break;
1130         case BtLockDelete:
1131                 WriteRelease (set->access);
1132                 break;
1133         case BtLockParent:
1134                 WriteRelease (set->parent);
1135                 break;
1136         }
1137 }
1138
1139 //      allocate a new page
1140 //      return with page latched.
1141
1142 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1143 {
1144 int blk;
1145
1146         //      lock allocation page
1147
1148         bt_spinwritelock(bt->mgr->lock);
1149
1150         // use empty chain first
1151         // else allocate empty page
1152
1153         if( set->page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1154                 if( set->latch = bt_pinlatch (bt, set->page_no, 1) )
1155                         set->page = bt_mappage (bt, set->latch);
1156                 else
1157                         return bt->err = BTERR_struct, -1;
1158
1159                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1160                 bt_spinreleasewrite(bt->mgr->lock);
1161
1162                 memcpy (set->page, contents, bt->mgr->page_size);
1163                 set->latch->dirty = 1;
1164                 return 0;
1165         }
1166
1167         set->page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1168         bt_putid(bt->mgr->pagezero->alloc->right, set->page_no+1);
1169
1170         // unlock allocation latch
1171
1172         bt_spinreleasewrite(bt->mgr->lock);
1173
1174         //      don't load cache from btree page
1175
1176         if( set->latch = bt_pinlatch (bt, set->page_no, 0) )
1177                 set->page = bt_mappage (bt, set->latch);
1178         else
1179                 return bt->err = BTERR_struct;
1180
1181         memcpy (set->page, contents, bt->mgr->page_size);
1182         set->latch->dirty = 1;
1183         return 0;
1184 }
1185
1186 //  find slot in page for given key at a given level
1187
1188 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1189 {
1190 uint diff, higher = set->page->cnt, low = 1, slot;
1191 uint good = 0;
1192
1193         //        make stopper key an infinite fence value
1194
1195         if( bt_getid (set->page->right) )
1196                 higher++;
1197         else
1198                 good++;
1199
1200         //      low is the lowest candidate.
1201         //  loop ends when they meet
1202
1203         //  higher is already
1204         //      tested as .ge. the passed key.
1205
1206         while( diff = higher - low ) {
1207                 slot = low + ( diff >> 1 );
1208                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1209                         low = slot + 1;
1210                 else
1211                         higher = slot, good++;
1212         }
1213
1214         //      return zero if key is on right link page
1215
1216         return good ? higher : 0;
1217 }
1218
1219 //  find and load page at given level for given key
1220 //      leave page rd or wr locked as requested
1221
1222 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1223 {
1224 uid page_no = ROOT_page, prevpage = 0;
1225 uint drill = 0xff, slot;
1226 BtLatchSet *prevlatch;
1227 uint mode, prevmode;
1228
1229   //  start at root of btree and drill down
1230
1231   do {
1232         // determine lock mode of drill level
1233         mode = (drill == lvl) ? lock : BtLockRead; 
1234
1235         if( set->latch = bt_pinlatch (bt, page_no, 1) )
1236                 set->page_no = page_no;
1237         else
1238                 return 0;
1239
1240         // obtain access lock using lock chaining with Access mode
1241
1242         if( page_no > ROOT_page )
1243           bt_lockpage(BtLockAccess, set->latch);
1244
1245         set->page = bt_mappage (bt, set->latch);
1246
1247         //      release & unpin parent page
1248
1249         if( prevpage ) {
1250           bt_unlockpage(prevmode, prevlatch);
1251           bt_unpinlatch (prevlatch);
1252           prevpage = 0;
1253         }
1254
1255         // obtain read lock using lock chaining
1256
1257         bt_lockpage(mode, set->latch);
1258
1259         if( set->page->free )
1260                 return bt->err = BTERR_struct, 0;
1261
1262         if( page_no > ROOT_page )
1263           bt_unlockpage(BtLockAccess, set->latch);
1264
1265         // re-read and re-lock root after determining actual level of root
1266
1267         if( set->page->lvl != drill) {
1268                 if( set->page_no != ROOT_page )
1269                         return bt->err = BTERR_struct, 0;
1270                         
1271                 drill = set->page->lvl;
1272
1273                 if( lock != BtLockRead && drill == lvl ) {
1274                   bt_unlockpage(mode, set->latch);
1275                   bt_unpinlatch (set->latch);
1276                   continue;
1277                 }
1278         }
1279
1280         prevpage = set->page_no;
1281         prevlatch = set->latch;
1282         prevmode = mode;
1283
1284         //  find key on page at this level
1285         //  and descend to requested level
1286
1287         if( set->page->kill )
1288           goto slideright;
1289
1290         if( slot = bt_findslot (set, key, len) ) {
1291           if( drill == lvl )
1292                 return slot;
1293
1294           while( slotptr(set->page, slot)->dead )
1295                 if( slot++ < set->page->cnt )
1296                         continue;
1297                 else
1298                         goto slideright;
1299
1300           page_no = bt_getid(valptr(set->page, slot)->value);
1301           drill--;
1302           continue;
1303         }
1304
1305         //  or slide right into next page
1306
1307 slideright:
1308         page_no = bt_getid(set->page->right);
1309
1310   } while( page_no );
1311
1312   // return error on end of right chain
1313
1314   bt->err = BTERR_struct;
1315   return 0;     // return error
1316 }
1317
1318 //      return page to free list
1319 //      page must be delete & write locked
1320
1321 void bt_freepage (BtDb *bt, BtPageSet *set)
1322 {
1323         //      lock allocation page
1324
1325         bt_spinwritelock (bt->mgr->lock);
1326
1327         //      store chain
1328         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1329         bt_putid(bt->mgr->pagezero->chain, set->page_no);
1330         set->latch->dirty = 1;
1331         set->page->free = 1;
1332
1333         // unlock released page
1334
1335         bt_unlockpage (BtLockDelete, set->latch);
1336         bt_unlockpage (BtLockWrite, set->latch);
1337         bt_unpinlatch (set->latch);
1338
1339         // unlock allocation page
1340
1341         bt_spinreleasewrite (bt->mgr->lock);
1342 }
1343
1344 //      a fence key was deleted from a page
1345 //      push new fence value upwards
1346
1347 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1348 {
1349 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1350 unsigned char value[BtId];
1351 BtKey* ptr;
1352 uint idx;
1353
1354         //      remove the old fence value
1355
1356         ptr = keyptr(set->page, set->page->cnt);
1357         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1358         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1359         set->latch->dirty = 1;
1360
1361         //  cache new fence value
1362
1363         ptr = keyptr(set->page, set->page->cnt);
1364         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1365
1366         bt_lockpage (BtLockParent, set->latch);
1367         bt_unlockpage (BtLockWrite, set->latch);
1368
1369         //      insert new (now smaller) fence key
1370
1371         bt_putid (value, set->page_no);
1372         ptr = (BtKey*)leftkey;
1373
1374         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1375           return bt->err;
1376
1377         //      now delete old fence key
1378
1379         ptr = (BtKey*)rightkey;
1380
1381         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1382                 return bt->err;
1383
1384         bt_unlockpage (BtLockParent, set->latch);
1385         bt_unpinlatch(set->latch);
1386         return 0;
1387 }
1388
1389 //      root has a single child
1390 //      collapse a level from the tree
1391
1392 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1393 {
1394 BtPageSet child[1];
1395 uint idx;
1396
1397   // find the child entry and promote as new root contents
1398
1399   do {
1400         for( idx = 0; idx++ < root->page->cnt; )
1401           if( !slotptr(root->page, idx)->dead )
1402                 break;
1403
1404         child->page_no = bt_getid (valptr(root->page, idx)->value);
1405
1406         if( child->latch = bt_pinlatch (bt, child->page_no, 1) )
1407                 child->page = bt_mappage (bt, child->latch);
1408         else
1409                 return bt->err;
1410
1411         bt_lockpage (BtLockDelete, child->latch);
1412         bt_lockpage (BtLockWrite, child->latch);
1413
1414         memcpy (root->page, child->page, bt->mgr->page_size);
1415         root->latch->dirty = 1;
1416
1417         bt_freepage (bt, child);
1418
1419   } while( root->page->lvl > 1 && root->page->act == 1 );
1420
1421   bt_unlockpage (BtLockWrite, root->latch);
1422   bt_unpinlatch (root->latch);
1423   return 0;
1424 }
1425
1426 //      maintain reverse scan pointers by
1427 //      linking left pointer of far right node
1428
1429 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1430 {
1431 BtPageSet right2[1];
1432
1433         //      keep track of rightmost leaf page
1434
1435         if( !right_page_no ) {
1436           bt_putid (bt->mgr->pagezero->alloc->left, left_page_no);
1437           return 0;
1438         }
1439
1440         //      link right page to left page
1441
1442         if( right2->latch = bt_pinlatch (bt, right_page_no, 1) )
1443                 right2->page = bt_mappage (bt, right2->latch);
1444         else
1445                 return bt->err;
1446
1447         bt_lockpage (BtLockWrite, right2->latch);
1448
1449         bt_putid(right2->page->left, left_page_no);
1450         bt_unlockpage (BtLockWrite, right2->latch);
1451         bt_unpinlatch (right2->latch);
1452         return 0;
1453 }
1454
1455 //  find and delete key on page by marking delete flag bit
1456 //  if page becomes empty, delete it from the btree
1457
1458 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1459 {
1460 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1461 uint slot, idx, found, fence;
1462 BtPageSet set[1], right[1];
1463 unsigned char value[BtId];
1464 BtKey *ptr, *tst;
1465 BtVal *val;
1466
1467         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1468                 ptr = keyptr(set->page, slot);
1469         else
1470                 return bt->err;
1471
1472         // if librarian slot, advance to real slot
1473
1474         if( slotptr(set->page, slot)->type == Librarian )
1475                 ptr = keyptr(set->page, ++slot);
1476
1477         fence = slot == set->page->cnt;
1478
1479         // if key is found delete it, otherwise ignore request
1480
1481         if( found = !keycmp (ptr, key, len) )
1482           if( found = slotptr(set->page, slot)->dead == 0 ) {
1483                 val = valptr(set->page,slot);
1484                 slotptr(set->page, slot)->dead = 1;
1485                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1486                 set->page->act--;
1487
1488                 // collapse empty slots beneath the fence
1489
1490                 while( idx = set->page->cnt - 1 )
1491                   if( slotptr(set->page, idx)->dead ) {
1492                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1493                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1494                   } else
1495                         break;
1496           }
1497
1498         //      did we delete a fence key in an upper level?
1499
1500         if( found && lvl && set->page->act && fence )
1501           if( bt_fixfence (bt, set, lvl) )
1502                 return bt->err;
1503           else
1504                 return bt->found = found, 0;
1505
1506         //      do we need to collapse root?
1507
1508         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1509           if( bt_collapseroot (bt, set) )
1510                 return bt->err;
1511           else
1512                 return bt->found = found, 0;
1513
1514         //      return if page is not empty
1515
1516         if( set->page->act ) {
1517                 set->latch->dirty = 1;
1518                 bt_unlockpage(BtLockWrite, set->latch);
1519                 bt_unpinlatch (set->latch);
1520                 return bt->found = found, 0;
1521         }
1522
1523         //      cache copy of fence key
1524         //      to post in parent
1525
1526         ptr = keyptr(set->page, set->page->cnt);
1527         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1528
1529         //      obtain lock on right page
1530
1531         right->page_no = bt_getid(set->page->right);
1532
1533         if( right->latch = bt_pinlatch (bt, right->page_no, 1) )
1534                 right->page = bt_mappage (bt, right->latch);
1535         else
1536                 return 0;
1537
1538         bt_lockpage (BtLockWrite, right->latch);
1539
1540         if( right->page->kill )
1541                 return bt->err = BTERR_struct;
1542
1543         // transfer left link
1544
1545         memcpy (right->page->left, set->page->left, BtId);
1546
1547         // pull contents of right peer into our empty page
1548
1549         memcpy (set->page, right->page, bt->mgr->page_size);
1550         set->latch->dirty = 1;
1551
1552         // update left link
1553
1554         if( !lvl )
1555           if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1556                 return bt->err;
1557
1558         // cache copy of key to update
1559
1560         ptr = keyptr(right->page, right->page->cnt);
1561         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1562
1563         // mark right page deleted and point it to left page
1564         //      until we can post parent updates
1565
1566         bt_putid (right->page->right, set->page_no);
1567         right->latch->dirty = 1;
1568         right->page->kill = 1;
1569
1570         bt_lockpage (BtLockParent, right->latch);
1571         bt_unlockpage (BtLockWrite, right->latch);
1572
1573         bt_lockpage (BtLockParent, set->latch);
1574         bt_unlockpage (BtLockWrite, set->latch);
1575
1576         // redirect higher key directly to our new node contents
1577
1578         bt_putid (value, set->page_no);
1579         ptr = (BtKey*)higherfence;
1580
1581         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1582           return bt->err;
1583
1584         //      delete old lower key to our node
1585
1586         ptr = (BtKey*)lowerfence;
1587
1588         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1589           return bt->err;
1590
1591         //      obtain delete and write locks to right node
1592
1593         bt_unlockpage (BtLockParent, right->latch);
1594         bt_lockpage (BtLockDelete, right->latch);
1595         bt_lockpage (BtLockWrite, right->latch);
1596         bt_freepage (bt, right);
1597
1598         bt_unlockpage (BtLockParent, set->latch);
1599         bt_unpinlatch (set->latch);
1600         bt->found = found;
1601         return 0;
1602 }
1603
1604 BtKey *bt_foundkey (BtDb *bt)
1605 {
1606         return (BtKey*)(bt->key);
1607 }
1608
1609 //      advance to next slot
1610
1611 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1612 {
1613 BtLatchSet *prevlatch;
1614 uid page_no;
1615
1616         if( slot < set->page->cnt )
1617                 return slot + 1;
1618
1619         prevlatch = set->latch;
1620
1621         if( page_no = bt_getid(set->page->right) )
1622           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1623                 set->page = bt_mappage (bt, set->latch);
1624           else
1625                 return 0;
1626         else
1627           return bt->err = BTERR_struct, 0;
1628
1629         // obtain access lock using lock chaining with Access mode
1630
1631         bt_lockpage(BtLockAccess, set->latch);
1632
1633         bt_unlockpage(BtLockRead, prevlatch);
1634         bt_unpinlatch (prevlatch);
1635
1636         bt_lockpage(BtLockRead, set->latch);
1637         bt_unlockpage(BtLockAccess, set->latch);
1638
1639         set->page_no = page_no;
1640         return 1;
1641 }
1642
1643 //      find unique key or first duplicate key in
1644 //      leaf level and return number of value bytes
1645 //      or (-1) if not found.  Setup key for bt_foundkey
1646
1647 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1648 {
1649 BtPageSet set[1];
1650 uint len, slot;
1651 int ret = -1;
1652 BtKey *ptr;
1653 BtVal *val;
1654
1655   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1656    do {
1657         ptr = keyptr(set->page, slot);
1658
1659         //      skip librarian slot place holder
1660
1661         if( slotptr(set->page, slot)->type == Librarian )
1662                 ptr = keyptr(set->page, ++slot);
1663
1664         //      return actual key found
1665
1666         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1667         len = ptr->len;
1668
1669         if( slotptr(set->page, slot)->type == Duplicate )
1670                 len -= BtId;
1671
1672         //      not there if we reach the stopper key
1673
1674         if( slot == set->page->cnt )
1675           if( !bt_getid (set->page->right) )
1676                 break;
1677
1678         // if key exists, return >= 0 value bytes copied
1679         //      otherwise return (-1)
1680
1681         if( slotptr(set->page, slot)->dead )
1682                 continue;
1683
1684         if( keylen == len )
1685           if( !memcmp (ptr->key, key, len) ) {
1686                 val = valptr (set->page,slot);
1687                 if( valmax > val->len )
1688                         valmax = val->len;
1689                 memcpy (value, val->value, valmax);
1690                 ret = valmax;
1691           }
1692
1693         break;
1694
1695    } while( slot = bt_findnext (bt, set, slot) );
1696
1697   bt_unlockpage (BtLockRead, set->latch);
1698   bt_unpinlatch (set->latch);
1699   return ret;
1700 }
1701
1702 //      check page for space available,
1703 //      clean if necessary and return
1704 //      0 - page needs splitting
1705 //      >0  new slot value
1706
1707 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1708 {
1709 uint nxt = bt->mgr->page_size;
1710 BtPage page = set->page;
1711 uint cnt = 0, idx = 0;
1712 uint max = page->cnt;
1713 uint newslot = max;
1714 BtKey *key;
1715 BtVal *val;
1716
1717         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1718                 return slot;
1719
1720         //      skip cleanup and proceed to split
1721         //      if there's not enough garbage
1722         //      to bother with.
1723
1724         if( page->garbage < nxt / 5 )
1725                 return 0;
1726
1727         memcpy (bt->frame, page, bt->mgr->page_size);
1728
1729         // skip page info and set rest of page to zero
1730
1731         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1732         set->latch->dirty = 1;
1733         page->garbage = 0;
1734         page->act = 0;
1735
1736         // clean up page first by
1737         // removing deleted keys
1738
1739         while( cnt++ < max ) {
1740                 if( cnt == slot )
1741                         newslot = idx + 2;
1742                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1743                         continue;
1744
1745                 // copy the value across
1746
1747                 val = valptr(bt->frame, cnt);
1748                 nxt -= val->len + sizeof(BtVal);
1749                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1750
1751                 // copy the key across
1752
1753                 key = keyptr(bt->frame, cnt);
1754                 nxt -= key->len + sizeof(BtKey);
1755                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1756
1757                 // make a librarian slot
1758
1759                 if( idx ) {
1760                         slotptr(page, ++idx)->off = nxt;
1761                         slotptr(page, idx)->type = Librarian;
1762                         slotptr(page, idx)->dead = 1;
1763                 }
1764
1765                 // set up the slot
1766
1767                 slotptr(page, ++idx)->off = nxt;
1768                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1769
1770                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1771                         page->act++;
1772         }
1773
1774         page->min = nxt;
1775         page->cnt = idx;
1776
1777         //      see if page has enough space now, or does it need splitting?
1778
1779         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1780                 return newslot;
1781
1782         return 0;
1783 }
1784
1785 // split the root and raise the height of the btree
1786
1787 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
1788 {
1789 uint nxt = bt->mgr->page_size;
1790 unsigned char value[BtId];
1791 BtPageSet left[1];
1792 BtKey *ptr;
1793 BtVal *val;
1794
1795         //  Obtain an empty page to use, and copy the current
1796         //  root contents into it, e.g. lower keys
1797
1798         if( bt_newpage(bt, left, root->page) )
1799                 return bt->err;
1800
1801         bt_unpinlatch (left->latch);
1802
1803         // set left from higher to lower page
1804
1805         bt_putid (right->page->left, left->page_no);
1806         bt_unpinlatch (right->latch);
1807
1808         // preserve the page info at the bottom
1809         // of higher keys and set rest to zero
1810
1811         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1812
1813         // insert stopper key at top of newroot page
1814         // and increase the root height
1815
1816         nxt -= BtId + sizeof(BtVal);
1817         bt_putid (value, right->page_no);
1818         val = (BtVal *)((unsigned char *)root->page + nxt);
1819         memcpy (val->value, value, BtId);
1820         val->len = BtId;
1821
1822         nxt -= 2 + sizeof(BtKey);
1823         slotptr(root->page, 2)->off = nxt;
1824         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1825         ptr->len = 2;
1826         ptr->key[0] = 0xff;
1827         ptr->key[1] = 0xff;
1828
1829         // insert lower keys page fence key on newroot page as first key
1830
1831         nxt -= BtId + sizeof(BtVal);
1832         bt_putid (value, left->page_no);
1833         val = (BtVal *)((unsigned char *)root->page + nxt);
1834         memcpy (val->value, value, BtId);
1835         val->len = BtId;
1836
1837         nxt -= leftkey->len + sizeof(BtKey);
1838         slotptr(root->page, 1)->off = nxt;
1839         memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
1840         
1841         bt_putid(root->page->right, 0);
1842         root->page->min = nxt;          // reset lowest used offset and key count
1843         root->page->cnt = 2;
1844         root->page->act = 2;
1845         root->page->lvl++;
1846
1847         // release and unpin root pages
1848
1849         bt_unlockpage(BtLockWrite, root->latch);
1850         bt_unpinlatch (root->latch);
1851         return 0;
1852 }
1853
1854 //  split already locked full node
1855 //      return unlocked.
1856
1857 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1858 {
1859 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
1860 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1861 unsigned char value[BtId];
1862 uint lvl = set->page->lvl;
1863 BtPageSet right[1];
1864 BtKey *key, *ptr;
1865 BtVal *val, *src;
1866 uid right2;
1867 uint prev;
1868
1869         //  split higher half of keys to bt->frame
1870
1871         memset (bt->frame, 0, bt->mgr->page_size);
1872         max = set->page->cnt;
1873         cnt = max / 2;
1874         idx = 0;
1875
1876         while( cnt++ < max ) {
1877                 if( slotptr(set->page, cnt)->dead && cnt < max )
1878                         continue;
1879                 src = valptr(set->page, cnt);
1880                 nxt -= src->len + sizeof(BtVal);
1881                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1882
1883                 key = keyptr(set->page, cnt);
1884                 nxt -= key->len + sizeof(BtKey);
1885                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1886                 memcpy (ptr, key, key->len + sizeof(BtKey));
1887
1888                 //      add librarian slot
1889
1890                 if( idx ) {
1891                         slotptr(bt->frame, ++idx)->off = nxt;
1892                         slotptr(bt->frame, idx)->type = Librarian;
1893                         slotptr(bt->frame, idx)->dead = 1;
1894                 }
1895
1896                 //  add actual slot
1897
1898                 slotptr(bt->frame, ++idx)->off = nxt;
1899                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1900
1901                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1902                         bt->frame->act++;
1903         }
1904
1905         // remember existing fence key for new page to the right
1906
1907         memcpy (rightkey, key, key->len + sizeof(BtKey));
1908
1909         bt->frame->bits = bt->mgr->page_bits;
1910         bt->frame->min = nxt;
1911         bt->frame->cnt = idx;
1912         bt->frame->lvl = lvl;
1913
1914         // link right node
1915
1916         if( set->page_no > ROOT_page ) {
1917                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1918                 bt_putid(bt->frame->left, set->page_no);
1919         }
1920
1921         //      get new free page and write higher keys to it.
1922
1923         if( bt_newpage(bt, right, bt->frame) )
1924                 return bt->err;
1925
1926         // link left node
1927
1928         if( set->page_no > ROOT_page && !lvl )
1929           if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
1930                 return bt->err;
1931
1932         //      update lower keys to continue in old page
1933
1934         memcpy (bt->frame, set->page, bt->mgr->page_size);
1935         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1936         set->latch->dirty = 1;
1937
1938         nxt = bt->mgr->page_size;
1939         set->page->garbage = 0;
1940         set->page->act = 0;
1941         max /= 2;
1942         cnt = 0;
1943         idx = 0;
1944
1945         if( slotptr(bt->frame, max)->type == Librarian )
1946                 max--;
1947
1948         //  assemble page of smaller keys
1949
1950         while( cnt++ < max ) {
1951                 if( slotptr(bt->frame, cnt)->dead )
1952                         continue;
1953                 val = valptr(bt->frame, cnt);
1954                 nxt -= val->len + sizeof(BtVal);
1955                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1956
1957                 key = keyptr(bt->frame, cnt);
1958                 nxt -= key->len + sizeof(BtKey);
1959                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1960
1961                 //      add librarian slot
1962
1963                 if( idx ) {
1964                         slotptr(set->page, ++idx)->off = nxt;
1965                         slotptr(set->page, idx)->type = Librarian;
1966                         slotptr(set->page, idx)->dead = 1;
1967                 }
1968
1969                 //      add actual slot
1970
1971                 slotptr(set->page, ++idx)->off = nxt;
1972                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1973                 set->page->act++;
1974         }
1975
1976         // remember fence key for smaller page
1977
1978         memcpy(fencekey, key, key->len + sizeof(BtKey));
1979
1980         bt_putid(set->page->right, right->page_no);
1981         set->page->min = nxt;
1982         set->page->cnt = idx;
1983
1984         // if current page is the root page, split it
1985
1986         if( set->page_no == ROOT_page )
1987                 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
1988
1989         // insert new fences in their parent pages
1990
1991         bt_lockpage (BtLockParent, right->latch);
1992
1993         bt_lockpage (BtLockParent, set->latch);
1994         bt_unlockpage (BtLockWrite, set->latch);
1995
1996         // insert new fence for reformulated left block of smaller keys
1997
1998         bt_putid (value, set->page_no);
1999
2000         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2001                 return bt->err;
2002
2003         // switch fence for right block of larger keys to new right page
2004
2005         bt_putid (value, right->page_no);
2006
2007         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2008                 return bt->err;
2009
2010         bt_unlockpage (BtLockParent, set->latch);
2011         bt_unpinlatch (set->latch);
2012
2013         bt_unlockpage (BtLockParent, right->latch);
2014         bt_unpinlatch (right->latch);
2015         return 0;
2016 }
2017
2018 //      install new key and value onto page
2019 //      page must already be checked for
2020 //      adequate space
2021
2022 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2023 {
2024 uint idx, librarian;
2025 BtSlot *node;
2026 BtKey *ptr;
2027 BtVal *val;
2028
2029         //      if found slot > desired slot and previous slot
2030         //      is a librarian slot, use it
2031
2032         if( slot > 1 )
2033           if( slotptr(set->page, slot-1)->type == Librarian )
2034                 slot--;
2035
2036         // copy value onto page
2037
2038         set->page->min -= vallen + sizeof(BtVal);
2039         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2040         memcpy (val->value, value, vallen);
2041         val->len = vallen;
2042
2043         // copy key onto page
2044
2045         set->page->min -= keylen + sizeof(BtKey);
2046         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2047         memcpy (ptr->key, key, keylen);
2048         ptr->len = keylen;
2049         
2050         //      find first empty slot
2051
2052         for( idx = slot; idx < set->page->cnt; idx++ )
2053           if( slotptr(set->page, idx)->dead )
2054                 break;
2055
2056         // now insert key into array before slot
2057
2058         if( idx == set->page->cnt )
2059                 idx += 2, set->page->cnt += 2, librarian = 2;
2060         else
2061                 librarian = 1;
2062
2063         set->latch->dirty = 1;
2064         set->page->act++;
2065
2066         while( idx > slot + librarian - 1 )
2067                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2068
2069         //      add librarian slot
2070
2071         if( librarian > 1 ) {
2072                 node = slotptr(set->page, slot++);
2073                 node->off = set->page->min;
2074                 node->type = Librarian;
2075                 node->dead = 1;
2076         }
2077
2078         //      fill in new slot
2079
2080         node = slotptr(set->page, slot);
2081         node->off = set->page->min;
2082         node->type = type;
2083         node->dead = 0;
2084
2085         bt_unlockpage (BtLockWrite, set->latch);
2086         bt_unpinlatch (set->latch);
2087         return 0;
2088 }
2089
2090 //  Insert new key into the btree at given level.
2091 //      either add a new key or update/add an existing one
2092
2093 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2094 {
2095 unsigned char newkey[BT_keyarray];
2096 uint slot, idx, len;
2097 BtPageSet set[1];
2098 BtKey *ptr, *ins;
2099 uid sequence;
2100 BtVal *val;
2101 uint type;
2102
2103   // set up the key we're working on
2104
2105   ins = (BtKey*)newkey;
2106   memcpy (ins->key, key, keylen);
2107   ins->len = keylen;
2108
2109   // is this a non-unique index value?
2110
2111   if( unique )
2112         type = Unique;
2113   else {
2114         type = Duplicate;
2115         sequence = bt_newdup (bt);
2116         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2117         ins->len += BtId;
2118   }
2119
2120   while( 1 ) { // find the page and slot for the current key
2121         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2122                 ptr = keyptr(set->page, slot);
2123         else {
2124                 if( !bt->err )
2125                         bt->err = BTERR_ovflw;
2126                 return bt->err;
2127         }
2128
2129         // if librarian slot == found slot, advance to real slot
2130
2131         if( slotptr(set->page, slot)->type == Librarian )
2132           if( !keycmp (ptr, key, keylen) )
2133                 ptr = keyptr(set->page, ++slot);
2134
2135         len = ptr->len;
2136
2137         if( slotptr(set->page, slot)->type == Duplicate )
2138                 len -= BtId;
2139
2140         //  if inserting a duplicate key or unique key
2141         //      check for adequate space on the page
2142         //      and insert the new key before slot.
2143
2144         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2145           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2146                 if( bt_splitpage (bt, set) )
2147                   return bt->err;
2148                 else
2149                   continue;
2150
2151           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2152         }
2153
2154         // if key already exists, update value and return
2155
2156         val = valptr(set->page, slot);
2157
2158         if( val->len >= vallen ) {
2159                 if( slotptr(set->page, slot)->dead )
2160                         set->page->act++;
2161                 set->page->garbage += val->len - vallen;
2162                 set->latch->dirty = 1;
2163                 slotptr(set->page, slot)->dead = 0;
2164                 val->len = vallen;
2165                 memcpy (val->value, value, vallen);
2166                 bt_unlockpage(BtLockWrite, set->latch);
2167                 bt_unpinlatch (set->latch);
2168                 return 0;
2169         }
2170
2171         //      new update value doesn't fit in existing value area
2172
2173         if( !slotptr(set->page, slot)->dead )
2174                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2175         else {
2176                 slotptr(set->page, slot)->dead = 0;
2177                 set->page->act++;
2178         }
2179
2180         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2181           if( bt_splitpage (bt, set) )
2182                 return bt->err;
2183           else
2184                 continue;
2185
2186         set->page->min -= vallen + sizeof(BtVal);
2187         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2188         memcpy (val->value, value, vallen);
2189         val->len = vallen;
2190
2191         set->latch->dirty = 1;
2192         set->page->min -= keylen + sizeof(BtKey);
2193         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2194         memcpy (ptr->key, key, keylen);
2195         ptr->len = keylen;
2196         
2197         slotptr(set->page, slot)->off = set->page->min;
2198         bt_unlockpage(BtLockWrite, set->latch);
2199         bt_unpinlatch (set->latch);
2200         return 0;
2201   }
2202   return 0;
2203 }
2204
2205 //      set cursor to highest slot on highest page
2206
2207 uint bt_lastkey (BtDb *bt)
2208 {
2209 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2210 BtPageSet set[1];
2211 uint slot;
2212
2213         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2214                 set->page = bt_mappage (bt, set->latch);
2215         else
2216                 return 0;
2217
2218     bt_lockpage(BtLockRead, set->latch);
2219
2220         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2221         slot = set->page->cnt;
2222
2223     bt_unlockpage(BtLockRead, set->latch);
2224         bt_unpinlatch (set->latch);
2225         return slot;
2226 }
2227
2228 //      return previous slot on cursor page
2229
2230 uint bt_prevkey (BtDb *bt, uint slot)
2231 {
2232 BtPageSet set[1];
2233 uid left;
2234
2235         if( --slot )
2236                 return slot;
2237
2238         if( left = bt_getid(bt->cursor->left) )
2239                 bt->cursor_page = left;
2240         else
2241                 return 0;
2242
2243         if( set->latch = bt_pinlatch (bt, left, 1) )
2244                 set->page = bt_mappage (bt, set->latch);
2245         else
2246                 return 0;
2247
2248     bt_lockpage(BtLockRead, set->latch);
2249         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2250         bt_unlockpage(BtLockRead, set->latch);
2251         bt_unpinlatch (set->latch);
2252         return bt->cursor->cnt;
2253 }
2254
2255 //  return next slot on cursor page
2256 //  or slide cursor right into next page
2257
2258 uint bt_nextkey (BtDb *bt, uint slot)
2259 {
2260 BtPageSet set[1];
2261 uid right;
2262
2263   do {
2264         right = bt_getid(bt->cursor->right);
2265
2266         while( slot++ < bt->cursor->cnt )
2267           if( slotptr(bt->cursor,slot)->dead )
2268                 continue;
2269           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2270                 return slot;
2271           else
2272                 break;
2273
2274         if( !right )
2275                 break;
2276
2277         bt->cursor_page = right;
2278
2279         if( set->latch = bt_pinlatch (bt, right, 1) )
2280                 set->page = bt_mappage (bt, set->latch);
2281         else
2282                 return 0;
2283
2284     bt_lockpage(BtLockRead, set->latch);
2285
2286         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2287
2288         bt_unlockpage(BtLockRead, set->latch);
2289         bt_unpinlatch (set->latch);
2290         slot = 0;
2291
2292   } while( 1 );
2293
2294   return bt->err = 0;
2295 }
2296
2297 //  cache page of keys into cursor and return starting slot for given key
2298
2299 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2300 {
2301 BtPageSet set[1];
2302 uint slot;
2303
2304         // cache page for retrieval
2305
2306         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2307           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2308         else
2309           return 0;
2310
2311         bt->cursor_page = set->page_no;
2312
2313         bt_unlockpage(BtLockRead, set->latch);
2314         bt_unpinlatch (set->latch);
2315         return slot;
2316 }
2317
2318 BtKey *bt_key(BtDb *bt, uint slot)
2319 {
2320         return keyptr(bt->cursor, slot);
2321 }
2322
2323 BtVal *bt_val(BtDb *bt, uint slot)
2324 {
2325         return valptr(bt->cursor,slot);
2326 }
2327
2328 #ifdef STANDALONE
2329
2330 #ifndef unix
2331 double getCpuTime(int type)
2332 {
2333 FILETIME crtime[1];
2334 FILETIME xittime[1];
2335 FILETIME systime[1];
2336 FILETIME usrtime[1];
2337 SYSTEMTIME timeconv[1];
2338 double ans = 0;
2339
2340         memset (timeconv, 0, sizeof(SYSTEMTIME));
2341
2342         switch( type ) {
2343         case 0:
2344                 GetSystemTimeAsFileTime (xittime);
2345                 FileTimeToSystemTime (xittime, timeconv);
2346                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2347                 break;
2348         case 1:
2349                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2350                 FileTimeToSystemTime (usrtime, timeconv);
2351                 break;
2352         case 2:
2353                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2354                 FileTimeToSystemTime (systime, timeconv);
2355                 break;
2356         }
2357
2358         ans += (double)timeconv->wHour * 3600;
2359         ans += (double)timeconv->wMinute * 60;
2360         ans += (double)timeconv->wSecond;
2361         ans += (double)timeconv->wMilliseconds / 1000;
2362         return ans;
2363 }
2364 #else
2365 #include <time.h>
2366 #include <sys/resource.h>
2367
2368 double getCpuTime(int type)
2369 {
2370 struct rusage used[1];
2371 struct timeval tv[1];
2372
2373         switch( type ) {
2374         case 0:
2375                 gettimeofday(tv, NULL);
2376                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2377
2378         case 1:
2379                 getrusage(RUSAGE_SELF, used);
2380                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2381
2382         case 2:
2383                 getrusage(RUSAGE_SELF, used);
2384                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2385         }
2386
2387         return 0;
2388 }
2389 #endif
2390
2391 void bt_poolaudit (BtMgr *mgr)
2392 {
2393 BtLatchSet *latch;
2394 uint slot = 0;
2395
2396         while( slot++ < mgr->latchdeployed ) {
2397                 latch = mgr->latchsets + slot;
2398
2399                 if( *latch->readwr->rin & MASK )
2400                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2401                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2402
2403                 if( *latch->access->rin & MASK )
2404                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2405                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2406
2407                 if( *latch->parent->rin & MASK )
2408                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2409                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2410
2411                 if( latch->pin & ~CLOCK_bit ) {
2412                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2413                         latch->pin = 0;
2414                 }
2415         }
2416 }
2417
2418 uint bt_latchaudit (BtDb *bt)
2419 {
2420 ushort idx, hashidx;
2421 uid next, page_no;
2422 BtLatchSet *latch;
2423 uint cnt = 0;
2424 BtKey *ptr;
2425
2426         if( *(ushort *)(bt->mgr->lock) )
2427                 fprintf(stderr, "Alloc page locked\n");
2428         *(ushort *)(bt->mgr->lock) = 0;
2429
2430         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2431                 latch = bt->mgr->latchsets + idx;
2432                 if( *latch->readwr->rin & MASK )
2433                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2434                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2435
2436                 if( *latch->access->rin & MASK )
2437                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2438                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2439
2440                 if( *latch->parent->rin & MASK )
2441                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2442                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2443
2444                 if( latch->pin ) {
2445                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2446                         latch->pin = 0;
2447                 }
2448         }
2449
2450         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2451           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
2452                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2453
2454           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
2455
2456           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2457                 latch = bt->mgr->latchsets + idx;
2458                 if( latch->pin )
2459                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2460           } while( idx = latch->next );
2461         }
2462
2463         page_no = LEAF_page;
2464
2465         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2466         uid off = page_no << bt->mgr->page_bits;
2467 #ifdef unix
2468           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2469 #else
2470         DWORD amt[1];
2471
2472           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2473
2474           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2475                 return bt->err = BTERR_map;
2476
2477           if( *amt <  bt->mgr->page_size )
2478                 return bt->err = BTERR_map;
2479 #endif
2480                 if( !bt->frame->free && !bt->frame->lvl )
2481                         cnt += bt->frame->act;
2482                 page_no++;
2483         }
2484                 
2485         cnt--;  // remove stopper key
2486         fprintf(stderr, " Total keys read %d\n", cnt);
2487
2488         bt_close (bt);
2489         return 0;
2490 }
2491
2492 typedef struct {
2493         char idx;
2494         char *type;
2495         char *infile;
2496         BtMgr *mgr;
2497         int num;
2498 } ThreadArg;
2499
2500 //  standalone program to index file of keys
2501 //  then list them onto std-out
2502
2503 #ifdef unix
2504 void *index_file (void *arg)
2505 #else
2506 uint __stdcall index_file (void *arg)
2507 #endif
2508 {
2509 int line = 0, found = 0, cnt = 0, unique;
2510 uid next, page_no = LEAF_page;  // start on first page of leaves
2511 unsigned char key[BT_maxkey];
2512 ThreadArg *args = arg;
2513 int ch, len = 0, slot;
2514 BtPageSet set[1];
2515 BtKey *ptr;
2516 BtVal *val;
2517 BtDb *bt;
2518 FILE *in;
2519
2520         bt = bt_open (args->mgr);
2521
2522         unique = (args->type[1] | 0x20) != 'd';
2523
2524         switch(args->type[0] | 0x20)
2525         {
2526         case 'a':
2527                 fprintf(stderr, "started latch mgr audit\n");
2528                 cnt = bt_latchaudit (bt);
2529                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2530                 break;
2531
2532         case 'p':
2533                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2534                 if( in = fopen (args->infile, "rb") )
2535                   while( ch = getc(in), ch != EOF )
2536                         if( ch == '\n' )
2537                         {
2538                           line++;
2539
2540                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2541                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2542                           len = 0;
2543                         }
2544                         else if( len < BT_maxkey )
2545                                 key[len++] = ch;
2546                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2547                 break;
2548
2549         case 'w':
2550                 fprintf(stderr, "started indexing for %s\n", args->infile);
2551                 if( in = fopen (args->infile, "rb") )
2552                   while( ch = getc(in), ch != EOF )
2553                         if( ch == '\n' )
2554                         {
2555                           line++;
2556
2557                           if( args->num == 1 )
2558                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2559
2560                           else if( args->num )
2561                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2562
2563                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2564                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2565                           len = 0;
2566                         }
2567                         else if( len < BT_maxkey )
2568                                 key[len++] = ch;
2569                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2570                 break;
2571
2572         case 'd':
2573                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2574                 if( in = fopen (args->infile, "rb") )
2575                   while( ch = getc(in), ch != EOF )
2576                         if( ch == '\n' )
2577                         {
2578                           line++;
2579                           if( args->num == 1 )
2580                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2581
2582                           else if( args->num )
2583                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2584
2585                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2586                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2587                           ptr = (BtKey*)(bt->key);
2588                           found++;
2589
2590                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2591                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2592                           len = 0;
2593                         }
2594                         else if( len < BT_maxkey )
2595                                 key[len++] = ch;
2596                 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2597                 break;
2598
2599         case 'f':
2600                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2601                 if( in = fopen (args->infile, "rb") )
2602                   while( ch = getc(in), ch != EOF )
2603                         if( ch == '\n' )
2604                         {
2605                           line++;
2606                           if( args->num == 1 )
2607                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2608
2609                           else if( args->num )
2610                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2611
2612                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2613                                 found++;
2614                           else if( bt->err )
2615                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2616                           len = 0;
2617                         }
2618                         else if( len < BT_maxkey )
2619                                 key[len++] = ch;
2620                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2621                 break;
2622
2623         case 's':
2624                 fprintf(stderr, "started scanning\n");
2625                 do {
2626                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2627                                 set->page = bt_mappage (bt, set->latch);
2628                         else
2629                                 fprintf(stderr, "unable to obtain latch"), exit(1);
2630                         bt_lockpage (BtLockRead, set->latch);
2631                         next = bt_getid (set->page->right);
2632
2633                         for( slot = 0; slot++ < set->page->cnt; )
2634                          if( next || slot < set->page->cnt )
2635                           if( !slotptr(set->page, slot)->dead ) {
2636                                 ptr = keyptr(set->page, slot);
2637                                 len = ptr->len;
2638
2639                             if( slotptr(set->page, slot)->type == Duplicate )
2640                                         len -= BtId;
2641
2642                                 fwrite (ptr->key, len, 1, stdout);
2643                                 val = valptr(set->page, slot);
2644                                 fwrite (val->value, val->len, 1, stdout);
2645                                 fputc ('\n', stdout);
2646                                 cnt++;
2647                            }
2648
2649                         bt_unlockpage (BtLockRead, set->latch);
2650                         bt_unpinlatch (set->latch);
2651                 } while( page_no = next );
2652
2653                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2654                 break;
2655
2656         case 'r':
2657                 fprintf(stderr, "started reverse scan\n");
2658                 if( slot = bt_lastkey (bt) )
2659                    while( slot = bt_prevkey (bt, slot) ) {
2660                         if( slotptr(bt->cursor, slot)->dead )
2661                           continue;
2662
2663                         ptr = keyptr(bt->cursor, slot);
2664                         len = ptr->len;
2665
2666                         if( slotptr(bt->cursor, slot)->type == Duplicate )
2667                                 len -= BtId;
2668
2669                         fwrite (ptr->key, len, 1, stdout);
2670                         val = valptr(bt->cursor, slot);
2671                         fwrite (val->value, val->len, 1, stdout);
2672                         fputc ('\n', stdout);
2673                         cnt++;
2674                   }
2675
2676                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2677                 break;
2678
2679         case 'c':
2680 #ifdef unix
2681                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2682 #endif
2683                 fprintf(stderr, "started counting\n");
2684                 page_no = LEAF_page;
2685
2686                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2687                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
2688                                 break;
2689
2690                         if( !bt->frame->free && !bt->frame->lvl )
2691                                 cnt += bt->frame->act;
2692
2693                         bt->reads++;
2694                         page_no++;
2695                 }
2696                 
2697                 cnt--;  // remove stopper key
2698                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2699                 break;
2700         }
2701
2702         bt_close (bt);
2703 #ifdef unix
2704         return NULL;
2705 #else
2706         return 0;
2707 #endif
2708 }
2709
2710 typedef struct timeval timer;
2711
2712 int main (int argc, char **argv)
2713 {
2714 int idx, cnt, len, slot, err;
2715 int segsize, bits = 16;
2716 double start, stop;
2717 #ifdef unix
2718 pthread_t *threads;
2719 #else
2720 HANDLE *threads;
2721 #endif
2722 ThreadArg *args;
2723 uint poolsize = 0;
2724 float elapsed;
2725 int num = 0;
2726 char key[1];
2727 BtMgr *mgr;
2728 BtKey *ptr;
2729 BtDb *bt;
2730
2731         if( argc < 3 ) {
2732                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2733                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2734                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
2735                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2736                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2737                 exit(0);
2738         }
2739
2740         start = getCpuTime(0);
2741
2742         if( argc > 3 )
2743                 bits = atoi(argv[3]);
2744
2745         if( argc > 4 )
2746                 poolsize = atoi(argv[4]);
2747
2748         if( !poolsize )
2749                 fprintf (stderr, "Warning: no mapped_pool\n");
2750
2751         if( argc > 5 )
2752                 num = atoi(argv[5]);
2753
2754         cnt = argc - 6;
2755 #ifdef unix
2756         threads = malloc (cnt * sizeof(pthread_t));
2757 #else
2758         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2759 #endif
2760         args = malloc (cnt * sizeof(ThreadArg));
2761
2762         mgr = bt_mgr ((argv[1]), bits, poolsize);
2763
2764         if( !mgr ) {
2765                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2766                 exit (1);
2767         }
2768
2769         //      fire off threads
2770
2771         for( idx = 0; idx < cnt; idx++ ) {
2772                 args[idx].infile = argv[idx + 6];
2773                 args[idx].type = argv[2];
2774                 args[idx].mgr = mgr;
2775                 args[idx].num = num;
2776                 args[idx].idx = idx;
2777 #ifdef unix
2778                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2779                         fprintf(stderr, "Error creating thread %d\n", err);
2780 #else
2781                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2782 #endif
2783         }
2784
2785         //      wait for termination
2786
2787 #ifdef unix
2788         for( idx = 0; idx < cnt; idx++ )
2789                 pthread_join (threads[idx], NULL);
2790 #else
2791         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2792
2793         for( idx = 0; idx < cnt; idx++ )
2794                 CloseHandle(threads[idx]);
2795
2796 #endif
2797         bt_poolaudit(mgr);
2798         bt_mgrclose (mgr);
2799
2800         elapsed = getCpuTime(0) - start;
2801         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2802         elapsed = getCpuTime(1);
2803         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2804         elapsed = getCpuTime(2);
2805         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2806 }
2807
2808 #endif  //STANDALONE