2 #include "lfds710_queue_unbounded_manyproducer_manyconsumer_internal.h"
8 /****************************************************************************/
9 void lfds710_queue_umm_enqueue( struct lfds710_queue_umm_state *qumms,
10 struct lfds710_queue_umm_element *qumme )
15 enum lfds710_misc_flag
16 finished_flag = LFDS710_MISC_FLAG_LOWERED;
19 backoff_iteration = LFDS710_BACKOFF_INITIAL_VALUE;
21 struct lfds710_queue_umm_element LFDS710_PAL_ALIGN(LFDS710_PAL_ALIGN_DOUBLE_POINTER)
22 *volatile enqueue[PAC_SIZE],
23 *new_enqueue[PAC_SIZE],
24 *volatile next[PAC_SIZE];
26 LFDS710_PAL_ASSERT( qumms != NULL );
27 LFDS710_PAL_ASSERT( qumme != NULL );
28 LFDS710_PAL_ASSERT( (lfds710_pal_uint_t) qumme->next % LFDS710_PAL_ALIGN_DOUBLE_POINTER == 0 );
30 qumme->next[POINTER] = NULL;
31 LFDS710_PAL_ATOMIC_ADD( &qumms->aba_counter, 1, qumme->next[COUNTER], struct lfds710_queue_umm_element * );
32 LFDS710_MISC_BARRIER_STORE;
34 new_enqueue[POINTER] = qumme;
36 LFDS710_MISC_BARRIER_LOAD;
40 /* TRD : note here the deviation from the white paper
41 in the white paper, next is loaded from enqueue, not from qumms->enqueue
42 what concerns me is that between the load of enqueue and the load of
43 enqueue->next, the element can be dequeued by another thread *and freed*
45 by ordering the loads (load barriers), and loading both from qumms,
46 the following if(), which checks enqueue is still the same as qumms->enqueue
47 still continues to ensure next belongs to enqueue, while avoiding the
51 enqueue[COUNTER] = qumms->enqueue[COUNTER];
52 enqueue[POINTER] = qumms->enqueue[POINTER];
54 LFDS710_MISC_BARRIER_LOAD;
56 next[COUNTER] = qumms->enqueue[POINTER]->next[COUNTER];
57 next[POINTER] = qumms->enqueue[POINTER]->next[POINTER];
59 LFDS710_MISC_BARRIER_LOAD;
61 if( qumms->enqueue[COUNTER] == enqueue[COUNTER] and qumms->enqueue[POINTER] == enqueue[POINTER] )
63 if( next[POINTER] == NULL )
65 new_enqueue[COUNTER] = next[COUNTER] + 1;
66 LFDS710_PAL_ATOMIC_DWCAS( enqueue[POINTER]->next, next, new_enqueue, LFDS710_MISC_CAS_STRENGTH_WEAK, result );
68 finished_flag = LFDS710_MISC_FLAG_RAISED;
72 next[COUNTER] = enqueue[COUNTER] + 1;
73 // TRD : strictly, this is a weak CAS, but we do an extra iteration of the main loop on a fake failure, so we set it to be strong
74 LFDS710_PAL_ATOMIC_DWCAS( qumms->enqueue, enqueue, next, LFDS710_MISC_CAS_STRENGTH_STRONG, result );
81 LFDS710_BACKOFF_EXPONENTIAL_BACKOFF( qumms->enqueue_backoff, backoff_iteration );
83 while( finished_flag == LFDS710_MISC_FLAG_LOWERED );
85 // TRD : move enqueue along; only a weak CAS as the dequeue will solve this if it's out of place
86 new_enqueue[COUNTER] = enqueue[COUNTER] + 1;
87 LFDS710_PAL_ATOMIC_DWCAS( qumms->enqueue, enqueue, new_enqueue, LFDS710_MISC_CAS_STRENGTH_WEAK, result );
90 LFDS710_BACKOFF_EXPONENTIAL_BACKOFF( qumms->enqueue_backoff, backoff_iteration );
92 LFDS710_BACKOFF_AUTOTUNE( qumms->enqueue_backoff, backoff_iteration );