This is the mail archive of the
pthreads-win32@sources.redhat.com
mailing list for the pthreas-win32 project.
Possible violation of synchronization in pthread_cond_timedwait()
- From: qybupt at 263 dot net
- To: pthreads-win32 at sources dot redhat dot com
- Date: Thu, 18 Nov 2004 11:38:13 +0800
- Subject: Possible violation of synchronization in pthread_cond_timedwait()
Hello guys:
When I am using the pthread-win32 in a program which will be portable on windows and Linux,
I find a problem of pthread_cond_timedwait().
It is a very simple scenario. There is one message queue, one thread popping messages from
it and another one pushing messages into it. When there is none message in the queue, the
popping thread will wait for a condition. On the other side, the pushing thread will wait for
the same condition when the queue is full. I use the pthread_cond_timedwait because I want the
waiting thread not to block for ever. However, I found that the popping thread sometimes
executed the code to pop message when the queue was empty! The test code is below:
(Please pay attention to the CAudioMessageQueue::queuePop() and CAudioMessageQueue::queuePush())
// TestWaitTimeOut.cpp
#include <stdlib.h>
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include "sched.h"
#include "pthread.h"
#include "CAudioMessage.h"
#include "CAudioMessageQueue.h"
bool isShutdown = false;
CAudioMessageQueue *pMessageQueue = NULL;
extern "C"{
void* __cdecl push(void* param)
{
CAudioMessage msg;
char *pStr;
int i = 0;
while(true) {
if(isShutdown) {
break;
}
msg.setData(NULL);
msg.setDataSize(0);
pStr = (char *)malloc(100*sizeof(char));
memset(pStr, 0, 100);
//printf("in MessagePusher, address = %x\n", (long)pStr);
sprintf(pStr, "%d", i);
msg.setData(pStr);
msg.setDataSize(100);
if(!pMessageQueue->queuePush(msg)) {
free(pStr);
}
i++;
Sleep(20);
}
return NULL;
}
}
extern "C"{
void* __cdecl pop(void* param)
{
CAudioMessage msg;
int i = 0;
while(true) {
if(isShutdown) {
break;
}
msg.setData(NULL);
msg.setDataSize(0);
if(!pMessageQueue->queuePop(msg)) {
continue;
}
//printf("in MessagePoper, address = %x\n", (long)msg.getData());
free(msg.getData());
i++;
//Sleep(1);
}
return NULL;
}
}
int main(int argc, char *argv[]) {
pthread_t tidPusher, tidPoper;
pMessageQueue = new CAudioMessageQueue(10);
if (0 != pthread_create(&tidPusher, NULL, &push, NULL)) {
printf("fail to start pusher\n");
return 1;
}
if (0 != pthread_create(&tidPoper, NULL, &pop, NULL)) {
printf("fail to start poper\n");
return 1;
}
Sleep(200000);
isShutdown = true;
printf("wait for the threads to stop\n");
Sleep(5000);
return 0;
}
// CAudioMessageQueue.h
#ifndef CAUDIOMESSAGEQUEUE_H
#define CAUDIOMESSAGEQUEUE_H
#include "pthread.h"
#include "CAudioMessage.h"
class CAudioMessageQueue
{
public:
// constructor
// the destructor will release the storage pointed by these two arguments
CAudioMessageQueue();
CAudioMessageQueue(int maxSize);
// destructor
virtual ~CAudioMessageQueue();
// Get queue max size.
int getMaxSize(void);
// Set queue size.
void setMaxSize(int size);
// Get queue current size.
int getCurrentSize(void);
// Push message to query tail, this queue will keep its own copy of msg
bool queuePush(const CAudioMessage& msg);
bool queuePush(const CAudioMessage* msg);
// Pop message from query head, this queue will release storage of its own copy
bool queuePop(CAudioMessage& msg);
bool queuePop(CAudioMessage* msg);
// Is queue full?
bool isFull(void);
// Is queue empty?
bool isEmpty(void);
struct timespec getTimeOut();
private:
CAudioMessage* m_head;
CAudioMessage* m_tail;
int m_maxSize;
int m_curSize;
pthread_cond_t m_cond;
pthread_mutex_t m_mutex;
};
#endif// CAUDIOMESSAGEQUEUE_H
// CAudioMessageQueue.cpp
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/types.h>
#include <math.h>
#include "sched.h"
#include "pthread.h"
#include "CAudioMessageQueue.h"
#define COND_TIMEOUT 0.01
CAudioMessageQueue::CAudioMessageQueue()
{
m_head = NULL;
m_tail = NULL;
m_maxSize = 0;
m_curSize = 0;
pthread_mutex_init(&m_mutex, NULL);
pthread_cond_init(&m_cond, NULL);
}
CAudioMessageQueue::CAudioMessageQueue(int maxSize)
{
//CAudioMessageQueue();
m_head = NULL;
m_tail = NULL;
m_maxSize = 0;
m_curSize = 0;
pthread_mutex_init(&m_mutex, NULL);
pthread_cond_init(&m_cond, NULL);
m_maxSize = maxSize;
}
// destructor
// Attention: the CAudioMessage objects in this queue will be popped out, but
// their storage will not be free, it is the client's responsibility.
CAudioMessageQueue::~CAudioMessageQueue()
{
CAudioMessage msg;
while(m_curSize > 0) {
queuePop(msg);
}
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
// Get queue max size.
int CAudioMessageQueue::getMaxSize(void)
{
int size;
pthread_mutex_lock(&m_mutex);
size = m_maxSize;
pthread_mutex_unlock(&m_mutex);
return size;
}
// Set queue size.
void CAudioMessageQueue::setMaxSize(int size)
{
pthread_mutex_lock(&m_mutex);
m_maxSize = size;
pthread_mutex_unlock(&m_mutex);
}
// Get queue current size
int CAudioMessageQueue::getCurrentSize(void)
{
int size;
pthread_mutex_lock(&m_mutex);
size = m_curSize;
pthread_mutex_unlock(&m_mutex);
return size;
}
// Push message to query tail.
bool CAudioMessageQueue::queuePush(const CAudioMessage& msg)
{
CAudioMessage* message = new CAudioMessage(msg);
// create AudioMessage fail
if (NULL == message) return false;
pthread_mutex_lock(&m_mutex);
if(m_curSize == m_maxSize) {
struct timespec timeout = getTimeOut();
if(0 != pthread_cond_timedwait(&m_cond, &m_mutex, &timeout)) {
//if(0 != pthread_cond_wait(&m_cond, &m_mutex)) {
pthread_mutex_unlock(&m_mutex);
return false;
}
}
// add message to queue.
if (NULL == m_head) {//queue is empty
m_head = message;
m_tail = message;
} else { // queue is not empty, add after tail.
m_tail->m_next = message;
m_tail = message;
}
message->m_next = NULL;
m_curSize ++;
//printf("pushed message %s into the queue\n", message->getData());
//printf("the current size of queue is %d\n", m_curSize);
pthread_cond_signal(&m_cond);
pthread_mutex_unlock(&m_mutex);
return true;
}
bool CAudioMessageQueue::queuePush(const CAudioMessage* msg)
{
return CAudioMessageQueue::queuePush(*msg);
}
// Pop message from query head.
bool CAudioMessageQueue::queuePop(CAudioMessage& msg)
{
// empty queue
//if (NULL == m_head) return false;
pthread_mutex_lock(&m_mutex);
if(m_curSize == 0) {
struct timespec timeout = getTimeOut();
if(0 != pthread_cond_timedwait(&m_cond, &m_mutex, &timeout)) {
//if(0 != pthread_cond_wait(&m_cond, &m_mutex)) {
pthread_mutex_unlock(&m_mutex);
return false;
}
}
// delete message from queue.
msg = *m_head;
delete m_head;
m_head = msg.m_next; // header changed
msg.m_next = NULL; // next is illegal
if (NULL == m_head) m_tail = NULL;
m_curSize --;
//printf("poped message %s from the queue\n", msg.getData());
//printf("the current size of queue is %d\n", m_curSize);
// unlock mutex if synchronized.
pthread_cond_signal(&m_cond);
pthread_mutex_unlock(&m_mutex);
return true;
}
bool CAudioMessageQueue::queuePop(CAudioMessage* msg)
{
return CAudioMessageQueue::queuePop(*msg);
}
// Is queue full?
bool CAudioMessageQueue::isFull(void)
{
bool full;
// lock mutex if synchronized.
pthread_mutex_lock(&m_mutex);
full = (m_curSize == m_maxSize) ? true : false;
// unlock mutex if synchronized.
pthread_mutex_unlock(&m_mutex);
return full;
}
// Is queue empty?
bool CAudioMessageQueue::isEmpty(void)
{
bool empty;
// lock mutex if synchronized.
pthread_mutex_lock(&m_mutex);
empty = (m_curSize == 0) ? true : false;
// unlock mutex if synchronized.
pthread_mutex_unlock(&m_mutex);
return empty;
}
timespec CAudioMessageQueue::getTimeOut() {
struct _timeb timebuffer;
struct timespec timeout;
_ftime(&timebuffer);
timeout.tv_sec = timebuffer.time + floor(COND_TIMEOUT);
timeout.tv_nsec = (timebuffer.millitm +
(COND_TIMEOUT - floor(COND_TIMEOUT)) * 1000) * 1000000;
return timeout;
}
// CAudioMessage.h
#ifndef CAUDIOMESSAGE_H
#define CAUDIOMESSAGE_H
//#include "CAudioStdinc.h"
#define NULL 0
class CAudioMessageQueue;
class CAudioMessage
{
friend class CAudioMessageQueue;
public:
CAudioMessage();
CAudioMessage(const CAudioMessage& msg);
virtual ~CAudioMessage();
void setData(void* data);
// Get data.
void* getData(void) const;
// Set data size.
void setDataSize(int size);
// Get data size.
int getDataSize(void) const;
// Operator =
CAudioMessage& operator=(const CAudioMessage& right);
protected:
CAudioMessage *m_next;
private:
void* m_data;
int m_dataSize;
};
#endif// CAUDIOMESSAGE_H
// CAudioMessage.cpp
#include "CAudioMessage.h"
//#include <stdlib.h>
//#include <string.h>
CAudioMessage::CAudioMessage()
{
m_data = NULL;
m_dataSize = 0;
}
CAudioMessage::CAudioMessage(const CAudioMessage& msg)
{
m_data = msg.m_data;
m_dataSize = msg.m_dataSize;
}
CAudioMessage::~CAudioMessage()
{
m_data = NULL;
m_dataSize = 0;
}
// Get data.
void* CAudioMessage::getData(void) const
{
return m_data;
}
// Set data.
void CAudioMessage::setData(void* data)
{
m_data = data;
}
// Get data size.
int CAudioMessage::getDataSize(void) const
{
return m_dataSize;
}
// Set data size.
void CAudioMessage::setDataSize(int size)
{
m_dataSize = size;
}
// Operator =
CAudioMessage& CAudioMessage::operator=(const CAudioMessage& right)
{
if (this == &right) return *this;
m_next = right.m_next;
m_data = right.m_data;
m_dataSize = right.m_dataSize;
return * this;
}
I run it in the VC6.0 and found that in CAudioMessageQueue::queuePop() the thread sometimes
excuted the "msg = *m_head;" when m_head was equal to NULL! The pthread_cond_timedwait's normal
return indicated that the condition waited had been signaled and the queue had at least one
message, but there was none in it.
I searched the archives of pthread-win32's discussion list and got none report about this
problem. I hope it is because of my mistake in the source code, but I have checked it many
times. Does anyone have suggestion? Thank you for your patience. :)
My test enviroment:
hardware: cpu - P4 2.8GHz * 2 memeory - 1G
software: windows xp profession, Visual C++ 6.0
==========================
263电子邮件-信赖邮自专业