Velocity Reviews - Computer Hardware Reviews

Velocity Reviews > Newsgroups > Programming > C++ > Blocking queue race condition?

Reply
Thread Tools

Blocking queue race condition?

 
 
Andrew Tomazos
Guest
Posts: n/a
 
      01-05-2012
I'm trying to implement a high performance blocking queue backed by a
circular buffer on top of pthreads, semaphore.h and gcc atomic
builtins. The queue needs to handle multiple simulataneous readers
and writers from different threads.

I've isolated some sort of race condition, and I'm not sure if it's a
faulty assumption about the behavior of some of the atomic operations
and semaphores, or whether my design is fundamentally flawed.

I've extracted and simplified it to the below standalone example. I
would expect that this program never returns. It does however return
after a few hundred thousand iterations with corruption detected in
the queue (at least on my machine).

In the below example (for exposition) it doesn't actually store
anything, it just sets to 1 a cell that would hold the actual data,
and 0 to represent an empty cell. There is a counting semaphore
(vacancies) representing the number of vacant cells, and another
counting semaphore (occupants) representing the number of occupied
cells.

Writers do the following:
(1) decrement vacancies
(2) atomically get next head position
(3) write to it
(4) increment occupants

Readers do the opposite:
(1) decrement occupants
(2) atomically get next tail position
(3) read from it
(4) increment vacancies

I would expect that given the above, precisely one thread can be
reading or writing any given cell at one time. This seems to not be
the case though.

Any ideas about why it doesn't work or debugging strategies
appreciated. Code and output below...

#include <stdlib.h>
#include <semaphore.h>
#include <iostream>

using namespace std;

#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2

struct CountingSemaphore
{
sem_t m;
CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
void post() { sem_post(&m); }
void wait() { sem_wait(&m); }
~CountingSemaphore() { sem_destroy(&m); }
};

struct BlockingQueue
{
unsigned int head; // (head % capacity) is next head position
unsigned int tail; // (tail % capacity) is next tail position
CountingSemaphore vacancies; // how many cells are vacant
CountingSemaphore occupants; // how many cells are occupied

int cell[QUEUE_CAPACITY]; // cell[x] == 1 if cell x occupied, cell[x]
== 0 if cell x vacant

BlockingQueue() :
head(0),
tail(0),
vacancies(QUEUE_CAPACITY),
occupants(0)
{
for (size_t i = 0; i < QUEUE_CAPACITY; i++)
cell[i] = 0;
}

// put an item in the queue
void put()
{
vacancies.wait();

// __sync_fetch_and_add(&head,1) is an atomic post increment, ie head
++
set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

occupants.post();
}

// take an item from the queue
void take()
{
occupants.wait();

// __sync_fetch_and_add(&tail,1) is an atomic post increment, ie tail
++
get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);

vacancies.post();
}

// set cell i
void set(unsigned int i)
{
// __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
// swap 1 for 0 or die
if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
{
corrupt("set", i);
exit(-1);
}
}

// get cell i
void get(unsigned int i)
{
// __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
// swap 0 for 1 or die
if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
{
corrupt("get", i);
exit(-1);
}
}

// corruption detected
void corrupt(const char* action, unsigned int i)
{
static CountingSemaphore sem(1);
sem.wait();

cerr << "corruption detected" << endl;
cerr << "action = " << action << endl;
cerr << "i = " << i << endl;
cerr << "head = " << head << endl;
cerr << "tail = " << tail << endl;

for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
cerr << "cell[" << j << "] = " << cell[j] << endl;
}
};

BlockingQueue q;

// keep posting to the queue forever
void* Source(void*)
{
while (true)
q.put();

return 0;
}

// keep taking from the queue forever
void* Sink(void*)
{
while (true)
q.take();

return 0;
}

int main()
{
pthread_t id;

// start some pthreads to run Source function
for (int i = 0; i < NUM_THREADS; i++)
if (pthread_create(&id, NULL, &Source, 0))
abort();

// start some pthreads to run Sink function
for (int i = 0; i < NUM_THREADS; i++)
if (pthread_create(&id, NULL, &Sink, 0))
abort();

while (true);
}

Compile the above as follows:
$ g++ -pthread AboveCode.cpp
$ ./a.out

The output is different every time, but here is one example:

