]> pd.if.org Git - liblfds/blob - liblfds/liblfds7.1.0/test_and_benchmark/libtest/src/libtest_tests/libtest_tests_queue_bounded_manyproducer_manyconsumer_enqueuing.c
Initial import (all versions, including the new 7.1.0)
[liblfds] / liblfds / liblfds7.1.0 / test_and_benchmark / libtest / src / libtest_tests / libtest_tests_queue_bounded_manyproducer_manyconsumer_enqueuing.c
1 /***** includes *****/
2 #include "libtest_tests_internal.h"
3
4 /***** structs *****/
5 struct test_per_thread_state
6 {
7   lfds710_pal_uint_t
8     thread_number;
9
10   struct lfds710_queue_bmm_state
11     *qbmms;
12 };
13
14 /***** private prototypes *****/
15 static libshared_pal_thread_return_t LIBSHARED_PAL_THREAD_CALLING_CONVENTION thread_simple_enqueuer( void *libtest_threadset_per_thread_state );
16
17
18
19
20
21 /****************************************************************************/
22 void libtest_tests_queue_bmm_enqueuing( struct lfds710_list_asu_state *list_of_logical_processors, struct libshared_memory_state *ms, enum lfds710_misc_validity *dvs )
23 {
24   lfds710_pal_uint_t
25     counter_number,
26     loop = 0,
27     *per_thread_counters,
28     power_of_two_number_elements = 1,
29     number_elements,
30     number_logical_processors,
31     thread_number;
32
33   struct lfds710_list_asu_element
34     *lasue = NULL;
35
36   struct lfds710_queue_bmm_element
37     *qbmme_array;
38
39   struct lfds710_queue_bmm_state
40     qbmms;
41
42   struct lfds710_misc_validation_info
43     vi;
44
45   struct libtest_logical_processor
46     *lp;
47
48   struct libtest_threadset_per_thread_state
49     *pts;
50
51   struct libtest_threadset_state
52     ts;
53
54   struct test_per_thread_state
55     *tpts;
56
57   LFDS710_PAL_ASSERT( list_of_logical_processors != NULL );
58   LFDS710_PAL_ASSERT( ms != NULL );
59   LFDS710_PAL_ASSERT( dvs != NULL );
60
61   /* TRD : create an empty queue, with the largest possible number of elements
62            then run one thread per CPU
63            where each thread busy-works, enqueuing key/values, where the key is the thread ID and the value is an incrementing per-thread counter
64            run until the queue is full
65
66            when we're done, we check that all the elements are present
67            and increment on a per-thread basis
68   */
69
70   *dvs = LFDS710_MISC_VALIDITY_VALID;
71
72   lfds710_list_asu_query( list_of_logical_processors, LFDS710_LIST_ASU_QUERY_GET_POTENTIALLY_INACCURATE_COUNT, NULL, (void **) &number_logical_processors );
73
74   // TRD : allocate
75   lfds710_list_asu_query( list_of_logical_processors, LFDS710_LIST_ASU_QUERY_GET_POTENTIALLY_INACCURATE_COUNT, NULL, (void **) &number_logical_processors );
76   tpts = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct test_per_thread_state) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
77   pts = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct libtest_threadset_per_thread_state) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
78   per_thread_counters = libshared_memory_alloc_from_unknown_node( ms, sizeof(lfds710_pal_uint_t) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
79   qbmme_array = libshared_memory_alloc_largest_possible_array_from_unknown_node( ms, sizeof(struct lfds710_queue_bmm_element), LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES, &number_elements );
80
81   // TRD : need to only use a power of 2 number of elements
82   number_elements >>= 1;
83
84   while( number_elements != 0 )
85   {
86     number_elements >>= 1;
87     power_of_two_number_elements <<= 1;
88   }
89
90   lfds710_queue_bmm_init_valid_on_current_logical_core( &qbmms, qbmme_array, power_of_two_number_elements, NULL );
91
92   libtest_threadset_init( &ts, NULL );
93
94   while( LFDS710_LIST_ASU_GET_START_AND_THEN_NEXT(*list_of_logical_processors,lasue) )
95   {
96     lp = LFDS710_LIST_ASU_GET_VALUE_FROM_ELEMENT( *lasue );
97     (tpts+loop)->qbmms = &qbmms;
98     (tpts+loop)->thread_number = loop;
99     libtest_threadset_add_thread( &ts, &pts[loop], lp, thread_simple_enqueuer, &tpts[loop] );
100     loop++;
101   }
102
103   LFDS710_MISC_BARRIER_STORE;
104
105   lfds710_misc_force_store();
106
107   // TRD : run the test
108   libtest_threadset_run( &ts );
109
110   libtest_threadset_cleanup( &ts );
111
112   // TRD : validate
113   LFDS710_MISC_BARRIER_LOAD;
114
115   /* TRD : first, validate the queue
116            then dequeue
117            we expect to find element numbers increment on a per thread basis
118   */
119
120   vi.min_elements = vi.max_elements = power_of_two_number_elements;
121
122   lfds710_queue_bmm_query( &qbmms, LFDS710_QUEUE_BMM_QUERY_SINGLETHREADED_VALIDATE, &vi, dvs );
123
124   for( loop = 0 ; loop < number_logical_processors ; loop++ )
125     *(per_thread_counters+loop) = 0;
126
127   while( *dvs == LFDS710_MISC_VALIDITY_VALID and lfds710_queue_bmm_dequeue(&qbmms, (void **) &thread_number, (void **) &counter_number) )
128   {
129     if( thread_number >= number_logical_processors )
130     {
131       *dvs = LFDS710_MISC_VALIDITY_INVALID_TEST_DATA;
132       break;
133     }
134
135     if( counter_number > per_thread_counters[thread_number] )
136       *dvs = LFDS710_MISC_VALIDITY_INVALID_MISSING_ELEMENTS;
137
138     if( counter_number < per_thread_counters[thread_number] )
139       *dvs = LFDS710_MISC_VALIDITY_INVALID_ADDITIONAL_ELEMENTS;
140
141     if( counter_number == per_thread_counters[thread_number] )
142       per_thread_counters[thread_number]++;
143   }
144
145   lfds710_queue_bmm_cleanup( &qbmms, NULL );
146
147   return;
148 }
149
150
151
152
153
154 /****************************************************************************/
155 static libshared_pal_thread_return_t LIBSHARED_PAL_THREAD_CALLING_CONVENTION thread_simple_enqueuer( void *libtest_threadset_per_thread_state )
156 {
157   lfds710_pal_uint_t
158     counter = 0;
159
160   struct test_per_thread_state
161     *tpts;
162
163   struct libtest_threadset_per_thread_state
164     *pts;
165
166   LFDS710_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
167
168   LFDS710_PAL_ASSERT( libtest_threadset_per_thread_state != NULL );
169
170   pts = (struct libtest_threadset_per_thread_state *) libtest_threadset_per_thread_state;
171   tpts = LIBTEST_THREADSET_GET_USER_STATE_FROM_PER_THREAD_STATE( *pts );
172
173   libtest_threadset_thread_ready_and_wait( pts );
174
175   while( lfds710_queue_bmm_enqueue(tpts->qbmms, (void *) (tpts->thread_number), (void *) (counter++)) );
176
177   LFDS710_MISC_BARRIER_STORE;
178
179   lfds710_misc_force_store();
180
181   return LIBSHARED_PAL_THREAD_RETURN_CAST(RETURN_SUCCESS);
182 }
183