thread pool и борьба с race condition
От: RedApe Беларусь  
Дата: 01.02.15 07:16
Оценка:
Есть пул потоков thread_pool и есть классы задач some_task, которые проводят обработку запуская работу в потоках.
Типа так, очень упрощенно:

// структура, которая хранит данные для параллельной обработки
struct process  
{
    task* self;
    some_data config;
    some_data1 source;
    some_data2 result;
};

// открытый наружу класс, при помощи которого конфигурируется задача
// запускается и получается результат
class some_task
{
public:
    ~some_task()
    {
        thread_pool::detach_task(this);
    }

    void run()
    {
        some_data cfg;

        // указатель на some_task при работе потока не 
        // используется, и нужен только чтобы идентифицировать задание
        process p {this, cfg, ...}
        thread_pool::start( p );
    }

    some_result get_result()
    {
        return thread_pool::wait_and_take_result(this);
    }
}

// хранит список задач и запускает потоки
class thread_pool; 

void thread_pool::detach_task(task* self)
{
    boost::mutex::scoped_lock lock(mutex_); 
    tasks_.remove(self);
    ...
}

some_result thread_pool::wait_and_take_result(task* self)
{
    boost::mutex::scoped_lock lock(mutex_); 

    some_result* r = 0;
    while(1) 
    {
        r = result_ready(self);
        if (!r) some_pool_has_finished.wait(lock);
        else break;
    }

    return *r;    
}

int main()
{
   shared_ptr<task> t( new task );
   t->run();
   some_result = t->get_result();
}


Я стремился сделать так, чтобы параллельные потоки были отвязаны от класса task, т.е. чтобы при нескольких запусках task::run, выполнение не ломалось, а просто результат устаревал. А через get_result я получал бы самый актуальный результат. Проблема теперь в том, что я хочу пробрасывать сообщения из работчего потока в основной, например для обновления окна, типа так

thread_pool::start( ... )
{
    // основную работу делаем без блокировки мьютекса
    ...

    // а когда задание выполнено:

    boost::mutex::scoped_lock lock(mutex_); 
    // убеждаемся что задание не удалено
    if (tasks_.contains(self))
    {
#if _PREFER_DEADOCK

        // если делать так, то любой вызов метода thread_pool
        // приведет к дедлоку, например я не могу сделать
        // в обработчике delete this
        self->on_process_finished();

#else // _PREFER_RACE_CONDITION

        // а если так, то имеет место race_condition
        lock.unlock();

        // возможно в этот момент задание и будет удалено
        self->on_process_finished(); 
#endif
    }
}


В общем существующий код значительно сложнее и худо-бедно работает, но меня не покидает ощущение, мой код не корректен. Я пытаюсь его улучшить и нахожу такие вот концептуальные косяки. Как вообще такие вещи делаются? Может быть можно где-то архитектуру подсмотреть или почитать о примерах организации такой вот параллельной работы?
--
RedApe
Re: thread pool и борьба с race condition
От: os24ever
Дата: 05.02.15 18:56
Оценка:
RA>Может быть можно где-то архитектуру подсмотреть или почитать о примерах организации такой вот параллельной работы?

Joe Armstrong, "Programming Erlang: Software for a Concurrent World".
Re: thread pool и борьба с race condition
От: kaa.python Ниоткуда РСДН профессионально мёртв и завален ватой.
Дата: 07.02.15 00:18
Оценка:
Здравствуйте, RedApe, Вы писали:

RA>В общем существующий код значительно сложнее и худо-бедно работает, но меня не покидает ощущение, мой код не корректен. Я пытаюсь его улучшить и нахожу такие вот концептуальные косяки. Как вообще такие вещи делаются? Может быть можно где-то архитектуру подсмотреть или почитать о примерах организации такой вот параллельной работы?


Я бы на твоем месте с автором SObjectizer поговорил http://eao197.blogspot.ru
Re: thread pool и борьба с race condition
От: vdimas Россия  
Дата: 19.02.15 08:10
Оценка:
Здравствуйте, RedApe, Вы писали:

Наверно, нужен подход "получил результат и забыл" (об исходном объекте-задаче). В этом случае, thread_pool::detach_task становится не нужен, пусть задача живет ровно столько, сколько она нужна потоку из пула, т.е. ровно столько, сколько производятся вычисления. В этом случае "задача" может быть произвольным функтором/лямбдой и тогда достаточно будет сосредоточиться только на сигналинге готовности и на разделяемом владении данными, оформив это дело в стиле С++:

