]> pd.if.org Git - liblfds/blob - liblfds/liblfds7.1.0/test_and_benchmark/libtest/src/libtest_tests/libtest_tests_queue_unbounded_manyproducer_manyconsumer_enqueuing.c
Initial import (all versions, including the new 7.1.0)
[liblfds] / liblfds / liblfds7.1.0 / test_and_benchmark / libtest / src / libtest_tests / libtest_tests_queue_unbounded_manyproducer_manyconsumer_enqueuing.c
1 /***** includes *****/
2 #include "libtest_tests_internal.h"
3
4 /***** structs *****/
5 struct test_per_thread_state
6 {
7   lfds710_pal_uint_t
8     number_elements_per_thread,
9     thread_number;
10
11   struct lfds710_queue_umm_state
12     *qs;
13
14   struct test_element
15     *te_array;
16 };
17
18 struct test_element
19 {
20   struct lfds710_queue_umm_element
21     qe;
22
23   lfds710_pal_uint_t
24     counter,
25     thread_number;
26 };
27
28 /***** private prototypes *****/
29 static libshared_pal_thread_return_t LIBSHARED_PAL_THREAD_CALLING_CONVENTION thread_simple_enqueuer( void *libtest_threadset_per_thread_state );
30
31
32
33
34
35 /****************************************************************************/
36 void libtest_tests_queue_umm_enqueuing( struct lfds710_list_asu_state *list_of_logical_processors, struct libshared_memory_state *ms, enum lfds710_misc_validity *dvs )
37 {
38   lfds710_pal_uint_t
39     *per_thread_counters,
40     loop = 0,
41     number_elements,
42     number_elements_per_thread,
43     number_logical_processors;
44
45   struct lfds710_list_asu_element
46     *lasue = NULL;
47
48   struct lfds710_queue_umm_element LFDS710_PAL_ALIGN(LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES)
49     dummy_qe;
50
51   struct lfds710_queue_umm_element
52     *qe;
53
54   struct lfds710_queue_umm_state
55     qs;
56
57   struct lfds710_misc_validation_info
58     vi;
59
60   struct libtest_logical_processor
61     *lp;
62
63   struct libtest_threadset_per_thread_state
64     *pts;
65
66   struct libtest_threadset_state
67     ts;
68
69   struct test_element
70     *te,
71     *te_array;
72
73   struct test_per_thread_state
74     *tpts;
75
76   LFDS710_PAL_ASSERT( list_of_logical_processors != NULL );
77   LFDS710_PAL_ASSERT( ms != NULL );
78   LFDS710_PAL_ASSERT( dvs != NULL );
79
80   /* TRD : create an empty queue
81            then run one thread per CPU
82            where each thread busy-works, enqueuing elements from a freelist (one local freelist per thread)
83            until 100000 elements are enqueued, per thread
84            each element's void pointer of user data is a struct containing thread number and element number
85            where element_number is a thread-local counter starting at 0
86
87            when we're done, we check that all the elements are present
88            and increment on a per-thread basis
89   */
90
91   *dvs = LFDS710_MISC_VALIDITY_VALID;
92
93   lfds710_list_asu_query( list_of_logical_processors, LFDS710_LIST_ASU_QUERY_GET_POTENTIALLY_INACCURATE_COUNT, NULL, (void **) &number_logical_processors );
94
95   // TRD : allocate
96   lfds710_list_asu_query( list_of_logical_processors, LFDS710_LIST_ASU_QUERY_GET_POTENTIALLY_INACCURATE_COUNT, NULL, (void **) &number_logical_processors );
97   tpts = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct test_per_thread_state) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
98   pts = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct libtest_threadset_per_thread_state) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
99   per_thread_counters = libshared_memory_alloc_from_unknown_node( ms, sizeof(lfds710_pal_uint_t) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
100   te_array = libshared_memory_alloc_largest_possible_array_from_unknown_node( ms, sizeof(struct test_element), LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES, &number_elements );
101
102   number_elements_per_thread = number_elements / number_logical_processors;
103
104   lfds710_queue_umm_init_valid_on_current_logical_core( &qs, &dummy_qe, NULL );
105
106   libtest_threadset_init( &ts, NULL );
107
108   while( LFDS710_LIST_ASU_GET_START_AND_THEN_NEXT(*list_of_logical_processors,lasue) )
109   {
110     lp = LFDS710_LIST_ASU_GET_VALUE_FROM_ELEMENT( *lasue );
111     (tpts+loop)->qs = &qs;
112     (tpts+loop)->thread_number = loop;
113     (tpts+loop)->number_elements_per_thread = number_elements_per_thread;
114     (tpts+loop)->te_array = te_array + loop * number_elements_per_thread;
115     libtest_threadset_add_thread( &ts, &pts[loop], lp, thread_simple_enqueuer, &tpts[loop] );
116     loop++;
117   }
118
119   LFDS710_MISC_BARRIER_STORE;
120
121   lfds710_misc_force_store();
122
123   // TRD : run the test
124   libtest_threadset_run( &ts );
125
126   libtest_threadset_cleanup( &ts );
127
128   // TRD : validate
129   LFDS710_MISC_BARRIER_LOAD;
130
131   /* TRD : first, validate the queue
132
133            then dequeue
134            we expect to find element numbers increment on a per thread basis
135   */
136
137   vi.min_elements = vi.max_elements = number_elements_per_thread * number_logical_processors;
138
139   lfds710_queue_umm_query( &qs, LFDS710_QUEUE_UMM_QUERY_SINGLETHREADED_VALIDATE, &vi, dvs );
140
141   for( loop = 0 ; loop < number_logical_processors ; loop++ )
142     *(per_thread_counters+loop) = 0;
143
144   while( *dvs == LFDS710_MISC_VALIDITY_VALID and lfds710_queue_umm_dequeue(&qs, &qe) )
145   {
146     te = LFDS710_QUEUE_UMM_GET_VALUE_FROM_ELEMENT( *qe );
147
148     if( te->thread_number >= number_logical_processors )
149     {
150       *dvs = LFDS710_MISC_VALIDITY_INVALID_TEST_DATA;
151       break;
152     }
153
154     if( te->counter > per_thread_counters[te->thread_number] )
155       *dvs = LFDS710_MISC_VALIDITY_INVALID_MISSING_ELEMENTS;
156
157     if( te->counter < per_thread_counters[te->thread_number] )
158       *dvs = LFDS710_MISC_VALIDITY_INVALID_ADDITIONAL_ELEMENTS;
159
160     if( te->counter == per_thread_counters[te->thread_number] )
161       per_thread_counters[te->thread_number]++;
162   }
163
164   lfds710_queue_umm_cleanup( &qs, NULL );
165
166   return;
167 }
168
169
170
171
172
173 /****************************************************************************/
174 static libshared_pal_thread_return_t LIBSHARED_PAL_THREAD_CALLING_CONVENTION thread_simple_enqueuer( void *libtest_threadset_per_thread_state )
175 {
176   lfds710_pal_uint_t
177     loop;
178
179   struct test_per_thread_state
180     *tpts;
181
182   struct libtest_threadset_per_thread_state
183     *pts;
184
185   LFDS710_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
186
187   LFDS710_PAL_ASSERT( libtest_threadset_per_thread_state != NULL );
188
189   pts = (struct libtest_threadset_per_thread_state *) libtest_threadset_per_thread_state;
190   tpts = LIBTEST_THREADSET_GET_USER_STATE_FROM_PER_THREAD_STATE( *pts );
191
192   for( loop = 0 ; loop < tpts->number_elements_per_thread ; loop++ )
193   {
194     (tpts->te_array+loop)->thread_number = tpts->thread_number;
195     (tpts->te_array+loop)->counter = loop;
196   }
197
198   libtest_threadset_thread_ready_and_wait( pts );
199
200   for( loop = 0 ; loop < tpts->number_elements_per_thread ; loop++ )
201   {
202     LFDS710_QUEUE_UMM_SET_VALUE_IN_ELEMENT( (tpts->te_array+loop)->qe, tpts->te_array+loop );
203     lfds710_queue_umm_enqueue( tpts->qs, &(tpts->te_array+loop)->qe );
204   }
205
206   LFDS710_MISC_BARRIER_STORE;
207
208   lfds710_misc_force_store();
209
210   return LIBSHARED_PAL_THREAD_RETURN_CAST(RETURN_SUCCESS);
211 }
212