]> pd.if.org Git - liblfds/blob - liblfds/liblfds7.1.0/test_and_benchmark/libtest/src/libtest_tests/libtest_tests_queue_bounded_manyproducer_manyconsumer_rapid_enqueuing_and_dequeuing.c
Initial import (all versions, including the new 7.1.0)
[liblfds] / liblfds / liblfds7.1.0 / test_and_benchmark / libtest / src / libtest_tests / libtest_tests_queue_bounded_manyproducer_manyconsumer_rapid_enqueuing_and_dequeuing.c
1 /***** includes *****/
2 #include "libtest_tests_internal.h"
3
4 /***** structs *****/
5 struct test_per_thread_state
6 {
7   lfds710_pal_uint_t
8     thread_number,
9     total_dequeues,
10     total_enqueues;
11
12   struct lfds710_queue_bmm_state
13     *qbmms;
14 };
15
16 /***** private prototypes *****/
17 static libshared_pal_thread_return_t LIBSHARED_PAL_THREAD_CALLING_CONVENTION thread_rapid_enqueuer_and_dequeuer( void *libtest_threadset_per_thread_state );
18
19
20
21
22
23 /****************************************************************************/
24 void libtest_tests_queue_bmm_rapid_enqueuing_and_dequeuing( struct lfds710_list_asu_state *list_of_logical_processors, struct libshared_memory_state *ms, enum lfds710_misc_validity *dvs )
25 {
26   lfds710_pal_uint_t
27     counter,
28     loop,
29     number_logical_processors,
30     *per_thread_counters,
31     thread_number;
32
33   struct lfds710_list_asu_element
34     *lasue = NULL;
35
36   struct lfds710_queue_bmm_element
37     *qbmme_array;
38
39   struct lfds710_misc_validation_info
40     vi;
41
42   struct lfds710_queue_bmm_state
43     qbmms;
44
45   struct libtest_logical_processor
46     *lp;
47
48   struct libtest_threadset_per_thread_state
49     *pts;
50
51   struct libtest_threadset_state
52     ts;
53
54   struct test_per_thread_state
55     *tpts;
56
57   LFDS710_PAL_ASSERT( list_of_logical_processors != NULL );
58   LFDS710_PAL_ASSERT( ms != NULL );
59   LFDS710_PAL_ASSERT( dvs != NULL );
60
61   /* TRD : we create a single queue with 50,000 elements
62            we don't want too many elements, so we ensure plenty of element re-use
63            each thread simply loops dequeuing and enqueuing
64            where the user data indicates thread number and an increment counter
65            vertification is that the counter increments on a per-thread basis
66   */
67
68   *dvs = LFDS710_MISC_VALIDITY_VALID;
69
70   lfds710_list_asu_query( list_of_logical_processors, LFDS710_LIST_ASU_QUERY_GET_POTENTIALLY_INACCURATE_COUNT, NULL, (void **) &number_logical_processors );
71
72   lfds710_list_asu_query( list_of_logical_processors, LFDS710_LIST_ASU_QUERY_GET_POTENTIALLY_INACCURATE_COUNT, NULL, (void **) &number_logical_processors );
73   tpts = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct test_per_thread_state) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
74   pts = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct libtest_threadset_per_thread_state) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
75   per_thread_counters = libshared_memory_alloc_from_unknown_node( ms, sizeof(lfds710_pal_uint_t) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
76   // TRD : must be a power of 2
77   qbmme_array = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct lfds710_queue_bmm_element) * 8192, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
78
79   lfds710_queue_bmm_init_valid_on_current_logical_core( &qbmms, qbmme_array, 8192, NULL );
80
81   // TRD : we assume the test will iterate at least once (or we'll have a false negative)
82   for( loop = 0 ; loop < 8192 ; loop++ )
83     lfds710_queue_bmm_enqueue( &qbmms, (void *) 0, (void *) 0 );
84
85   libtest_threadset_init( &ts, NULL );
86
87   loop = 0;
88
89   while( LFDS710_LIST_ASU_GET_START_AND_THEN_NEXT(*list_of_logical_processors,lasue) )
90   {
91     lp = LFDS710_LIST_ASU_GET_VALUE_FROM_ELEMENT( *lasue );
92     (tpts+loop)->qbmms = &qbmms;
93     (tpts+loop)->thread_number = loop;
94     (tpts+loop)->total_dequeues = 0;
95     (tpts+loop)->total_enqueues = 0;
96     libtest_threadset_add_thread( &ts, &pts[loop], lp, thread_rapid_enqueuer_and_dequeuer, &tpts[loop] );
97     loop++;
98   }
99
100   // TRD : run the test
101   libtest_threadset_run( &ts );
102
103   libtest_threadset_cleanup( &ts );
104
105   // TRD : validate
106   LFDS710_MISC_BARRIER_LOAD;
107
108   vi.min_elements = 8192;
109
110   for( loop = 0 ; loop < number_logical_processors ; loop++ )
111   {
112     vi.min_elements -= tpts[loop].total_dequeues;
113     vi.min_elements += tpts[loop].total_enqueues;
114   }
115
116   vi.max_elements = vi.min_elements;
117
118   lfds710_queue_bmm_query( &qbmms, LFDS710_QUEUE_BMM_QUERY_SINGLETHREADED_VALIDATE, &vi, dvs );
119
120   // TRD : now check results
121   for( loop = 0 ; loop < number_logical_processors ; loop++ )
122     *(per_thread_counters+loop) = 0;
123
124   while( *dvs == LFDS710_MISC_VALIDITY_VALID and lfds710_queue_bmm_dequeue(&qbmms, (void **) &thread_number, (void **) &counter) )
125   {
126     if( thread_number >= number_logical_processors )
127     {
128       *dvs = LFDS710_MISC_VALIDITY_INVALID_TEST_DATA;
129       break;
130     }
131
132     if( per_thread_counters[thread_number] == 0 )
133       per_thread_counters[thread_number] = counter;
134
135     if( counter > per_thread_counters[thread_number] )
136       *dvs = LFDS710_MISC_VALIDITY_INVALID_MISSING_ELEMENTS;
137
138     if( counter < per_thread_counters[thread_number] )
139       *dvs = LFDS710_MISC_VALIDITY_INVALID_ADDITIONAL_ELEMENTS;
140
141     if( counter == per_thread_counters[thread_number] )
142       per_thread_counters[thread_number]++;
143   }
144
145   lfds710_queue_bmm_cleanup( &qbmms, NULL );
146
147   return;
148 }
149
150
151
152
153
154 /****************************************************************************/
155 static libshared_pal_thread_return_t LIBSHARED_PAL_THREAD_CALLING_CONVENTION thread_rapid_enqueuer_and_dequeuer( void *libtest_threadset_per_thread_state )
156 {
157   int
158     rv;
159
160   lfds710_pal_uint_t
161     key,
162     value,
163     time_loop = 0;
164
165   struct test_per_thread_state
166     *tpts;
167
168   struct libtest_threadset_per_thread_state
169     *pts;
170
171   time_t
172     current_time,
173     start_time;
174
175   LFDS710_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
176
177   LFDS710_PAL_ASSERT( libtest_threadset_per_thread_state != NULL );
178
179   pts = (struct libtest_threadset_per_thread_state *) libtest_threadset_per_thread_state;
180   tpts = LIBTEST_THREADSET_GET_USER_STATE_FROM_PER_THREAD_STATE( *pts );
181
182   libtest_threadset_thread_ready_and_wait( pts );
183
184   current_time = start_time = time( NULL );
185
186   while( current_time < start_time + TEST_DURATION_IN_SECONDS )
187   {
188     // TRD : with BMM, both dequeue and so enqueue may spuriously fail, so we only increment counter if we did enqueue
189
190     tpts->total_dequeues += lfds710_queue_bmm_dequeue( tpts->qbmms, (void **) &key, (void **) &value );
191
192     // TRD : disgard the dequeue content - this is the rapid test
193
194     // TRD : counter (total_enqueues works in the same way, so using that) needs to increment, for validation checks
195     rv = lfds710_queue_bmm_enqueue( tpts->qbmms, (void *) tpts->thread_number, (void *) (tpts->total_enqueues) );
196
197     if( rv == 1 )
198       tpts->total_enqueues++;
199
200     if( time_loop++ == TIME_LOOP_COUNT )
201     {
202       time_loop = 0;
203       time( &current_time );
204     }
205   }
206
207   LFDS710_MISC_BARRIER_STORE;
208
209   lfds710_misc_force_store();
210
211   return LIBSHARED_PAL_THREAD_RETURN_CAST(RETURN_SUCCESS);
212 }
213