This is the mail archive of the pthreads-win32@sources.redhat.com mailing list for the pthreas-win32 project.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

Possible violation of synchronization in pthread_cond_timedwait()


Hello guys:

    When I am using the pthread-win32 in a program which will be portable on windows and Linux,
 I find a problem of pthread_cond_timedwait(). 
        
    It is a very simple scenario. There is one message queue, one thread popping messages from 
it and another one pushing messages into it. When there is none message in the queue, the 
popping thread will wait for a condition. On the other side, the pushing thread will wait for 
the same condition when the queue is full. I use the pthread_cond_timedwait because I want the 
waiting thread not to block for ever. However, I found that the popping thread sometimes 
executed the code to pop message when the queue was empty! The test code is below: 
(Please pay attention to the CAudioMessageQueue::queuePop() and CAudioMessageQueue::queuePush())

// TestWaitTimeOut.cpp
#include <stdlib.h>
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include "sched.h"
#include "pthread.h"
#include "CAudioMessage.h"
#include "CAudioMessageQueue.h"

bool isShutdown = false;
CAudioMessageQueue *pMessageQueue = NULL;

extern "C"{
	void*  __cdecl push(void* param)
	{
		CAudioMessage msg;
		char *pStr;
		int i = 0;
		while(true)  {
			if(isShutdown) {
				break;
			}
			msg.setData(NULL);
			msg.setDataSize(0);
			pStr = (char *)malloc(100*sizeof(char));
			memset(pStr, 0, 100);
			//printf("in MessagePusher, address = %x\n", (long)pStr);
			sprintf(pStr, "%d", i); 
			msg.setData(pStr);
			msg.setDataSize(100);
			if(!pMessageQueue->queuePush(msg)) {
				free(pStr);
			}
			i++;
			Sleep(20);
		}
		return NULL;
	}
}

extern "C"{
	void*  __cdecl pop(void* param)
	{
		CAudioMessage msg;
		int i = 0;
		while(true) {
			if(isShutdown) {
				break;
			}
			msg.setData(NULL);
			msg.setDataSize(0);
			if(!pMessageQueue->queuePop(msg)) {
				continue;
			}
			//printf("in MessagePoper, address = %x\n", (long)msg.getData());
			free(msg.getData());
			i++;
			//Sleep(1);
		}
		return NULL;
	}
}

int main(int argc, char *argv[]) {
	pthread_t tidPusher, tidPoper;
	pMessageQueue = new CAudioMessageQueue(10);
	
	if (0 != pthread_create(&tidPusher, NULL, &push, NULL)) {
		printf("fail to start pusher\n");
		return 1;
	}
	if (0 != pthread_create(&tidPoper, NULL, &pop, NULL)) {
		printf("fail to start poper\n");
		return 1;
	}

	Sleep(200000);
	isShutdown = true;
	printf("wait for the threads to stop\n");
	Sleep(5000);

	return 0;
}

// CAudioMessageQueue.h

#ifndef CAUDIOMESSAGEQUEUE_H
#define CAUDIOMESSAGEQUEUE_H
#include "pthread.h"
#include "CAudioMessage.h"
 
class CAudioMessageQueue 
{
public:
    // constructor
    // the destructor will release the storage pointed by these two arguments
    CAudioMessageQueue();
    CAudioMessageQueue(int maxSize);
    // destructor
    virtual ~CAudioMessageQueue();
    // Get queue max size.
    int getMaxSize(void);
    // Set queue size.
    void setMaxSize(int size);
    // Get queue current size.
    int getCurrentSize(void);
    // Push message to query tail, this queue will keep its own copy of msg
    bool queuePush(const CAudioMessage& msg);
    bool queuePush(const CAudioMessage* msg);
    // Pop message from query head, this queue will release storage of its own copy
    bool queuePop(CAudioMessage& msg);
    bool queuePop(CAudioMessage* msg);
    // Is queue full?
    bool isFull(void);
    // Is queue empty?
    bool isEmpty(void);
    struct timespec getTimeOut();

private:
    CAudioMessage* m_head;
    CAudioMessage* m_tail;
    int m_maxSize;
    int m_curSize;
    pthread_cond_t m_cond;
    pthread_mutex_t m_mutex;
};