corruption detected
action = get
i = 6
head = 122685
tail = 122685
cell[0] = 0
cell[1] = 0
cell[2] = 1
cell[3] = 0
cell[4] = 1
cell[5] = 0
cell[6] = 1
cell[7] = 1

My system is Ubuntu 11.10 on Intel Core 2:
$ uname -a
Linux 3.0.0-14-generic #23-Ubuntu SMP Mon Nov 21 20:28:43 UTC 2011
x86_64 x86_64 x86_64 GNU/Linux
$ cat /proc/cpuinfo | grep Intel
model name : Intel(R) Core(TM)2 Quad CPU Q9300 @ 2.50GHz
$ g++ --version
g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1

Thanks,
Andrew.
 
Reply With Quote
 
 
 
 
Andrew Tomazos
Guest
Posts: n/a
 
      01-05-2012
On Jan 5, 8:31*am, Paavo Helde <(E-Mail Removed)> wrote:
> Andrew Tomazos <(E-Mail Removed)> wrote innews:(E-Mail Removed):
>
> > I'm trying to implement a high performance blocking queue backed by a
> > circular buffer on top of pthreads, semaphore.h and gcc atomic
> > builtins. *The queue needs to handle multiple simulataneous readers
> > and writers from different threads.

>
> It appears you are using 4 different memory locations for synchronization
> (head, tail, occupants, vacancies). However, updates to these locations
> are not strictly synchronized with each other so most probably their
> states will become inconsistent at some point. Moreover, there is a
> possibility that using 4 memory locations for synchronization may
> actually reduce the performance when compared to e.g. a simple mutex.


What do you mean synchronized with each other? sem_wait, sem_get, and
__sync_fetch_and_add compile to a LOCK XADD, so they have an implicit
full memory barrier. gcc says this includes anything that is

> > I've isolated some sort of race condition, and I'm not sure if it's a
> > faulty assumption about the behavior of some of the atomic operations
> > and semaphores, or whether my design is fundamentally flawed.
> > * * *void put()
> > * * *{
> > * * * * * vacancies.wait();

>
> > * * * * * // __sync_fetch_and_add(&head,1) is an atomic post
> > * * * * * increment, ie head
> > ++
> > * * * * * set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

>
> > * * * * * occupants.post();
> > * * *}

>
> Here is an example of race condition. You wait until there are some free
> cells (vacancies.wait()), then proceed to write some data somewhere.
> However, these two operations are not performed atomically, meaning that
> other threads can meanwhile post any number of items so that the queue
> becomes full again when you get to __sync_fetch_and_add and will thus
> overwrite some already occupied slot. Or have I overlooked something?


The initial value of vacancies is the capacity of the queue and the
initial value of occupants is 0. One of these two semaphores is
always decremented at the start of the function (either put or take)
and the other is incremented at the end. They both have implicit full
membars, so there values are globally visable after changing.
Therefore in between (during execution of put or take) the total of
(vacancies+occupants) can be at maximum (capacity-1).

Further for every other thread concurrently inside put or take this
maximum is decreased by 1, so we can see at maximum (capacity) threads
are putting or taking a total of (capacity) cells. Because readers
atomically read/increment the head count once entering the take
function they must receive sequentially indexed cells. Same for
writers.

I'm sure there is a flaw in my reasoning (as the program doesn't
work), but I think your statement that "other threads can meanwhile
post any number of items" is false as they have to get past the
counting semaphores.

Regards,
Andrew.
 
Reply With Quote
 
 
 
 
Melissa
Guest
Posts: n/a
 
      01-05-2012
On Thu, 5 Jan 2012 00:15:00 -0800 (PST)
Andrew Tomazos <(E-Mail Removed)> wrote:

>
> I'm sure there is a flaw in my reasoning (as the program doesn't
> work), but I think your statement that "other threads can meanwhile
> post any number of items" is false as they have to get past the
> counting semaphores.
>


Poblem is that set and get operations are not serialized so
order of sets and gets can be random.
That means that set(1) can trigger get (0) while
set (0) is not yet performed.



 
Reply With Quote
 
Andrew Tomazos
Guest
Posts: n/a
 
      01-05-2012
On Jan 5, 6:05*pm, "io_x" <(E-Mail Removed)> wrote:
> "Andrew Tomazos" <(E-Mail Removed)> ha scritto nel messaggionews:(E-Mail Removed)...
>
> > I'm trying to implement a high performance blocking queue backed by a

>
> the code not compile
> it return 51 errors


