2 #include "lfds710_queue_unbounded_manyproducer_manyconsumer_internal.h"
8 /****************************************************************************/
9 int lfds710_queue_umm_dequeue( struct lfds710_queue_umm_state *qumms,
10 struct lfds710_queue_umm_element **qumme )
15 enum lfds710_misc_flag
16 finished_flag = LFDS710_MISC_FLAG_LOWERED;
18 enum lfds710_queue_umm_queue_state
19 state = LFDS710_QUEUE_UMM_QUEUE_STATE_UNKNOWN;
25 backoff_iteration = LFDS710_BACKOFF_INITIAL_VALUE;
27 struct lfds710_queue_umm_element LFDS710_PAL_ALIGN(LFDS710_PAL_ALIGN_DOUBLE_POINTER)
36 LFDS710_PAL_ASSERT( qumms != NULL );
37 LFDS710_PAL_ASSERT( qumme != NULL );
39 LFDS710_MISC_BARRIER_LOAD;
43 /* TRD : note here the deviation from the white paper
44 in the white paper, next is loaded from dequeue, not from qumms->dequeue
45 what concerns me is that between the load of dequeue and the load of
46 enqueue->next, the element can be dequeued by another thread *and freed*
48 by ordering the loads (load barriers), and loading both from qumms,
49 the following if(), which checks dequeue is still the same as qumms->enqueue
50 still continues to ensure next belongs to enqueue, while avoiding the
54 dequeue[COUNTER] = qumms->dequeue[COUNTER];
55 dequeue[POINTER] = qumms->dequeue[POINTER];
57 LFDS710_MISC_BARRIER_LOAD;
59 enqueue[COUNTER] = qumms->enqueue[COUNTER];
60 enqueue[POINTER] = qumms->enqueue[POINTER];
62 next[COUNTER] = qumms->dequeue[POINTER]->next[COUNTER];
63 next[POINTER] = qumms->dequeue[POINTER]->next[POINTER];
65 LFDS710_MISC_BARRIER_LOAD;
67 if( qumms->dequeue[COUNTER] == dequeue[COUNTER] and qumms->dequeue[POINTER] == dequeue[POINTER] )
69 if( enqueue[POINTER] == dequeue[POINTER] and next[POINTER] == NULL )
70 state = LFDS710_QUEUE_UMM_QUEUE_STATE_EMPTY;
72 if( enqueue[POINTER] == dequeue[POINTER] and next[POINTER] != NULL )
73 state = LFDS710_QUEUE_UMM_QUEUE_STATE_ENQUEUE_OUT_OF_PLACE;
75 if( enqueue[POINTER] != dequeue[POINTER] )
76 state = LFDS710_QUEUE_UMM_QUEUE_STATE_ATTEMPT_DEQUEUE;
80 case LFDS710_QUEUE_UMM_QUEUE_STATE_UNKNOWN:
81 // TRD : eliminates compiler warning
84 case LFDS710_QUEUE_UMM_QUEUE_STATE_EMPTY:
88 finished_flag = LFDS710_MISC_FLAG_RAISED;
91 case LFDS710_QUEUE_UMM_QUEUE_STATE_ENQUEUE_OUT_OF_PLACE:
92 next[COUNTER] = enqueue[COUNTER] + 1;
93 LFDS710_PAL_ATOMIC_DWCAS( qumms->enqueue, enqueue, next, LFDS710_MISC_CAS_STRENGTH_WEAK, result );
94 // TRD : in fact if result is 1 (successful) I think we can now simply drop down into the dequeue attempt
97 case LFDS710_QUEUE_UMM_QUEUE_STATE_ATTEMPT_DEQUEUE:
98 key = next[POINTER]->key;
99 value = next[POINTER]->value;
101 next[COUNTER] = dequeue[COUNTER] + 1;
102 LFDS710_PAL_ATOMIC_DWCAS( qumms->dequeue, dequeue, next, LFDS710_MISC_CAS_STRENGTH_WEAK, result );
105 finished_flag = LFDS710_MISC_FLAG_RAISED;
113 LFDS710_BACKOFF_EXPONENTIAL_BACKOFF( qumms->dequeue_backoff, backoff_iteration );
115 while( finished_flag == LFDS710_MISC_FLAG_LOWERED );
119 *qumme = dequeue[POINTER];
121 (*qumme)->value = value;
124 LFDS710_BACKOFF_AUTOTUNE( qumms->dequeue_backoff, backoff_iteration );