]> pd.if.org Git - btree/blob - threadskv8.c
Introduce threadskv8.c for atomic-consistent inserts/deletes for a set of keys in...
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      traditional buffer pool manager
7 //      and ACID batched key updates
8
9 // 21 SEP 2014
10
11 // author: karl malbrain, malbrain@cal.berkeley.edu
12
13 /*
14 This work, including the source code, documentation
15 and related data, is placed into the public domain.
16
17 The orginal author is Karl Malbrain.
18
19 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
20 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
21 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
22 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
23 RESULTING FROM THE USE, MODIFICATION, OR
24 REDISTRIBUTION OF THIS SOFTWARE.
25 */
26
27 // Please see the project home page for documentation
28 // code.google.com/p/high-concurrency-btree
29
30 #define _FILE_OFFSET_BITS 64
31 #define _LARGEFILE64_SOURCE
32
33 #ifdef linux
34 #define _GNU_SOURCE
35 #endif
36
37 #ifdef unix
38 #include <unistd.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <fcntl.h>
42 #include <sys/time.h>
43 #include <sys/mman.h>
44 #include <errno.h>
45 #include <pthread.h>
46 #else
47 #define WIN32_LEAN_AND_MEAN
48 #include <windows.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51 #include <time.h>
52 #include <fcntl.h>
53 #include <process.h>
54 #include <intrin.h>
55 #endif
56
57 #include <memory.h>
58 #include <string.h>
59 #include <stddef.h>
60
61 typedef unsigned long long      uid;
62
63 #ifndef unix
64 typedef unsigned long long      off64_t;
65 typedef unsigned short          ushort;
66 typedef unsigned int            uint;
67 #endif
68
69 #define BT_ro 0x6f72    // ro
70 #define BT_rw 0x7772    // rw
71
72 #define BT_maxbits              24                                      // maximum page size in bits
73 #define BT_minbits              9                                       // minimum page size in bits
74 #define BT_minpage              (1 << BT_minbits)       // minimum page size
75 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
76
77 //  BTree page number constants
78 #define ALLOC_page              0       // allocation page
79 #define ROOT_page               1       // root of the btree
80 #define LEAF_page               2       // first page of leaves
81
82 //      Number of levels to create in a new BTree
83
84 #define MIN_lvl                 2
85
86 /*
87 There are six lock types for each node in four independent sets: 
88 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
89 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
90 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
91 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
92 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
93 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
94 */
95
96 typedef enum{
97         BtLockAccess = 1,
98         BtLockDelete = 2,
99         BtLockRead   = 4,
100         BtLockWrite  = 8,
101         BtLockParent = 16,
102         BtLockAtomic = 32
103 } BtLock;
104
105 //      definition for phase-fair reader/writer lock implementation
106
107 typedef struct {
108         ushort rin[1];
109         ushort rout[1];
110         ushort ticket[1];
111         ushort serving[1];
112 } RWLock;
113
114 #define PHID 0x1
115 #define PRES 0x2
116 #define MASK 0x3
117 #define RINC 0x4
118
119 //      definition for spin latch implementation
120
121 // exclusive is set for write access
122 // share is count of read accessors
123 // grant write lock when share == 0
124
125 volatile typedef struct {
126         ushort exclusive:1;
127         ushort pending:1;
128         ushort share:14;
129 } BtSpinLatch;
130
131 #define XCL 1
132 #define PEND 2
133 #define BOTH 3
134 #define SHARE 4
135
136 //  hash table entries
137
138 typedef struct {
139         volatile uint slot;             // Latch table entry at head of chain
140         BtSpinLatch latch[1];
141 } BtHashEntry;
142
143 //      latch manager table structure
144
145 typedef struct {
146         uid page_no;                    // latch set page number
147         RWLock readwr[1];               // read/write page lock
148         RWLock access[1];               // Access Intent/Page delete
149         RWLock parent[1];               // Posting of fence key in parent
150         RWLock atomic[1];               // Atomic update in progress
151         uint split;                             // right split page atomic insert
152         uint entry;                             // entry slot in latch table
153         uint next;                              // next entry in hash table chain
154         uint prev;                              // prev entry in hash table chain
155         volatile ushort pin;    // number of outstanding threads
156         ushort dirty:1;                 // page in cache is dirty
157 #ifdef unix
158         pthread_t atomictid;    // pid holding atomic lock
159 #else
160         uint atomictid;                 // pid holding atomic lock
161 #endif
162 } BtLatchSet;
163
164 //      Define the length of the page record numbers
165
166 #define BtId 6
167
168 //      Page key slot definition.
169
170 //      Keys are marked dead, but remain on the page until
171 //      it cleanup is called. The fence key (highest key) for
172 //      a leaf page is always present, even after cleanup.
173
174 //      Slot types
175
176 //      In addition to the Unique keys that occupy slots
177 //      there are Librarian and Duplicate key
178 //      slots occupying the key slot array.
179
180 //      The Librarian slots are dead keys that
181 //      serve as filler, available to add new Unique
182 //      or Dup slots that are inserted into the B-tree.
183
184 //      The Duplicate slots have had their key bytes extended
185 //      by 6 bytes to contain a binary duplicate key uniqueifier.
186
187 typedef enum {
188         Unique,
189         Librarian,
190         Duplicate,
191         Delete
192 } BtSlotType;
193
194 typedef struct {
195         uint off:BT_maxbits;    // page offset for key start
196         uint type:3;                    // type of slot
197         uint dead:1;                    // set for deleted slot
198 } BtSlot;
199
200 //      The key structure occupies space at the upper end of
201 //      each page.  It's a length byte followed by the key
202 //      bytes.
203
204 typedef struct {
205         unsigned char len;              // this can be changed to a ushort or uint
206         unsigned char key[0];
207 } BtKey;
208
209 //      the value structure also occupies space at the upper
210 //      end of the page. Each key is immediately followed by a value.
211
212 typedef struct {
213         unsigned char len;              // this can be changed to a ushort or uint
214         unsigned char value[0];
215 } BtVal;
216
217 #define BT_maxkey       255             // maximum number of bytes in a key
218 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
219
220 //      The first part of an index page.
221 //      It is immediately followed
222 //      by the BtSlot array of keys.
223
224 //      note that this structure size
225 //      must be a multiple of 8 bytes
226 //      in order to place dups correctly.
227
228 typedef struct BtPage_ {
229         uint cnt;                                       // count of keys in page
230         uint act;                                       // count of active keys
231         uint min;                                       // next key offset
232         uint garbage;                           // page garbage in bytes
233         unsigned char bits:7;           // page size in bits
234         unsigned char free:1;           // page is on free chain
235         unsigned char lvl:7;            // level of page
236         unsigned char kill:1;           // page is being deleted
237         unsigned char right[BtId];      // page number to right
238 } *BtPage;
239
240 //  The loadpage interface object
241
242 typedef struct {
243         BtPage page;            // current page pointer
244         BtLatchSet *latch;      // current page latch set
245 } BtPageSet;
246
247 //      structure for latch manager on ALLOC_page
248
249 typedef struct {
250         struct BtPage_ alloc[1];        // next page_no in right ptr
251         unsigned long long dups[1];     // global duplicate key uniqueifier
252         unsigned char chain[BtId];      // head of free page_nos chain
253 } BtPageZero;
254
255 //      The object structure for Btree access
256
257 typedef struct {
258         uint page_size;                         // page size    
259         uint page_bits;                         // page size in bits    
260 #ifdef unix
261         int idx;
262 #else
263         HANDLE idx;
264 #endif
265         BtPageZero *pagezero;           // mapped allocation page
266         BtSpinLatch lock[1];            // allocation area lite latch
267         uint latchdeployed;                     // highest number of latch entries deployed
268         uint nlatchpage;                        // number of latch pages at BT_latch
269         uint latchtotal;                        // number of page latch entries
270         uint latchhash;                         // number of latch hash table slots
271         uint latchvictim;                       // next latch entry to examine
272         BtHashEntry *hashtable;         // the buffer pool hash table entries
273         BtLatchSet *latchsets;          // mapped latch set from buffer pool
274         unsigned char *pagepool;        // mapped to the buffer pool pages
275 #ifndef unix
276         HANDLE halloc;                          // allocation handle
277         HANDLE hpool;                           // buffer pool handle
278 #endif
279 } BtMgr;
280
281 typedef struct {
282         BtMgr *mgr;                             // buffer manager for thread
283         BtPage cursor;                  // cached frame for start/next (never mapped)
284         BtPage frame;                   // spare frame for the page split (never mapped)
285         uid cursor_page;                                // current cursor page number   
286         unsigned char *mem;                             // frame, cursor, page memory buffer
287         unsigned char key[BT_keyarray]; // last found complete key
288         int found;                              // last delete or insert was found
289         int err;                                // last error
290         int reads, writes;              // number of reads and writes from the btree
291 } BtDb;
292
293 typedef enum {
294         BTERR_ok = 0,
295         BTERR_struct,
296         BTERR_ovflw,
297         BTERR_lock,
298         BTERR_map,
299         BTERR_read,
300         BTERR_wrt,
301         BTERR_atomic
302 } BTERR;
303
304 #define CLOCK_bit 0x8000
305
306 // B-Tree functions
307 extern void bt_close (BtDb *bt);
308 extern BtDb *bt_open (BtMgr *mgr);
309 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
310 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
311 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
312 extern BtKey *bt_foundkey (BtDb *bt);
313 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
314 extern uint bt_nextkey   (BtDb *bt, uint slot);
315
316 //      manager functions
317 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
318 void bt_mgrclose (BtMgr *mgr);
319
320 //  Helper functions to return slot values
321 //      from the cursor page.
322
323 extern BtKey *bt_key (BtDb *bt, uint slot);
324 extern BtVal *bt_val (BtDb *bt, uint slot);
325
326 //  The page is allocated from low and hi ends.
327 //  The key slots are allocated from the bottom,
328 //      while the text and value of the key
329 //  are allocated from the top.  When the two
330 //  areas meet, the page is split into two.
331
332 //  A key consists of a length byte, two bytes of
333 //  index number (0 - 65535), and up to 253 bytes
334 //  of key value.
335
336 //  Associated with each key is a value byte string
337 //      containing any value desired.
338
339 //  The b-tree root is always located at page 1.
340 //      The first leaf page of level zero is always
341 //      located on page 2.
342
343 //      The b-tree pages are linked with next
344 //      pointers to facilitate enumerators,
345 //      and provide for concurrency.
346
347 //      When to root page fills, it is split in two and
348 //      the tree height is raised by a new root at page
349 //      one with two keys.
350
351 //      Deleted keys are marked with a dead bit until
352 //      page cleanup. The fence key for a leaf node is
353 //      always present
354
355 //  To achieve maximum concurrency one page is locked at a time
356 //  as the tree is traversed to find leaf key in question. The right
357 //  page numbers are used in cases where the page is being split,
358 //      or consolidated.
359
360 //  Page 0 is dedicated to lock for new page extensions,
361 //      and chains empty pages together for reuse. It also
362 //      contains the latch manager hash table.
363
364 //      The ParentModification lock on a node is obtained to serialize posting
365 //      or changing the fence key for a node.
366
367 //      Empty pages are chained together through the ALLOC page and reused.
368
369 //      Access macros to address slot and key values from the page
370 //      Page slots use 1 based indexing.
371
372 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
373 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
374 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
375
376 void bt_putid(unsigned char *dest, uid id)
377 {
378 int i = BtId;
379
380         while( i-- )
381                 dest[i] = (unsigned char)id, id >>= 8;
382 }
383
384 uid bt_getid(unsigned char *src)
385 {
386 uid id = 0;
387 int i;
388
389         for( i = 0; i < BtId; i++ )
390                 id <<= 8, id |= *src++; 
391
392         return id;
393 }
394
395 uid bt_newdup (BtDb *bt)
396 {
397 #ifdef unix
398         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
399 #else
400         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
401 #endif
402 }
403
404 //      Phase-Fair reader/writer lock implementation
405
406 void WriteLock (RWLock *lock)
407 {
408 ushort w, r, tix;
409
410 #ifdef unix
411         tix = __sync_fetch_and_add (lock->ticket, 1);
412 #else
413         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
414 #endif
415         // wait for our ticket to come up
416
417         while( tix != lock->serving[0] )
418 #ifdef unix
419                 sched_yield();
420 #else
421                 SwitchToThread ();
422 #endif
423
424         w = PRES | (tix & PHID);
425 #ifdef  unix
426         r = __sync_fetch_and_add (lock->rin, w);
427 #else
428         r = _InterlockedExchangeAdd16 (lock->rin, w);
429 #endif
430         while( r != *lock->rout )
431 #ifdef unix
432                 sched_yield();
433 #else
434                 SwitchToThread();
435 #endif
436 }
437
438 void WriteRelease (RWLock *lock)
439 {
440 #ifdef unix
441         __sync_fetch_and_and (lock->rin, ~MASK);
442 #else
443         _InterlockedAnd16 (lock->rin, ~MASK);
444 #endif
445         lock->serving[0]++;
446 }
447
448 void ReadLock (RWLock *lock)
449 {
450 ushort w;
451 #ifdef unix
452         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
453 #else
454         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
455 #endif
456         if( w )
457           while( w == (*lock->rin & MASK) )
458 #ifdef unix
459                 sched_yield ();
460 #else
461                 SwitchToThread ();
462 #endif
463 }
464
465 void ReadRelease (RWLock *lock)
466 {
467 #ifdef unix
468         __sync_fetch_and_add (lock->rout, RINC);
469 #else
470         _InterlockedExchangeAdd16 (lock->rout, RINC);
471 #endif
472 }
473
474 //      Spin Latch Manager
475
476 //      wait until write lock mode is clear
477 //      and add 1 to the share count
478
479 void bt_spinreadlock(BtSpinLatch *latch)
480 {
481 ushort prev;
482
483   do {
484 #ifdef unix
485         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
486 #else
487         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
488 #endif
489         //  see if exclusive request is granted or pending
490
491         if( !(prev & BOTH) )
492                 return;
493 #ifdef unix
494         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
495 #else
496         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
497 #endif
498 #ifdef  unix
499   } while( sched_yield(), 1 );
500 #else
501   } while( SwitchToThread(), 1 );
502 #endif
503 }
504
505 //      wait for other read and write latches to relinquish
506
507 void bt_spinwritelock(BtSpinLatch *latch)
508 {
509 ushort prev;
510
511   do {
512 #ifdef  unix
513         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
514 #else
515         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
516 #endif
517         if( !(prev & XCL) )
518           if( !(prev & ~BOTH) )
519                 return;
520           else
521 #ifdef unix
522                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
523 #else
524                 _InterlockedAnd16((ushort *)latch, ~XCL);
525 #endif
526 #ifdef  unix
527   } while( sched_yield(), 1 );
528 #else
529   } while( SwitchToThread(), 1 );
530 #endif
531 }
532
533 //      try to obtain write lock
534
535 //      return 1 if obtained,
536 //              0 otherwise
537
538 int bt_spinwritetry(BtSpinLatch *latch)
539 {
540 ushort prev;
541
542 #ifdef  unix
543         prev = __sync_fetch_and_or((ushort *)latch, XCL);
544 #else
545         prev = _InterlockedOr16((ushort *)latch, XCL);
546 #endif
547         //      take write access if all bits are clear
548
549         if( !(prev & XCL) )
550           if( !(prev & ~BOTH) )
551                 return 1;
552           else
553 #ifdef unix
554                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
555 #else
556                 _InterlockedAnd16((ushort *)latch, ~XCL);
557 #endif
558         return 0;
559 }
560
561 //      clear write mode
562
563 void bt_spinreleasewrite(BtSpinLatch *latch)
564 {
565 #ifdef unix
566         __sync_fetch_and_and((ushort *)latch, ~BOTH);
567 #else
568         _InterlockedAnd16((ushort *)latch, ~BOTH);
569 #endif
570 }
571
572 //      decrement reader count
573
574 void bt_spinreleaseread(BtSpinLatch *latch)
575 {
576 #ifdef unix
577         __sync_fetch_and_add((ushort *)latch, -SHARE);
578 #else
579         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
580 #endif
581 }
582
583 //      read page from permanent location in Btree file
584
585 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
586 {
587 off64_t off = page_no << mgr->page_bits;
588
589 #ifdef unix
590         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
591                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
592                 return BTERR_read;
593         }
594 #else
595 OVERLAPPED ovl[1];
596 uint amt[1];
597
598         memset (ovl, 0, sizeof(OVERLAPPED));
599         ovl->Offset = off;
600         ovl->OffsetHigh = off >> 32;
601
602         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
603                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
604                 return BTERR_read;
605         }
606         if( *amt <  mgr->page_size ) {
607                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
608                 return BTERR_read;
609         }
610 #endif
611         return 0;
612 }
613
614 //      write page to permanent location in Btree file
615 //      clear the dirty bit
616
617 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
618 {
619 off64_t off = page_no << mgr->page_bits;
620
621 #ifdef unix
622         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
623                 return BTERR_wrt;
624 #else
625 OVERLAPPED ovl[1];
626 uint amt[1];
627
628         memset (ovl, 0, sizeof(OVERLAPPED));
629         ovl->Offset = off;
630         ovl->OffsetHigh = off >> 32;
631
632         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
633                 return BTERR_wrt;
634
635         if( *amt <  mgr->page_size )
636                 return BTERR_wrt;
637 #endif
638         return 0;
639 }
640
641 //      link latch table entry into head of latch hash table
642
643 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
644 {
645 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
646 BtLatchSet *latch = bt->mgr->latchsets + slot;
647
648         if( latch->next = bt->mgr->hashtable[hashidx].slot )
649                 bt->mgr->latchsets[latch->next].prev = slot;
650
651         bt->mgr->hashtable[hashidx].slot = slot;
652         memset (&latch->atomictid, 0, sizeof(latch->atomictid));
653         latch->page_no = page_no;
654         latch->entry = slot;
655         latch->prev = 0;
656         latch->pin = 1;
657
658         if( loadit )
659           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
660                 return bt->err;
661           else
662                 bt->reads++;
663
664         return bt->err = 0;
665 }
666
667 //      set CLOCK bit in latch
668 //      decrement pin count
669
670 void bt_unpinlatch (BtLatchSet *latch)
671 {
672 #ifdef unix
673         if( ~latch->pin & CLOCK_bit )
674                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
675         __sync_fetch_and_add(&latch->pin, -1);
676 #else
677         if( ~latch->pin & CLOCK_bit )
678                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
679         _InterlockedDecrement16 (&latch->pin);
680 #endif
681 }
682
683 //  return the btree cached page address
684
685 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
686 {
687 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
688
689         return page;
690 }
691
692 //      find existing latchset or inspire new one
693 //      return with latchset pinned
694
695 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
696 {
697 uint hashidx = page_no % bt->mgr->latchhash;
698 BtLatchSet *latch;
699 uint slot, idx;
700 uint lvl, cnt;
701 off64_t off;
702 uint amt[1];
703 BtPage page;
704
705   //  try to find our entry
706
707   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
708
709   if( slot = bt->mgr->hashtable[hashidx].slot ) do
710   {
711         latch = bt->mgr->latchsets + slot;
712         if( page_no == latch->page_no )
713                 break;
714   } while( slot = latch->next );
715
716   //  found our entry
717   //  increment clock
718
719   if( slot ) {
720         latch = bt->mgr->latchsets + slot;
721 #ifdef unix
722         __sync_fetch_and_add(&latch->pin, 1);
723 #else
724         _InterlockedIncrement16 (&latch->pin);
725 #endif
726         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
727         return latch;
728   }
729
730         //  see if there are any unused pool entries
731 #ifdef unix
732         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
733 #else
734         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
735 #endif
736
737         if( slot < bt->mgr->latchtotal ) {
738                 latch = bt->mgr->latchsets + slot;
739                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
740                         return NULL;
741                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
742                 return latch;
743         }
744
745 #ifdef unix
746         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
747 #else
748         _InterlockedDecrement (&bt->mgr->latchdeployed);
749 #endif
750   //  find and reuse previous entry on victim
751
752   while( 1 ) {
753 #ifdef unix
754         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
755 #else
756         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
757 #endif
758         // try to get write lock on hash chain
759         //      skip entry if not obtained
760         //      or has outstanding pins
761
762         slot %= bt->mgr->latchtotal;
763
764         if( !slot )
765                 continue;
766
767         latch = bt->mgr->latchsets + slot;
768         idx = latch->page_no % bt->mgr->latchhash;
769
770         //      see we are on same chain as hashidx
771
772         if( idx == hashidx )
773                 continue;
774
775         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
776                 continue;
777
778         //  skip this slot if it is pinned
779         //      or the CLOCK bit is set
780
781         if( latch->pin ) {
782           if( latch->pin & CLOCK_bit ) {
783 #ifdef unix
784                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
785 #else
786                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
787 #endif
788           }
789           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
790           continue;
791         }
792
793         //  update permanent page area in btree from buffer pool
794
795         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
796
797         if( latch->dirty )
798           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
799                 return NULL;
800           else
801                 latch->dirty = 0, bt->writes++;
802
803         //  unlink our available slot from its hash chain
804
805         if( latch->prev )
806                 bt->mgr->latchsets[latch->prev].next = latch->next;
807         else
808                 bt->mgr->hashtable[idx].slot = latch->next;
809
810         if( latch->next )
811                 bt->mgr->latchsets[latch->next].prev = latch->prev;
812
813         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
814
815         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
816                 return NULL;
817
818         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
819         return latch;
820   }
821 }
822
823 void bt_mgrclose (BtMgr *mgr)
824 {
825 BtLatchSet *latch;
826 uint num = 0;
827 BtPage page;
828 uint slot;
829
830         //      flush dirty pool pages to the btree
831
832         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
833                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
834                 latch = mgr->latchsets + slot;
835
836                 if( latch->dirty ) {
837                         bt_writepage(mgr, page, latch->page_no);
838                         latch->dirty = 0, num++;
839                 }
840 //              madvise (page, mgr->page_size, MADV_DONTNEED);
841         }
842
843         fprintf(stderr, "%d buffer pool pages flushed\n", num);
844
845 #ifdef unix
846         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
847         munmap (mgr->pagezero, mgr->page_size);
848 #else
849         FlushViewOfFile(mgr->pagezero, 0);
850         UnmapViewOfFile(mgr->pagezero);
851         UnmapViewOfFile(mgr->hashtable);
852         CloseHandle(mgr->halloc);
853         CloseHandle(mgr->hpool);
854 #endif
855 #ifdef unix
856         close (mgr->idx);
857         free (mgr);
858 #else
859         FlushFileBuffers(mgr->idx);
860         CloseHandle(mgr->idx);
861         GlobalFree (mgr);
862 #endif
863 }
864
865 //      close and release memory
866
867 void bt_close (BtDb *bt)
868 {
869 #ifdef unix
870         if( bt->mem )
871                 free (bt->mem);
872 #else
873         if( bt->mem)
874                 VirtualFree (bt->mem, 0, MEM_RELEASE);
875 #endif
876         free (bt);
877 }
878
879 //  open/create new btree buffer manager
880
881 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
882 //              size of page pool (e.g. 262144)
883
884 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
885 {
886 uint lvl, attr, last, slot, idx;
887 uint nlatchpage, latchhash;
888 unsigned char value[BtId];
889 int flag, initit = 0;
890 BtPageZero *pagezero;
891 off64_t size;
892 uint amt[1];
893 BtMgr* mgr;
894 BtKey* key;
895 BtVal *val;
896
897         // determine sanity of page size and buffer pool
898
899         if( bits > BT_maxbits )
900                 bits = BT_maxbits;
901         else if( bits < BT_minbits )
902                 bits = BT_minbits;
903
904         if( nodemax < 16 ) {
905                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
906                 return NULL;
907         }
908
909 #ifdef unix
910         mgr = calloc (1, sizeof(BtMgr));
911
912         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
913
914         if( mgr->idx == -1 ) {
915                 fprintf (stderr, "Unable to open btree file\n");
916                 return free(mgr), NULL;
917         }
918 #else
919         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
920         attr = FILE_ATTRIBUTE_NORMAL;
921         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
922
923         if( mgr->idx == INVALID_HANDLE_VALUE )
924                 return GlobalFree(mgr), NULL;
925 #endif
926
927 #ifdef unix
928         pagezero = valloc (BT_maxpage);
929         *amt = 0;
930
931         // read minimum page size to get root info
932         //      to support raw disk partition files
933         //      check if bits == 0 on the disk.
934
935         if( size = lseek (mgr->idx, 0L, 2) )
936                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
937                         if( pagezero->alloc->bits )
938                                 bits = pagezero->alloc->bits;
939                         else
940                                 initit = 1;
941                 else
942                         return free(mgr), free(pagezero), NULL;
943         else
944                 initit = 1;
945 #else
946         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
947         size = GetFileSize(mgr->idx, amt);
948
949         if( size || *amt ) {
950                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
951                         return bt_mgrclose (mgr), NULL;
952                 bits = pagezero->alloc->bits;
953         } else
954                 initit = 1;
955 #endif
956
957         mgr->page_size = 1 << bits;
958         mgr->page_bits = bits;
959
960         //  calculate number of latch hash table entries
961
962         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
963         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
964
965         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
966         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
967         mgr->latchtotal = nodemax;
968
969         if( !initit )
970                 goto mgrlatch;
971
972         // initialize an empty b-tree with latch page, root page, page of leaves
973         // and page(s) of latches and page pool cache
974
975         memset (pagezero, 0, 1 << bits);
976         pagezero->alloc->bits = mgr->page_bits;
977         bt_putid(pagezero->alloc->right, MIN_lvl+1);
978
979         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
980                 fprintf (stderr, "Unable to create btree page zero\n");
981                 return bt_mgrclose (mgr), NULL;
982         }
983
984         memset (pagezero, 0, 1 << bits);
985         pagezero->alloc->bits = mgr->page_bits;
986
987         for( lvl=MIN_lvl; lvl--; ) {
988                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
989                 key = keyptr(pagezero->alloc, 1);
990                 key->len = 2;           // create stopper key
991                 key->key[0] = 0xff;
992                 key->key[1] = 0xff;
993
994                 bt_putid(value, MIN_lvl - lvl + 1);
995                 val = valptr(pagezero->alloc, 1);
996                 val->len = lvl ? BtId : 0;
997                 memcpy (val->value, value, val->len);
998
999                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1000                 pagezero->alloc->lvl = lvl;
1001                 pagezero->alloc->cnt = 1;
1002                 pagezero->alloc->act = 1;
1003
1004                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1005                         fprintf (stderr, "Unable to create btree page zero\n");
1006                         return bt_mgrclose (mgr), NULL;
1007                 }
1008         }
1009
1010 mgrlatch:
1011 #ifdef unix
1012         free (pagezero);
1013 #else
1014         VirtualFree (pagezero, 0, MEM_RELEASE);
1015 #endif
1016 #ifdef unix
1017         // mlock the pagezero page
1018
1019         flag = PROT_READ | PROT_WRITE;
1020         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1021         if( mgr->pagezero == MAP_FAILED ) {
1022                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1023                 return bt_mgrclose (mgr), NULL;
1024         }
1025         mlock (mgr->pagezero, mgr->page_size);
1026
1027         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1028         if( mgr->hashtable == MAP_FAILED ) {
1029                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1030                 return bt_mgrclose (mgr), NULL;
1031         }
1032 #else
1033         flag = PAGE_READWRITE;
1034         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1035         if( !mgr->halloc ) {
1036                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1037                 return bt_mgrclose (mgr), NULL;
1038         }
1039
1040         flag = FILE_MAP_WRITE;
1041         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1042         if( !mgr->pagezero ) {
1043                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1044                 return bt_mgrclose (mgr), NULL;
1045         }
1046
1047         flag = PAGE_READWRITE;
1048         size = (uid)mgr->nlatchpage << mgr->page_bits;
1049         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1050         if( !mgr->hpool ) {
1051                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1052                 return bt_mgrclose (mgr), NULL;
1053         }
1054
1055         flag = FILE_MAP_WRITE;
1056         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1057         if( !mgr->hashtable ) {
1058                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1059                 return bt_mgrclose (mgr), NULL;
1060         }
1061 #endif
1062
1063         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1064         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1065
1066         return mgr;
1067 }
1068
1069 //      open BTree access method
1070 //      based on buffer manager
1071
1072 BtDb *bt_open (BtMgr *mgr)
1073 {
1074 BtDb *bt = malloc (sizeof(*bt));
1075
1076         memset (bt, 0, sizeof(*bt));
1077         bt->mgr = mgr;
1078 #ifdef unix
1079         bt->mem = valloc (2 *mgr->page_size);
1080 #else
1081         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1082 #endif
1083         bt->frame = (BtPage)bt->mem;
1084         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1085         return bt;
1086 }
1087
1088 //  compare two keys, return > 0, = 0, or < 0
1089 //  =0: keys are same
1090 //  -1: key2 > key1
1091 //  +1: key2 < key1
1092 //  as the comparison value
1093
1094 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1095 {
1096 uint len1 = key1->len;
1097 int ans;
1098
1099         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1100                 return ans;
1101
1102         if( len1 > len2 )
1103                 return 1;
1104         if( len1 < len2 )
1105                 return -1;
1106
1107         return 0;
1108 }
1109
1110 // place write, read, or parent lock on requested page_no.
1111
1112 void bt_lockpage(BtLock mode, BtLatchSet *set)
1113 {
1114         switch( mode ) {
1115         case BtLockRead:
1116                 ReadLock (set->readwr);
1117                 break;
1118         case BtLockWrite:
1119                 WriteLock (set->readwr);
1120                 break;
1121         case BtLockAccess:
1122                 ReadLock (set->access);
1123                 break;
1124         case BtLockDelete:
1125                 WriteLock (set->access);
1126                 break;
1127         case BtLockParent:
1128                 WriteLock (set->parent);
1129                 break;
1130         case BtLockAtomic:
1131                 WriteLock (set->atomic);
1132                 break;
1133         case BtLockAtomic | BtLockRead:
1134                 WriteLock (set->atomic);
1135                 ReadLock (set->readwr);
1136                 break;
1137         }
1138 }
1139
1140 // remove write, read, or parent lock on requested page
1141
1142 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1143 {
1144         switch( mode ) {
1145         case BtLockRead:
1146                 ReadRelease (set->readwr);
1147                 break;
1148         case BtLockWrite:
1149                 WriteRelease (set->readwr);
1150                 break;
1151         case BtLockAccess:
1152                 ReadRelease (set->access);
1153                 break;
1154         case BtLockDelete:
1155                 WriteRelease (set->access);
1156                 break;
1157         case BtLockParent:
1158                 WriteRelease (set->parent);
1159                 break;
1160         case BtLockAtomic:
1161                 memset (&set->atomictid, 0, sizeof(set->atomictid));
1162                 WriteRelease (set->atomic);
1163                 break;
1164         case BtLockAtomic | BtLockRead:
1165                 ReadRelease (set->readwr);
1166                 memset (&set->atomictid, 0, sizeof(set->atomictid));
1167                 WriteRelease (set->atomic);
1168                 break;
1169         }
1170 }
1171
1172 //      allocate a new page
1173 //      return with page latched, but unlocked.
1174
1175 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1176 {
1177 uid page_no;
1178 int blk;
1179
1180         //      lock allocation page
1181
1182         bt_spinwritelock(bt->mgr->lock);
1183
1184         // use empty chain first
1185         // else allocate empty page
1186
1187         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1188                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1189                         set->page = bt_mappage (bt, set->latch);
1190                 else
1191                         return bt->err = BTERR_struct, -1;
1192
1193                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1194                 bt_spinreleasewrite(bt->mgr->lock);
1195
1196                 memcpy (set->page, contents, bt->mgr->page_size);
1197                 set->latch->dirty = 1;
1198                 return 0;
1199         }
1200
1201         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1202         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1203
1204         // unlock allocation latch
1205
1206         bt_spinreleasewrite(bt->mgr->lock);
1207
1208         //      don't load cache from btree page
1209
1210         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1211                 set->page = bt_mappage (bt, set->latch);
1212         else
1213                 return bt->err = BTERR_struct;
1214
1215         memcpy (set->page, contents, bt->mgr->page_size);
1216         set->latch->dirty = 1;
1217         return 0;
1218 }
1219
1220 //  find slot in page for given key at a given level
1221
1222 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1223 {
1224 uint diff, higher = set->page->cnt, low = 1, slot;
1225 uint good = 0;
1226
1227         //        make stopper key an infinite fence value
1228
1229         if( bt_getid (set->page->right) )
1230                 higher++;
1231         else
1232                 good++;
1233
1234         //      low is the lowest candidate.
1235         //  loop ends when they meet
1236
1237         //  higher is already
1238         //      tested as .ge. the passed key.
1239
1240         while( diff = higher - low ) {
1241                 slot = low + ( diff >> 1 );
1242                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1243                         low = slot + 1;
1244                 else
1245                         higher = slot, good++;
1246         }
1247
1248         //      return zero if key is on right link page
1249
1250         return good ? higher : 0;
1251 }
1252
1253 //  find and load page at given level for given key
1254 //      leave page rd or wr locked as requested
1255
1256 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1257 {
1258 uid page_no = ROOT_page, prevpage = 0;
1259 uint drill = 0xff, slot;
1260 BtLatchSet *prevlatch;
1261 uint mode, prevmode;
1262
1263   //  start at root of btree and drill down
1264
1265   do {
1266         // determine lock mode of drill level
1267         mode = (drill == lvl) ? lock : BtLockRead; 
1268
1269         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1270           return 0;
1271
1272         // obtain access lock using lock chaining with Access mode
1273
1274         if( page_no > ROOT_page )
1275           bt_lockpage(BtLockAccess, set->latch);
1276
1277         set->page = bt_mappage (bt, set->latch);
1278
1279         //      release & unpin parent page
1280
1281         if( prevpage ) {
1282           bt_unlockpage(prevmode, prevlatch);
1283           bt_unpinlatch (prevlatch);
1284           prevpage = 0;
1285         }
1286
1287         //      skip Atomic lock on leaf page if already held
1288
1289         if( !drill )
1290          if( mode & BtLockAtomic )
1291           if( pthread_equal (set->latch->atomictid, pthread_self()) )
1292                 mode &= ~BtLockAtomic;
1293
1294         // obtain mode lock using lock chaining through AccessLock
1295
1296         bt_lockpage(mode, set->latch);
1297
1298         if( mode & BtLockAtomic )
1299                 set->latch->atomictid = pthread_self();
1300
1301         if( set->page->free )
1302                 return bt->err = BTERR_struct, 0;
1303
1304         if( page_no > ROOT_page )
1305           bt_unlockpage(BtLockAccess, set->latch);
1306
1307         // re-read and re-lock root after determining actual level of root
1308
1309         if( set->page->lvl != drill) {
1310                 if( set->latch->page_no != ROOT_page )
1311                         return bt->err = BTERR_struct, 0;
1312                         
1313                 drill = set->page->lvl;
1314
1315                 if( lock != BtLockRead && drill == lvl ) {
1316                   bt_unlockpage(mode, set->latch);
1317                   bt_unpinlatch (set->latch);
1318                   continue;
1319                 }
1320         }
1321
1322         prevpage = set->latch->page_no;
1323         prevlatch = set->latch;
1324         prevmode = mode;
1325
1326         //  find key on page at this level
1327         //  and descend to requested level
1328
1329         if( set->page->kill )
1330           goto slideright;
1331
1332         if( slot = bt_findslot (set, key, len) ) {
1333           if( drill == lvl )
1334                 return slot;
1335
1336           while( slotptr(set->page, slot)->dead )
1337                 if( slot++ < set->page->cnt )
1338                         continue;
1339                 else
1340                         goto slideright;
1341
1342           page_no = bt_getid(valptr(set->page, slot)->value);
1343           drill--;
1344           continue;
1345         }
1346
1347         //  or slide right into next page
1348
1349 slideright:
1350         page_no = bt_getid(set->page->right);
1351
1352   } while( page_no );
1353
1354   // return error on end of right chain
1355
1356   bt->err = BTERR_struct;
1357   return 0;     // return error
1358 }
1359
1360 //      return page to free list
1361 //      page must be delete & write locked
1362
1363 void bt_freepage (BtDb *bt, BtPageSet *set)
1364 {
1365         //      lock allocation page
1366
1367         bt_spinwritelock (bt->mgr->lock);
1368
1369         //      store chain
1370
1371         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1372         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1373         set->latch->dirty = 1;
1374         set->page->free = 1;
1375
1376         // unlock released page
1377
1378         bt_unlockpage (BtLockDelete, set->latch);
1379         bt_unlockpage (BtLockWrite, set->latch);
1380         bt_unpinlatch (set->latch);
1381
1382         // unlock allocation page
1383
1384         bt_spinreleasewrite (bt->mgr->lock);
1385 }
1386
1387 //      a fence key was deleted from a page
1388 //      push new fence value upwards
1389
1390 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1391 {
1392 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1393 unsigned char value[BtId];
1394 BtKey* ptr;
1395 uint idx;
1396
1397         //      remove the old fence value
1398
1399         ptr = keyptr(set->page, set->page->cnt);
1400         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1401         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1402         set->latch->dirty = 1;
1403
1404         //  cache new fence value
1405
1406         ptr = keyptr(set->page, set->page->cnt);
1407         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1408
1409         bt_lockpage (BtLockParent, set->latch);
1410         bt_unlockpage (BtLockWrite, set->latch);
1411
1412         //      insert new (now smaller) fence key
1413
1414         bt_putid (value, set->latch->page_no);
1415         ptr = (BtKey*)leftkey;
1416
1417         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1418           return bt->err;
1419
1420         //      now delete old fence key
1421
1422         ptr = (BtKey*)rightkey;
1423
1424         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1425                 return bt->err;
1426
1427         bt_unlockpage (BtLockParent, set->latch);
1428         bt_unpinlatch(set->latch);
1429         return 0;
1430 }
1431
1432 //      root has a single child
1433 //      collapse a level from the tree
1434
1435 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1436 {
1437 BtPageSet child[1];
1438 uid page_no;
1439 uint idx;
1440
1441   // find the child entry and promote as new root contents
1442
1443   do {
1444         for( idx = 0; idx++ < root->page->cnt; )
1445           if( !slotptr(root->page, idx)->dead )
1446                 break;
1447
1448         page_no = bt_getid (valptr(root->page, idx)->value);
1449
1450         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1451                 child->page = bt_mappage (bt, child->latch);
1452         else
1453                 return bt->err;
1454
1455         bt_lockpage (BtLockDelete, child->latch);
1456         bt_lockpage (BtLockWrite, child->latch);
1457
1458         memcpy (root->page, child->page, bt->mgr->page_size);
1459         root->latch->dirty = 1;
1460
1461         bt_freepage (bt, child);
1462
1463   } while( root->page->lvl > 1 && root->page->act == 1 );
1464
1465   bt_unlockpage (BtLockWrite, root->latch);
1466   bt_unpinlatch (root->latch);
1467   return 0;
1468 }
1469
1470 //  delete a page and manage keys
1471 //  call with page writelocked
1472 //      returns with page unpinned
1473
1474 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1475 {
1476 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1477 unsigned char value[BtId];
1478 uint lvl = set->page->lvl;
1479 BtPageSet right[1];
1480 uid page_no;
1481 BtKey *ptr;
1482
1483         //      cache copy of fence key
1484         //      to post in parent
1485
1486         ptr = keyptr(set->page, set->page->cnt);
1487         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1488
1489         //      obtain lock on right page
1490
1491         page_no = bt_getid(set->page->right);
1492
1493         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1494                 right->page = bt_mappage (bt, right->latch);
1495         else
1496                 return 0;
1497
1498         bt_lockpage (BtLockWrite, right->latch);
1499
1500         // cache copy of key to update
1501
1502         ptr = keyptr(right->page, right->page->cnt);
1503         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1504
1505         if( right->page->kill )
1506                 return bt->err = BTERR_struct;
1507
1508         // pull contents of right peer into our empty page
1509
1510         memcpy (set->page, right->page, bt->mgr->page_size);
1511         set->latch->dirty = 1;
1512
1513         // mark right page deleted and point it to left page
1514         //      until we can post parent updates that remove access
1515         //      to the deleted page.
1516
1517         bt_putid (right->page->right, set->latch->page_no);
1518         right->latch->dirty = 1;
1519         right->page->kill = 1;
1520
1521         bt_lockpage (BtLockParent, right->latch);
1522         bt_unlockpage (BtLockWrite, right->latch);
1523
1524         bt_lockpage (BtLockParent, set->latch);
1525         bt_unlockpage (BtLockWrite, set->latch);
1526
1527         // redirect higher key directly to our new node contents
1528
1529         bt_putid (value, set->latch->page_no);
1530         ptr = (BtKey*)higherfence;
1531
1532         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1533           return bt->err;
1534
1535         //      delete old lower key to our node
1536
1537         ptr = (BtKey*)lowerfence;
1538
1539         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1540           return bt->err;
1541
1542         //      obtain delete and write locks to right node
1543
1544         bt_unlockpage (BtLockParent, right->latch);
1545         bt_lockpage (BtLockDelete, right->latch);
1546         bt_lockpage (BtLockWrite, right->latch);
1547         bt_freepage (bt, right);
1548
1549         bt_unlockpage (BtLockParent, set->latch);
1550         bt_unpinlatch (set->latch);
1551         bt->found = 1;
1552         return 0;
1553 }
1554
1555 //  find and delete key on page by marking delete flag bit
1556 //  if page becomes empty, delete it from the btree
1557
1558 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1559 {
1560 uint slot, idx, found, fence;
1561 BtPageSet set[1];
1562 BtKey *ptr;
1563 BtVal *val;
1564
1565         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1566                 ptr = keyptr(set->page, slot);
1567         else
1568                 return bt->err;
1569
1570         // if librarian slot, advance to real slot
1571
1572         if( slotptr(set->page, slot)->type == Librarian )
1573                 ptr = keyptr(set->page, ++slot);
1574
1575         fence = slot == set->page->cnt;
1576
1577         // if key is found delete it, otherwise ignore request
1578
1579         if( found = !keycmp (ptr, key, len) )
1580           if( found = slotptr(set->page, slot)->dead == 0 ) {
1581                 val = valptr(set->page,slot);
1582                 slotptr(set->page, slot)->dead = 1;
1583                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1584                 set->page->act--;
1585
1586                 // collapse empty slots beneath the fence
1587
1588                 while( idx = set->page->cnt - 1 )
1589                   if( slotptr(set->page, idx)->dead ) {
1590                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1591                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1592                   } else
1593                         break;
1594           }
1595
1596         //      did we delete a fence key in an upper level?
1597
1598         if( found && lvl && set->page->act && fence )
1599           if( bt_fixfence (bt, set, lvl) )
1600                 return bt->err;
1601           else
1602                 return bt->found = found, 0;
1603
1604         //      do we need to collapse root?
1605
1606         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1607           if( bt_collapseroot (bt, set) )
1608                 return bt->err;
1609           else
1610                 return bt->found = found, 0;
1611
1612         //      delete empty page
1613
1614         if( !set->page->act )
1615                 return bt_deletepage (bt, set);
1616
1617         set->latch->dirty = 1;
1618         bt_unlockpage(BtLockWrite, set->latch);
1619         bt_unpinlatch (set->latch);
1620         return bt->found = found, 0;
1621 }
1622
1623 BtKey *bt_foundkey (BtDb *bt)
1624 {
1625         return (BtKey*)(bt->key);
1626 }
1627
1628 //      advance to next slot
1629
1630 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1631 {
1632 BtLatchSet *prevlatch;
1633 uid page_no;
1634
1635         if( slot < set->page->cnt )
1636                 return slot + 1;
1637
1638         prevlatch = set->latch;
1639
1640         if( page_no = bt_getid(set->page->right) )
1641           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1642                 set->page = bt_mappage (bt, set->latch);
1643           else
1644                 return 0;
1645         else
1646           return bt->err = BTERR_struct, 0;
1647
1648         // obtain access lock using lock chaining with Access mode
1649
1650         bt_lockpage(BtLockAccess, set->latch);
1651
1652         bt_unlockpage(BtLockRead, prevlatch);
1653         bt_unpinlatch (prevlatch);
1654
1655         bt_lockpage(BtLockRead, set->latch);
1656         bt_unlockpage(BtLockAccess, set->latch);
1657         return 1;
1658 }
1659
1660 //      find unique key or first duplicate key in
1661 //      leaf level and return number of value bytes
1662 //      or (-1) if not found.  Setup key for bt_foundkey
1663
1664 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1665 {
1666 BtPageSet set[1];
1667 uint len, slot;
1668 int ret = -1;
1669 BtKey *ptr;
1670 BtVal *val;
1671
1672   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1673    do {
1674         ptr = keyptr(set->page, slot);
1675
1676         //      skip librarian slot place holder
1677
1678         if( slotptr(set->page, slot)->type == Librarian )
1679                 ptr = keyptr(set->page, ++slot);
1680
1681         //      return actual key found
1682
1683         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1684         len = ptr->len;
1685
1686         if( slotptr(set->page, slot)->type == Duplicate )
1687                 len -= BtId;
1688
1689         //      not there if we reach the stopper key
1690
1691         if( slot == set->page->cnt )
1692           if( !bt_getid (set->page->right) )
1693                 break;
1694
1695         // if key exists, return >= 0 value bytes copied
1696         //      otherwise return (-1)
1697
1698         if( slotptr(set->page, slot)->dead )
1699                 continue;
1700
1701         if( keylen == len )
1702           if( !memcmp (ptr->key, key, len) ) {
1703                 val = valptr (set->page,slot);
1704                 if( valmax > val->len )
1705                         valmax = val->len;
1706                 memcpy (value, val->value, valmax);
1707                 ret = valmax;
1708           }
1709
1710         break;
1711
1712    } while( slot = bt_findnext (bt, set, slot) );
1713
1714   bt_unlockpage (BtLockRead, set->latch);
1715   bt_unpinlatch (set->latch);
1716   return ret;
1717 }
1718
1719 //      check page for space available,
1720 //      clean if necessary and return
1721 //      0 - page needs splitting
1722 //      >0  new slot value
1723
1724 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1725 {
1726 uint nxt = bt->mgr->page_size;
1727 BtPage page = set->page;
1728 uint cnt = 0, idx = 0;
1729 uint max = page->cnt;
1730 uint newslot = max;
1731 BtKey *key;
1732 BtVal *val;
1733
1734         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1735                 return slot;
1736
1737         //      skip cleanup and proceed to split
1738         //      if there's not enough garbage
1739         //      to bother with.
1740
1741         if( page->garbage < nxt / 5 )
1742                 return 0;
1743
1744         memcpy (bt->frame, page, bt->mgr->page_size);
1745
1746         // skip page info and set rest of page to zero
1747
1748         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1749         set->latch->dirty = 1;
1750         page->garbage = 0;
1751         page->act = 0;
1752
1753         // clean up page first by
1754         // removing deleted keys
1755
1756         while( cnt++ < max ) {
1757                 if( cnt == slot )
1758                         newslot = idx + 2;
1759                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1760                         continue;
1761
1762                 // copy the value across
1763
1764                 val = valptr(bt->frame, cnt);
1765                 nxt -= val->len + sizeof(BtVal);
1766                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1767
1768                 // copy the key across
1769
1770                 key = keyptr(bt->frame, cnt);
1771                 nxt -= key->len + sizeof(BtKey);
1772                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1773
1774                 // make a librarian slot
1775
1776                 if( idx ) {
1777                         slotptr(page, ++idx)->off = nxt;
1778                         slotptr(page, idx)->type = Librarian;
1779                         slotptr(page, idx)->dead = 1;
1780                 }
1781
1782                 // set up the slot
1783
1784                 slotptr(page, ++idx)->off = nxt;
1785                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1786
1787                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1788                         page->act++;
1789         }
1790
1791         page->min = nxt;
1792         page->cnt = idx;
1793
1794         //      see if page has enough space now, or does it need splitting?
1795
1796         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1797                 return newslot;
1798
1799         return 0;
1800 }
1801
1802 // split the root and raise the height of the btree
1803
1804 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1805 {  
1806 unsigned char leftkey[BT_keyarray];
1807 uint nxt = bt->mgr->page_size;
1808 unsigned char value[BtId];
1809 BtPageSet left[1];
1810 uid left_page_no;
1811 BtKey *ptr;
1812 BtVal *val;
1813
1814         //      save left page fence key for new root
1815
1816         ptr = keyptr(root->page, root->page->cnt);
1817         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1818
1819         //  Obtain an empty page to use, and copy the current
1820         //  root contents into it, e.g. lower keys
1821
1822         if( bt_newpage(bt, left, root->page) )
1823                 return bt->err;
1824
1825         left_page_no = left->latch->page_no;
1826         bt_unpinlatch (left->latch);
1827
1828         // preserve the page info at the bottom
1829         // of higher keys and set rest to zero
1830
1831         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1832
1833         // insert stopper key at top of newroot page
1834         // and increase the root height
1835
1836         nxt -= BtId + sizeof(BtVal);
1837         bt_putid (value, right->page_no);
1838         val = (BtVal *)((unsigned char *)root->page + nxt);
1839         memcpy (val->value, value, BtId);
1840         val->len = BtId;
1841
1842         nxt -= 2 + sizeof(BtKey);
1843         slotptr(root->page, 2)->off = nxt;
1844         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1845         ptr->len = 2;
1846         ptr->key[0] = 0xff;
1847         ptr->key[1] = 0xff;
1848
1849         // insert lower keys page fence key on newroot page as first key
1850
1851         nxt -= BtId + sizeof(BtVal);
1852         bt_putid (value, left_page_no);
1853         val = (BtVal *)((unsigned char *)root->page + nxt);
1854         memcpy (val->value, value, BtId);
1855         val->len = BtId;
1856
1857         ptr = (BtKey *)leftkey;
1858         nxt -= ptr->len + sizeof(BtKey);
1859         slotptr(root->page, 1)->off = nxt;
1860         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1861         
1862         bt_putid(root->page->right, 0);
1863         root->page->min = nxt;          // reset lowest used offset and key count
1864         root->page->cnt = 2;
1865         root->page->act = 2;
1866         root->page->lvl++;
1867
1868         // release and unpin root pages
1869
1870         bt_unlockpage(BtLockWrite, root->latch);
1871         bt_unpinlatch (root->latch);
1872
1873         bt_unpinlatch (right);
1874         return 0;
1875 }
1876
1877 //  split already locked full node
1878 //      leave it locked.
1879 //      return pool entry for new right
1880 //      page, unlocked
1881
1882 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1883 {
1884 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1885 uint lvl = set->page->lvl;
1886 BtPageSet right[1];
1887 BtKey *key, *ptr;
1888 BtVal *val, *src;
1889 uid right2;
1890 uint prev;
1891
1892         //  split higher half of keys to bt->frame
1893
1894         memset (bt->frame, 0, bt->mgr->page_size);
1895         max = set->page->cnt;
1896         cnt = max / 2;
1897         idx = 0;
1898
1899         while( cnt++ < max ) {
1900                 if( slotptr(set->page, cnt)->dead && cnt < max )
1901                         continue;
1902                 src = valptr(set->page, cnt);
1903                 nxt -= src->len + sizeof(BtVal);
1904                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1905
1906                 key = keyptr(set->page, cnt);
1907                 nxt -= key->len + sizeof(BtKey);
1908                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1909                 memcpy (ptr, key, key->len + sizeof(BtKey));
1910
1911                 //      add librarian slot
1912
1913                 if( idx ) {
1914                         slotptr(bt->frame, ++idx)->off = nxt;
1915                         slotptr(bt->frame, idx)->type = Librarian;
1916                         slotptr(bt->frame, idx)->dead = 1;
1917                 }
1918
1919                 //  add actual slot
1920
1921                 slotptr(bt->frame, ++idx)->off = nxt;
1922                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1923
1924                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1925                         bt->frame->act++;
1926         }
1927
1928         bt->frame->bits = bt->mgr->page_bits;
1929         bt->frame->min = nxt;
1930         bt->frame->cnt = idx;
1931         bt->frame->lvl = lvl;
1932
1933         // link right node
1934
1935         if( set->latch->page_no > ROOT_page )
1936                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1937
1938         //      get new free page and write higher keys to it.
1939
1940         if( bt_newpage(bt, right, bt->frame) )
1941                 return 0;
1942
1943         memcpy (bt->frame, set->page, bt->mgr->page_size);
1944         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1945         set->latch->dirty = 1;
1946
1947         nxt = bt->mgr->page_size;
1948         set->page->garbage = 0;
1949         set->page->act = 0;
1950         max /= 2;
1951         cnt = 0;
1952         idx = 0;
1953
1954         if( slotptr(bt->frame, max)->type == Librarian )
1955                 max--;
1956
1957         //  assemble page of smaller keys
1958
1959         while( cnt++ < max ) {
1960                 if( slotptr(bt->frame, cnt)->dead )
1961                         continue;
1962                 val = valptr(bt->frame, cnt);
1963                 nxt -= val->len + sizeof(BtVal);
1964                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1965
1966                 key = keyptr(bt->frame, cnt);
1967                 nxt -= key->len + sizeof(BtKey);
1968                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1969
1970                 //      add librarian slot
1971
1972                 if( idx ) {
1973                         slotptr(set->page, ++idx)->off = nxt;
1974                         slotptr(set->page, idx)->type = Librarian;
1975                         slotptr(set->page, idx)->dead = 1;
1976                 }
1977
1978                 //      add actual slot
1979
1980                 slotptr(set->page, ++idx)->off = nxt;
1981                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1982                 set->page->act++;
1983         }
1984
1985         bt_putid(set->page->right, right->latch->page_no);
1986         set->page->min = nxt;
1987         set->page->cnt = idx;
1988
1989         return right->latch->entry;
1990 }
1991
1992 //      fix keys for newly split page
1993 //      call with page locked, return
1994 //      unlocked
1995
1996 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
1997 {
1998 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1999 unsigned char value[BtId];
2000 uint lvl = set->page->lvl;
2001 BtPage page;
2002 BtKey *ptr;
2003
2004         // if current page is the root page, split it
2005
2006         if( set->latch->page_no == ROOT_page )
2007                 return bt_splitroot (bt, set, right);
2008
2009         ptr = keyptr(set->page, set->page->cnt);
2010         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2011
2012         page = bt_mappage (bt, right);
2013
2014         ptr = keyptr(page, page->cnt);
2015         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2016
2017         // insert new fences in their parent pages
2018
2019         bt_lockpage (BtLockParent, right);
2020
2021         bt_lockpage (BtLockParent, set->latch);
2022         bt_unlockpage (BtLockWrite, set->latch);
2023
2024         // insert new fence for reformulated left block of smaller keys
2025
2026         bt_putid (value, set->latch->page_no);
2027         ptr = (BtKey *)leftkey;
2028
2029         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2030                 return bt->err;
2031
2032         // switch fence for right block of larger keys to new right page
2033
2034         bt_putid (value, right->page_no);
2035         ptr = (BtKey *)rightkey;
2036
2037         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2038                 return bt->err;
2039
2040         bt_unlockpage (BtLockParent, set->latch);
2041         bt_unpinlatch (set->latch);
2042
2043         bt_unlockpage (BtLockParent, right);
2044         bt_unpinlatch (right);
2045         return 0;
2046 }
2047
2048 //      install new key and value onto page
2049 //      page must already be checked for
2050 //      adequate space
2051
2052 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2053 {
2054 uint idx, librarian;
2055 BtSlot *node;
2056 BtKey *ptr;
2057 BtVal *val;
2058
2059         //      if found slot > desired slot and previous slot
2060         //      is a librarian slot, use it
2061
2062         if( slot > 1 )
2063           if( slotptr(set->page, slot-1)->type == Librarian )
2064                 slot--;
2065
2066         // copy value onto page
2067
2068         set->page->min -= vallen + sizeof(BtVal);
2069         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2070         memcpy (val->value, value, vallen);
2071         val->len = vallen;
2072
2073         // copy key onto page
2074
2075         set->page->min -= keylen + sizeof(BtKey);
2076         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2077         memcpy (ptr->key, key, keylen);
2078         ptr->len = keylen;
2079         
2080         //      find first empty slot
2081
2082         for( idx = slot; idx < set->page->cnt; idx++ )
2083           if( slotptr(set->page, idx)->dead )
2084                 break;
2085
2086         // now insert key into array before slot
2087
2088         if( idx == set->page->cnt )
2089                 idx += 2, set->page->cnt += 2, librarian = 2;
2090         else
2091                 librarian = 1;
2092
2093         set->latch->dirty = 1;
2094         set->page->act++;
2095
2096         while( idx > slot + librarian - 1 )
2097                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2098
2099         //      add librarian slot
2100
2101         if( librarian > 1 ) {
2102                 node = slotptr(set->page, slot++);
2103                 node->off = set->page->min;
2104                 node->type = Librarian;
2105                 node->dead = 1;
2106         }
2107
2108         //      fill in new slot
2109
2110         node = slotptr(set->page, slot);
2111         node->off = set->page->min;
2112         node->type = type;
2113         node->dead = 0;
2114
2115         if( release ) {
2116                 bt_unlockpage (BtLockWrite, set->latch);
2117                 bt_unpinlatch (set->latch);
2118         }
2119
2120         return 0;
2121 }
2122
2123 //  Insert new key into the btree at given level.
2124 //      either add a new key or update/add an existing one
2125
2126 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2127 {
2128 unsigned char newkey[BT_keyarray];
2129 uint slot, idx, len, entry;
2130 BtPageSet set[1];
2131 BtKey *ptr, *ins;
2132 uid sequence;
2133 BtVal *val;
2134 uint type;
2135
2136   // set up the key we're working on
2137
2138   ins = (BtKey*)newkey;
2139   memcpy (ins->key, key, keylen);
2140   ins->len = keylen;
2141
2142   // is this a non-unique index value?
2143
2144   if( unique )
2145         type = Unique;
2146   else {
2147         type = Duplicate;
2148         sequence = bt_newdup (bt);
2149         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2150         ins->len += BtId;
2151   }
2152
2153   while( 1 ) { // find the page and slot for the current key
2154         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2155                 ptr = keyptr(set->page, slot);
2156         else {
2157                 if( !bt->err )
2158                         bt->err = BTERR_ovflw;
2159                 return bt->err;
2160         }
2161
2162         // if librarian slot == found slot, advance to real slot
2163
2164         if( slotptr(set->page, slot)->type == Librarian )
2165           if( !keycmp (ptr, key, keylen) )
2166                 ptr = keyptr(set->page, ++slot);
2167
2168         len = ptr->len;
2169
2170         if( slotptr(set->page, slot)->type == Duplicate )
2171                 len -= BtId;
2172
2173         //  if inserting a duplicate key or unique key
2174         //      check for adequate space on the page
2175         //      and insert the new key before slot.
2176
2177         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2178           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2179                 if( !(entry = bt_splitpage (bt, set)) )
2180                   return bt->err;
2181                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2182                   return bt->err;
2183                 else
2184                   continue;
2185
2186           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2187         }
2188
2189         // if key already exists, update value and return
2190
2191         val = valptr(set->page, slot);
2192
2193         if( val->len >= vallen ) {
2194                 if( slotptr(set->page, slot)->dead )
2195                         set->page->act++;
2196                 set->page->garbage += val->len - vallen;
2197                 set->latch->dirty = 1;
2198                 slotptr(set->page, slot)->dead = 0;
2199                 val->len = vallen;
2200                 memcpy (val->value, value, vallen);
2201                 bt_unlockpage(BtLockWrite, set->latch);
2202                 bt_unpinlatch (set->latch);
2203                 return 0;
2204         }
2205
2206         //      new update value doesn't fit in existing value area
2207
2208         if( !slotptr(set->page, slot)->dead )
2209                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2210         else {
2211                 slotptr(set->page, slot)->dead = 0;
2212                 set->page->act++;
2213         }
2214
2215         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2216           if( !(entry = bt_splitpage (bt, set)) )
2217                 return bt->err;
2218           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2219                 return bt->err;
2220           else
2221                 continue;
2222
2223         set->page->min -= vallen + sizeof(BtVal);
2224         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2225         memcpy (val->value, value, vallen);
2226         val->len = vallen;
2227
2228         set->latch->dirty = 1;
2229         set->page->min -= keylen + sizeof(BtKey);
2230         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2231         memcpy (ptr->key, key, keylen);
2232         ptr->len = keylen;
2233         
2234         slotptr(set->page, slot)->off = set->page->min;
2235         bt_unlockpage(BtLockWrite, set->latch);
2236         bt_unpinlatch (set->latch);
2237         return 0;
2238   }
2239   return 0;
2240 }
2241
2242 typedef struct {
2243         uint entry;                     // latch table entry number
2244         uint slot:30;           // page slot number
2245         uint reuse:1;           // reused previous page
2246         uint emptied:1;         // page was emptied
2247 } AtomicMod;
2248
2249 typedef struct {
2250         uid page_no;            // page number for split leaf
2251         void *next;                     // next key to insert
2252         uint entry;                     // latch table entry number
2253         unsigned char leafkey[BT_keyarray];
2254 } AtomicKey;
2255
2256 //      determine actual page where key is located
2257 //  return slot number with set page locked
2258
2259 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
2260 {
2261 BtKey *key = keyptr(source,src);
2262 uint slot = locks[src].slot;
2263 uint entry;
2264
2265         if( src > 1 && locks[src].reuse )
2266           entry = locks[src-1].entry, locks[src].entry = entry, slot = 0;
2267         else
2268           entry = locks[src].entry;
2269
2270         if( slot ) {
2271                 set->latch = bt->mgr->latchsets + entry;
2272                 set->page = bt_mappage (bt, set->latch);
2273                 return slot;
2274         }
2275
2276         //      is locks->reuse set?
2277         //      if so, find where our key
2278         //      is located on previous page or split pages
2279
2280         do {
2281                 set->latch = bt->mgr->latchsets + entry;
2282                 set->page = bt_mappage (bt, set->latch);
2283
2284                 if( slot = bt_findslot(set, key->key, key->len) )
2285                         return slot;
2286
2287         } while( entry = set->latch->split );
2288
2289         bt->err = BTERR_atomic;
2290         return 0;
2291 }
2292
2293 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2294 {
2295 BtKey *key = keyptr(source, src);
2296 BtVal *val = valptr(source, src);
2297 BtLatchSet *latch;
2298 BtPageSet set[1];
2299 uint slot, entry;
2300
2301   while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2302         if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
2303           return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2304         if( entry = bt_splitpage (bt, set) )
2305           latch = bt->mgr->latchsets + entry;
2306         else
2307           return bt->err;
2308         latch->split = set->latch->split;
2309         set->latch->split = entry;
2310         bt_lockpage(BtLockWrite, latch);
2311   }
2312
2313   return bt->err = BTERR_atomic;
2314 }
2315
2316 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
2317 {
2318 BtKey *key = keyptr(source, src);
2319 uint idx, entry, slot;
2320 BtPageSet set[1];
2321 BtKey *ptr;
2322 BtVal *val;
2323
2324         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2325                 slotptr(set->page, slot)->dead = 1;
2326         else
2327                 return bt->err = BTERR_struct;
2328
2329         ptr = keyptr(set->page, slot);
2330         val = valptr(set->page, slot);
2331
2332         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2333         set->page->act--;
2334
2335         // collapse empty slots beneath the fence
2336
2337         while( idx = set->page->cnt - 1 )
2338           if( slotptr(set->page, idx)->dead ) {
2339                 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2340                 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2341           } else
2342                 break;
2343
2344         set->latch->dirty = 1;
2345         return 0;
2346 }
2347
2348 //      qsort comparison function
2349
2350 int bt_qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2351 {
2352 BtKey *key1 = (BtKey *)((unsigned char *)page + slot1->off);
2353 BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off);
2354
2355         return keycmp (key1, key2->key, key2->len);
2356 }
2357
2358 //      atomic modification of a batch of keys.
2359
2360 //      return -1 if BTERR is set
2361 //      otherwise return slot number
2362 //      causing the key constraint violation
2363 //      or zero on successful completion.
2364
2365 int bt_atomicmods (BtDb *bt, BtPage source)
2366 {
2367 uint src, idx, slot, samepage, entry, next, split;
2368 AtomicKey *head, *tail, *leaf;
2369 BtPageSet set[1], prev[1];
2370 unsigned char value[BtId];
2371 BtKey *key, *ptr, *key2;
2372 BtLatchSet *latch;
2373 AtomicMod *locks;
2374 int result = 0;
2375 BtSlot temp[1];
2376 BtPage page;
2377 BtVal *val;
2378 int type;
2379
2380   locks = calloc (source->cnt + 1, sizeof(AtomicMod));
2381   head = NULL;
2382   tail = NULL;
2383
2384   // first sort the list of keys into order to
2385   //    prevent deadlocks between threads.
2386   qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)bt_qsortcmp, source);
2387 /*
2388   for( src = 1; src++ < source->cnt; ) {
2389         *temp = *slotptr(source,src);
2390         key = keyptr (source,src);
2391
2392         for( idx = src; --idx; ) {
2393           key2 = keyptr (source,idx);
2394           if( keycmp (key, key2->key, key2->len) < 0 ) {
2395                 *slotptr(source,idx+1) = *slotptr(source,idx);
2396                 *slotptr(source,idx) = *temp;
2397           } else
2398                 break;
2399         }
2400   }
2401 */
2402   // Load the leafpage for each key
2403   // and determine any constraint violations
2404
2405   for( src = 0; src++ < source->cnt; ) {
2406         key = keyptr(source, src);
2407         val = valptr(source, src);
2408         slot = 0;
2409
2410         // first determine if this modification falls
2411         // on the same page as the previous modification
2412         //      note that the far right leaf page is a special case
2413
2414         if( samepage = src > 1 )
2415           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 )
2416                 slot = bt_findslot(set, key->key, key->len);
2417           else // release read on previous page
2418                 bt_unlockpage(BtLockRead, set->latch); 
2419
2420         if( !slot ) {
2421           slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead);
2422           set->latch->split = 0;
2423         }
2424
2425         if( !slot )
2426           return -1;
2427
2428         if( !samepage ) {
2429           for(idx = 0; idx++ < source->cnt; )
2430                 if( locks[idx].entry == set->latch->entry )
2431                   abort();
2432           locks[src].entry = set->latch->entry;
2433           locks[src].slot = slot;
2434           locks[src].reuse = 0;
2435         } else {
2436           locks[src].entry = 0;
2437           locks[src].slot = 0;
2438           locks[src].reuse = 1;
2439         }
2440
2441         if( slotptr(set->page, slot)->type == Librarian )
2442           ptr = keyptr(set->page, ++slot);
2443         else
2444           ptr = keyptr(set->page, slot);
2445
2446         switch( slotptr(source, src)->type ) {
2447         case Duplicate:
2448         case Unique:
2449           if( !slotptr(set->page, slot)->dead )
2450            if( slot < set->page->cnt || bt_getid (set->page->right) )
2451             if( ptr->len == key->len && !memcmp (ptr->key, key->key, key->len) ) {
2452
2453                   // return constraint violation if key already exists
2454
2455                   result = src;
2456
2457                   while( src ) {
2458                         if( locks[src].entry ) {
2459                           set->latch = bt->mgr->latchsets + locks[src].entry;
2460                           bt_unlockpage(BtLockAtomic, set->latch);
2461                           bt_unlockpage(BtLockRead, set->latch);
2462                           bt_unpinlatch (set->latch);
2463                         }
2464                         src--;
2465                   }
2466
2467                   return result;
2468                 }
2469
2470           break;
2471
2472         case Delete:
2473           if( !slotptr(set->page, slot)->dead )
2474            if( ptr->len == key->len )
2475                 if( !memcmp (ptr->key, key->key, key->len) )
2476               break;
2477
2478           //  Key to delete doesn't exist
2479
2480           result = src;
2481
2482           while( src ) {
2483                 if( locks[src].entry ) {
2484                   set->latch = bt->mgr->latchsets + locks[src].entry;
2485                   bt_unlockpage(BtLockAtomic, set->latch);
2486                   bt_unlockpage(BtLockRead, set->latch);
2487                   bt_unpinlatch (set->latch);
2488                 }
2489                 src--;
2490           }
2491
2492           return result;
2493         }
2494   }
2495
2496   //  unlock last loadpage lock
2497
2498   if( source->cnt > 1 )
2499         bt_unlockpage(BtLockRead, set->latch);
2500
2501   //  obtain write lock for each page
2502
2503   for( src = 0; src++ < source->cnt; )
2504         if( locks[src].entry )
2505           bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2506
2507   // insert or delete each key
2508
2509   for( src = 0; src++ < source->cnt; )
2510         if( slotptr(source,src)->type == Delete )
2511           if( bt_atomicdelete (bt, source, locks, src) )
2512                 return -1;
2513           else
2514                 continue;
2515           else if( bt_atomicinsert (bt, source, locks, src) )
2516                 return -1;
2517           else
2518                 continue;
2519
2520   // set ParentModification and release WriteLock
2521   // leave empty pages locked for later processing
2522   // build queue of keys to insert for split pages
2523
2524   for( src = 0; src++ < source->cnt; ) {
2525         if( locks[src].reuse )
2526           continue;
2527
2528         prev->latch = bt->mgr->latchsets + locks[src].entry;
2529         prev->page = bt_mappage (bt, prev->latch);
2530
2531         //  pick-up all splits from first entry
2532         //  save page that points to this entry
2533
2534         split = next = prev->latch->split;
2535
2536         if( !prev->page->act )
2537           locks[src].emptied = 1;
2538
2539         while( entry = next ) {
2540           set->latch = bt->mgr->latchsets + entry;
2541           set->page = bt_mappage (bt, set->latch);
2542           next = set->latch->split;
2543
2544           // delete empty previous page
2545
2546           if( !prev->page->act ) {
2547                 memcpy (prev->page, set->page, bt->mgr->page_size);
2548                 bt_lockpage (BtLockDelete, set->latch);
2549                 bt_freepage (bt, set);
2550
2551                 prev->latch->dirty = 1;
2552
2553                 if( prev->page->act )
2554                   locks[src].emptied = 0;
2555
2556                 continue;
2557           }
2558
2559           // prev page is not emptied
2560
2561           locks[src].emptied = 0;
2562
2563           //  schedule previous fence key update
2564
2565           ptr = keyptr(prev->page,prev->page->cnt);
2566           leaf = malloc (sizeof(AtomicKey));
2567
2568           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2569           leaf->page_no = prev->latch->page_no;
2570           leaf->entry = prev->latch->entry;
2571           leaf->next = NULL;
2572
2573           if( tail )
2574                 tail->next = leaf;
2575           else
2576                 head = leaf;
2577
2578           latch = prev->latch;
2579           tail = leaf;
2580
2581           // remove empty block from the split chain
2582
2583           if( !set->page->act ) {
2584                 memcpy (prev->page->right, set->page->right, BtId);
2585                 bt_lockpage (BtLockDelete, set->latch);
2586                 bt_freepage (bt, set);
2587           } else
2588                 *prev = *set;
2589
2590           bt_lockpage(BtLockParent, latch);
2591           bt_unlockpage(BtLockWrite, latch);
2592         }
2593
2594         if( !prev->page->act )
2595                 continue;
2596
2597         if( !split ) {
2598                 bt_unlockpage(BtLockWrite, prev->latch);
2599                 continue;
2600         }
2601
2602         //      Process last page split in chain
2603
2604         ptr = keyptr(prev->page,prev->page->cnt);
2605         leaf = malloc (sizeof(AtomicKey));
2606
2607         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2608         leaf->page_no = prev->latch->page_no;
2609         leaf->entry = prev->latch->entry;
2610         leaf->next = NULL;
2611
2612         if( tail )
2613           tail->next = leaf;
2614         else
2615           head = leaf;
2616
2617         tail = leaf;
2618
2619         bt_lockpage(BtLockParent, prev->latch);
2620         bt_unlockpage(BtLockWrite, prev->latch);
2621   }
2622   
2623   // remove Atomic locks and
2624   // process any empty pages
2625
2626   for( src = source->cnt; src; src-- ) {
2627         if( locks[src].reuse )
2628           continue;
2629
2630         //  delete page emptied by our atomic action
2631
2632         set->latch = bt->mgr->latchsets + locks[src].entry;
2633         set->page = bt_mappage (bt, set->latch);
2634
2635         if( locks[src].emptied ) {
2636           bt_unlockpage (BtLockAtomic, set->latch);
2637           if( bt_deletepage (bt, set) )
2638                 return bt->err;
2639           continue;
2640         }
2641
2642         bt_unlockpage (BtLockAtomic, set->latch);
2643
2644         if( !set->latch->split )
2645                 bt_unpinlatch (set->latch);
2646   }
2647
2648   //  add keys for any pages split during action
2649
2650   if( leaf = head )
2651     do {
2652           ptr = (BtKey *)leaf->leafkey;
2653           bt_putid (value, leaf->page_no);
2654
2655           if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2656                 return bt->err;
2657
2658           bt_unlockpage (BtLockParent, bt->mgr->latchsets + leaf->entry);
2659           bt_unpinlatch (bt->mgr->latchsets + leaf->entry);
2660           tail = leaf->next;
2661           free (leaf);
2662         } while( leaf = tail );
2663
2664   // return success
2665
2666   free (locks);
2667   return 0;
2668 }
2669
2670 //  return next slot on cursor page
2671 //  or slide cursor right into next page
2672
2673 uint bt_nextkey (BtDb *bt, uint slot)
2674 {
2675 BtPageSet set[1];
2676 uid right;
2677
2678   do {
2679         right = bt_getid(bt->cursor->right);
2680
2681         while( slot++ < bt->cursor->cnt )
2682           if( slotptr(bt->cursor,slot)->dead )
2683                 continue;
2684           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2685                 return slot;
2686           else
2687                 break;
2688
2689         if( !right )
2690                 break;
2691
2692         bt->cursor_page = right;
2693
2694         if( set->latch = bt_pinlatch (bt, right, 1) )
2695                 set->page = bt_mappage (bt, set->latch);
2696         else
2697                 return 0;
2698
2699     bt_lockpage(BtLockRead, set->latch);
2700
2701         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2702
2703         bt_unlockpage(BtLockRead, set->latch);
2704         bt_unpinlatch (set->latch);
2705         slot = 0;
2706
2707   } while( 1 );
2708
2709   return bt->err = 0;
2710 }
2711
2712 //  cache page of keys into cursor and return starting slot for given key
2713
2714 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2715 {
2716 BtPageSet set[1];
2717 uint slot;
2718
2719         // cache page for retrieval
2720
2721         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2722           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2723         else
2724           return 0;
2725
2726         bt->cursor_page = set->latch->page_no;
2727
2728         bt_unlockpage(BtLockRead, set->latch);
2729         bt_unpinlatch (set->latch);
2730         return slot;
2731 }
2732
2733 BtKey *bt_key(BtDb *bt, uint slot)
2734 {
2735         return keyptr(bt->cursor, slot);
2736 }
2737
2738 BtVal *bt_val(BtDb *bt, uint slot)
2739 {
2740         return valptr(bt->cursor,slot);
2741 }
2742
2743 #ifdef STANDALONE
2744
2745 #ifndef unix
2746 double getCpuTime(int type)
2747 {
2748 FILETIME crtime[1];
2749 FILETIME xittime[1];
2750 FILETIME systime[1];
2751 FILETIME usrtime[1];
2752 SYSTEMTIME timeconv[1];
2753 double ans = 0;
2754
2755         memset (timeconv, 0, sizeof(SYSTEMTIME));
2756
2757         switch( type ) {
2758         case 0:
2759                 GetSystemTimeAsFileTime (xittime);
2760                 FileTimeToSystemTime (xittime, timeconv);
2761                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2762                 break;
2763         case 1:
2764                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2765                 FileTimeToSystemTime (usrtime, timeconv);
2766                 break;
2767         case 2:
2768                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2769                 FileTimeToSystemTime (systime, timeconv);
2770                 break;
2771         }
2772
2773         ans += (double)timeconv->wHour * 3600;
2774         ans += (double)timeconv->wMinute * 60;
2775         ans += (double)timeconv->wSecond;
2776         ans += (double)timeconv->wMilliseconds / 1000;
2777         return ans;
2778 }
2779 #else
2780 #include <time.h>
2781 #include <sys/resource.h>
2782
2783 double getCpuTime(int type)
2784 {
2785 struct rusage used[1];
2786 struct timeval tv[1];
2787
2788         switch( type ) {
2789         case 0:
2790                 gettimeofday(tv, NULL);
2791                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2792
2793         case 1:
2794                 getrusage(RUSAGE_SELF, used);
2795                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2796
2797         case 2:
2798                 getrusage(RUSAGE_SELF, used);
2799                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2800         }
2801
2802         return 0;
2803 }
2804 #endif
2805
2806 void bt_poolaudit (BtMgr *mgr)
2807 {
2808 BtLatchSet *latch;
2809 uint slot = 0;
2810
2811         while( slot++ < mgr->latchdeployed ) {
2812                 latch = mgr->latchsets + slot;
2813
2814                 if( *latch->readwr->rin & MASK )
2815                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2816                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2817
2818                 if( *latch->access->rin & MASK )
2819                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2820                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2821
2822                 if( *latch->parent->rin & MASK )
2823                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2824                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2825
2826                 if( latch->pin & ~CLOCK_bit ) {
2827                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2828                         latch->pin = 0;
2829                 }
2830         }
2831 }
2832
2833 uint bt_latchaudit (BtDb *bt)
2834 {
2835 ushort idx, hashidx;
2836 uid next, page_no;
2837 BtLatchSet *latch;
2838 uint cnt = 0;
2839 BtKey *ptr;
2840
2841         if( *(ushort *)(bt->mgr->lock) )
2842                 fprintf(stderr, "Alloc page locked\n");
2843         *(ushort *)(bt->mgr->lock) = 0;
2844
2845         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2846                 latch = bt->mgr->latchsets + idx;
2847                 if( *latch->readwr->rin & MASK )
2848                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2849                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2850
2851                 if( *latch->access->rin & MASK )
2852                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2853                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2854
2855                 if( *latch->parent->rin & MASK )
2856                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2857                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2858
2859                 if( latch->pin ) {
2860                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2861                         latch->pin = 0;
2862                 }
2863         }
2864
2865         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2866           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
2867                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2868
2869           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
2870
2871           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2872                 latch = bt->mgr->latchsets + idx;
2873                 if( latch->pin )
2874                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2875           } while( idx = latch->next );
2876         }
2877
2878         page_no = LEAF_page;
2879
2880         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2881         uid off = page_no << bt->mgr->page_bits;
2882 #ifdef unix
2883           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2884 #else
2885         DWORD amt[1];
2886
2887           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2888
2889           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2890                 return bt->err = BTERR_map;
2891
2892           if( *amt <  bt->mgr->page_size )
2893                 return bt->err = BTERR_map;
2894 #endif
2895                 if( !bt->frame->free && !bt->frame->lvl )
2896                         cnt += bt->frame->act;
2897                 page_no++;
2898         }
2899                 
2900         cnt--;  // remove stopper key
2901         fprintf(stderr, " Total keys read %d\n", cnt);
2902
2903         bt_close (bt);
2904         return 0;
2905 }
2906
2907 typedef struct {
2908         char idx;
2909         char *type;
2910         char *infile;
2911         BtMgr *mgr;
2912         int num;
2913 } ThreadArg;
2914
2915 //  standalone program to index file of keys
2916 //  then list them onto std-out
2917
2918 #ifdef unix
2919 void *index_file (void *arg)
2920 #else
2921 uint __stdcall index_file (void *arg)
2922 #endif
2923 {
2924 int line = 0, found = 0, cnt = 0, unique;
2925 uid next, page_no = LEAF_page;  // start on first page of leaves
2926 unsigned char key[BT_maxkey];
2927 unsigned char txn[65536];
2928 ThreadArg *args = arg;
2929 int ch, len = 0, slot;
2930 BtPageSet set[1];
2931 uint nxt = 65536;
2932 BtPage page;
2933 BtKey *ptr;
2934 BtVal *val;
2935 BtDb *bt;
2936 FILE *in;
2937
2938         bt = bt_open (args->mgr);
2939         page = (BtPage)txn;
2940
2941         unique = (args->type[1] | 0x20) != 'd';
2942
2943         switch(args->type[0] | 0x20)
2944         {
2945         case 'a':
2946                 fprintf(stderr, "started latch mgr audit\n");
2947                 cnt = bt_latchaudit (bt);
2948                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2949                 break;
2950
2951         case 't':
2952                 fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
2953                 if( in = fopen (args->infile, "rb") )
2954                   while( ch = getc(in), ch != EOF )
2955                         if( ch == '\n' )
2956                         {
2957                           line++;
2958                           nxt -= len - 10;
2959                           memcpy (txn + nxt, key + 10, len - 10);
2960                           nxt -= 1;
2961                           txn[nxt] = len - 10;
2962                           nxt -= 10;
2963                           memcpy (txn + nxt, key, 10);
2964                           nxt -= 1;
2965                           txn[nxt] = 10;
2966                           slotptr(page,++cnt)->off  = nxt;
2967                           slotptr(page,cnt)->type = Unique;
2968                           len = 0;
2969
2970                           if( cnt < 25 )
2971                                 continue;
2972
2973                           page->cnt = cnt;
2974                           page->act = cnt;
2975                           page->min = nxt;
2976
2977                           if( bt_atomicmods (bt, page) )
2978                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2979                           nxt = 65536;
2980                           cnt = 0;
2981
2982                         }
2983                         else if( len < BT_maxkey )
2984                                 key[len++] = ch;
2985                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2986                 break;
2987
2988         case 'p':
2989                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2990                 if( in = fopen (args->infile, "rb") )
2991                   while( ch = getc(in), ch != EOF )
2992                         if( ch == '\n' )
2993                         {
2994                           line++;
2995
2996                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2997                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2998                           len = 0;
2999                         }
3000                         else if( len < BT_maxkey )
3001                                 key[len++] = ch;
3002                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3003                 break;
3004
3005         case 'w':
3006                 fprintf(stderr, "started indexing for %s\n", args->infile);
3007                 if( in = fopen (args->infile, "r") )
3008                   while( ch = getc(in), ch != EOF )
3009                         if( ch == '\n' )
3010                         {
3011                           line++;
3012
3013                           if( args->num == 1 )
3014                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3015
3016                           else if( args->num )
3017                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3018
3019                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
3020                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3021                           len = 0;
3022                         }
3023                         else if( len < BT_maxkey )
3024                                 key[len++] = ch;
3025                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3026                 break;
3027
3028         case 'd':
3029                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
3030                 if( in = fopen (args->infile, "rb") )
3031                   while( ch = getc(in), ch != EOF )
3032                         if( ch == '\n' )
3033                         {
3034                           line++;
3035                           if( args->num == 1 )
3036                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3037
3038                           else if( args->num )
3039                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3040
3041                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
3042                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
3043                           ptr = (BtKey*)(bt->key);
3044                           found++;
3045
3046                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
3047                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3048                           len = 0;
3049                         }
3050                         else if( len < BT_maxkey )
3051                                 key[len++] = ch;
3052                 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3053                 break;
3054
3055         case 'f':
3056                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3057                 if( in = fopen (args->infile, "rb") )
3058                   while( ch = getc(in), ch != EOF )
3059                         if( ch == '\n' )
3060                         {
3061                           line++;
3062                           if( args->num == 1 )
3063                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
3064
3065                           else if( args->num )
3066                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
3067
3068                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3069                                 found++;
3070                           else if( bt->err )
3071                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3072                           len = 0;
3073                         }
3074                         else if( len < BT_maxkey )
3075                                 key[len++] = ch;
3076                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3077                 break;
3078
3079         case 's':
3080                 fprintf(stderr, "started scanning\n");
3081                 do {
3082                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3083                                 set->page = bt_mappage (bt, set->latch);
3084                         else
3085                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3086                         bt_lockpage (BtLockRead, set->latch);
3087                         next = bt_getid (set->page->right);
3088
3089                         for( slot = 0; slot++ < set->page->cnt; )
3090                          if( next || slot < set->page->cnt )
3091                           if( !slotptr(set->page, slot)->dead ) {
3092                                 ptr = keyptr(set->page, slot);
3093                                 len = ptr->len;
3094
3095                             if( slotptr(set->page, slot)->type == Duplicate )
3096                                         len -= BtId;
3097
3098                                 fwrite (ptr->key, len, 1, stdout);
3099                                 val = valptr(set->page, slot);
3100                                 fwrite (val->value, val->len, 1, stdout);
3101                                 fputc ('\n', stdout);
3102                                 cnt++;
3103                            }
3104
3105                         bt_unlockpage (BtLockRead, set->latch);
3106                         bt_unpinlatch (set->latch);
3107                 } while( page_no = next );
3108
3109                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3110                 break;
3111
3112         case 'c':
3113 #ifdef unix
3114                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3115 #endif
3116                 fprintf(stderr, "started counting\n");
3117                 page_no = LEAF_page;
3118
3119                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3120                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3121                                 break;
3122
3123                         if( !bt->frame->free && !bt->frame->lvl )
3124                                 cnt += bt->frame->act;
3125
3126                         bt->reads++;
3127                         page_no++;
3128                 }
3129                 
3130                 cnt--;  // remove stopper key
3131                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3132                 break;
3133         }
3134
3135         bt_close (bt);
3136 #ifdef unix
3137         return NULL;
3138 #else
3139         return 0;
3140 #endif
3141 }
3142
3143 typedef struct timeval timer;
3144
3145 int main (int argc, char **argv)
3146 {
3147 int idx, cnt, len, slot, err;
3148 int segsize, bits = 16;
3149 double start, stop;
3150 #ifdef unix
3151 pthread_t *threads;
3152 #else
3153 HANDLE *threads;
3154 #endif
3155 ThreadArg *args;
3156 uint poolsize = 0;
3157 float elapsed;
3158 int num = 0;
3159 char key[1];
3160 BtMgr *mgr;
3161 BtKey *ptr;
3162 BtDb *bt;
3163
3164         if( argc < 3 ) {
3165                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3166                 fprintf (stderr, "  where page_bits is the page size in bits\n");
3167                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3168                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
3169                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3170                 exit(0);
3171         }
3172
3173         start = getCpuTime(0);
3174
3175         if( argc > 3 )
3176                 bits = atoi(argv[3]);
3177
3178         if( argc > 4 )
3179                 poolsize = atoi(argv[4]);
3180
3181         if( !poolsize )
3182                 fprintf (stderr, "Warning: no mapped_pool\n");
3183
3184         if( argc > 5 )
3185                 num = atoi(argv[5]);
3186
3187         cnt = argc - 6;
3188 #ifdef unix
3189         threads = malloc (cnt * sizeof(pthread_t));
3190 #else
3191         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3192 #endif
3193         args = malloc (cnt * sizeof(ThreadArg));
3194
3195         mgr = bt_mgr ((argv[1]), bits, poolsize);
3196
3197         if( !mgr ) {
3198                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3199                 exit (1);
3200         }
3201
3202         //      fire off threads
3203
3204         for( idx = 0; idx < cnt; idx++ ) {
3205                 args[idx].infile = argv[idx + 6];
3206                 args[idx].type = argv[2];
3207                 args[idx].mgr = mgr;
3208                 args[idx].num = num;
3209                 args[idx].idx = idx;
3210 #ifdef unix
3211                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3212                         fprintf(stderr, "Error creating thread %d\n", err);
3213 #else
3214                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3215 #endif
3216         }
3217
3218         //      wait for termination
3219
3220 #ifdef unix
3221         for( idx = 0; idx < cnt; idx++ )
3222                 pthread_join (threads[idx], NULL);
3223 #else
3224         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3225
3226         for( idx = 0; idx < cnt; idx++ )
3227                 CloseHandle(threads[idx]);
3228
3229 #endif
3230         bt_poolaudit(mgr);
3231         bt_mgrclose (mgr);
3232
3233         elapsed = getCpuTime(0) - start;
3234         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3235         elapsed = getCpuTime(1);
3236         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3237         elapsed = getCpuTime(2);
3238         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3239 }
3240
3241 #endif  //STANDALONE