If you post the errors maybe I can tell you what the problem is.

> i prefer my asm queue implementation that allow multithread
> to all stl libray, exception handling, and <> definitinos
> , const definition, all in one count
>
> but i like constructors and distructors of object of C++
> and references too, name for function that compiler see
> name function strcat() name of parameters i like them too
>
> but pheraps i'm too much stupid for understand full them
> or not like that path


The real implementation uses things like:

typename aligned_storage<sizeof(T), alignment_of<T>::value>::type
storage[capacity];

and emplace and move constructors. I've just reduced it to this
simple example for debugging the race condition.
-Andrew.
 
Reply With Quote
 
Chris M. Thomasson
Guest
Posts: n/a
 
      01-07-2012
"Andrew Tomazos" wrote in message
news:(E-Mail Removed)...

[...]

> // set cell i
> void set(unsigned int i)
> {
> // __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
> // swap 1 for 0 or die
> if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
> {
> corrupt("set", i);
> exit(-1);
> }
> }


> // get cell i
> void get(unsigned int i)
> {
> // __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
> // swap 0 for 1 or die
> if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
> {
> corrupt("get", i);
> exit(-1);
> }
> }


Why would the simple failure condition of the CAS mean "everything" is
corrupted? AFAICT, the state is simply not finished being fully produced
yet...


2 threads
_________________________________________________
thread 1 gets past vacancies
thread 2 gets past vacancies
thread 1 XADD head gets 0 index
thread 2 XADD head gets 1 index
thread 2 success CAS 0-to-1 in index[1]
thread 2 incs occupants
thread 2 gets past occupants [consumer now]
thread 2 XADD tail gets 0 index
thread 2 fails CAS 1-to-0 in index[0]; test failed ????
thread 1 success CAS 0-to-1 in index[0]; humm...
_________________________________________________



You basically have to use CAS _loops_ here; wrt this design anyway....