#endif// CAUDIOMESSAGEQUEUE_H

// CAudioMessageQueue.cpp
#include <stdio.h>
#include <sys/timeb.h> 
#include <sys/types.h>
#include <math.h> 
#include "sched.h"
#include "pthread.h"   
#include "CAudioMessageQueue.h"
#define COND_TIMEOUT 0.01

CAudioMessageQueue::CAudioMessageQueue()     
{
    m_head = NULL;
    m_tail = NULL;
    m_maxSize = 0;
    m_curSize = 0;
    pthread_mutex_init(&m_mutex, NULL);
    pthread_cond_init(&m_cond, NULL);
}

CAudioMessageQueue::CAudioMessageQueue(int maxSize)
{
    //CAudioMessageQueue();
    m_head = NULL;
    m_tail = NULL;
    m_maxSize = 0;
    m_curSize = 0;
    pthread_mutex_init(&m_mutex, NULL);
    pthread_cond_init(&m_cond, NULL);
    m_maxSize = maxSize;
}

// destructor
// Attention: the CAudioMessage objects in this queue will be popped out, but
// their storage will not be free, it is the client's responsibility.
CAudioMessageQueue::~CAudioMessageQueue()
{
    CAudioMessage msg;
    while(m_curSize > 0) { 
        queuePop(msg);
    }
    pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_cond);
}

// Get queue max size.
int CAudioMessageQueue::getMaxSize(void)
{
    int size;

    pthread_mutex_lock(&m_mutex);
    size = m_maxSize;
    pthread_mutex_unlock(&m_mutex);

    return size;
}

// Set queue size.
void CAudioMessageQueue::setMaxSize(int size)
{
    pthread_mutex_lock(&m_mutex);
    m_maxSize = size;
    pthread_mutex_unlock(&m_mutex);
}
 
// Get queue current size
int CAudioMessageQueue::getCurrentSize(void) 
{
    int size;

    pthread_mutex_lock(&m_mutex);
    size = m_curSize;
    pthread_mutex_unlock(&m_mutex);
    return size;
}


// Push message to query tail.
bool CAudioMessageQueue::queuePush(const CAudioMessage& msg)
{
    CAudioMessage* message = new CAudioMessage(msg);
    
    // create AudioMessage fail
    if (NULL == message) return false;
     
    pthread_mutex_lock(&m_mutex);
    if(m_curSize == m_maxSize) {  
        struct timespec timeout = getTimeOut();
        if(0 != pthread_cond_timedwait(&m_cond, &m_mutex, &timeout)) {
		//if(0 != pthread_cond_wait(&m_cond, &m_mutex)) {
            pthread_mutex_unlock(&m_mutex);
            return false;
        }
    }
    // add message to queue.
    if (NULL == m_head) {//queue is empty
        m_head = message;
        m_tail = message;
    } else { // queue is not empty, add after tail.
        m_tail->m_next = message;
        m_tail = message;
    }
    message->m_next = NULL;
    m_curSize ++;
    
    //printf("pushed message %s into the queue\n", message->getData());
    //printf("the current size of queue is %d\n", m_curSize);
    
    pthread_cond_signal(&m_cond);
    pthread_mutex_unlock(&m_mutex);
    return true;
}

bool CAudioMessageQueue::queuePush(const CAudioMessage* msg)
{
    return CAudioMessageQueue::queuePush(*msg);
}

// Pop message from query head.
bool CAudioMessageQueue::queuePop(CAudioMessage& msg)
{
    // empty queue
    //if (NULL == m_head) return false;

    pthread_mutex_lock(&m_mutex);
    if(m_curSize == 0) {
        struct timespec timeout = getTimeOut();
        if(0 != pthread_cond_timedwait(&m_cond, &m_mutex, &timeout)) {
		//if(0 != pthread_cond_wait(&m_cond, &m_mutex)) {
            pthread_mutex_unlock(&m_mutex);
            return false;
        }
    }

    // delete message from queue.
    msg = *m_head; 
    delete m_head;
    m_head = msg.m_next; // header changed
    msg.m_next = NULL; // next is illegal
    if (NULL == m_head) m_tail = NULL;
    m_curSize --;

    //printf("poped message %s from the queue\n", msg.getData());
    //printf("the current size of queue is %d\n", m_curSize);

    // unlock mutex if synchronized.
    pthread_cond_signal(&m_cond);
    pthread_mutex_unlock(&m_mutex);
    return true;
}

