]> pd.if.org Git - liblfds/blob - liblfds/liblfds7.1.0/test_and_benchmark/libtest/src/libtest_tests/libtest_tests_queue_unbounded_manyproducer_manyconsumer_dequeuing.c
Initial import (all versions, including the new 7.1.0)
[liblfds] / liblfds / liblfds7.1.0 / test_and_benchmark / libtest / src / libtest_tests / libtest_tests_queue_unbounded_manyproducer_manyconsumer_dequeuing.c
1 /***** includes *****/
2 #include "libtest_tests_internal.h"
3
4 /***** structs *****/
5 struct test_per_thread_state
6 {
7   enum flag
8     error_flag;
9
10   struct lfds710_queue_umm_state
11     *qs;
12 };
13
14 struct test_element
15 {
16   struct lfds710_queue_umm_element
17     qe;
18 };
19
20 /***** private prototypes *****/
21 static libshared_pal_thread_return_t LIBSHARED_PAL_THREAD_CALLING_CONVENTION thread_simple_dequeuer( void *libtest_threadset_per_thread_state );
22
23
24
25
26
27 /****************************************************************************/
28 void libtest_tests_queue_umm_dequeuing( struct lfds710_list_asu_state *list_of_logical_processors, struct libshared_memory_state *ms, enum lfds710_misc_validity *dvs )
29 {
30   lfds710_pal_uint_t
31     loop,
32     number_elements,
33     number_logical_processors;
34
35   struct lfds710_list_asu_element
36     *lasue = NULL;
37
38   struct lfds710_queue_umm_element LFDS710_PAL_ALIGN(LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES)
39     qe_dummy;
40
41   struct lfds710_queue_umm_state
42     qs;
43
44   struct lfds710_misc_validation_info
45     vi = { 0, 0 };
46
47   struct libtest_logical_processor
48     *lp;
49
50   struct libtest_threadset_per_thread_state
51     *pts;
52
53   struct libtest_threadset_state
54     ts;
55
56   struct test_element
57     *te_array;
58
59   struct test_per_thread_state
60     *tpts;
61
62   LFDS710_PAL_ASSERT( list_of_logical_processors != NULL );
63   LFDS710_PAL_ASSERT( ms != NULL );
64   LFDS710_PAL_ASSERT( dvs != NULL );
65
66   /* TRD : create a queue, add 1,000,000 elements
67
68            use a single thread to enqueue every element
69            each elements user data is an incrementing counter
70
71            then run one thread per CPU
72            where each busy-works dequeuing
73
74            when an element is dequeued, we check (on a per-thread basis) the
75            value dequeued is greater than the element previously dequeued
76   */
77
78   *dvs = LFDS710_MISC_VALIDITY_VALID;
79
80   // TRD : allocate
81   lfds710_list_asu_query( list_of_logical_processors, LFDS710_LIST_ASU_QUERY_GET_POTENTIALLY_INACCURATE_COUNT, NULL, (void **) &number_logical_processors );
82   tpts = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct test_per_thread_state) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
83   pts = libshared_memory_alloc_from_unknown_node( ms, sizeof(struct libtest_threadset_per_thread_state) * number_logical_processors, LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES );
84   te_array = libshared_memory_alloc_largest_possible_array_from_unknown_node( ms, sizeof(struct test_element), LFDS710_PAL_ATOMIC_ISOLATION_IN_BYTES, &number_elements );
85
86   lfds710_queue_umm_init_valid_on_current_logical_core( &qs, &qe_dummy, NULL );
87
88   for( loop = 0 ; loop < number_elements ; loop++ )
89   {
90     LFDS710_QUEUE_UMM_SET_VALUE_IN_ELEMENT( (te_array+loop)->qe, loop );
91     lfds710_queue_umm_enqueue( &qs, &(te_array+loop)->qe );
92   }
93
94   libtest_threadset_init( &ts, NULL );
95
96   loop = 0;
97
98   while( LFDS710_LIST_ASU_GET_START_AND_THEN_NEXT(*list_of_logical_processors,lasue) )
99   {
100     lp = LFDS710_LIST_ASU_GET_VALUE_FROM_ELEMENT( *lasue );
101     (tpts+loop)->qs = &qs;
102     (tpts+loop)->error_flag = LOWERED;
103     libtest_threadset_add_thread( &ts, &pts[loop], lp, thread_simple_dequeuer, &tpts[loop] );
104     loop++;
105   }
106
107   LFDS710_MISC_BARRIER_STORE;
108
109   lfds710_misc_force_store();
110
111   // TRD : run the test
112   libtest_threadset_run( &ts );
113
114   libtest_threadset_cleanup( &ts );
115
116   // TRD : validate
117   LFDS710_MISC_BARRIER_LOAD;
118
119   // TRD : check queue is empty
120   lfds710_queue_umm_query( &qs, LFDS710_QUEUE_UMM_QUERY_SINGLETHREADED_VALIDATE, &vi, dvs );
121
122   // TRD : check for raised error flags
123   for( loop = 0 ; loop < number_logical_processors ; loop++ )
124     if( (tpts+loop)->error_flag == RAISED )
125       *dvs = LFDS710_MISC_VALIDITY_INVALID_TEST_DATA;
126
127   lfds710_queue_umm_cleanup( &qs, NULL );
128
129   return;
130 }
131
132
133
134
135
136 /****************************************************************************/
137 static libshared_pal_thread_return_t LIBSHARED_PAL_THREAD_CALLING_CONVENTION thread_simple_dequeuer( void *libtest_threadset_per_thread_state )
138 {
139   lfds710_pal_uint_t
140     *prev_value,
141     *value;
142
143   struct lfds710_queue_umm_element
144     *qe;
145
146   struct test_per_thread_state
147     *tpts;
148
149   struct libtest_threadset_per_thread_state
150     *pts;
151
152   LFDS710_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
153
154   LFDS710_PAL_ASSERT( libtest_threadset_per_thread_state != NULL );
155
156   pts = (struct libtest_threadset_per_thread_state *) libtest_threadset_per_thread_state;
157   tpts = LIBTEST_THREADSET_GET_USER_STATE_FROM_PER_THREAD_STATE( *pts );
158
159   lfds710_queue_umm_dequeue( tpts->qs, &qe );
160   prev_value = LFDS710_QUEUE_UMM_GET_VALUE_FROM_ELEMENT( *qe );
161
162   libtest_threadset_thread_ready_and_wait( pts );
163
164   while( lfds710_queue_umm_dequeue(tpts->qs, &qe) )
165   {
166     value = LFDS710_QUEUE_UMM_GET_VALUE_FROM_ELEMENT( *qe );
167
168     if( value <= prev_value )
169       tpts->error_flag = RAISED;
170
171     prev_value = value;
172   }
173
174   LFDS710_MISC_BARRIER_STORE;
175
176   lfds710_misc_force_store();
177
178   return LIBSHARED_PAL_THREAD_RETURN_CAST(RETURN_SUCCESS);
179 }
180