]> pd.if.org Git - btree/blob - threadskv8.c
44793a09d99365c97979c5bd79d44905ff555283
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and ACID batched key-value updates
9
10 // 26 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 /*
88 There are six lock types for each node in four independent sets: 
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
95 */
96
97 typedef enum{
98         BtLockAccess = 1,
99         BtLockDelete = 2,
100         BtLockRead   = 4,
101         BtLockWrite  = 8,
102         BtLockParent = 16,
103         BtLockAtomic = 32
104 } BtLock;
105
106 //      definition for phase-fair reader/writer lock implementation
107
108 typedef struct {
109         ushort rin[1];
110         ushort rout[1];
111         ushort ticket[1];
112         ushort serving[1];
113 } RWLock;
114
115 #define PHID 0x1
116 #define PRES 0x2
117 #define MASK 0x3
118 #define RINC 0x4
119
120 //      definition for spin latch implementation
121
122 // exclusive is set for write access
123 // share is count of read accessors
124 // grant write lock when share == 0
125
126 volatile typedef struct {
127         ushort exclusive:1;
128         ushort pending:1;
129         ushort share:14;
130 } BtSpinLatch;
131
132 #define XCL 1
133 #define PEND 2
134 #define BOTH 3
135 #define SHARE 4
136
137 //  hash table entries
138
139 typedef struct {
140         volatile uint slot;             // Latch table entry at head of chain
141         BtSpinLatch latch[1];
142 } BtHashEntry;
143
144 //      latch manager table structure
145
146 typedef struct {
147         uid page_no;                    // latch set page number
148         RWLock readwr[1];               // read/write page lock
149         RWLock access[1];               // Access Intent/Page delete
150         RWLock parent[1];               // Posting of fence key in parent
151         RWLock atomic[1];               // Atomic update in progress
152         uint split;                             // right split page atomic insert
153         uint entry;                             // entry slot in latch table
154         uint next;                              // next entry in hash table chain
155         uint prev;                              // prev entry in hash table chain
156         volatile ushort pin;    // number of outstanding threads
157         ushort dirty:1;                 // page in cache is dirty
158 #ifdef unix
159         pthread_t atomictid;    // pid holding atomic lock
160 #else
161         uint atomictid;                 // pid holding atomic lock
162 #endif
163 } BtLatchSet;
164
165 //      Define the length of the page record numbers
166
167 #define BtId 6
168
169 //      Page key slot definition.
170
171 //      Keys are marked dead, but remain on the page until
172 //      it cleanup is called. The fence key (highest key) for
173 //      a leaf page is always present, even after cleanup.
174
175 //      Slot types
176
177 //      In addition to the Unique keys that occupy slots
178 //      there are Librarian and Duplicate key
179 //      slots occupying the key slot array.
180
181 //      The Librarian slots are dead keys that
182 //      serve as filler, available to add new Unique
183 //      or Dup slots that are inserted into the B-tree.
184
185 //      The Duplicate slots have had their key bytes extended
186 //      by 6 bytes to contain a binary duplicate key uniqueifier.
187
188 typedef enum {
189         Unique,
190         Librarian,
191         Duplicate,
192         Delete
193 } BtSlotType;
194
195 typedef struct {
196         uint off:BT_maxbits;    // page offset for key start
197         uint type:3;                    // type of slot
198         uint dead:1;                    // set for deleted slot
199 } BtSlot;
200
201 //      The key structure occupies space at the upper end of
202 //      each page.  It's a length byte followed by the key
203 //      bytes.
204
205 typedef struct {
206         unsigned char len;              // this can be changed to a ushort or uint
207         unsigned char key[0];
208 } BtKey;
209
210 //      the value structure also occupies space at the upper
211 //      end of the page. Each key is immediately followed by a value.
212
213 typedef struct {
214         unsigned char len;              // this can be changed to a ushort or uint
215         unsigned char value[0];
216 } BtVal;
217
218 #define BT_maxkey       255             // maximum number of bytes in a key
219 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
220
221 //      The first part of an index page.
222 //      It is immediately followed
223 //      by the BtSlot array of keys.
224
225 //      note that this structure size
226 //      must be a multiple of 8 bytes
227 //      in order to place dups correctly.
228
229 typedef struct BtPage_ {
230         uint cnt;                                       // count of keys in page
231         uint act;                                       // count of active keys
232         uint min;                                       // next key offset
233         uint garbage;                           // page garbage in bytes
234         unsigned char bits:7;           // page size in bits
235         unsigned char free:1;           // page is on free chain
236         unsigned char lvl:7;            // level of page
237         unsigned char kill:1;           // page is being deleted
238         unsigned char right[BtId];      // page number to right
239         unsigned char left[BtId];       // page number to left
240         unsigned char filler[2];        // padding to multiple of 8
241 } *BtPage;
242
243 //  The loadpage interface object
244
245 typedef struct {
246         BtPage page;            // current page pointer
247         BtLatchSet *latch;      // current page latch set
248 } BtPageSet;
249
250 //      structure for latch manager on ALLOC_page
251
252 typedef struct {
253         struct BtPage_ alloc[1];        // next page_no in right ptr
254         unsigned long long dups[1];     // global duplicate key uniqueifier
255         unsigned char chain[BtId];      // head of free page_nos chain
256 } BtPageZero;
257
258 //      The object structure for Btree access
259
260 typedef struct {
261         uint page_size;                         // page size    
262         uint page_bits;                         // page size in bits    
263 #ifdef unix
264         int idx;
265 #else
266         HANDLE idx;
267 #endif
268         BtPageZero *pagezero;           // mapped allocation page
269         BtSpinLatch lock[1];            // allocation area lite latch
270         uint latchdeployed;                     // highest number of latch entries deployed
271         uint nlatchpage;                        // number of latch pages at BT_latch
272         uint latchtotal;                        // number of page latch entries
273         uint latchhash;                         // number of latch hash table slots
274         uint latchvictim;                       // next latch entry to examine
275         BtHashEntry *hashtable;         // the buffer pool hash table entries
276         BtLatchSet *latchsets;          // mapped latch set from buffer pool
277         unsigned char *pagepool;        // mapped to the buffer pool pages
278 #ifndef unix
279         HANDLE halloc;                          // allocation handle
280         HANDLE hpool;                           // buffer pool handle
281 #endif
282 } BtMgr;
283
284 typedef struct {
285         BtMgr *mgr;                             // buffer manager for thread
286         BtPage cursor;                  // cached frame for start/next (never mapped)
287         BtPage frame;                   // spare frame for the page split (never mapped)
288         uid cursor_page;                                // current cursor page number   
289         unsigned char *mem;                             // frame, cursor, page memory buffer
290         unsigned char key[BT_keyarray]; // last found complete key
291         int found;                              // last delete or insert was found
292         int err;                                // last error
293         int reads, writes;              // number of reads and writes from the btree
294 } BtDb;
295
296 typedef enum {
297         BTERR_ok = 0,
298         BTERR_struct,
299         BTERR_ovflw,
300         BTERR_lock,
301         BTERR_map,
302         BTERR_read,
303         BTERR_wrt,
304         BTERR_atomic
305 } BTERR;
306
307 #define CLOCK_bit 0x8000
308
309 // B-Tree functions
310 extern void bt_close (BtDb *bt);
311 extern BtDb *bt_open (BtMgr *mgr);
312 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
313 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
314 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
315 extern BtKey *bt_foundkey (BtDb *bt);
316 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
317 extern uint bt_nextkey   (BtDb *bt, uint slot);
318
319 //      manager functions
320 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
321 void bt_mgrclose (BtMgr *mgr);
322
323 //  Helper functions to return slot values
324 //      from the cursor page.
325
326 extern BtKey *bt_key (BtDb *bt, uint slot);
327 extern BtVal *bt_val (BtDb *bt, uint slot);
328
329 //  The page is allocated from low and hi ends.
330 //  The key slots are allocated from the bottom,
331 //      while the text and value of the key
332 //  are allocated from the top.  When the two
333 //  areas meet, the page is split into two.
334
335 //  A key consists of a length byte, two bytes of
336 //  index number (0 - 65535), and up to 253 bytes
337 //  of key value.
338
339 //  Associated with each key is a value byte string
340 //      containing any value desired.
341
342 //  The b-tree root is always located at page 1.
343 //      The first leaf page of level zero is always
344 //      located on page 2.
345
346 //      The b-tree pages are linked with next
347 //      pointers to facilitate enumerators,
348 //      and provide for concurrency.
349
350 //      When to root page fills, it is split in two and
351 //      the tree height is raised by a new root at page
352 //      one with two keys.
353
354 //      Deleted keys are marked with a dead bit until
355 //      page cleanup. The fence key for a leaf node is
356 //      always present
357
358 //  To achieve maximum concurrency one page is locked at a time
359 //  as the tree is traversed to find leaf key in question. The right
360 //  page numbers are used in cases where the page is being split,
361 //      or consolidated.
362
363 //  Page 0 is dedicated to lock for new page extensions,
364 //      and chains empty pages together for reuse. It also
365 //      contains the latch manager hash table.
366
367 //      The ParentModification lock on a node is obtained to serialize posting
368 //      or changing the fence key for a node.
369
370 //      Empty pages are chained together through the ALLOC page and reused.
371
372 //      Access macros to address slot and key values from the page
373 //      Page slots use 1 based indexing.
374
375 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
376 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
377 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
378
379 void bt_putid(unsigned char *dest, uid id)
380 {
381 int i = BtId;
382
383         while( i-- )
384                 dest[i] = (unsigned char)id, id >>= 8;
385 }
386
387 uid bt_getid(unsigned char *src)
388 {
389 uid id = 0;
390 int i;
391
392         for( i = 0; i < BtId; i++ )
393                 id <<= 8, id |= *src++; 
394
395         return id;
396 }
397
398 uid bt_newdup (BtDb *bt)
399 {
400 #ifdef unix
401         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
402 #else
403         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
404 #endif
405 }
406
407 //      Phase-Fair reader/writer lock implementation
408
409 void WriteLock (RWLock *lock)
410 {
411 ushort w, r, tix;
412
413 #ifdef unix
414         tix = __sync_fetch_and_add (lock->ticket, 1);
415 #else
416         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
417 #endif
418         // wait for our ticket to come up
419
420         while( tix != lock->serving[0] )
421 #ifdef unix
422                 sched_yield();
423 #else
424                 SwitchToThread ();
425 #endif
426
427         w = PRES | (tix & PHID);
428 #ifdef  unix
429         r = __sync_fetch_and_add (lock->rin, w);
430 #else
431         r = _InterlockedExchangeAdd16 (lock->rin, w);
432 #endif
433         while( r != *lock->rout )
434 #ifdef unix
435                 sched_yield();
436 #else
437                 SwitchToThread();
438 #endif
439 }
440
441 void WriteRelease (RWLock *lock)
442 {
443 #ifdef unix
444         __sync_fetch_and_and (lock->rin, ~MASK);
445 #else
446         _InterlockedAnd16 (lock->rin, ~MASK);
447 #endif
448         lock->serving[0]++;
449 }
450
451 void ReadLock (RWLock *lock)
452 {
453 ushort w;
454 #ifdef unix
455         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
456 #else
457         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
458 #endif
459         if( w )
460           while( w == (*lock->rin & MASK) )
461 #ifdef unix
462                 sched_yield ();
463 #else
464                 SwitchToThread ();
465 #endif
466 }
467
468 void ReadRelease (RWLock *lock)
469 {
470 #ifdef unix
471         __sync_fetch_and_add (lock->rout, RINC);
472 #else
473         _InterlockedExchangeAdd16 (lock->rout, RINC);
474 #endif
475 }
476
477 //      Spin Latch Manager
478
479 //      wait until write lock mode is clear
480 //      and add 1 to the share count
481
482 void bt_spinreadlock(BtSpinLatch *latch)
483 {
484 ushort prev;
485
486   do {
487 #ifdef unix
488         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
489 #else
490         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
491 #endif
492         //  see if exclusive request is granted or pending
493
494         if( !(prev & BOTH) )
495                 return;
496 #ifdef unix
497         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
498 #else
499         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
500 #endif
501 #ifdef  unix
502   } while( sched_yield(), 1 );
503 #else
504   } while( SwitchToThread(), 1 );
505 #endif
506 }
507
508 //      wait for other read and write latches to relinquish
509
510 void bt_spinwritelock(BtSpinLatch *latch)
511 {
512 ushort prev;
513
514   do {
515 #ifdef  unix
516         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
517 #else
518         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
519 #endif
520         if( !(prev & XCL) )
521           if( !(prev & ~BOTH) )
522                 return;
523           else
524 #ifdef unix
525                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
526 #else
527                 _InterlockedAnd16((ushort *)latch, ~XCL);
528 #endif
529 #ifdef  unix
530   } while( sched_yield(), 1 );
531 #else
532   } while( SwitchToThread(), 1 );
533 #endif
534 }
535
536 //      try to obtain write lock
537
538 //      return 1 if obtained,
539 //              0 otherwise
540
541 int bt_spinwritetry(BtSpinLatch *latch)
542 {
543 ushort prev;
544
545 #ifdef  unix
546         prev = __sync_fetch_and_or((ushort *)latch, XCL);
547 #else
548         prev = _InterlockedOr16((ushort *)latch, XCL);
549 #endif
550         //      take write access if all bits are clear
551
552         if( !(prev & XCL) )
553           if( !(prev & ~BOTH) )
554                 return 1;
555           else
556 #ifdef unix
557                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
558 #else
559                 _InterlockedAnd16((ushort *)latch, ~XCL);
560 #endif
561         return 0;
562 }
563
564 //      clear write mode
565
566 void bt_spinreleasewrite(BtSpinLatch *latch)
567 {
568 #ifdef unix
569         __sync_fetch_and_and((ushort *)latch, ~BOTH);
570 #else
571         _InterlockedAnd16((ushort *)latch, ~BOTH);
572 #endif
573 }
574
575 //      decrement reader count
576
577 void bt_spinreleaseread(BtSpinLatch *latch)
578 {
579 #ifdef unix
580         __sync_fetch_and_add((ushort *)latch, -SHARE);
581 #else
582         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
583 #endif
584 }
585
586 //      read page from permanent location in Btree file
587
588 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
589 {
590 off64_t off = page_no << mgr->page_bits;
591
592 #ifdef unix
593         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
594                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
595                 return BTERR_read;
596         }
597 #else
598 OVERLAPPED ovl[1];
599 uint amt[1];
600
601         memset (ovl, 0, sizeof(OVERLAPPED));
602         ovl->Offset = off;
603         ovl->OffsetHigh = off >> 32;
604
605         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
606                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
607                 return BTERR_read;
608         }
609         if( *amt <  mgr->page_size ) {
610                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
611                 return BTERR_read;
612         }
613 #endif
614         return 0;
615 }
616
617 //      write page to permanent location in Btree file
618 //      clear the dirty bit
619
620 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
621 {
622 off64_t off = page_no << mgr->page_bits;
623
624 #ifdef unix
625         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
626                 return BTERR_wrt;
627 #else
628 OVERLAPPED ovl[1];
629 uint amt[1];
630
631         memset (ovl, 0, sizeof(OVERLAPPED));
632         ovl->Offset = off;
633         ovl->OffsetHigh = off >> 32;
634
635         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
636                 return BTERR_wrt;
637
638         if( *amt <  mgr->page_size )
639                 return BTERR_wrt;
640 #endif
641         return 0;
642 }
643
644 //      link latch table entry into head of latch hash table
645
646 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
647 {
648 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
649 BtLatchSet *latch = bt->mgr->latchsets + slot;
650
651         if( latch->next = bt->mgr->hashtable[hashidx].slot )
652                 bt->mgr->latchsets[latch->next].prev = slot;
653
654         bt->mgr->hashtable[hashidx].slot = slot;
655         memset (&latch->atomictid, 0, sizeof(latch->atomictid));
656         latch->page_no = page_no;
657         latch->entry = slot;
658         latch->split = 0;
659         latch->prev = 0;
660         latch->pin = 1;
661
662         if( loadit )
663           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
664                 return bt->err;
665           else
666                 bt->reads++;
667
668         return bt->err = 0;
669 }
670
671 //      set CLOCK bit in latch
672 //      decrement pin count
673
674 void bt_unpinlatch (BtLatchSet *latch)
675 {
676 #ifdef unix
677         if( ~latch->pin & CLOCK_bit )
678                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
679         __sync_fetch_and_add(&latch->pin, -1);
680 #else
681         if( ~latch->pin & CLOCK_bit )
682                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
683         _InterlockedDecrement16 (&latch->pin);
684 #endif
685 }
686
687 //  return the btree cached page address
688
689 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
690 {
691 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
692
693         return page;
694 }
695
696 //      find existing latchset or inspire new one
697 //      return with latchset pinned
698
699 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
700 {
701 uint hashidx = page_no % bt->mgr->latchhash;
702 BtLatchSet *latch;
703 uint slot, idx;
704 uint lvl, cnt;
705 off64_t off;
706 uint amt[1];
707 BtPage page;
708
709   //  try to find our entry
710
711   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
712
713   if( slot = bt->mgr->hashtable[hashidx].slot ) do
714   {
715         latch = bt->mgr->latchsets + slot;
716         if( page_no == latch->page_no )
717                 break;
718   } while( slot = latch->next );
719
720   //  found our entry
721   //  increment clock
722
723   if( slot ) {
724         latch = bt->mgr->latchsets + slot;
725 #ifdef unix
726         __sync_fetch_and_add(&latch->pin, 1);
727 #else
728         _InterlockedIncrement16 (&latch->pin);
729 #endif
730         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
731         return latch;
732   }
733
734         //  see if there are any unused pool entries
735 #ifdef unix
736         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
737 #else
738         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
739 #endif
740
741         if( slot < bt->mgr->latchtotal ) {
742                 latch = bt->mgr->latchsets + slot;
743                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
744                         return NULL;
745                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
746                 return latch;
747         }
748
749 #ifdef unix
750         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
751 #else
752         _InterlockedDecrement (&bt->mgr->latchdeployed);
753 #endif
754   //  find and reuse previous entry on victim
755
756   while( 1 ) {
757 #ifdef unix
758         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
759 #else
760         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
761 #endif
762         // try to get write lock on hash chain
763         //      skip entry if not obtained
764         //      or has outstanding pins
765
766         slot %= bt->mgr->latchtotal;
767
768         if( !slot )
769                 continue;
770
771         latch = bt->mgr->latchsets + slot;
772         idx = latch->page_no % bt->mgr->latchhash;
773
774         //      see we are on same chain as hashidx
775
776         if( idx == hashidx )
777                 continue;
778
779         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
780                 continue;
781
782         //  skip this slot if it is pinned
783         //      or the CLOCK bit is set
784
785         if( latch->pin ) {
786           if( latch->pin & CLOCK_bit ) {
787 #ifdef unix
788                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
789 #else
790                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
791 #endif
792           }
793           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
794           continue;
795         }
796
797         //  update permanent page area in btree from buffer pool
798
799         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
800
801         if( latch->dirty )
802           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
803                 return NULL;
804           else
805                 latch->dirty = 0, bt->writes++;
806
807         //  unlink our available slot from its hash chain
808
809         if( latch->prev )
810                 bt->mgr->latchsets[latch->prev].next = latch->next;
811         else
812                 bt->mgr->hashtable[idx].slot = latch->next;
813
814         if( latch->next )
815                 bt->mgr->latchsets[latch->next].prev = latch->prev;
816
817         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
818
819         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
820                 return NULL;
821
822         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
823         return latch;
824   }
825 }
826
827 void bt_mgrclose (BtMgr *mgr)
828 {
829 BtLatchSet *latch;
830 uint num = 0;
831 BtPage page;
832 uint slot;
833
834         //      flush dirty pool pages to the btree
835
836         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
837                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
838                 latch = mgr->latchsets + slot;
839
840                 if( latch->dirty ) {
841                         bt_writepage(mgr, page, latch->page_no);
842                         latch->dirty = 0, num++;
843                 }
844 //              madvise (page, mgr->page_size, MADV_DONTNEED);
845         }
846
847         fprintf(stderr, "%d buffer pool pages flushed\n", num);
848
849 #ifdef unix
850         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
851         munmap (mgr->pagezero, mgr->page_size);
852 #else
853         FlushViewOfFile(mgr->pagezero, 0);
854         UnmapViewOfFile(mgr->pagezero);
855         UnmapViewOfFile(mgr->hashtable);
856         CloseHandle(mgr->halloc);
857         CloseHandle(mgr->hpool);
858 #endif
859 #ifdef unix
860         close (mgr->idx);
861         free (mgr);
862 #else
863         FlushFileBuffers(mgr->idx);
864         CloseHandle(mgr->idx);
865         GlobalFree (mgr);
866 #endif
867 }
868
869 //      close and release memory
870
871 void bt_close (BtDb *bt)
872 {
873 #ifdef unix
874         if( bt->mem )
875                 free (bt->mem);
876 #else
877         if( bt->mem)
878                 VirtualFree (bt->mem, 0, MEM_RELEASE);
879 #endif
880         free (bt);
881 }
882
883 //  open/create new btree buffer manager
884
885 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
886 //              size of page pool (e.g. 262144)
887
888 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
889 {
890 uint lvl, attr, last, slot, idx;
891 uint nlatchpage, latchhash;
892 unsigned char value[BtId];
893 int flag, initit = 0;
894 BtPageZero *pagezero;
895 off64_t size;
896 uint amt[1];
897 BtMgr* mgr;
898 BtKey* key;
899 BtVal *val;
900
901         // determine sanity of page size and buffer pool
902
903         if( bits > BT_maxbits )
904                 bits = BT_maxbits;
905         else if( bits < BT_minbits )
906                 bits = BT_minbits;
907
908         if( nodemax < 16 ) {
909                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
910                 return NULL;
911         }
912
913 #ifdef unix
914         mgr = calloc (1, sizeof(BtMgr));
915
916         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
917
918         if( mgr->idx == -1 ) {
919                 fprintf (stderr, "Unable to open btree file\n");
920                 return free(mgr), NULL;
921         }
922 #else
923         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
924         attr = FILE_ATTRIBUTE_NORMAL;
925         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
926
927         if( mgr->idx == INVALID_HANDLE_VALUE )
928                 return GlobalFree(mgr), NULL;
929 #endif
930
931 #ifdef unix
932         pagezero = valloc (BT_maxpage);
933         *amt = 0;
934
935         // read minimum page size to get root info
936         //      to support raw disk partition files
937         //      check if bits == 0 on the disk.
938
939         if( size = lseek (mgr->idx, 0L, 2) )
940                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
941                         if( pagezero->alloc->bits )
942                                 bits = pagezero->alloc->bits;
943                         else
944                                 initit = 1;
945                 else
946                         return free(mgr), free(pagezero), NULL;
947         else
948                 initit = 1;
949 #else
950         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
951         size = GetFileSize(mgr->idx, amt);
952
953         if( size || *amt ) {
954                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
955                         return bt_mgrclose (mgr), NULL;
956                 bits = pagezero->alloc->bits;
957         } else
958                 initit = 1;
959 #endif
960
961         mgr->page_size = 1 << bits;
962         mgr->page_bits = bits;
963
964         //  calculate number of latch hash table entries
965
966         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
967         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
968
969         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
970         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
971         mgr->latchtotal = nodemax;
972
973         if( !initit )
974                 goto mgrlatch;
975
976         // initialize an empty b-tree with latch page, root page, page of leaves
977         // and page(s) of latches and page pool cache
978
979         memset (pagezero, 0, 1 << bits);
980         pagezero->alloc->bits = mgr->page_bits;
981         bt_putid(pagezero->alloc->right, MIN_lvl+1);
982
983         //  initialize left-most LEAF page in
984         //      alloc->left.
985
986         bt_putid (pagezero->alloc->left, LEAF_page);
987
988         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
989                 fprintf (stderr, "Unable to create btree page zero\n");
990                 return bt_mgrclose (mgr), NULL;
991         }
992
993         memset (pagezero, 0, 1 << bits);
994         pagezero->alloc->bits = mgr->page_bits;
995
996         for( lvl=MIN_lvl; lvl--; ) {
997                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
998                 key = keyptr(pagezero->alloc, 1);
999                 key->len = 2;           // create stopper key
1000                 key->key[0] = 0xff;
1001                 key->key[1] = 0xff;
1002
1003                 bt_putid(value, MIN_lvl - lvl + 1);
1004                 val = valptr(pagezero->alloc, 1);
1005                 val->len = lvl ? BtId : 0;
1006                 memcpy (val->value, value, val->len);
1007
1008                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1009                 pagezero->alloc->lvl = lvl;
1010                 pagezero->alloc->cnt = 1;
1011                 pagezero->alloc->act = 1;
1012
1013                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1014                         fprintf (stderr, "Unable to create btree page zero\n");
1015                         return bt_mgrclose (mgr), NULL;
1016                 }
1017         }
1018
1019 mgrlatch:
1020 #ifdef unix
1021         free (pagezero);
1022 #else
1023         VirtualFree (pagezero, 0, MEM_RELEASE);
1024 #endif
1025 #ifdef unix
1026         // mlock the pagezero page
1027
1028         flag = PROT_READ | PROT_WRITE;
1029         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1030         if( mgr->pagezero == MAP_FAILED ) {
1031                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1032                 return bt_mgrclose (mgr), NULL;
1033         }
1034         mlock (mgr->pagezero, mgr->page_size);
1035
1036         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1037         if( mgr->hashtable == MAP_FAILED ) {
1038                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1039                 return bt_mgrclose (mgr), NULL;
1040         }
1041 #else
1042         flag = PAGE_READWRITE;
1043         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1044         if( !mgr->halloc ) {
1045                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1046                 return bt_mgrclose (mgr), NULL;
1047         }
1048
1049         flag = FILE_MAP_WRITE;
1050         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1051         if( !mgr->pagezero ) {
1052                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1053                 return bt_mgrclose (mgr), NULL;
1054         }
1055
1056         flag = PAGE_READWRITE;
1057         size = (uid)mgr->nlatchpage << mgr->page_bits;
1058         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1059         if( !mgr->hpool ) {
1060                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1061                 return bt_mgrclose (mgr), NULL;
1062         }
1063
1064         flag = FILE_MAP_WRITE;
1065         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1066         if( !mgr->hashtable ) {
1067                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1068                 return bt_mgrclose (mgr), NULL;
1069         }
1070 #endif
1071
1072         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1073         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1074
1075         return mgr;
1076 }
1077
1078 //      open BTree access method
1079 //      based on buffer manager
1080
1081 BtDb *bt_open (BtMgr *mgr)
1082 {
1083 BtDb *bt = malloc (sizeof(*bt));
1084
1085         memset (bt, 0, sizeof(*bt));
1086         bt->mgr = mgr;
1087 #ifdef unix
1088         bt->mem = valloc (2 *mgr->page_size);
1089 #else
1090         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1091 #endif
1092         bt->frame = (BtPage)bt->mem;
1093         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1094         return bt;
1095 }
1096
1097 //  compare two keys, return > 0, = 0, or < 0
1098 //  =0: keys are same
1099 //  -1: key2 > key1
1100 //  +1: key2 < key1
1101 //  as the comparison value
1102
1103 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1104 {
1105 uint len1 = key1->len;
1106 int ans;
1107
1108         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1109                 return ans;
1110
1111         if( len1 > len2 )
1112                 return 1;
1113         if( len1 < len2 )
1114                 return -1;
1115
1116         return 0;
1117 }
1118
1119 // place write, read, or parent lock on requested page_no.
1120
1121 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1122 {
1123         switch( mode ) {
1124         case BtLockRead:
1125                 ReadLock (latch->readwr);
1126                 break;
1127         case BtLockWrite:
1128                 WriteLock (latch->readwr);
1129                 break;
1130         case BtLockAccess:
1131                 ReadLock (latch->access);
1132                 break;
1133         case BtLockDelete:
1134                 WriteLock (latch->access);
1135                 break;
1136         case BtLockParent:
1137                 WriteLock (latch->parent);
1138                 break;
1139         case BtLockAtomic:
1140                 WriteLock (latch->atomic);
1141                 break;
1142         case BtLockAtomic | BtLockRead:
1143                 WriteLock (latch->atomic);
1144                 ReadLock (latch->readwr);
1145                 break;
1146         }
1147 }
1148
1149 // remove write, read, or parent lock on requested page
1150
1151 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1152 {
1153         switch( mode ) {
1154         case BtLockRead:
1155                 ReadRelease (latch->readwr);
1156                 break;
1157         case BtLockWrite:
1158                 WriteRelease (latch->readwr);
1159                 break;
1160         case BtLockAccess:
1161                 ReadRelease (latch->access);
1162                 break;
1163         case BtLockDelete:
1164                 WriteRelease (latch->access);
1165                 break;
1166         case BtLockParent:
1167                 WriteRelease (latch->parent);
1168                 break;
1169         case BtLockAtomic:
1170                 memset (&latch->atomictid, 0, sizeof(latch->atomictid));
1171                 WriteRelease (latch->atomic);
1172                 break;
1173         case BtLockAtomic | BtLockRead:
1174                 ReadRelease (latch->readwr);
1175                 memset (&latch->atomictid, 0, sizeof(latch->atomictid));
1176                 WriteRelease (latch->atomic);
1177                 break;
1178         }
1179 }
1180
1181 //      allocate a new page
1182 //      return with page latched, but unlocked.
1183
1184 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1185 {
1186 uid page_no;
1187 int blk;
1188
1189         //      lock allocation page
1190
1191         bt_spinwritelock(bt->mgr->lock);
1192
1193         // use empty chain first
1194         // else allocate empty page
1195
1196         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1197                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1198                         set->page = bt_mappage (bt, set->latch);
1199                 else
1200                         return bt->err = BTERR_struct, -1;
1201
1202                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1203                 bt_spinreleasewrite(bt->mgr->lock);
1204
1205                 memcpy (set->page, contents, bt->mgr->page_size);
1206                 set->latch->dirty = 1;
1207                 return 0;
1208         }
1209
1210         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1211         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1212
1213         // unlock allocation latch
1214
1215         bt_spinreleasewrite(bt->mgr->lock);
1216
1217         //      don't load cache from btree page
1218
1219         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1220                 set->page = bt_mappage (bt, set->latch);
1221         else
1222                 return bt->err = BTERR_struct;
1223
1224         memcpy (set->page, contents, bt->mgr->page_size);
1225         set->latch->dirty = 1;
1226         return 0;
1227 }
1228
1229 //  find slot in page for given key at a given level
1230
1231 int bt_findslot (BtPage page, unsigned char *key, uint len)
1232 {
1233 uint diff, higher = page->cnt, low = 1, slot;
1234 uint good = 0;
1235
1236         //        make stopper key an infinite fence value
1237
1238         if( bt_getid (page->right) )
1239                 higher++;
1240         else
1241                 good++;
1242
1243         //      low is the lowest candidate.
1244         //  loop ends when they meet
1245
1246         //  higher is already
1247         //      tested as .ge. the passed key.
1248
1249         while( diff = higher - low ) {
1250                 slot = low + ( diff >> 1 );
1251                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1252                         low = slot + 1;
1253                 else
1254                         higher = slot, good++;
1255         }
1256
1257         //      return zero if key is on right link page
1258
1259         return good ? higher : 0;
1260 }
1261
1262 //  find and load page at given level for given key
1263 //      leave page rd or wr locked as requested
1264
1265 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1266 {
1267 uid page_no = ROOT_page, prevpage = 0;
1268 uint drill = 0xff, slot;
1269 BtLatchSet *prevlatch;
1270 uint mode, prevmode;
1271
1272   //  start at root of btree and drill down
1273
1274   do {
1275         // determine lock mode of drill level
1276         mode = (drill == lvl) ? lock : BtLockRead; 
1277
1278         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1279           return 0;
1280
1281         // obtain access lock using lock chaining with Access mode
1282
1283         if( page_no > ROOT_page )
1284           bt_lockpage(BtLockAccess, set->latch);
1285
1286         set->page = bt_mappage (bt, set->latch);
1287
1288         //      release & unpin parent or left sibling page
1289
1290         if( prevpage ) {
1291           bt_unlockpage(prevmode, prevlatch);
1292           bt_unpinlatch (prevlatch);
1293           prevpage = 0;
1294         }
1295
1296         //      skip Atomic lock on leaf page we need to slide over
1297
1298         if( !drill )
1299          if( mode & BtLockAtomic )
1300           if( pthread_equal (set->latch->atomictid, pthread_self()) )
1301                 mode &= ~BtLockAtomic;
1302
1303         // obtain mode lock using lock chaining through AccessLock
1304
1305         bt_lockpage(mode, set->latch);
1306
1307         if( mode & BtLockAtomic )
1308                 set->latch->atomictid = pthread_self();
1309
1310         if( set->page->free )
1311                 return bt->err = BTERR_struct, 0;
1312
1313         if( page_no > ROOT_page )
1314           bt_unlockpage(BtLockAccess, set->latch);
1315
1316         // re-read and re-lock root after determining actual level of root
1317
1318         if( set->page->lvl != drill) {
1319                 if( set->latch->page_no != ROOT_page )
1320                         return bt->err = BTERR_struct, 0;
1321                         
1322                 drill = set->page->lvl;
1323
1324                 if( lock != BtLockRead && drill == lvl ) {
1325                   bt_unlockpage(mode, set->latch);
1326                   bt_unpinlatch (set->latch);
1327                   continue;
1328                 }
1329         }
1330
1331         prevpage = set->latch->page_no;
1332         prevlatch = set->latch;
1333         prevmode = mode;
1334
1335         //  find key on page at this level
1336         //  and descend to requested level
1337
1338         if( set->page->kill )
1339           goto slideright;
1340
1341         if( slot = bt_findslot (set->page, key, len) ) {
1342           if( drill == lvl )
1343                 return slot;
1344
1345           while( slotptr(set->page, slot)->dead )
1346                 if( slot++ < set->page->cnt )
1347                         continue;
1348                 else
1349                         goto slideright;
1350
1351           page_no = bt_getid(valptr(set->page, slot)->value);
1352           drill--;
1353           continue;
1354         }
1355
1356         //  or slide right into next page
1357
1358 slideright:
1359         page_no = bt_getid(set->page->right);
1360
1361   } while( page_no );
1362
1363   // return error on end of right chain
1364
1365   bt->err = BTERR_struct;
1366   return 0;     // return error
1367 }
1368
1369 //      return page to free list
1370 //      page must be delete & write locked
1371
1372 void bt_freepage (BtDb *bt, BtPageSet *set)
1373 {
1374         //      lock allocation page
1375
1376         bt_spinwritelock (bt->mgr->lock);
1377
1378         //      store chain
1379
1380         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1381         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1382         set->latch->dirty = 1;
1383         set->page->free = 1;
1384
1385         // unlock released page
1386
1387         bt_unlockpage (BtLockDelete, set->latch);
1388         bt_unlockpage (BtLockWrite, set->latch);
1389
1390         // unlock allocation page
1391
1392         bt_spinreleasewrite (bt->mgr->lock);
1393 }
1394
1395 //      a fence key was deleted from a page
1396 //      push new fence value upwards
1397
1398 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1399 {
1400 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1401 unsigned char value[BtId];
1402 BtKey* ptr;
1403 uint idx;
1404
1405         //      remove the old fence value
1406
1407         ptr = keyptr(set->page, set->page->cnt);
1408         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1409         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1410         set->latch->dirty = 1;
1411
1412         //  cache new fence value
1413
1414         ptr = keyptr(set->page, set->page->cnt);
1415         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1416
1417         bt_lockpage (BtLockParent, set->latch);
1418         bt_unlockpage (BtLockWrite, set->latch);
1419
1420         //      insert new (now smaller) fence key
1421
1422         bt_putid (value, set->latch->page_no);
1423         ptr = (BtKey*)leftkey;
1424
1425         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1426           return bt->err;
1427
1428         //      now delete old fence key
1429
1430         ptr = (BtKey*)rightkey;
1431
1432         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1433                 return bt->err;
1434
1435         bt_unlockpage (BtLockParent, set->latch);
1436         bt_unpinlatch(set->latch);
1437         return 0;
1438 }
1439
1440 //      root has a single child
1441 //      collapse a level from the tree
1442
1443 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1444 {
1445 BtPageSet child[1];
1446 uid page_no;
1447 uint idx;
1448
1449   // find the child entry and promote as new root contents
1450
1451   do {
1452         for( idx = 0; idx++ < root->page->cnt; )
1453           if( !slotptr(root->page, idx)->dead )
1454                 break;
1455
1456         page_no = bt_getid (valptr(root->page, idx)->value);
1457
1458         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1459                 child->page = bt_mappage (bt, child->latch);
1460         else
1461                 return bt->err;
1462
1463         bt_lockpage (BtLockDelete, child->latch);
1464         bt_lockpage (BtLockWrite, child->latch);
1465
1466         memcpy (root->page, child->page, bt->mgr->page_size);
1467         root->latch->dirty = 1;
1468
1469         bt_freepage (bt, child);
1470         bt_unpinlatch (child->latch);
1471
1472   } while( root->page->lvl > 1 && root->page->act == 1 );
1473
1474   bt_unlockpage (BtLockWrite, root->latch);
1475   bt_unpinlatch (root->latch);
1476   return 0;
1477 }
1478
1479 //  delete a page and manage keys
1480 //  call with page writelocked
1481 //      returns with page unpinned
1482
1483 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1484 {
1485 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1486 unsigned char value[BtId];
1487 uint lvl = set->page->lvl;
1488 BtPageSet right[1];
1489 uid page_no;
1490 BtKey *ptr;
1491
1492         //      cache copy of fence key
1493         //      to post in parent
1494
1495         ptr = keyptr(set->page, set->page->cnt);
1496         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1497
1498         //      obtain lock on right page
1499
1500         page_no = bt_getid(set->page->right);
1501
1502         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1503                 right->page = bt_mappage (bt, right->latch);
1504         else
1505                 return 0;
1506
1507         bt_lockpage (BtLockWrite, right->latch);
1508
1509         // cache copy of key to update
1510
1511         ptr = keyptr(right->page, right->page->cnt);
1512         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1513
1514         if( right->page->kill )
1515                 return bt->err = BTERR_struct;
1516
1517         // pull contents of right peer into our empty page
1518
1519         memcpy (set->page, right->page, bt->mgr->page_size);
1520         set->latch->dirty = 1;
1521
1522         // mark right page deleted and point it to left page
1523         //      until we can post parent updates that remove access
1524         //      to the deleted page.
1525
1526         bt_putid (right->page->right, set->latch->page_no);
1527         right->latch->dirty = 1;
1528         right->page->kill = 1;
1529
1530         bt_lockpage (BtLockParent, right->latch);
1531         bt_unlockpage (BtLockWrite, right->latch);
1532
1533         bt_lockpage (BtLockParent, set->latch);
1534         bt_unlockpage (BtLockWrite, set->latch);
1535
1536         // redirect higher key directly to our new node contents
1537
1538         bt_putid (value, set->latch->page_no);
1539         ptr = (BtKey*)higherfence;
1540
1541         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1542           return bt->err;
1543
1544         //      delete old lower key to our node
1545
1546         ptr = (BtKey*)lowerfence;
1547
1548         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1549           return bt->err;
1550
1551         //      obtain delete and write locks to right node
1552
1553         bt_unlockpage (BtLockParent, right->latch);
1554         bt_lockpage (BtLockDelete, right->latch);
1555         bt_lockpage (BtLockWrite, right->latch);
1556         bt_freepage (bt, right);
1557         bt_unpinlatch (right->latch);
1558
1559         bt_unlockpage (BtLockParent, set->latch);
1560         bt_unpinlatch (set->latch);
1561         bt->found = 1;
1562         return 0;
1563 }
1564
1565 //  find and delete key on page by marking delete flag bit
1566 //  if page becomes empty, delete it from the btree
1567
1568 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1569 {
1570 uint slot, idx, found, fence;
1571 BtPageSet set[1];
1572 BtKey *ptr;
1573 BtVal *val;
1574
1575         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1576                 ptr = keyptr(set->page, slot);
1577         else
1578                 return bt->err;
1579
1580         // if librarian slot, advance to real slot
1581
1582         if( slotptr(set->page, slot)->type == Librarian )
1583                 ptr = keyptr(set->page, ++slot);
1584
1585         fence = slot == set->page->cnt;
1586
1587         // if key is found delete it, otherwise ignore request
1588
1589         if( found = !keycmp (ptr, key, len) )
1590           if( found = slotptr(set->page, slot)->dead == 0 ) {
1591                 val = valptr(set->page,slot);
1592                 slotptr(set->page, slot)->dead = 1;
1593                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1594                 set->page->act--;
1595
1596                 // collapse empty slots beneath the fence
1597
1598                 while( idx = set->page->cnt - 1 )
1599                   if( slotptr(set->page, idx)->dead ) {
1600                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1601                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1602                   } else
1603                         break;
1604           }
1605
1606         //      did we delete a fence key in an upper level?
1607
1608         if( found && lvl && set->page->act && fence )
1609           if( bt_fixfence (bt, set, lvl) )
1610                 return bt->err;
1611           else
1612                 return bt->found = found, 0;
1613
1614         //      do we need to collapse root?
1615
1616         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1617           if( bt_collapseroot (bt, set) )
1618                 return bt->err;
1619           else
1620                 return bt->found = found, 0;
1621
1622         //      delete empty page
1623
1624         if( !set->page->act )
1625                 return bt_deletepage (bt, set);
1626
1627         set->latch->dirty = 1;
1628         bt_unlockpage(BtLockWrite, set->latch);
1629         bt_unpinlatch (set->latch);
1630         return bt->found = found, 0;
1631 }
1632
1633 BtKey *bt_foundkey (BtDb *bt)
1634 {
1635         return (BtKey*)(bt->key);
1636 }
1637
1638 //      advance to next slot
1639
1640 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1641 {
1642 BtLatchSet *prevlatch;
1643 uid page_no;
1644
1645         if( slot < set->page->cnt )
1646                 return slot + 1;
1647
1648         prevlatch = set->latch;
1649
1650         if( page_no = bt_getid(set->page->right) )
1651           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1652                 set->page = bt_mappage (bt, set->latch);
1653           else
1654                 return 0;
1655         else
1656           return bt->err = BTERR_struct, 0;
1657
1658         // obtain access lock using lock chaining with Access mode
1659
1660         bt_lockpage(BtLockAccess, set->latch);
1661
1662         bt_unlockpage(BtLockRead, prevlatch);
1663         bt_unpinlatch (prevlatch);
1664
1665         bt_lockpage(BtLockRead, set->latch);
1666         bt_unlockpage(BtLockAccess, set->latch);
1667         return 1;
1668 }
1669
1670 //      find unique key or first duplicate key in
1671 //      leaf level and return number of value bytes
1672 //      or (-1) if not found.  Setup key for bt_foundkey
1673
1674 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1675 {
1676 BtPageSet set[1];
1677 uint len, slot;
1678 int ret = -1;
1679 BtKey *ptr;
1680 BtVal *val;
1681
1682   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1683    do {
1684         ptr = keyptr(set->page, slot);
1685
1686         //      skip librarian slot place holder
1687
1688         if( slotptr(set->page, slot)->type == Librarian )
1689                 ptr = keyptr(set->page, ++slot);
1690
1691         //      return actual key found
1692
1693         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1694         len = ptr->len;
1695
1696         if( slotptr(set->page, slot)->type == Duplicate )
1697                 len -= BtId;
1698
1699         //      not there if we reach the stopper key
1700
1701         if( slot == set->page->cnt )
1702           if( !bt_getid (set->page->right) )
1703                 break;
1704
1705         // if key exists, return >= 0 value bytes copied
1706         //      otherwise return (-1)
1707
1708         if( slotptr(set->page, slot)->dead )
1709                 continue;
1710
1711         if( keylen == len )
1712           if( !memcmp (ptr->key, key, len) ) {
1713                 val = valptr (set->page,slot);
1714                 if( valmax > val->len )
1715                         valmax = val->len;
1716                 memcpy (value, val->value, valmax);
1717                 ret = valmax;
1718           }
1719
1720         break;
1721
1722    } while( slot = bt_findnext (bt, set, slot) );
1723
1724   bt_unlockpage (BtLockRead, set->latch);
1725   bt_unpinlatch (set->latch);
1726   return ret;
1727 }
1728
1729 //      check page for space available,
1730 //      clean if necessary and return
1731 //      0 - page needs splitting
1732 //      >0  new slot value
1733
1734 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1735 {
1736 uint nxt = bt->mgr->page_size;
1737 BtPage page = set->page;
1738 uint cnt = 0, idx = 0;
1739 uint max = page->cnt;
1740 uint newslot = max;
1741 BtKey *key;
1742 BtVal *val;
1743
1744         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1745                 return slot;
1746
1747         //      skip cleanup and proceed to split
1748         //      if there's not enough garbage
1749         //      to bother with.
1750
1751         if( page->garbage < nxt / 5 )
1752                 return 0;
1753
1754         memcpy (bt->frame, page, bt->mgr->page_size);
1755
1756         // skip page info and set rest of page to zero
1757
1758         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1759         set->latch->dirty = 1;
1760         page->garbage = 0;
1761         page->act = 0;
1762
1763         // clean up page first by
1764         // removing deleted keys
1765
1766         while( cnt++ < max ) {
1767                 if( cnt == slot )
1768                         newslot = idx + 2;
1769                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1770                         continue;
1771
1772                 // copy the value across
1773
1774                 val = valptr(bt->frame, cnt);
1775                 nxt -= val->len + sizeof(BtVal);
1776                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1777
1778                 // copy the key across
1779
1780                 key = keyptr(bt->frame, cnt);
1781                 nxt -= key->len + sizeof(BtKey);
1782                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1783
1784                 // make a librarian slot
1785
1786                 if( idx ) {
1787                         slotptr(page, ++idx)->off = nxt;
1788                         slotptr(page, idx)->type = Librarian;
1789                         slotptr(page, idx)->dead = 1;
1790                 }
1791
1792                 // set up the slot
1793
1794                 slotptr(page, ++idx)->off = nxt;
1795                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1796
1797                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1798                         page->act++;
1799         }
1800
1801         page->min = nxt;
1802         page->cnt = idx;
1803
1804         //      see if page has enough space now, or does it need splitting?
1805
1806         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1807                 return newslot;
1808
1809         return 0;
1810 }
1811
1812 // split the root and raise the height of the btree
1813
1814 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1815 {  
1816 unsigned char leftkey[BT_keyarray];
1817 uint nxt = bt->mgr->page_size;
1818 unsigned char value[BtId];
1819 BtPageSet left[1];
1820 uid left_page_no;
1821 BtKey *ptr;
1822 BtVal *val;
1823
1824         //      save left page fence key for new root
1825
1826         ptr = keyptr(root->page, root->page->cnt);
1827         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1828
1829         //  Obtain an empty page to use, and copy the current
1830         //  root contents into it, e.g. lower keys
1831
1832         if( bt_newpage(bt, left, root->page) )
1833                 return bt->err;
1834
1835         left_page_no = left->latch->page_no;
1836         bt_unpinlatch (left->latch);
1837
1838         // preserve the page info at the bottom
1839         // of higher keys and set rest to zero
1840
1841         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1842
1843         // insert stopper key at top of newroot page
1844         // and increase the root height
1845
1846         nxt -= BtId + sizeof(BtVal);
1847         bt_putid (value, right->page_no);
1848         val = (BtVal *)((unsigned char *)root->page + nxt);
1849         memcpy (val->value, value, BtId);
1850         val->len = BtId;
1851
1852         nxt -= 2 + sizeof(BtKey);
1853         slotptr(root->page, 2)->off = nxt;
1854         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1855         ptr->len = 2;
1856         ptr->key[0] = 0xff;
1857         ptr->key[1] = 0xff;
1858
1859         // insert lower keys page fence key on newroot page as first key
1860
1861         nxt -= BtId + sizeof(BtVal);
1862         bt_putid (value, left_page_no);
1863         val = (BtVal *)((unsigned char *)root->page + nxt);
1864         memcpy (val->value, value, BtId);
1865         val->len = BtId;
1866
1867         ptr = (BtKey *)leftkey;
1868         nxt -= ptr->len + sizeof(BtKey);
1869         slotptr(root->page, 1)->off = nxt;
1870         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1871         
1872         bt_putid(root->page->right, 0);
1873         root->page->min = nxt;          // reset lowest used offset and key count
1874         root->page->cnt = 2;
1875         root->page->act = 2;
1876         root->page->lvl++;
1877
1878         // release and unpin root pages
1879
1880         bt_unlockpage(BtLockWrite, root->latch);
1881         bt_unpinlatch (root->latch);
1882
1883         bt_unpinlatch (right);
1884         return 0;
1885 }
1886
1887 //  split already locked full node
1888 //      leave it locked.
1889 //      return pool entry for new right
1890 //      page, unlocked
1891
1892 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1893 {
1894 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1895 uint lvl = set->page->lvl;
1896 BtPageSet right[1];
1897 BtKey *key, *ptr;
1898 BtVal *val, *src;
1899 uid right2;
1900 uint prev;
1901
1902         //  split higher half of keys to bt->frame
1903
1904         memset (bt->frame, 0, bt->mgr->page_size);
1905         max = set->page->cnt;
1906         cnt = max / 2;
1907         idx = 0;
1908
1909         while( cnt++ < max ) {
1910                 if( slotptr(set->page, cnt)->dead && cnt < max )
1911                         continue;
1912                 src = valptr(set->page, cnt);
1913                 nxt -= src->len + sizeof(BtVal);
1914                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1915
1916                 key = keyptr(set->page, cnt);
1917                 nxt -= key->len + sizeof(BtKey);
1918                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1919                 memcpy (ptr, key, key->len + sizeof(BtKey));
1920
1921                 //      add librarian slot
1922
1923                 if( idx ) {
1924                         slotptr(bt->frame, ++idx)->off = nxt;
1925                         slotptr(bt->frame, idx)->type = Librarian;
1926                         slotptr(bt->frame, idx)->dead = 1;
1927                 }
1928
1929                 //  add actual slot
1930
1931                 slotptr(bt->frame, ++idx)->off = nxt;
1932                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1933
1934                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1935                         bt->frame->act++;
1936         }
1937
1938         bt->frame->bits = bt->mgr->page_bits;
1939         bt->frame->min = nxt;
1940         bt->frame->cnt = idx;
1941         bt->frame->lvl = lvl;
1942
1943         // link right node
1944
1945         if( set->latch->page_no > ROOT_page )
1946                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1947
1948         //      get new free page and write higher keys to it.
1949
1950         if( bt_newpage(bt, right, bt->frame) )
1951                 return 0;
1952
1953         memcpy (bt->frame, set->page, bt->mgr->page_size);
1954         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1955         set->latch->dirty = 1;
1956
1957         nxt = bt->mgr->page_size;
1958         set->page->garbage = 0;
1959         set->page->act = 0;
1960         max /= 2;
1961         cnt = 0;
1962         idx = 0;
1963
1964         if( slotptr(bt->frame, max)->type == Librarian )
1965                 max--;
1966
1967         //  assemble page of smaller keys
1968
1969         while( cnt++ < max ) {
1970                 if( slotptr(bt->frame, cnt)->dead )
1971                         continue;
1972                 val = valptr(bt->frame, cnt);
1973                 nxt -= val->len + sizeof(BtVal);
1974                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1975
1976                 key = keyptr(bt->frame, cnt);
1977                 nxt -= key->len + sizeof(BtKey);
1978                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1979
1980                 //      add librarian slot
1981
1982                 if( idx ) {
1983                         slotptr(set->page, ++idx)->off = nxt;
1984                         slotptr(set->page, idx)->type = Librarian;
1985                         slotptr(set->page, idx)->dead = 1;
1986                 }
1987
1988                 //      add actual slot
1989
1990                 slotptr(set->page, ++idx)->off = nxt;
1991                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1992                 set->page->act++;
1993         }
1994
1995         bt_putid(set->page->right, right->latch->page_no);
1996         set->page->min = nxt;
1997         set->page->cnt = idx;
1998
1999         return right->latch->entry;
2000 }
2001
2002 //      fix keys for newly split page
2003 //      call with page locked, return
2004 //      unlocked
2005
2006 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2007 {
2008 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2009 unsigned char value[BtId];
2010 uint lvl = set->page->lvl;
2011 BtPage page;
2012 BtKey *ptr;
2013
2014         // if current page is the root page, split it
2015
2016         if( set->latch->page_no == ROOT_page )
2017                 return bt_splitroot (bt, set, right);
2018
2019         ptr = keyptr(set->page, set->page->cnt);
2020         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2021
2022         page = bt_mappage (bt, right);
2023
2024         ptr = keyptr(page, page->cnt);
2025         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2026
2027         // insert new fences in their parent pages
2028
2029         bt_lockpage (BtLockParent, right);
2030
2031         bt_lockpage (BtLockParent, set->latch);
2032         bt_unlockpage (BtLockWrite, set->latch);
2033
2034         // insert new fence for reformulated left block of smaller keys
2035
2036         bt_putid (value, set->latch->page_no);
2037         ptr = (BtKey *)leftkey;
2038
2039         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2040                 return bt->err;
2041
2042         // switch fence for right block of larger keys to new right page
2043
2044         bt_putid (value, right->page_no);
2045         ptr = (BtKey *)rightkey;
2046
2047         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2048                 return bt->err;
2049
2050         bt_unlockpage (BtLockParent, set->latch);
2051         bt_unpinlatch (set->latch);
2052
2053         bt_unlockpage (BtLockParent, right);
2054         bt_unpinlatch (right);
2055         return 0;
2056 }
2057
2058 //      install new key and value onto page
2059 //      page must already be checked for
2060 //      adequate space
2061
2062 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2063 {
2064 uint idx, librarian;
2065 BtSlot *node;
2066 BtKey *ptr;
2067 BtVal *val;
2068
2069         //      if found slot > desired slot and previous slot
2070         //      is a librarian slot, use it
2071
2072         if( slot > 1 )
2073           if( slotptr(set->page, slot-1)->type == Librarian )
2074                 slot--;
2075
2076         // copy value onto page
2077
2078         set->page->min -= vallen + sizeof(BtVal);
2079         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2080         memcpy (val->value, value, vallen);
2081         val->len = vallen;
2082
2083         // copy key onto page
2084
2085         set->page->min -= keylen + sizeof(BtKey);
2086         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2087         memcpy (ptr->key, key, keylen);
2088         ptr->len = keylen;
2089         
2090         //      find first empty slot
2091
2092         for( idx = slot; idx < set->page->cnt; idx++ )
2093           if( slotptr(set->page, idx)->dead )
2094                 break;
2095
2096         // now insert key into array before slot
2097
2098         if( idx == set->page->cnt )
2099                 idx += 2, set->page->cnt += 2, librarian = 2;
2100         else
2101                 librarian = 1;
2102
2103         set->latch->dirty = 1;
2104         set->page->act++;
2105
2106         while( idx > slot + librarian - 1 )
2107                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2108
2109         //      add librarian slot
2110
2111         if( librarian > 1 ) {
2112                 node = slotptr(set->page, slot++);
2113                 node->off = set->page->min;
2114                 node->type = Librarian;
2115                 node->dead = 1;
2116         }
2117
2118         //      fill in new slot
2119
2120         node = slotptr(set->page, slot);
2121         node->off = set->page->min;
2122         node->type = type;
2123         node->dead = 0;
2124
2125         if( release ) {
2126                 bt_unlockpage (BtLockWrite, set->latch);
2127                 bt_unpinlatch (set->latch);
2128         }
2129
2130         return 0;
2131 }
2132
2133 //  Insert new key into the btree at given level.
2134 //      either add a new key or update/add an existing one
2135
2136 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2137 {
2138 unsigned char newkey[BT_keyarray];
2139 uint slot, idx, len, entry;
2140 BtPageSet set[1];
2141 BtKey *ptr, *ins;
2142 uid sequence;
2143 BtVal *val;
2144 uint type;
2145
2146   // set up the key we're working on
2147
2148   ins = (BtKey*)newkey;
2149   memcpy (ins->key, key, keylen);
2150   ins->len = keylen;
2151
2152   // is this a non-unique index value?
2153
2154   if( unique )
2155         type = Unique;
2156   else {
2157         type = Duplicate;
2158         sequence = bt_newdup (bt);
2159         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2160         ins->len += BtId;
2161   }
2162
2163   while( 1 ) { // find the page and slot for the current key
2164         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2165                 ptr = keyptr(set->page, slot);
2166         else {
2167                 if( !bt->err )
2168                         bt->err = BTERR_ovflw;
2169                 return bt->err;
2170         }
2171
2172         // if librarian slot == found slot, advance to real slot
2173
2174         if( slotptr(set->page, slot)->type == Librarian )
2175           if( !keycmp (ptr, key, keylen) )
2176                 ptr = keyptr(set->page, ++slot);
2177
2178         len = ptr->len;
2179
2180         if( slotptr(set->page, slot)->type == Duplicate )
2181                 len -= BtId;
2182
2183         //  if inserting a duplicate key or unique key
2184         //      check for adequate space on the page
2185         //      and insert the new key before slot.
2186
2187         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2188           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2189                 if( !(entry = bt_splitpage (bt, set)) )
2190                   return bt->err;
2191                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2192                   return bt->err;
2193                 else
2194                   continue;
2195
2196           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2197         }
2198
2199         // if key already exists, update value and return
2200
2201         val = valptr(set->page, slot);
2202
2203         if( val->len >= vallen ) {
2204                 if( slotptr(set->page, slot)->dead )
2205                         set->page->act++;
2206                 set->page->garbage += val->len - vallen;
2207                 set->latch->dirty = 1;
2208                 slotptr(set->page, slot)->dead = 0;
2209                 val->len = vallen;
2210                 memcpy (val->value, value, vallen);
2211                 bt_unlockpage(BtLockWrite, set->latch);
2212                 bt_unpinlatch (set->latch);
2213                 return 0;
2214         }
2215
2216         //      new update value doesn't fit in existing value area
2217
2218         if( !slotptr(set->page, slot)->dead )
2219                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2220         else {
2221                 slotptr(set->page, slot)->dead = 0;
2222                 set->page->act++;
2223         }
2224
2225         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2226           if( !(entry = bt_splitpage (bt, set)) )
2227                 return bt->err;
2228           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2229                 return bt->err;
2230           else
2231                 continue;
2232
2233         set->page->min -= vallen + sizeof(BtVal);
2234         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2235         memcpy (val->value, value, vallen);
2236         val->len = vallen;
2237
2238         set->latch->dirty = 1;
2239         set->page->min -= keylen + sizeof(BtKey);
2240         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2241         memcpy (ptr->key, key, keylen);
2242         ptr->len = keylen;
2243         
2244         slotptr(set->page, slot)->off = set->page->min;
2245         bt_unlockpage(BtLockWrite, set->latch);
2246         bt_unpinlatch (set->latch);
2247         return 0;
2248   }
2249   return 0;
2250 }
2251
2252 typedef struct {
2253         uint entry;                     // latch table entry number
2254         uint slot:31;           // page slot number
2255         uint reuse:1;           // reused previous page
2256 } AtomicMod;
2257
2258 typedef struct {
2259         uid page_no;            // page number for split leaf
2260         void *next;                     // next key to insert
2261         uint entry:30;          // latch table entry number
2262         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2263         unsigned char leafkey[BT_keyarray];
2264 } AtomicKey;
2265
2266 //      determine actual page where key is located
2267 //  return slot number
2268
2269 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2270 {
2271 BtKey *key = keyptr(source,src);
2272 uint slot = locks[src].slot;
2273 uint entry;
2274
2275         if( src > 1 && locks[src].reuse )
2276           entry = locks[src-1].entry, slot = 0;
2277         else
2278           entry = locks[src].entry;
2279
2280         if( slot ) {
2281                 set->latch = bt->mgr->latchsets + entry;
2282                 set->page = bt_mappage (bt, set->latch);
2283                 return slot;
2284         }
2285
2286         //      is locks->reuse set?
2287         //      if so, find where our key
2288         //      is located on previous page or split pages
2289
2290         do {
2291                 set->latch = bt->mgr->latchsets + entry;
2292                 set->page = bt_mappage (bt, set->latch);
2293
2294                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2295                   if( locks[src].reuse )
2296                         locks[src].entry = entry;
2297                   return slot;
2298                 }
2299         } while( entry = set->latch->split );
2300
2301         bt->err = BTERR_atomic;
2302         return 0;
2303 }
2304
2305 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2306 {
2307 BtKey *key = keyptr(source, src);
2308 BtVal *val = valptr(source, src);
2309 BtLatchSet *latch;
2310 BtPageSet set[1];
2311 uint entry;
2312
2313   while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
2314         if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
2315           return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2316
2317         if( entry = bt_splitpage (bt, set) )
2318           latch = bt->mgr->latchsets + entry;
2319         else
2320           return bt->err;
2321
2322         //      splice right page into split chain
2323         //      and WriteLock it.
2324
2325         latch->split = set->latch->split;
2326         set->latch->split = entry;
2327         bt_lockpage(BtLockWrite, latch);
2328   }
2329
2330   return bt->err = BTERR_atomic;
2331 }
2332
2333 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2334 {
2335 BtKey *key = keyptr(source, src);
2336 uint idx, entry, slot;
2337 BtPageSet set[1];
2338 BtKey *ptr;
2339 BtVal *val;
2340
2341         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2342                 slotptr(set->page, slot)->dead = 1;
2343         else
2344                 return bt->err = BTERR_struct;
2345
2346         ptr = keyptr(set->page, slot);
2347         val = valptr(set->page, slot);
2348
2349         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2350         set->latch->dirty = 1;
2351         set->page->act--;
2352         return 0;
2353 }
2354
2355 //      atomic modification of a batch of keys.
2356
2357 //      return -1 if BTERR is set
2358 //      otherwise return slot number
2359 //      causing the key constraint violation
2360 //      or zero on successful completion.
2361
2362 int bt_atomictxn (BtDb *bt, BtPage source)
2363 {
2364 uint src, idx, slot, samepage, entry;
2365 AtomicKey *head, *tail, *leaf;
2366 BtPageSet set[1], prev[1];
2367 unsigned char value[BtId];
2368 BtKey *key, *ptr, *key2;
2369 BtLatchSet *latch;
2370 AtomicMod *locks;
2371 int result = 0;
2372 BtSlot temp[1];
2373 BtPage page;
2374 BtVal *val;
2375 uid right;
2376 int type;
2377
2378   locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2379   head = NULL;
2380   tail = NULL;
2381
2382   // stable sort the list of keys into order to
2383   //    prevent deadlocks between threads.
2384
2385   for( src = 1; src++ < source->cnt; ) {
2386         *temp = *slotptr(source,src);
2387         key = keyptr (source,src);
2388
2389         for( idx = src; --idx; ) {
2390           key2 = keyptr (source,idx);
2391           if( keycmp (key, key2->key, key2->len) < 0 ) {
2392                 *slotptr(source,idx+1) = *slotptr(source,idx);
2393                 *slotptr(source,idx) = *temp;
2394           } else
2395                 break;
2396         }
2397   }
2398
2399   // Load the leaf page for each key
2400   // group same page references with reuse bit
2401   // and determine any constraint violations
2402
2403   for( src = 0; src++ < source->cnt; ) {
2404         key = keyptr(source, src);
2405         slot = 0;
2406
2407         // first determine if this modification falls
2408         // on the same page as the previous modification
2409         //      note that the far right leaf page is a special case
2410
2411         if( samepage = src > 1 )
2412           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2413                 slot = bt_findslot(set->page, key->key, key->len);
2414           else // release read on previous page
2415                 bt_unlockpage(BtLockRead, set->latch); 
2416
2417         if( !slot )
2418           if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
2419                 set->latch->split = 0;
2420           else
2421                 return -1;
2422
2423         if( slotptr(set->page, slot)->type == Librarian )
2424           ptr = keyptr(set->page, ++slot);
2425         else
2426           ptr = keyptr(set->page, slot);
2427
2428         if( !samepage ) {
2429           locks[src].entry = set->latch->entry;
2430           locks[src].slot = slot;
2431           locks[src].reuse = 0;
2432         } else {
2433           locks[src].entry = 0;
2434           locks[src].slot = 0;
2435           locks[src].reuse = 1;
2436         }
2437
2438         switch( slotptr(source, src)->type ) {
2439         case Duplicate:
2440         case Unique:
2441           if( !slotptr(set->page, slot)->dead )
2442            if( slot < set->page->cnt || bt_getid (set->page->right) )
2443             if( !keycmp (ptr, key->key, key->len) ) {
2444
2445                   // return constraint violation if key already exists
2446
2447                   bt_unlockpage(BtLockRead, set->latch);
2448                   result = src;
2449
2450                   while( src ) {
2451                         if( locks[src].entry ) {
2452                           set->latch = bt->mgr->latchsets + locks[src].entry;
2453                           bt_unlockpage(BtLockAtomic, set->latch);
2454                           bt_unpinlatch (set->latch);
2455                         }
2456                         src--;
2457                   }
2458                   free (locks);
2459                   return result;
2460                 }
2461           break;
2462         }
2463   }
2464
2465   //  unlock last loadpage lock
2466
2467   if( source->cnt > 1 )
2468         bt_unlockpage(BtLockRead, set->latch);
2469
2470   //  obtain write lock for each master page
2471
2472   for( src = 0; src++ < source->cnt; )
2473         if( locks[src].reuse )
2474           continue;
2475         else
2476           bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2477
2478   // insert or delete each key
2479   // process any splits or merges
2480   // release Write & Atomic latches
2481   // set ParentModifications and build
2482   // queue of keys to insert for split pages
2483   // or delete for deleted pages.
2484
2485   // run through txn list backwards
2486
2487   samepage = source->cnt + 1;
2488
2489   for( src = source->cnt; src; src-- ) {
2490         if( locks[src].reuse )
2491           continue;
2492
2493         //  perform the txn operations
2494         //      from smaller to larger on
2495         //  the same page
2496
2497         for( idx = src; idx < samepage; idx++ )
2498          switch( slotptr(source,idx)->type ) {
2499          case Delete:
2500           if( bt_atomicdelete (bt, source, locks, idx) )
2501                 return -1;
2502           break;
2503
2504         case Duplicate:
2505         case Unique:
2506           if( bt_atomicinsert (bt, source, locks, idx) )
2507                 return -1;
2508           break;
2509         }
2510
2511         //      after the same page operations have finished,
2512         //  process master page for splits or deletion.
2513
2514         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2515         prev->page = bt_mappage (bt, prev->latch);
2516         samepage = src;
2517
2518         //  pick-up all splits from master page
2519
2520         entry = prev->latch->split;
2521
2522         while( entry ) {
2523           set->latch = bt->mgr->latchsets + entry;
2524           set->page = bt_mappage (bt, set->latch);
2525           entry = set->latch->split;
2526
2527           // delete empty master page
2528
2529           if( !prev->page->act ) {
2530                 memcpy (set->page->left, prev->page->left, BtId);
2531                 memcpy (prev->page, set->page, bt->mgr->page_size);
2532                 bt_lockpage (BtLockDelete, set->latch);
2533                 bt_freepage (bt, set);
2534                 bt_unpinlatch (set->latch);
2535
2536                 prev->latch->dirty = 1;
2537                 continue;
2538           }
2539
2540           // remove empty page from the split chain
2541
2542           if( !set->page->act ) {
2543                 memcpy (prev->page->right, set->page->right, BtId);
2544                 prev->latch->split = set->latch->split;
2545                 bt_lockpage (BtLockDelete, set->latch);
2546                 bt_freepage (bt, set);
2547                 bt_unpinlatch (set->latch);
2548                 continue;
2549           }
2550
2551           //  schedule prev fence key update
2552
2553           ptr = keyptr(prev->page,prev->page->cnt);
2554           leaf = malloc (sizeof(AtomicKey));
2555
2556           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2557           leaf->page_no = prev->latch->page_no;
2558           leaf->entry = prev->latch->entry;
2559           leaf->next = NULL;
2560           leaf->type = 0;
2561
2562           if( tail )
2563                 tail->next = leaf;
2564           else
2565                 head = leaf;
2566
2567           tail = leaf;
2568
2569           bt_putid (set->page->left, prev->latch->page_no);
2570           bt_lockpage(BtLockParent, prev->latch);
2571           bt_unlockpage(BtLockWrite, prev->latch);
2572           *prev = *set;
2573         }
2574
2575         //  update left pointer in next right page
2576         //      if we did any now non-empty splits
2577
2578         if( latch->split ) {
2579           //  fix left pointer in master's original right sibling
2580           //    or set rightmost page in page zero
2581
2582           if( right = bt_getid (prev->page->right) ) {
2583                 if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
2584                   set->page = bt_mappage (bt, set->latch);
2585                 else
2586                   return -1;
2587
2588             bt_lockpage (BtLockWrite, set->latch);
2589             bt_putid (set->page->left, prev->latch->page_no);
2590                 set->latch->dirty = 1;
2591             bt_unlockpage (BtLockWrite, set->latch);
2592                 bt_unpinlatch (set->latch);
2593           } else
2594                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2595
2596           //  Process last page split in chain
2597
2598           ptr = keyptr(prev->page,prev->page->cnt);
2599           leaf = malloc (sizeof(AtomicKey));
2600
2601           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2602           leaf->page_no = prev->latch->page_no;
2603           leaf->entry = prev->latch->entry;
2604           leaf->next = NULL;
2605           leaf->type = 0;
2606   
2607           if( tail )
2608                 tail->next = leaf;
2609           else
2610                 head = leaf;
2611
2612           tail = leaf;
2613
2614           bt_lockpage(BtLockParent, prev->latch);
2615           bt_unlockpage(BtLockWrite, prev->latch);
2616           bt_unlockpage(BtLockAtomic, latch);
2617           continue;
2618         }
2619
2620         //  finished if prev page occupied (either master or final split)
2621
2622         if( prev->page->act ) {
2623           bt_unlockpage(BtLockWrite, latch);
2624           bt_unlockpage(BtLockAtomic, latch);
2625           bt_unpinlatch(latch);
2626           continue;
2627         }
2628
2629         // any splits were reversed, and the
2630         // master page located in prev is empty, delete it
2631         // by pulling over master's right sibling.
2632         // Delete empty master's fence
2633
2634         ptr = keyptr(prev->page,prev->page->cnt);
2635         leaf = malloc (sizeof(AtomicKey));
2636
2637         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2638         leaf->page_no = prev->latch->page_no;
2639         leaf->entry = prev->latch->entry;
2640         leaf->next = NULL;
2641         leaf->type = 1;
2642   
2643         if( tail )
2644           tail->next = leaf;
2645         else
2646           head = leaf;
2647
2648         tail = leaf;
2649
2650         bt_lockpage(BtLockParent, prev->latch);
2651         bt_unlockpage(BtLockWrite, prev->latch);
2652
2653         if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2654                 set->page = bt_mappage (bt, set->latch);
2655         else
2656                 return -1;
2657
2658         //      add page to our transaction
2659
2660         bt_lockpage(BtLockAtomic, set->latch);
2661         bt_lockpage(BtLockWrite, set->latch);
2662
2663         //      pull contents over empty page
2664
2665         memcpy (set->page->left, prev->page->left, BtId);
2666         memcpy (prev->page, set->page, bt->mgr->page_size);
2667
2668         bt_putid (set->page->right, prev->latch->page_no);
2669         set->latch->dirty = 1;
2670         set->page->kill = 1;
2671
2672         //      add new parent key for new master page contents
2673         //      delete it after parent posts the new master fence.
2674
2675         ptr = keyptr(set->page,set->page->cnt);
2676         leaf = malloc (sizeof(AtomicKey));
2677
2678         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2679         leaf->page_no = prev->latch->page_no;
2680         leaf->entry = set->latch->entry;
2681         leaf->next = NULL;
2682         leaf->type = 2;
2683   
2684         if( tail )
2685           tail->next = leaf;
2686         else
2687           head = leaf;
2688
2689         tail = leaf;
2690
2691 //      bt_lockpage(BtLockParent, set->latch);
2692         bt_unlockpage(BtLockWrite, set->latch);
2693
2694         //  fix master's far right sibling's left pointer
2695
2696         if( right = bt_getid (set->page->right) ) {
2697           if( set->latch = bt_pinlatch (bt, right, 1) )
2698                 set->page = bt_mappage (bt, set->latch);
2699
2700           bt_lockpage (BtLockWrite, set->latch);
2701           bt_putid (set->page->left, prev->latch->page_no);
2702           set->latch->dirty = 1;
2703
2704           bt_unlockpage (BtLockWrite, set->latch);
2705           bt_unpinlatch (set->latch);
2706         }
2707
2708         bt_unlockpage(BtLockAtomic, latch);
2709   }
2710   
2711   //  add & delete keys for any pages split or merged during transaction
2712
2713   if( leaf = head )
2714     do {
2715           set->latch = bt->mgr->latchsets + leaf->entry;
2716           set->page = bt_mappage (bt, set->latch);
2717
2718           bt_putid (value, leaf->page_no);
2719           ptr = (BtKey *)leaf->leafkey;
2720
2721           switch( leaf->type ) {
2722           case 0:       // insert key
2723             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2724                   return -1;
2725
2726             bt_unlockpage (BtLockParent, set->latch);
2727                 break;
2728
2729           case 1:       // delete key
2730                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2731                   return -1;
2732
2733             bt_unlockpage (BtLockParent, set->latch);
2734                 break;
2735
2736           case 2:       // insert key & free
2737             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2738                   return -1;
2739
2740                 bt_unlockpage (BtLockAtomic, set->latch);
2741                 bt_lockpage (BtLockDelete, set->latch);
2742                 bt_lockpage (BtLockWrite, set->latch);
2743                 bt_freepage (bt, set);
2744                 break;
2745           }
2746
2747           bt_unpinlatch (set->latch);
2748           tail = leaf->next;
2749           free (leaf);
2750         } while( leaf = tail );
2751
2752   // return success
2753
2754   free (locks);
2755   return 0;
2756 }
2757
2758 //      set cursor to highest slot on highest page
2759
2760 uint bt_lastkey (BtDb *bt)
2761 {
2762 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2763 BtPageSet set[1];
2764 uint slot;
2765
2766         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2767                 set->page = bt_mappage (bt, set->latch);
2768         else
2769                 return 0;
2770
2771     bt_lockpage(BtLockRead, set->latch);
2772
2773         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2774         slot = set->page->cnt;
2775
2776     bt_unlockpage(BtLockRead, set->latch);
2777         bt_unpinlatch (set->latch);
2778         return slot;
2779 }
2780
2781 //      return previous slot on cursor page
2782
2783 uint bt_prevkey (BtDb *bt, uint slot)
2784 {
2785 uid ourright, next, us = bt->cursor_page;
2786 BtPageSet set[1];
2787
2788         if( --slot )
2789                 return slot;
2790
2791         ourright = bt_getid(bt->cursor->right);
2792
2793 goleft:
2794         if( !(next = bt_getid(bt->cursor->left)) )
2795                 return 0;
2796
2797 findourself:
2798         bt->cursor_page = next;
2799
2800         if( set->latch = bt_pinlatch (bt, next, 1) )
2801                 set->page = bt_mappage (bt, set->latch);
2802         else
2803                 return 0;
2804
2805     bt_lockpage(BtLockRead, set->latch);
2806         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2807         bt_unlockpage(BtLockRead, set->latch);
2808         bt_unpinlatch (set->latch);
2809         
2810         next = bt_getid (bt->cursor->right);
2811
2812         if( next != us )
2813           if( next == ourright )
2814                 goto goleft;
2815           else
2816                 goto findourself;
2817
2818         return bt->cursor->cnt;
2819 }
2820
2821 //  return next slot on cursor page
2822 //  or slide cursor right into next page
2823
2824 uint bt_nextkey (BtDb *bt, uint slot)
2825 {
2826 BtPageSet set[1];
2827 uid right;
2828
2829   do {
2830         right = bt_getid(bt->cursor->right);
2831
2832         while( slot++ < bt->cursor->cnt )
2833           if( slotptr(bt->cursor,slot)->dead )
2834                 continue;
2835           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2836                 return slot;
2837           else
2838                 break;
2839
2840         if( !right )
2841                 break;
2842
2843         bt->cursor_page = right;
2844
2845         if( set->latch = bt_pinlatch (bt, right, 1) )
2846                 set->page = bt_mappage (bt, set->latch);
2847         else
2848                 return 0;
2849
2850     bt_lockpage(BtLockRead, set->latch);
2851
2852         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2853
2854         bt_unlockpage(BtLockRead, set->latch);
2855         bt_unpinlatch (set->latch);
2856         slot = 0;
2857
2858   } while( 1 );
2859
2860   return bt->err = 0;
2861 }
2862
2863 //  cache page of keys into cursor and return starting slot for given key
2864
2865 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2866 {
2867 BtPageSet set[1];
2868 uint slot;
2869
2870         // cache page for retrieval
2871
2872         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2873           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2874         else
2875           return 0;
2876
2877         bt->cursor_page = set->latch->page_no;
2878
2879         bt_unlockpage(BtLockRead, set->latch);
2880         bt_unpinlatch (set->latch);
2881         return slot;
2882 }
2883
2884 BtKey *bt_key(BtDb *bt, uint slot)
2885 {
2886         return keyptr(bt->cursor, slot);
2887 }
2888
2889 BtVal *bt_val(BtDb *bt, uint slot)
2890 {
2891         return valptr(bt->cursor,slot);
2892 }
2893
2894 #ifdef STANDALONE
2895
2896 #ifndef unix
2897 double getCpuTime(int type)
2898 {
2899 FILETIME crtime[1];
2900 FILETIME xittime[1];
2901 FILETIME systime[1];
2902 FILETIME usrtime[1];
2903 SYSTEMTIME timeconv[1];
2904 double ans = 0;
2905
2906         memset (timeconv, 0, sizeof(SYSTEMTIME));
2907
2908         switch( type ) {
2909         case 0:
2910                 GetSystemTimeAsFileTime (xittime);
2911                 FileTimeToSystemTime (xittime, timeconv);
2912                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2913                 break;
2914         case 1:
2915                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2916                 FileTimeToSystemTime (usrtime, timeconv);
2917                 break;
2918         case 2:
2919                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2920                 FileTimeToSystemTime (systime, timeconv);
2921                 break;
2922         }
2923
2924         ans += (double)timeconv->wHour * 3600;
2925         ans += (double)timeconv->wMinute * 60;
2926         ans += (double)timeconv->wSecond;
2927         ans += (double)timeconv->wMilliseconds / 1000;
2928         return ans;
2929 }
2930 #else
2931 #include <time.h>
2932 #include <sys/resource.h>
2933
2934 double getCpuTime(int type)
2935 {
2936 struct rusage used[1];
2937 struct timeval tv[1];
2938
2939         switch( type ) {
2940         case 0:
2941                 gettimeofday(tv, NULL);
2942                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2943
2944         case 1:
2945                 getrusage(RUSAGE_SELF, used);
2946                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2947
2948         case 2:
2949                 getrusage(RUSAGE_SELF, used);
2950                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2951         }
2952
2953         return 0;
2954 }
2955 #endif
2956
2957 void bt_poolaudit (BtMgr *mgr)
2958 {
2959 BtLatchSet *latch;
2960 uint slot = 0;
2961
2962         while( slot++ < mgr->latchdeployed ) {
2963                 latch = mgr->latchsets + slot;
2964
2965                 if( *latch->readwr->rin & MASK )
2966                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2967                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2968
2969                 if( *latch->access->rin & MASK )
2970                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2971                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2972
2973                 if( *latch->parent->rin & MASK )
2974                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2975                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2976
2977                 if( latch->pin & ~CLOCK_bit ) {
2978                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2979                         latch->pin = 0;
2980                 }
2981         }
2982 }
2983
2984 uint bt_latchaudit (BtDb *bt)
2985 {
2986 ushort idx, hashidx;
2987 uid next, page_no;
2988 BtLatchSet *latch;
2989 uint cnt = 0;
2990 BtKey *ptr;
2991
2992         if( *(ushort *)(bt->mgr->lock) )
2993                 fprintf(stderr, "Alloc page locked\n");
2994         *(ushort *)(bt->mgr->lock) = 0;
2995
2996         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2997                 latch = bt->mgr->latchsets + idx;
2998                 if( *latch->readwr->rin & MASK )
2999                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3000                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3001
3002                 if( *latch->access->rin & MASK )
3003                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3004                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3005
3006                 if( *latch->parent->rin & MASK )
3007                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3008                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3009
3010                 if( latch->pin ) {
3011                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3012                         latch->pin = 0;
3013                 }
3014         }
3015
3016         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3017           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3018                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3019
3020           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3021
3022           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3023                 latch = bt->mgr->latchsets + idx;
3024                 if( latch->pin )
3025                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3026           } while( idx = latch->next );
3027         }
3028
3029         page_no = LEAF_page;
3030
3031         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3032         uid off = page_no << bt->mgr->page_bits;
3033 #ifdef unix
3034           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3035 #else
3036         DWORD amt[1];
3037
3038           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3039
3040           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3041                 return bt->err = BTERR_map;
3042
3043           if( *amt <  bt->mgr->page_size )
3044                 return bt->err = BTERR_map;
3045 #endif
3046                 if( !bt->frame->free && !bt->frame->lvl )
3047                         cnt += bt->frame->act;
3048                 page_no++;
3049         }
3050                 
3051         cnt--;  // remove stopper key
3052         fprintf(stderr, " Total keys read %d\n", cnt);
3053
3054         bt_close (bt);
3055         return 0;
3056 }
3057
3058 typedef struct {
3059         char idx;
3060         char *type;
3061         char *infile;
3062         BtMgr *mgr;
3063         int num;
3064 } ThreadArg;
3065
3066 //  standalone program to index file of keys
3067 //  then list them onto std-out
3068
3069 #ifdef unix
3070 void *index_file (void *arg)
3071 #else
3072 uint __stdcall index_file (void *arg)
3073 #endif
3074 {
3075 int line = 0, found = 0, cnt = 0, unique;
3076 uid next, page_no = LEAF_page;  // start on first page of leaves
3077 unsigned char key[BT_maxkey];
3078 unsigned char txn[65536];
3079 ThreadArg *args = arg;
3080 int ch, len = 0, slot;
3081 BtPageSet set[1];
3082 uint nxt = 65536;
3083 BtPage page;
3084 BtKey *ptr;
3085 BtVal *val;
3086 BtDb *bt;
3087 FILE *in;
3088
3089         bt = bt_open (args->mgr);
3090         page = (BtPage)txn;
3091
3092         unique = (args->type[1] | 0x20) != 'd';
3093
3094         switch(args->type[0] | 0x20)
3095         {
3096         case 'a':
3097                 fprintf(stderr, "started latch mgr audit\n");
3098                 cnt = bt_latchaudit (bt);
3099                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3100                 break;
3101
3102         case 't':
3103                 fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
3104                 if( in = fopen (args->infile, "rb") )
3105                   while( ch = getc(in), ch != EOF )
3106                         if( ch == '\n' )
3107                         {
3108                           line++;
3109                           nxt -= len - 10;
3110                           memcpy (txn + nxt, key + 10, len - 10);
3111                           nxt -= 1;
3112                           txn[nxt] = len - 10;
3113                           nxt -= 10;
3114                           memcpy (txn + nxt, key, 10);
3115                           nxt -= 1;
3116                           txn[nxt] = 10;
3117                           slotptr(page,++cnt)->off  = nxt;
3118                           slotptr(page,cnt)->type = Unique;
3119                           len = 0;
3120
3121                           if( cnt < args->num )
3122                                 continue;
3123
3124                           page->cnt = cnt;
3125                           page->act = cnt;
3126                           page->min = nxt;
3127
3128                           if( bt_atomictxn (bt, page) )
3129                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3130                           nxt = 65536;
3131                           cnt = 0;
3132
3133                         }
3134                         else if( len < BT_maxkey )
3135                                 key[len++] = ch;
3136                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3137                 break;
3138
3139         case 'p':
3140                 fprintf(stderr, "started pennysort for %s\n", args->infile);
3141                 if( in = fopen (args->infile, "rb") )
3142                   while( ch = getc(in), ch != EOF )
3143                         if( ch == '\n' )
3144                         {
3145                           line++;
3146
3147                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
3148                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3149                           len = 0;
3150                         }
3151                         else if( len < BT_maxkey )
3152                                 key[len++] = ch;
3153                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3154                 break;
3155
3156         case 'w':
3157                 fprintf(stderr, "started indexing for %s\n", args->infile);
3158                 if( in = fopen (args->infile, "r") )
3159                   while( ch = getc(in), ch != EOF )
3160                         if( ch == '\n' )
3161                         {
3162                           line++;
3163
3164                           if( args->num == 1 )
3165                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3166
3167                           else if( args->num )
3168                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3169
3170                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
3171                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3172                           len = 0;
3173                         }
3174                         else if( len < BT_maxkey )
3175                                 key[len++] = ch;
3176                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3177                 break;
3178
3179         case 'd':
3180                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
3181                 if( in = fopen (args->infile, "rb") )
3182                   while( ch = getc(in), ch != EOF )
3183                         if( ch == '\n' )
3184                         {
3185                           line++;
3186                           if( args->num == 1 )
3187                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3188
3189                           else if( args->num )
3190                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3191
3192                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
3193                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
3194                           ptr = (BtKey*)(bt->key);
3195                           found++;
3196
3197                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
3198                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3199                           len = 0;
3200                         }
3201                         else if( len < BT_maxkey )
3202                                 key[len++] = ch;
3203                 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3204                 break;
3205
3206         case 'f':
3207                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3208                 if( in = fopen (args->infile, "rb") )
3209                   while( ch = getc(in), ch != EOF )
3210                         if( ch == '\n' )
3211                         {
3212                           line++;
3213                           if( args->num == 1 )
3214                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3215
3216                           else if( args->num )
3217                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3218
3219                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3220                                 found++;
3221                           else if( bt->err )
3222                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3223                           len = 0;
3224                         }
3225                         else if( len < BT_maxkey )
3226                                 key[len++] = ch;
3227                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3228                 break;
3229
3230         case 's':
3231                 fprintf(stderr, "started scanning\n");
3232                 do {
3233                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3234                                 set->page = bt_mappage (bt, set->latch);
3235                         else
3236                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3237                         bt_lockpage (BtLockRead, set->latch);
3238                         next = bt_getid (set->page->right);
3239
3240                         for( slot = 0; slot++ < set->page->cnt; )
3241                          if( next || slot < set->page->cnt )
3242                           if( !slotptr(set->page, slot)->dead ) {
3243                                 ptr = keyptr(set->page, slot);
3244                                 len = ptr->len;
3245
3246                             if( slotptr(set->page, slot)->type == Duplicate )
3247                                         len -= BtId;
3248
3249                                 fwrite (ptr->key, len, 1, stdout);
3250                                 val = valptr(set->page, slot);
3251                                 fwrite (val->value, val->len, 1, stdout);
3252                                 fputc ('\n', stdout);
3253                                 cnt++;
3254                            }
3255
3256                         bt_unlockpage (BtLockRead, set->latch);
3257                         bt_unpinlatch (set->latch);
3258                 } while( page_no = next );
3259
3260                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3261                 break;
3262
3263         case 'c':
3264 #ifdef unix
3265                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3266 #endif
3267                 fprintf(stderr, "started counting\n");
3268                 page_no = LEAF_page;
3269
3270                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3271                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3272                                 break;
3273
3274                         if( !bt->frame->free && !bt->frame->lvl )
3275                                 cnt += bt->frame->act;
3276
3277                         bt->reads++;
3278                         page_no++;
3279                 }
3280                 
3281                 cnt--;  // remove stopper key
3282                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3283                 break;
3284         }
3285
3286         bt_close (bt);
3287 #ifdef unix
3288         return NULL;
3289 #else
3290         return 0;
3291 #endif
3292 }
3293
3294 typedef struct timeval timer;
3295
3296 int main (int argc, char **argv)
3297 {
3298 int idx, cnt, len, slot, err;
3299 int segsize, bits = 16;
3300 double start, stop;
3301 #ifdef unix
3302 pthread_t *threads;
3303 #else
3304 HANDLE *threads;
3305 #endif
3306 ThreadArg *args;
3307 uint poolsize = 0;
3308 float elapsed;
3309 int num = 0;
3310 char key[1];
3311 BtMgr *mgr;
3312 BtKey *ptr;
3313 BtDb *bt;
3314
3315         if( argc < 3 ) {
3316                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3317                 fprintf (stderr, "  where page_bits is the page size in bits\n");
3318                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3319                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
3320                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3321                 exit(0);
3322         }
3323
3324         start = getCpuTime(0);
3325
3326         if( argc > 3 )
3327                 bits = atoi(argv[3]);
3328
3329         if( argc > 4 )
3330                 poolsize = atoi(argv[4]);
3331
3332         if( !poolsize )
3333                 fprintf (stderr, "Warning: no mapped_pool\n");
3334
3335         if( argc > 5 )
3336                 num = atoi(argv[5]);
3337
3338         cnt = argc - 6;
3339 #ifdef unix
3340         threads = malloc (cnt * sizeof(pthread_t));
3341 #else
3342         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3343 #endif
3344         args = malloc (cnt * sizeof(ThreadArg));
3345
3346         mgr = bt_mgr ((argv[1]), bits, poolsize);
3347
3348         if( !mgr ) {
3349                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3350                 exit (1);
3351         }
3352
3353         //      fire off threads
3354
3355         for( idx = 0; idx < cnt; idx++ ) {
3356                 args[idx].infile = argv[idx + 6];
3357                 args[idx].type = argv[2];
3358                 args[idx].mgr = mgr;
3359                 args[idx].num = num;
3360                 args[idx].idx = idx;
3361 #ifdef unix
3362                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3363                         fprintf(stderr, "Error creating thread %d\n", err);
3364 #else
3365                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3366 #endif
3367         }
3368
3369         //      wait for termination
3370
3371 #ifdef unix
3372         for( idx = 0; idx < cnt; idx++ )
3373                 pthread_join (threads[idx], NULL);
3374 #else
3375         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3376
3377         for( idx = 0; idx < cnt; idx++ )
3378                 CloseHandle(threads[idx]);
3379
3380 #endif
3381         bt_poolaudit(mgr);
3382         bt_mgrclose (mgr);
3383
3384         elapsed = getCpuTime(0) - start;
3385         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3386         elapsed = getCpuTime(1);
3387         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3388         elapsed = getCpuTime(2);
3389         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3390 }
3391
3392 #endif  //STANDALONE