// пишу прямо в браузер

namespace details {

typedef some_mutex mutex;
typedef some_lock guard;
typedef some_condition condition;

template<typename T>
struct last_value : boost::noncopyable
{
  optional<T> value_;
  mutex mutex_;
  condition condition_;

  T get() 
  {
    guard g(mutex);

    while(!value_)
      condition_.wait(g);
    
    return value_.value();
  }

  T get_then_reset() 
  {
    guard g(mutex);

    while(!value_)
      condition_.wait(g);
    
    T tmp = std::move(value_.value());
    value_ = std::nullopt;
    return tmp;
  }

  void set_value(const T & value) 
  {
    guard g(mutex_);
    value_ = value;
    condition_.notify_all();
  }

  void wait() 
  {
    guard g(mutex_);

    while(!value_)
      condition_.wait(g);
  }

  future_status wait_until(const time_point & abs_time) 
  {
    guard g(mutex_);

    while(!value_) {
      condition_.wait_until(g, abs_time);
      if(now() > abs_time) break;
    }

    return value_ ? future_status::ready 
                  : future_status::timeout;
  }
};

} // namespace details

template<typename T>
class last_value_promise
{
  typedef shared_ptr<details::last_value<T>> shared_state_ptr;

public:
  last_value_promise(shared_state_ptr shared_state)
    : shared_state_(shared_state)
  {}

  void set_value(const T & value) {
    shared_state_->set_value(value);
  }

private:
  shared_state_ptr shared_state_;
};

template<typename T>
class last_value_future
{
  typedef shared_ptr<details::last_value<T>> shared_state_ptr;

public:
  last_value_future() 
    : shared_state_(make_shared<details::last_value<T>>()) 
  {}

  last_value_promise<T> get_promise() {
    return last_value_promise<T>(shared_state_);
  }

  T get() {
    return shared_state_->get();
  }

  T get_then_reset() {
    return shared_state_->get_then_reset();
  }

  void wait() {
    return shared_state_->wait();
  }

  future_status wait_until(const time_point & abs_time) {
    return shared_state_->wait_until(abs_time);
  }

  future_status wait_for(const duration & timeout) {
    return shared_state_->wait_until(now() + timeout);
  }

private:
  shared_state_ptr shared_state_;
};


Для полноценного соответствия С++ надо дописать еще метод last_value_promise::set_exception и допилить методы get, поможет в этом следующая ссылка:
http://en.cppreference.com/w/cpp/error/exception_ptr

Далее пишем еще один простой хелпер:
template<typename T, typename F>
function<void()> last_value_aggregator(last_value_promise<T> p, F/*&&*/ f)
{
  return [] {
    try {
      p.set_value(f());
    } catch(...) {
      p.set_exception(current_exception());
    }
  };
}

Затем берем произвольную реализацию пула потоков (из буста или например: http://habrahabr.ru/post/188234/)

Используем все вместе:
size_t task(size_t param) 
{
  this_thread::sleep_for(random_delay());
  return param;
}

void printer(last_value_future<size_t> fut)
{
  while(true) {
    size_t last_value = fut.get_then_reset();
    cout << " last value: " << last_value << endl;
  }
}

void main()
{
  thread_pool pool;

  last_value_future<size_t> fut;

  async(pool, printer, fut);

  last_value_promise<size_t> p = fut.get_promise();

  for(size_t i = 0; i < 42; i++)
    async(pool, last_value_aggregator(p, task), i);

  // press CTRL+C to exit...
}


Примечание.
Обрати внимание на get и get_then_reset. Первый вариант не сбрасывает значение, поэтому последующий вызов get/get_then_reset не будет ожидать обновления значения, а просто вернет текущее (оно же последнее). Второй вариант сбрасывает всю систему в начальное состояние.

Дедлоков нет by design.
Отредактировано 20.02.2015 6:44 vdimas . Предыдущая версия . Еще …
Отредактировано 20.02.2015 6:43 vdimas . Предыдущая версия .
Отредактировано 19.02.2015 14:14 vdimas . Предыдущая версия .
Отредактировано 19.02.2015 8:15 vdimas . Предыдущая версия .
Отредактировано 19.02.2015 8:13 vdimas . Предыдущая версия .
Отредактировано 19.02.2015 8:11 vdimas . Предыдущая версия .
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.