]> pd.if.org Git - liblfds/blob - liblfds/liblfds7.1.0/liblfds710/src/lfds710_queue_unbounded_manyproducer_manyconsumer/lfds710_queue_unbounded_manyproducer_manyconsumer_dequeue.c
Initial import (all versions, including the new 7.1.0)
[liblfds] / liblfds / liblfds7.1.0 / liblfds710 / src / lfds710_queue_unbounded_manyproducer_manyconsumer / lfds710_queue_unbounded_manyproducer_manyconsumer_dequeue.c
1 /***** includes *****/
2 #include "lfds710_queue_unbounded_manyproducer_manyconsumer_internal.h"
3
4
5
6
7
8 /****************************************************************************/
9 int lfds710_queue_umm_dequeue( struct lfds710_queue_umm_state *qumms,
10                                struct lfds710_queue_umm_element **qumme )
11 {
12   char unsigned
13     result = 0;
14
15   enum lfds710_misc_flag
16     finished_flag = LFDS710_MISC_FLAG_LOWERED;
17
18   enum lfds710_queue_umm_queue_state
19     state = LFDS710_QUEUE_UMM_QUEUE_STATE_UNKNOWN;
20
21   int
22     rv = 1;
23
24   lfds710_pal_uint_t
25     backoff_iteration = LFDS710_BACKOFF_INITIAL_VALUE;
26
27   struct lfds710_queue_umm_element LFDS710_PAL_ALIGN(LFDS710_PAL_ALIGN_DOUBLE_POINTER)
28     *dequeue[PAC_SIZE],
29     *enqueue[PAC_SIZE],
30     *next[PAC_SIZE];
31
32   void
33     *key = NULL,
34     *value = NULL;
35
36   LFDS710_PAL_ASSERT( qumms != NULL );
37   LFDS710_PAL_ASSERT( qumme != NULL );
38
39   LFDS710_MISC_BARRIER_LOAD;
40
41   do
42   {
43     /* TRD : note here the deviation from the white paper
44              in the white paper, next is loaded from dequeue, not from qumms->dequeue
45              what concerns me is that between the load of dequeue and the load of
46              enqueue->next, the element can be dequeued by another thread *and freed*
47
48              by ordering the loads (load barriers), and loading both from qumms,
49              the following if(), which checks dequeue is still the same as qumms->enqueue
50              still continues to ensure next belongs to enqueue, while avoiding the
51              problem with free
52     */
53
54     dequeue[COUNTER] = qumms->dequeue[COUNTER];
55     dequeue[POINTER] = qumms->dequeue[POINTER];
56
57     LFDS710_MISC_BARRIER_LOAD;
58
59     enqueue[COUNTER] = qumms->enqueue[COUNTER];
60     enqueue[POINTER] = qumms->enqueue[POINTER];
61
62     next[COUNTER] = qumms->dequeue[POINTER]->next[COUNTER];
63     next[POINTER] = qumms->dequeue[POINTER]->next[POINTER];
64
65     LFDS710_MISC_BARRIER_LOAD;
66
67     if( qumms->dequeue[COUNTER] == dequeue[COUNTER] and qumms->dequeue[POINTER] == dequeue[POINTER] )
68     {
69       if( enqueue[POINTER] == dequeue[POINTER] and next[POINTER] == NULL )
70         state = LFDS710_QUEUE_UMM_QUEUE_STATE_EMPTY;
71
72       if( enqueue[POINTER] == dequeue[POINTER] and next[POINTER] != NULL )
73         state = LFDS710_QUEUE_UMM_QUEUE_STATE_ENQUEUE_OUT_OF_PLACE;
74
75       if( enqueue[POINTER] != dequeue[POINTER] )
76         state = LFDS710_QUEUE_UMM_QUEUE_STATE_ATTEMPT_DEQUEUE;
77
78       switch( state )
79       {
80         case LFDS710_QUEUE_UMM_QUEUE_STATE_UNKNOWN:
81           // TRD : eliminates compiler warning
82         break;
83
84         case LFDS710_QUEUE_UMM_QUEUE_STATE_EMPTY:
85           rv = 0;
86           *qumme = NULL;
87           result = 1;
88           finished_flag = LFDS710_MISC_FLAG_RAISED;
89         break;
90
91         case LFDS710_QUEUE_UMM_QUEUE_STATE_ENQUEUE_OUT_OF_PLACE:
92           next[COUNTER] = enqueue[COUNTER] + 1;
93           LFDS710_PAL_ATOMIC_DWCAS( qumms->enqueue, enqueue, next, LFDS710_MISC_CAS_STRENGTH_WEAK, result );
94           // TRD : in fact if result is 1 (successful) I think we can now simply drop down into the dequeue attempt
95         break;
96
97         case LFDS710_QUEUE_UMM_QUEUE_STATE_ATTEMPT_DEQUEUE:
98           key = next[POINTER]->key;
99           value = next[POINTER]->value;
100
101           next[COUNTER] = dequeue[COUNTER] + 1;
102           LFDS710_PAL_ATOMIC_DWCAS( qumms->dequeue, dequeue, next, LFDS710_MISC_CAS_STRENGTH_WEAK, result );
103
104           if( result == 1 )
105             finished_flag = LFDS710_MISC_FLAG_RAISED;
106         break;
107       }
108     }
109     else
110       result = 0;
111
112     if( result == 0 )
113       LFDS710_BACKOFF_EXPONENTIAL_BACKOFF( qumms->dequeue_backoff, backoff_iteration );
114   }
115   while( finished_flag == LFDS710_MISC_FLAG_LOWERED );
116
117   if( result == 1 )
118   {
119     *qumme = dequeue[POINTER];
120     (*qumme)->key = key;
121     (*qumme)->value = value;
122   }
123
124   LFDS710_BACKOFF_AUTOTUNE( qumms->dequeue_backoff, backoff_iteration );
125
126   return rv;
127 }
128