Есть пул потоков thread_pool и есть классы задач some_task, которые проводят обработку запуская работу в потоках.
Типа так, очень упрощенно:
// структура, которая хранит данные для параллельной обработки
struct process
{
task* self;
some_data config;
some_data1 source;
some_data2 result;
};
// открытый наружу класс, при помощи которого конфигурируется задача
// запускается и получается результат
class some_task
{
public:
~some_task()
{
thread_pool::detach_task(this);
}
void run()
{
some_data cfg;
// указатель на some_task при работе потока не
// используется, и нужен только чтобы идентифицировать задание
process p {this, cfg, ...}
thread_pool::start( p );
}
some_result get_result()
{
return thread_pool::wait_and_take_result(this);
}
}
// хранит список задач и запускает потоки
class thread_pool;
void thread_pool::detach_task(task* self)
{
boost::mutex::scoped_lock lock(mutex_);
tasks_.remove(self);
...
}
some_result thread_pool::wait_and_take_result(task* self)
{
boost::mutex::scoped_lock lock(mutex_);
some_result* r = 0;
while(1)
{
r = result_ready(self);
if (!r) some_pool_has_finished.wait(lock);
else break;
}
return *r;
}
int main()
{
shared_ptr<task> t( new task );
t->run();
some_result = t->get_result();
}
Я стремился сделать так, чтобы параллельные потоки были отвязаны от класса task, т.е. чтобы при нескольких запусках task::run, выполнение не ломалось, а просто результат устаревал. А через get_result я получал бы самый актуальный результат. Проблема теперь в том, что я хочу пробрасывать сообщения из работчего потока в основной, например для обновления окна, типа так
thread_pool::start( ... )
{
// основную работу делаем без блокировки мьютекса
...
// а когда задание выполнено:
boost::mutex::scoped_lock lock(mutex_);
// убеждаемся что задание не удалено
if (tasks_.contains(self))
{
#if _PREFER_DEADOCK
// если делать так, то любой вызов метода thread_pool
// приведет к дедлоку, например я не могу сделать
// в обработчике delete this
self->on_process_finished();
#else // _PREFER_RACE_CONDITION
// а если так, то имеет место race_condition
lock.unlock();
// возможно в этот момент задание и будет удалено
self->on_process_finished();
#endif
}
}
В общем существующий код значительно сложнее и худо-бедно работает, но меня не покидает ощущение, мой код не корректен. Я пытаюсь его улучшить и нахожу такие вот концептуальные косяки. Как вообще такие вещи делаются? Может быть можно где-то архитектуру подсмотреть или почитать о примерах организации такой вот параллельной работы?
Здравствуйте, RedApe, Вы писали:
RA>В общем существующий код значительно сложнее и худо-бедно работает, но меня не покидает ощущение, мой код не корректен. Я пытаюсь его улучшить и нахожу такие вот концептуальные косяки. Как вообще такие вещи делаются? Может быть можно где-то архитектуру подсмотреть или почитать о примерах организации такой вот параллельной работы?
Я бы на твоем месте с автором
SObjectizer поговорил
http://eao197.blogspot.ru
Здравствуйте, RedApe, Вы писали:
Наверно, нужен подход "получил результат и забыл" (об исходном объекте-задаче). В этом случае, thread_pool::detach_task становится не нужен, пусть задача живет ровно столько, сколько она нужна потоку из пула, т.е. ровно столько, сколько производятся вычисления. В этом случае "задача" может быть произвольным функтором/лямбдой и тогда достаточно будет сосредоточиться только на сигналинге готовности и на разделяемом владении данными, оформив это дело в стиле С++:
// пишу прямо в браузер
namespace details {
typedef some_mutex mutex;
typedef some_lock guard;
typedef some_condition condition;
template<typename T>
struct last_value : boost::noncopyable
{
optional<T> value_;
mutex mutex_;
condition condition_;
T get()
{
guard g(mutex);
while(!value_)
condition_.wait(g);
return value_.value();
}
T get_then_reset()
{
guard g(mutex);
while(!value_)
condition_.wait(g);
T tmp = std::move(value_.value());
value_ = std::nullopt;
return tmp;
}
void set_value(const T & value)
{
guard g(mutex_);
value_ = value;
condition_.notify_all();
}
void wait()
{
guard g(mutex_);
while(!value_)
condition_.wait(g);
}
future_status wait_until(const time_point & abs_time)
{
guard g(mutex_);
while(!value_) {
condition_.wait_until(g, abs_time);
if(now() > abs_time) break;
}
return value_ ? future_status::ready
: future_status::timeout;
}
};
} // namespace details
template<typename T>
class last_value_promise
{
typedef shared_ptr<details::last_value<T>> shared_state_ptr;
public:
last_value_promise(shared_state_ptr shared_state)
: shared_state_(shared_state)
{}
void set_value(const T & value) {
shared_state_->set_value(value);
}
private:
shared_state_ptr shared_state_;
};
template<typename T>
class last_value_future
{
typedef shared_ptr<details::last_value<T>> shared_state_ptr;
public:
last_value_future()
: shared_state_(make_shared<details::last_value<T>>())
{}
last_value_promise<T> get_promise() {
return last_value_promise<T>(shared_state_);
}
T get() {
return shared_state_->get();
}
T get_then_reset() {
return shared_state_->get_then_reset();
}
void wait() {
return shared_state_->wait();
}
future_status wait_until(const time_point & abs_time) {
return shared_state_->wait_until(abs_time);
}
future_status wait_for(const duration & timeout) {
return shared_state_->wait_until(now() + timeout);
}
private:
shared_state_ptr shared_state_;
};
Для полноценного соответствия С++ надо дописать еще метод last_value_promise::set_exception и допилить методы get, поможет в этом следующая ссылка:
http://en.cppreference.com/w/cpp/error/exception_ptr
Далее пишем еще один простой хелпер:
template<typename T, typename F>
function<void()> last_value_aggregator(last_value_promise<T> p, F/*&&*/ f)
{
return [] {
try {
p.set_value(f());
} catch(...) {
p.set_exception(current_exception());
}
};
}
Затем берем произвольную реализацию пула потоков (из буста или например:
http://habrahabr.ru/post/188234/)
Используем все вместе:
size_t task(size_t param)
{
this_thread::sleep_for(random_delay());
return param;
}
void printer(last_value_future<size_t> fut)
{
while(true) {
size_t last_value = fut.get_then_reset();
cout << " last value: " << last_value << endl;
}
}
void main()
{
thread_pool pool;
last_value_future<size_t> fut;
async(pool, printer, fut);
last_value_promise<size_t> p = fut.get_promise();
for(size_t i = 0; i < 42; i++)
async(pool, last_value_aggregator(p, task), i);
// press CTRL+C to exit...
}
Примечание.
Обрати внимание на get и get_then_reset. Первый вариант не сбрасывает значение, поэтому последующий вызов get/get_then_reset не будет ожидать обновления значения, а просто вернет текущее (оно же последнее). Второй вариант сбрасывает всю систему в начальное состояние.
Дедлоков нет by design.