boost thread подход к синхронизации
От: Аноним  
Дата: 09.06.11 14:11
Оценка:
Привет тут.

Есть желание разобраться с сабжевой библиотекой (и с многопоточностью вообще) и появился ряд вопросов (далее будет много корявого кода):

Дано: producer-cunsumers, 1 поток producer и например, 2 cunsumers.

Хранилище данных с интерфейсом доступа:

struct DataHolder
{
    DataHolder() : read_done_threads_count (0) {}
    void get(unsigned short* dst, unsigned start, unsigned count)
    {
        if (dst && start + count < DATA_SIZE + 1)
        {
            boost::mutex::scoped_lock l(access_mutex);
            memcpy(dst, &data[start], count*sizeof(unsigned short));
        }
    }
    void set(unsigned short* src, unsigned start, unsigned count)
    {
        if (src && start + count < DATA_SIZE + 1)
        {
            boost::mutex::scoped_lock l(access_mutex);
            memcpy(&data[start], src, count*sizeof(unsigned short));
        }
    }
    boost::mutex access_mutex; // вообще, если правильно сделать "логический" доступ, то этот мутекс и не нужен?
    boost::mutex l_mutex;
    boost::condition_variable con_send, con_read;
    unsigned short read_done_threads_count;
    unsigned short data[DATA_SIZE];
};


Базовый класс потока (вообще нужен он — вопрос):
class Thread
{
public:
    Thread()
        : stoprequested(false) {}

    virtual ~Thread() {}

    void start() 
    {
        thr = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&Thread::do_work, this)));
    }

    virtual void stop()
    {
        stoprequested = true;
        thr->join();
    }

    virtual void in_cycle() = 0;

private:
    volatile bool stoprequested;
    boost::shared_ptr<boost::thread> thr;

    void do_work()
    {
        while (!stoprequested)
        {
            in_cycle();
        }
    }                    
};


Класс producer:
class SourceThread : public Thread
{
public:
    SourceThread(DataHolder* holder_) : holder(holder_) {}
    virtual ~SourceThread() {}
private:
    void in_cycle()
    {
        boost::mutex::scoped_lock l(holder->l_mutex);
        while (holder->read_done_threads_count != 2)
            holder->con_read.wait(l);
        for (unsigned i = 0; i < DATA_SIZE; ++i)
            data[i] = rand();
        holder->set(data, 0, DATA_SIZE);
        holder->read_done_threads_count = 0;
        l.unlock();
        holder->con_send.notify_all();
    }
private:
    DataHolder* holder;
    unsigned short data[DATA_SIZE];
};


И класс cunsumer:
class RecieveThread : public Thread
{
public:
    RecieveThread(DataHolder* holder_) : holder(holder_) {}
    virtual ~RecieveThread() {}
private:
    void in_cycle()
    {
        boost::mutex::scoped_lock l(holder->l_mutex);
        while (holder->read_done_threads_count == 2)
            holder->con_send.wait(l);
        holder->get(data, 0, DATA_SIZE);
        holder->read_done_threads_count++;
        l.unlock();
        holder->con_read.notify_one();
    }
private:
    DataHolder* holder;
    unsigned short data[DATA_SIZE];
};


Хочется:
1. Дождаться в потоке producer пока 2 cunsumers получат данные и только после этого producer готовит новую порцию (вроде бы в теории вышеприведенный код это обеспечивает?).
2. Как-то корректно обрабатывать завершение потоков. Из кода видно, что если вызвать producer_thread.stop(), то consumer'ы намертво зависнут в ожидании holder->con_send.wait(l); и наоборот.
Если есть желание, поясните общий подход к данной задаче (использование condition vars и т.п.).
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.