]> pd.if.org Git - liblfds/blob - liblfds/liblfds7.1.0/test_and_benchmark/libtest/src/libtest_tests/libtest_tests_queue_unbounded_manyproducer_manyconsumer_rapid_enqueuing_and_dequeuing.c
Initial import (all versions, including the new 7.1.0)
[liblfds] / liblfds / liblfds7.1.0 / test_and_benchmark / libtest / src / libtest_tests / libtest_tests_queue_unbounded_manyproducer_manyconsumer_rapid_enqueuing_and_dequeuing.c
1 /***** includes *****/
2 #include "libtest_tests_internal.h"
3
4 /***** structs *****/
5 struct test_per_thread_state
6 {
7   lfds710_pal_uint_t
8     counter,
9     thread_number;
10
11   struct lfds710_queue_umm_state
12     *qs;
13 };
14
15 struct test_element
16 {
17   struct lfds710_queue_umm_element
18     qe,
19     *qe_use;
20
21   lfds710_pal_uint_t
22     counter,
23     thread_number;
24 };
25
26 /***** private prototypes *****/
27 static libshared_pal_thread_return_t LIBSHARED_PAL_THREAD_CALLING_CONVENTION thread_rapid_enqueuer_and_dequeuer( void *libtest_threadset_per_thread_state );
28
29
30
31
32
33 /****************************************************************************/
34 void libtest_tests_queue_umm_rapid_enqueuing_and_dequeuing( struct lfds710_list_asu_state *list_of_logical_processors, struct libshared_memory_state *ms, enum lfds710_misc_validity *dvs )
35 {
36   lfds710_pal_uint_t
37     loop,
38     number_logical_processors,
39     *per_thread_counters;
40
41   struct lfds710_list_asu_element
42     *lasue = NULL;
43
44   struct lfds710_queue_umm_element LFDS710_PAL_ALIGN(LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES)
45     qe_dummy;
46
47   struct lfds710_queue_umm_element
48     *qe;
49
50   struct lfds710_misc_validation_info
51     vi;
52
53   struct lfds710_queue_umm_state
54     qs;
55
56   struct libtest_logical_processor
57     *lp;
58
59   struct libtest_threadset_per_thread_state
60     *pts;
61
62   struct libtest_threadset_state
63     ts;
64
65   struct test_element
66     *te_array,
67     *te;
68
69   struct test_per_thread_state
70     *tpts;
71
72   LFDS710_PAL_ASSERT( list_of_logical_processors != NULL );
73   LFDS710_PAL_ASSERT( ms != NULL );
74   LFDS710_PAL_ASSERT( dvs != NULL );
75
76   /* TRD : we create a single queue with 50,000 elements
77            we don't want too many elements, so we ensure plenty of element re-use
78            each thread simply loops dequeuing and enqueuing
79            where the user data indicates thread number and an increment counter
80            vertification is that the counter increments on a per-thread basis
81   */
82
83   *dvs = LFDS710_MISC_VALIDITY_VALID;
84
85   lfds710_list_asu_query( list_of_logical_processors, LFDS710_LIST_ASU_QUERY_GET_POTENTIALLY_INACCURATE_COUNT, NULL, (void **) &number_logical_processors );
86
87   lfds710_list_asu_query( list_of_logical_processors, LFDS710_LIST_ASU_QUERY_GET_POTENTIALLY_INACCURATE_COUNT, NULL, (void **) &number_logical_processors );
88   tpts = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct test_per_thread_state) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
89   pts = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct libtest_threadset_per_thread_state) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
90   per_thread_counters = libshared_memory_alloc_from_unknown_node( ms, sizeof(lfds710_pal_uint_t) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
91   te_array = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct test_element) * 10000 * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
92
93   vi.min_elements = vi.max_elements = 10000;
94
95   lfds710_queue_umm_init_valid_on_current_logical_core( &qs, &qe_dummy, NULL );
96
97   // TRD : we assume the test will iterate at least once (or we'll have a false negative)
98   for( loop = 0 ; loop < 10000 ; loop++ )
99   {
100     (te_array+loop)->thread_number = loop;
101     (te_array+loop)->counter = 0;
102     LFDS710_QUEUE_UMM_SET_VALUE_IN_ELEMENT( (te_array+loop)->qe, te_array+loop );
103     lfds710_queue_umm_enqueue( &qs, &(te_array+loop)->qe );
104   }
105
106   libtest_threadset_init( &ts, NULL );
107
108   loop = 0;
109
110   while( LFDS710_LIST_ASU_GET_START_AND_THEN_NEXT(*list_of_logical_processors,lasue) )
111   {
112     lp = LFDS710_LIST_ASU_GET_VALUE_FROM_ELEMENT( *lasue );
113     (tpts+loop)->qs = &qs;
114     (tpts+loop)->thread_number = loop;
115     (tpts+loop)->counter = 0;
116     libtest_threadset_add_thread( &ts, &pts[loop], lp, thread_rapid_enqueuer_and_dequeuer, &tpts[loop] );
117     loop++;
118   }
119
120   // TRD : run the test
121   libtest_threadset_run( &ts );
122
123   libtest_threadset_cleanup( &ts );
124
125   // TRD : validate
126   LFDS710_MISC_BARRIER_LOAD;
127
128   lfds710_queue_umm_query( &qs, LFDS710_QUEUE_UMM_QUERY_SINGLETHREADED_VALIDATE, &vi, dvs );
129
130   // TRD : now check results
131   for( loop = 0 ; loop < number_logical_processors ; loop++ )
132     *(per_thread_counters+loop) = 0;
133
134   while( *dvs == LFDS710_MISC_VALIDITY_VALID and lfds710_queue_umm_dequeue(&qs, &qe) )
135   {
136     te = LFDS710_QUEUE_UMM_GET_VALUE_FROM_ELEMENT( *qe );
137
138     if( te->thread_number >= number_logical_processors )
139     {
140       *dvs = LFDS710_MISC_VALIDITY_INVALID_TEST_DATA;
141       break;
142     }
143
144     if( per_thread_counters[te->thread_number] == 0 )
145       per_thread_counters[te->thread_number] = te->counter;
146
147     if( te->counter > per_thread_counters[te->thread_number] )
148       *dvs = LFDS710_MISC_VALIDITY_INVALID_MISSING_ELEMENTS;
149
150     if( te->counter < per_thread_counters[te->thread_number] )
151       *dvs = LFDS710_MISC_VALIDITY_INVALID_ADDITIONAL_ELEMENTS;
152
153     if( te->counter == per_thread_counters[te->thread_number] )
154       per_thread_counters[te->thread_number]++;
155   }
156
157   lfds710_queue_umm_cleanup( &qs, NULL );
158
159   return;
160 }
161
162
163
164
165
166 /****************************************************************************/
167 static libshared_pal_thread_return_t LIBSHARED_PAL_THREAD_CALLING_CONVENTION thread_rapid_enqueuer_and_dequeuer( void *libtest_threadset_per_thread_state )
168 {
169   lfds710_pal_uint_t
170     time_loop = 0;
171
172   struct lfds710_queue_umm_element
173     *qe;
174
175   struct test_element
176     *te;
177
178   struct test_per_thread_state
179     *tpts;
180
181   struct libtest_threadset_per_thread_state
182     *pts;
183
184   time_t
185     current_time,
186     start_time;
187
188   LFDS710_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
189
190   LFDS710_PAL_ASSERT( libtest_threadset_per_thread_state != NULL );
191
192   pts = (struct libtest_threadset_per_thread_state *) libtest_threadset_per_thread_state;
193   tpts = LIBTEST_THREADSET_GET_USER_STATE_FROM_PER_THREAD_STATE( *pts );
194
195   libtest_threadset_thread_ready_and_wait( pts );
196
197   current_time = start_time = time( NULL );
198
199   while( current_time < start_time + TEST_DURATION_IN_SECONDS )
200   {
201     lfds710_queue_umm_dequeue( tpts->qs, &qe );
202     te = LFDS710_QUEUE_UMM_GET_VALUE_FROM_ELEMENT( *qe );
203
204     te->thread_number = tpts->thread_number;
205     te->counter = tpts->counter++;
206
207     LFDS710_QUEUE_UMM_SET_VALUE_IN_ELEMENT( *qe, te );
208     lfds710_queue_umm_enqueue( tpts->qs, qe );
209
210     if( time_loop++ == TIME_LOOP_COUNT )
211     {
212       time_loop = 0;
213       time( &current_time );
214     }
215   }
216
217   LFDS710_MISC_BARRIER_STORE;
218
219   lfds710_misc_force_store();
220
221   return LIBSHARED_PAL_THREAD_RETURN_CAST(RETURN_SUCCESS);
222 }
223