]> pd.if.org Git - liblfds/blob - liblfds/liblfds7.1.0/liblfds710/src/lfds710_queue_unbounded_manyproducer_manyconsumer/lfds710_queue_unbounded_manyproducer_manyconsumer_enqueue.c
Initial import (all versions, including the new 7.1.0)
[liblfds] / liblfds / liblfds7.1.0 / liblfds710 / src / lfds710_queue_unbounded_manyproducer_manyconsumer / lfds710_queue_unbounded_manyproducer_manyconsumer_enqueue.c
1 /***** includes *****/
2 #include "lfds710_queue_unbounded_manyproducer_manyconsumer_internal.h"
3
4
5
6
7
8 /****************************************************************************/
9 void lfds710_queue_umm_enqueue( struct lfds710_queue_umm_state *qumms,
10                                 struct lfds710_queue_umm_element *qumme )
11 {
12   char unsigned
13     result = 0;
14
15   enum lfds710_misc_flag
16     finished_flag = LFDS710_MISC_FLAG_LOWERED;
17
18   lfds710_pal_uint_t
19     backoff_iteration = LFDS710_BACKOFF_INITIAL_VALUE;
20
21   struct lfds710_queue_umm_element LFDS710_PAL_ALIGN(LFDS710_PAL_ALIGN_DOUBLE_POINTER)
22     *volatile enqueue[PAC_SIZE],
23     *new_enqueue[PAC_SIZE],
24     *volatile next[PAC_SIZE];
25
26   LFDS710_PAL_ASSERT( qumms != NULL );
27   LFDS710_PAL_ASSERT( qumme != NULL );
28   LFDS710_PAL_ASSERT( (lfds710_pal_uint_t) qumme->next % LFDS710_PAL_ALIGN_DOUBLE_POINTER == 0 );
29
30   qumme->next[POINTER] = NULL;
31   LFDS710_PAL_ATOMIC_ADD( &qumms->aba_counter, 1, qumme->next[COUNTER], struct lfds710_queue_umm_element * );
32   LFDS710_MISC_BARRIER_STORE;
33
34   new_enqueue[POINTER] = qumme;
35
36   LFDS710_MISC_BARRIER_LOAD;
37
38   do
39   {
40     /* TRD : note here the deviation from the white paper
41              in the white paper, next is loaded from enqueue, not from qumms->enqueue
42              what concerns me is that between the load of enqueue and the load of
43              enqueue->next, the element can be dequeued by another thread *and freed*
44
45              by ordering the loads (load barriers), and loading both from qumms,
46              the following if(), which checks enqueue is still the same as qumms->enqueue
47              still continues to ensure next belongs to enqueue, while avoiding the
48              problem with free
49     */
50
51     enqueue[COUNTER] = qumms->enqueue[COUNTER];
52     enqueue[POINTER] = qumms->enqueue[POINTER];
53
54     LFDS710_MISC_BARRIER_LOAD;
55
56     next[COUNTER] = qumms->enqueue[POINTER]->next[COUNTER];
57     next[POINTER] = qumms->enqueue[POINTER]->next[POINTER];
58
59     LFDS710_MISC_BARRIER_LOAD;
60
61     if( qumms->enqueue[COUNTER] == enqueue[COUNTER] and qumms->enqueue[POINTER] == enqueue[POINTER] )
62     {
63       if( next[POINTER] == NULL )
64       {
65         new_enqueue[COUNTER] = next[COUNTER] + 1;
66         LFDS710_PAL_ATOMIC_DWCAS( enqueue[POINTER]->next, next, new_enqueue, LFDS710_MISC_CAS_STRENGTH_WEAK, result );
67         if( result == 1 )
68           finished_flag = LFDS710_MISC_FLAG_RAISED;
69       }
70       else
71       {
72         next[COUNTER] = enqueue[COUNTER] + 1;
73         // TRD : strictly, this is a weak CAS, but we do an extra iteration of the main loop on a fake failure, so we set it to be strong
74         LFDS710_PAL_ATOMIC_DWCAS( qumms->enqueue, enqueue, next, LFDS710_MISC_CAS_STRENGTH_STRONG, result );
75       }
76     }
77     else
78       result = 0;
79
80     if( result == 0 )
81       LFDS710_BACKOFF_EXPONENTIAL_BACKOFF( qumms->enqueue_backoff, backoff_iteration );
82   }
83   while( finished_flag == LFDS710_MISC_FLAG_LOWERED );
84
85   // TRD : move enqueue along; only a weak CAS as the dequeue will solve this if it's out of place
86   new_enqueue[COUNTER] = enqueue[COUNTER] + 1;
87   LFDS710_PAL_ATOMIC_DWCAS( qumms->enqueue, enqueue, new_enqueue, LFDS710_MISC_CAS_STRENGTH_WEAK, result );
88
89   if( result == 0 )
90     LFDS710_BACKOFF_EXPONENTIAL_BACKOFF( qumms->enqueue_backoff, backoff_iteration );
91
92   LFDS710_BACKOFF_AUTOTUNE( qumms->enqueue_backoff, backoff_iteration );
93
94   return;
95 }
96