f



yet another win32 condvar implementation

Hi all,

I would like if possible a review of this approach to win32 condvars.
It is based on the ACE implementation (http://www.cs.wustl.edu/
~schmidt/win32-cv-1.html) but I tried to get rid of most concurrent
accesses to the "number of waiters" variable and of the critical
section protecting it.  To achieve this I made the following changes:

* I force the mutex to be held during signal and broadcast.

* I make signal synchronous (so, more expensive) as a tradeoff for
making broadcast cheaper.

Together, this two conditions ensure that the thread with the mutex
protects the signaled thread(s) against other threads.

Any observation is welcome!

#include <windows.h>
#include <limits.h>

typedef CRITICAL_SECTION QemuMutex;
static inline void qemu_mutex_lock(QemuMutex *mutex) {
    EnterCriticalSection(mutex);
}
static inline void qemu_mutex_unlock(QemuMutex *mutex) {
    LeaveCriticalSection(mutex);
}

typedef struct QemuCond QemuCond;
void qemu_cond_init(QemuCond *cond);
void qemu_cond_destroy(QemuCond *cond);
void qemu_cond_signal(QemuCond *cond);
void qemu_cond_broadcast(QemuCond *cond);
void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex);

struct QemuCond {
    LONG waiters, target;
    HANDLE sema;
    HANDLE continue_event;
};

void qemu_cond_init(QemuCond *cond)
{
    memset(cond, 0, sizeof(*cond));

    cond->sema = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
    cond->continue_event = CreateEvent(NULL,    /* security */
                                       FALSE,   /* auto-reset */
                                       FALSE,   /* not signaled */
                                       NULL);   /* name */
}

void qemu_cond_signal(QemuCond *cond)
{
    DWORD result;

    /*
     * Signal only when there are waiters.  cond->waiters is
     * incremented by pthread_cond_wait under the external lock,
     * so we are safe about that.
     */
    if (cond->waiters == 0) {
        return;
    }

    /*
     * Waiting threads decrement it outside the external lock,
     * but only if another thread is signaling/broadcasting and
     * has the mutex.  So, it also cannot be decremented
     * concurrently with this particular access.
     */
    cond->target = cond->waiters - 1;
    result = SignalObjectAndWait(cond->sema, cond->continue_event,
                                 INFINITE, FALSE);
}

void qemu_cond_broadcast(QemuCond *cond)
{
    BOOLEAN result;
    if (cond->waiters == 0) {
        return;
    }

    cond->target = 0;
    result = ReleaseSemaphore(cond->sema, cond->waiters, NULL);
    WaitForSingleObject(cond->continue_event, INFINITE);
}

void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
{
    /*
     * This access is protected under the mutex.
     */
    cond->waiters++;
    qemu_mutex_unlock(mutex);
    WaitForSingleObject(cond->sema, INFINITE);

    /*
     * Decrease waiters count.  The mutex is not taken, so we have
     * to do this atomically.
     *
     * All waiters contend for the mutex at the end of this function
     * until the signaling thread relinquishes it.  To ensure
     * each waiter consumes exactly one slice of the semaphore,
     * the signaling thread stops until it is told by the last
     * waiter that it can go on.
     */
    if (InterlockedDecrement(&cond->waiters) == cond->target) {
        SetEvent(cond->continue_event);
    }

    qemu_mutex_lock(mutex);
}
0
bonzini (40)
2/15/2011 1:29:55 PM
comp.programming.threads 4878 articles. 1 followers. Post Follow

2 Replies
1293 Views

Similar Articles

[PageSpeed] 33

Paolo Bonzini <bonzini@gnu.org> writes:

> I would like if possible a review of this approach to win32 condvars.

> void qemu_cond_signal(QemuCond *cond)
> {
>     DWORD result;
>     if (cond->waiters == 0) {
>         return;
>     }
>     cond->target = cond->waiters - 1;
>     result = SignalObjectAndWait(cond->sema, cond->continue_event,
>                                  INFINITE, FALSE);
> }
>
> void qemu_cond_broadcast(QemuCond *cond)
> {
>     BOOLEAN result;
>     if (cond->waiters == 0) {
>         return;
>     }
>
>     cond->target = 0;
>     result = ReleaseSemaphore(cond->sema, cond->waiters, NULL);
>     WaitForSingleObject(cond->continue_event, INFINITE);
> }
>
> void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
> {
>     cond->waiters++;
>     qemu_mutex_unlock(mutex);
>     WaitForSingleObject(cond->sema, INFINITE);
>
>     if (InterlockedDecrement(&cond->waiters) == cond->target) {
>         SetEvent(cond->continue_event);
>     }
>
>     qemu_mutex_lock(mutex);
> }

Consider:

1. Thread A calls qemu_cond_wait, sets cond->waiters==1 and blocks in
WaitForSingleObject

2. Thread B calls qemu_cond_signal, sees cond->waiters==1, sets
cond->target==0, triggers the semaphore and waits for
cond->continue_event

3. Thread C calls qemu_cond_wait, sets cond->waiters==2 and blocks in
WaitForSingleObject

Now we have a problem. There are two possible outcomes here, both of
which are problematic.

4a. Thread C is now woken, since the semaphore is signalled [Stolen
wakeup]

4b.Thread C stays sleeping, but thread B is woken as expected. Thread B
decrements cond->waiters to 1, but sees cond->target==0, so does not
signal cond->continue_event [Signalling thread does not return]

Why not just use the implementation from boost?

Anthony
-- 
Author of C++ Concurrency in Action     http://www.stdthread.co.uk/book/
just::thread C++0x thread library             http://www.stdthread.co.uk
Just Software Solutions Ltd       http://www.justsoftwaresolutions.co.uk
15 Carrallack Mews, St Just, Cornwall, TR19 7UL, UK. Company No. 5478976
0
Anthony
2/15/2011 3:51:19 PM
Anthony Williams <anthony.ajw@gmail.com> writes:

> Consider:
>
> 1. Thread A calls qemu_cond_wait, sets cond->waiters==1 and blocks in
> WaitForSingleObject
>
> 2. Thread B calls qemu_cond_signal, sees cond->waiters==1, sets
> cond->target==0, triggers the semaphore and waits for
> cond->continue_event
>
> 3. Thread C calls qemu_cond_wait, sets cond->waiters==2 and blocks in
> WaitForSingleObject

This can't happen if the mutex is held across the signal call, as per
Paolo's original post. 

Anthony
-- 
Author of C++ Concurrency in Action     http://www.stdthread.co.uk/book/
just::thread C++0x thread library             http://www.stdthread.co.uk
Just Software Solutions Ltd       http://www.justsoftwaresolutions.co.uk
15 Carrallack Mews, St Just, Cornwall, TR19 7UL, UK. Company No. 5478976
0
Anthony
2/16/2011 12:43:59 PM
Reply: