Привет тут.
Есть желание разобраться с сабжевой библиотекой (и с многопоточностью вообще) и появился ряд вопросов (далее будет много корявого кода):
Дано: producer-cunsumers, 1 поток producer и например, 2 cunsumers.
Хранилище данных с интерфейсом доступа:
struct DataHolder
{
DataHolder() : read_done_threads_count (0) {}
void get(unsigned short* dst, unsigned start, unsigned count)
{
if (dst && start + count < DATA_SIZE + 1)
{
boost::mutex::scoped_lock l(access_mutex);
memcpy(dst, &data[start], count*sizeof(unsigned short));
}
}
void set(unsigned short* src, unsigned start, unsigned count)
{
if (src && start + count < DATA_SIZE + 1)
{
boost::mutex::scoped_lock l(access_mutex);
memcpy(&data[start], src, count*sizeof(unsigned short));
}
}
boost::mutex access_mutex; // вообще, если правильно сделать "логический" доступ, то этот мутекс и не нужен?
boost::mutex l_mutex;
boost::condition_variable con_send, con_read;
unsigned short read_done_threads_count;
unsigned short data[DATA_SIZE];
};
Базовый класс потока (вообще нужен он — вопрос):
class Thread
{
public:
Thread()
: stoprequested(false) {}
virtual ~Thread() {}
void start()
{
thr = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&Thread::do_work, this)));
}
virtual void stop()
{
stoprequested = true;
thr->join();
}
virtual void in_cycle() = 0;
private:
volatile bool stoprequested;
boost::shared_ptr<boost::thread> thr;
void do_work()
{
while (!stoprequested)
{
in_cycle();
}
}
};
Класс producer:
class SourceThread : public Thread
{
public:
SourceThread(DataHolder* holder_) : holder(holder_) {}
virtual ~SourceThread() {}
private:
void in_cycle()
{
boost::mutex::scoped_lock l(holder->l_mutex);
while (holder->read_done_threads_count != 2)
holder->con_read.wait(l);
for (unsigned i = 0; i < DATA_SIZE; ++i)
data[i] = rand();
holder->set(data, 0, DATA_SIZE);
holder->read_done_threads_count = 0;
l.unlock();
holder->con_send.notify_all();
}
private:
DataHolder* holder;
unsigned short data[DATA_SIZE];
};
И класс cunsumer:
class RecieveThread : public Thread
{
public:
RecieveThread(DataHolder* holder_) : holder(holder_) {}
virtual ~RecieveThread() {}
private:
void in_cycle()
{
boost::mutex::scoped_lock l(holder->l_mutex);
while (holder->read_done_threads_count == 2)
holder->con_send.wait(l);
holder->get(data, 0, DATA_SIZE);
holder->read_done_threads_count++;
l.unlock();
holder->con_read.notify_one();
}
private:
DataHolder* holder;
unsigned short data[DATA_SIZE];
};
Хочется:
1. Дождаться в потоке producer пока 2 cunsumers получат данные и только после этого producer готовит новую порцию (вроде бы в теории вышеприведенный код это обеспечивает?).
2. Как-то корректно обрабатывать завершение потоков. Из кода видно, что если вызвать producer_thread.stop(), то consumer'ы намертво зависнут в ожидании holder->con_send.wait(l); и наоборот.
Если есть желание, поясните общий подход к данной задаче (использование condition vars и т.п.).