bool CAudioMessageQueue::queuePop(CAudioMessage* msg)
{
    return CAudioMessageQueue::queuePop(*msg);
}

// Is queue full?
bool CAudioMessageQueue::isFull(void)
{
    bool full;

    // lock mutex if synchronized.
    pthread_mutex_lock(&m_mutex);

    full = (m_curSize == m_maxSize) ? true : false;

    // unlock mutex if synchronized.
    pthread_mutex_unlock(&m_mutex);
    return full;
}

// Is queue empty?
bool CAudioMessageQueue::isEmpty(void) 
{
    bool empty;

    // lock mutex if synchronized.
    pthread_mutex_lock(&m_mutex);

    empty = (m_curSize == 0) ? true : false;

    // unlock mutex if synchronized.
    pthread_mutex_unlock(&m_mutex);
    return empty;
}

timespec CAudioMessageQueue::getTimeOut() {
    struct _timeb timebuffer;
    struct timespec timeout;
    _ftime(&timebuffer);
    timeout.tv_sec = timebuffer.time + floor(COND_TIMEOUT);
    timeout.tv_nsec = (timebuffer.millitm + 
                        (COND_TIMEOUT - floor(COND_TIMEOUT)) * 1000) * 1000000;
    
    return timeout;
}
    
// CAudioMessage.h
#ifndef CAUDIOMESSAGE_H
#define CAUDIOMESSAGE_H

//#include "CAudioStdinc.h"
#define NULL 0

class CAudioMessageQueue;

class CAudioMessage 
{
    friend class CAudioMessageQueue;
public:
    CAudioMessage();
    CAudioMessage(const CAudioMessage& msg);
    virtual ~CAudioMessage();
    void setData(void* data);
    // Get data.
    void* getData(void) const;
    // Set data size.
    void setDataSize(int size); 
    // Get data size.
    int getDataSize(void) const;
    // Operator =
    CAudioMessage& operator=(const CAudioMessage& right);

protected:
    CAudioMessage *m_next;
private:
    void* m_data;
    int m_dataSize;
};

#endif// CAUDIOMESSAGE_H

// CAudioMessage.cpp
#include "CAudioMessage.h"

//#include <stdlib.h>
//#include <string.h>
 
CAudioMessage::CAudioMessage()
{
    m_data = NULL;
    m_dataSize = 0;  
}

CAudioMessage::CAudioMessage(const CAudioMessage& msg)
{ 
    m_data = msg.m_data;
    m_dataSize = msg.m_dataSize;
}

CAudioMessage::~CAudioMessage()
{
    m_data = NULL;
    m_dataSize = 0;
}


// Get data.
void* CAudioMessage::getData(void) const
{
    return m_data;
}

// Set data.
void CAudioMessage::setData(void* data)
{
    m_data = data;
}

// Get data size.
int CAudioMessage::getDataSize(void) const
{
    return m_dataSize;
}

// Set data size.
void CAudioMessage::setDataSize(int size)
{
    m_dataSize = size;
}


// Operator =
CAudioMessage& CAudioMessage::operator=(const CAudioMessage& right)
{
    if (this == &right) return *this;

    m_next = right.m_next;
    m_data = right.m_data;
    m_dataSize = right.m_dataSize;

    return * this;
}

I run it in the VC6.0 and found that in CAudioMessageQueue::queuePop() the thread sometimes 
excuted the "msg = *m_head;" when m_head was equal to NULL! The pthread_cond_timedwait's normal 
return indicated that the condition waited had been signaled and the queue had at least one 
message, but there was none in it. 

I searched the archives of pthread-win32's discussion list and got none report about this 
problem. I hope it is because of my mistake in the source code, but I have checked it many 
times. Does anyone have suggestion? Thank you for your patience. :)

My test enviroment: 
hardware: cpu - P4 2.8GHz * 2 memeory - 1G
software: windows xp profession, Visual C++ 6.0 












==========================
263电子邮件-信赖邮自专业

Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]