<GET>
while (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
{
CLEVER_BACKOFF(); // ;^)
}

and vise-versa


This should solve the race condition.




BTW, check these mpmc bounded queues out:

http://www.1024cores.net/home/lock-f...ded-mpmc-queue


and a very simple and still experimental one from me:

http://groups.google.com/group/comp....4a30cba0623bf4




My algorithm implicitly blocks on queue-full/empty and cell-full/empty
conditions and acts accordingly. Also, it's in dire need of a distributed
conditional blocking mechanism. Eventcount's come to mind... ;^)

You can compile the following crude example impl with recent MSVC++ for
x86-32 and the pthread-win32 library:

http://pastebin.com/ZVX7dL4g

 
Reply With Quote
 
Chris M. Thomasson
Guest
Posts: n/a
 
      01-07-2012
"Chris M. Thomasson" wrote in message
newsl4Oq.9195$(E-Mail Removed)...
[...]
"Andrew Tomazos" wrote in message
[...]

> You can compile the following crude example impl with recent MSVC++ for
> x86-32 and the pthread-win32 library:


FWIW, you can read the following for further context:

http://groups.google.com/group/comp....6e1cc6fcbe77c8

http://groups.google.com/group/comp....ac3ad36c4d1069

http://groups.google.com/group/comp....b568c5d41d45a7
(read all my posts in this thread...)

 
Reply With Quote
 
Andrew Tomazos
Guest
Posts: n/a
 
      01-08-2012
On Jan 8, 12:13*am, "Chris M. Thomasson" <(E-Mail Removed)> wrote:
> "Andrew Tomazos" *wrote in message
>
> news:(E-Mail Removed)...
>
> [...]
>
>
>
>
>
>
>
>
>
> > // set cell i
> > void set(unsigned int i)
> > {
> > // __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
> > // swap 1 for 0 or die
> > if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
> > {
> > corrupt("set", i);
> > exit(-1);
> > }
> > }
> > // get cell i
> > void get(unsigned int i)
> > {
> > // __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
> > // swap 0 for 1 or die
> > if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
> > {
> > corrupt("get", i);
> > exit(-1);
> > }
> > }

>
> Why would the simple failure condition of the CAS mean "everything" is
> corrupted? AFAICT, the state is simply not finished being fully produced
> yet...


The state of the cell in the demo is a "stub" to represent whether the
content is constructed there or not (in the real implementation). If
the design was correct (it isn't), normally instead of the CAS there
would be a (guaranteed) uncontended constructor call of the value_type
(stub 0 -> 1) or a destructor call of the value_type (stub 1 -> 0).
The CAS is just to catch corruption in the demo.
-Andrew.
 
Reply With Quote
 
Chris M. Thomasson
Guest
Posts: n/a
 
      01-08-2012
"Andrew Tomazos" <(E-Mail Removed)> wrote in message
news:(E-Mail Removed)...
On Jan 8, 12:13 am, "Chris M. Thomasson" <(E-Mail Removed)> wrote:
> "Andrew Tomazos" wrote in message

[...]

> > Why would the simple failure condition of the CAS mean "everything" is
> > corrupted? AFAICT, the state is simply not finished being fully produced
> > yet...


> The state of the cell in the demo is a "stub" to represent whether the
> content is constructed there or not (in the real implementation). If
> the design was correct (it isn't), normally instead of the CAS there
> would be a (guaranteed) uncontended constructor call of the value_type
> (stub 0 -> 1) or a destructor call of the value_type (stub 1 -> 0).


Ugggg....


> The CAS is just to catch corruption in the demo.


Sorry, but a failed CAS in this specific algorithm simply does NOT indicate
a failure condition at all. You need to spin on the CAS simply because just
incrementing the `occupants' does NOT mean that there is a guaranteed item
in the "cell you expect". You have a race-condition that I laid out for you;
it can trip the condition even with 2 threads:

2 threads
_________________________________________________
thread 1 gets past vacancies
thread 2 gets past vacancies
thread 1 XADD head gets 0 index
thread 2 XADD head gets 1 index
thread 2 success CAS 0-to-1 in index[1]
thread 2 incs occupants
thread 2 gets past occupants [consumer now]
thread 2 XADD tail gets 0 index
thread 2 fails CAS 1-to-0 in index[0]; test failed ????
thread 1 success CAS 0-to-1 in index[0]; humm...
_________________________________________________


Think about it for a moment.... Thread 2 needs to WAIT for thread 1 to
finish it's CAS. If you don't do it, the queue is quite busted and totally
unusable.

It as simple as that.


 
Reply With Quote
 
Chris M. Thomasson
Guest
Posts: n/a
 
      01-08-2012
"Andrew Tomazos" <(E-Mail Removed)> wrote in message
news:(E-Mail Removed)...
> I'm trying to implement a high performance blocking queue backed by a
> circular buffer on top of pthreads, semaphore.h and gcc atomic
> builtins. The queue needs to handle multiple simulataneous readers
> and writers from different threads.


[...]

"High Performance" queue? I am counting 1 sem_post, 1 sem_wait, 2 XADD's and
1 CAS for a push operation, and the exact same overhead for a pop operation.
BTW, those calls to `sem_post/wait' are not exactly fast and probably
contain an atomic RMW.

So, assuming that `sem_post/wait' have at least 1 atomic RMW each, I am
counting:

5 atomic RMW for push and 5 atomic RMW for pop... Memory barriers aside for
a moment...

Even in the presence of ZERO contention, how can this possibly perform
better than a simple condvar/mutex setup?

The condvar/mutex will be using far less atomic RMW and memory barriers.


 
Reply With Quote
 
Andrew Tomazos
Guest
Posts: n/a
 
      01-09-2012
On Jan 8, 10:56*pm, "Chris M. Thomasson" <(E-Mail Removed)> wrote:
> Sorry, but a failed CAS in this specific algorithm simply does NOT indicate
> a failure condition at all.


From my point of view it does, because I expected (incorrectly) that
there would be no contention at that point in the code.

I understand the race condition now and have since fixed it in the
design.
-Andrew.
 
Reply With Quote
 
 
 
Reply

Thread Tools

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are Off


Similar Threads
Thread Thread Starter Forum Replies Last Post
Program blocked in Queue.Queue.get and Queue.Queue.put Kris Python 0 01-04-2012 03:46 PM
Is Queue.Queue.queue.clear() thread-safe? Russell Warren Python 4 06-27-2006 03:03 PM
what's the difference between #include "queue.h" and #include "queue.cpp" Kceiw C++ 3 03-14-2006 03:01 AM
Queue.Queue-like class without the busy-wait Paul L. Du Bois Python 29 04-04-2005 01:28 PM
Mega Pixel race is like the Mhz Race Hugo Drax Digital Photography 7 01-12-2004 11:07 AM



Advertisments