Single Reader, Single Writer FIFO

  • Follow


I am looking for a Single-Reader, Single-Writer FIFO in C/C++. Here is
my solution for win32, based on the http://www.drdobbs.com/cpp/210604448
article. But I am not sure if it is really thread-safe. Well, this is
not a re-view site, but if there is something wrong please tell me.

Best regards, Friedrich

#ifndef QUEUE_HPP_INCLUDED
#define QUEUE_HPP_INCLUDED

#include <Windows.h>

/// @brief A single reader, single writer queue
template <typename T>
class LockFreeQueue {
private:
    /// @brief Node of the queue
    struct Node {
        Node( T* val ) : value(val), next(0) { }
        T* value;
        Node* next;
    };

    Node* first; // for producer only
    Node* divider; // shared
    Node* last; // shared

    // no copy
    LockFreeQueue& operator=(const LockFreeQueue&);
    LockFreeQueue(const LockFreeQueue&);
public:
    /// @brief Constructor
    LockFreeQueue()
    :   first(new Node(0)),
        divider(first),
        last(first)
    {
    }

    /// @brief Destructor
    ~LockFreeQueue()
    {
        while( first != 0 )
        {
            // release the list
            Node* tmp = first;
            first = tmp->next;
            delete tmp;
        }
    }

    /// @brief Pushes to the end of the queue
    /// @warning Must only be called from the producer
    void push_back(T* t)
    {
        last->next = new Node(t);   // add the new item
        // publish it
        InterlockedExchangePointer(&last, last->next); // last = last-
>next;
        while( first != divider )
        {   // trim unused nodes
            Node* tmp = first;
            first = first->next;
            delete tmp;
        }
    }

    /// @brief Pop an element from the front
    /// @warning Must only be called from the consumer
    /// @return true If a node was popped
    /// @return false If queue is empty
    bool pop_front(T* result )
    {
        if(divider != last)
        {
            // if queue is nonempty
            result = divider->next->value;  // C: copy it back
            // D: publish that we took it
            InterlockedExchangePointer(&divider, divider->next); //
divider = divider->next;
            return true; // and report success
        }
        return false; // else report empty
    }

    /// @brief Points to the element at the front
    /// @warning Must only be called from the consumer
    /// @return 0 if queue is empty
    /// @return Pointer to the first node
    T* front()
    {
        T* t = 0;
        if(divider != last)
        {
            t = divider->next->value;
        }
        return t;
    }
};

#endif // QUEUE_HPP_INCLUDE
0
Reply Friedrich.Schick (3) 12/30/2010 9:58:49 AM

On Dec 30, 12:58=A0pm, "Friedrich.Sch...@googlemail.com"
<friedrich.sch...@googlemail.com> wrote:
> I am looking for a Single-Reader, Single-Writer FIFO in C/C++. Here is
> my solution for win32, based on thehttp://www.drdobbs.com/cpp/210604448
> article. But I am not sure if it is really thread-safe. Well, this is
> not a re-view site, but if there is something wrong please tell me.
>
> Best regards, Friedrich
>
> #ifndef QUEUE_HPP_INCLUDED
> #define QUEUE_HPP_INCLUDED
>
> #include <Windows.h>
>
> /// @brief A single reader, single writer queue
> template <typename T>
> class LockFreeQueue {
> private:
> =A0 =A0 /// @brief Node of the queue
> =A0 =A0 struct Node {
> =A0 =A0 =A0 =A0 Node( T* val ) : value(val), next(0) { }
> =A0 =A0 =A0 =A0 T* value;
> =A0 =A0 =A0 =A0 Node* next;
> =A0 =A0 };
>
> =A0 =A0 Node* first; // for producer only
> =A0 =A0 Node* divider; // shared
> =A0 =A0 Node* last; // shared


Oh boy, you introduced false-sharing, God must be killed a start-up
for that :)
Seriously, false-sharing is one of the most serious mistakes in
synchronization algorithms. Here are some graphs regarding negative
effects of sharing:
http://www.1024cores.net/home/lock-free-algorithms/first-things-first
And here are my thoughts on false sharing:
http://www.1024cores.net/home/lock-free-algorithms/false-sharing---false

Also, you need to declare the variables as 'volatile'. Read this:
http://www.1024cores.net/home/lock-free-algorithms/so-what-is-a-memory-mode=
l-and-how-to-cook-it/ordering


> =A0 =A0 // no copy
> =A0 =A0 LockFreeQueue& operator=3D(const LockFreeQueue&);
> =A0 =A0 LockFreeQueue(const LockFreeQueue&);
> public:
> =A0 =A0 /// @brief Constructor
> =A0 =A0 LockFreeQueue()
> =A0 =A0 : =A0 first(new Node(0)),
> =A0 =A0 =A0 =A0 divider(first),
> =A0 =A0 =A0 =A0 last(first)
> =A0 =A0 {
> =A0 =A0 }
>
> =A0 =A0 /// @brief Destructor
> =A0 =A0 ~LockFreeQueue()
> =A0 =A0 {
> =A0 =A0 =A0 =A0 while( first !=3D 0 )
> =A0 =A0 =A0 =A0 {
> =A0 =A0 =A0 =A0 =A0 =A0 // release the list
> =A0 =A0 =A0 =A0 =A0 =A0 Node* tmp =3D first;
> =A0 =A0 =A0 =A0 =A0 =A0 first =3D tmp->next;
> =A0 =A0 =A0 =A0 =A0 =A0 delete tmp;
> =A0 =A0 =A0 =A0 }
> =A0 =A0 }
>
> =A0 =A0 /// @brief Pushes to the end of the queue
> =A0 =A0 /// @warning Must only be called from the producer
> =A0 =A0 void push_back(T* t)
> =A0 =A0 {
> =A0 =A0 =A0 =A0 last->next =3D new Node(t); =A0 // add the new item
> =A0 =A0 =A0 =A0 // publish it
> =A0 =A0 =A0 =A0 InterlockedExchangePointer(&last, last->next); // last =
=3D last->next;


What for you put InterlockedExchangePointer() here? You need plain
atomic store-release here:
http://www.1024cores.net/home/lock-free-algorithms/so-what-is-a-memory-mode=
l-and-how-to-cook-it



> =A0 =A0 =A0 =A0 while( first !=3D divider )
> =A0 =A0 =A0 =A0 { =A0 // trim unused nodes
> =A0 =A0 =A0 =A0 =A0 =A0 Node* tmp =3D first;
> =A0 =A0 =A0 =A0 =A0 =A0 first =3D first->next;
> =A0 =A0 =A0 =A0 =A0 =A0 delete tmp;
> =A0 =A0 =A0 =A0 }
> =A0 =A0 }
>
> =A0 =A0 /// @brief Pop an element from the front
> =A0 =A0 /// @warning Must only be called from the consumer
> =A0 =A0 /// @return true If a node was popped
> =A0 =A0 /// @return false If queue is empty
> =A0 =A0 bool pop_front(T* result )
> =A0 =A0 {
> =A0 =A0 =A0 =A0 if(divider !=3D last)
> =A0 =A0 =A0 =A0 {
> =A0 =A0 =A0 =A0 =A0 =A0 // if queue is nonempty
> =A0 =A0 =A0 =A0 =A0 =A0 result =3D divider->next->value; =A0// C: copy it=
 back
> =A0 =A0 =A0 =A0 =A0 =A0 // D: publish that we took it
> =A0 =A0 =A0 =A0 =A0 =A0 InterlockedExchangePointer(&divider, divider->nex=
t); //
> divider =3D divider->next;


No need for InterlockedExchangePointer() as well.


> =A0 =A0 =A0 =A0 =A0 =A0 return true; // and report success
> =A0 =A0 =A0 =A0 }
> =A0 =A0 =A0 =A0 return false; // else report empty
> =A0 =A0 }
>
> =A0 =A0 /// @brief Points to the element at the front
> =A0 =A0 /// @warning Must only be called from the consumer
> =A0 =A0 /// @return 0 if queue is empty
> =A0 =A0 /// @return Pointer to the first node
> =A0 =A0 T* front()
> =A0 =A0 {
> =A0 =A0 =A0 =A0 T* t =3D 0;
> =A0 =A0 =A0 =A0 if(divider !=3D last)
> =A0 =A0 =A0 =A0 {
> =A0 =A0 =A0 =A0 =A0 =A0 t =3D divider->next->value;
> =A0 =A0 =A0 =A0 }
> =A0 =A0 =A0 =A0 return t;
> =A0 =A0 }
>
> };
>
> #endif // QUEUE_HPP_INCLUDE


I would recommend to look at the following queue implementation:
http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-qu=
eue
It has paddings in place, uses optimal primitives and more aggressive
node caching.

--
Dmitriy V'jukov
0
Reply Dmitriy 12/30/2010 5:00:22 PM


1 Replies
786 Views

(page loaded in 0.005 seconds)

Similiar Articles:













7/21/2012 3:49:39 AM


Reply: