Re[3]: непонятки с condition variable
От: nen777w  
Дата: 05.07.16 08:39
Оценка: +1
Всем спасибо!
Проблему решил.

Нужно была гарантия того что notofy_one() будет вызвано уже для ожидающей cv у thread_worker
Если кому интересно будет, вот:
  "Код правок"
void finalize()
{
    boost::unique_lock<boost::mutex> lock(m_mtx_data_ready);
    m_state = s_Finalize;
    m_cv_data_ready.notify_all();
}

void push()
{
    boost::unique_lock<boost::mutex> lock(m_mtx_data_ready);
    m_state = s_DataReady;
    m_cv_data_ready.notify_all();
}
Re[5]: непонятки с condition variable
От: uncommon Ниоткуда  
Дата: 05.07.16 15:50
Оценка: +1
Здравствуйте, Кодт, Вы писали:

К>Здравствуйте, nen777w, Вы писали:


N>>Нужно была гарантия того что notofy_one() будет вызвано уже для ожидающей cv у thread_worker


К>Нужна была гарантия того, что у тебя не возникает гонка на доступ в переменную m_state.


Скорее, это не гонка на m_state, а missed wake (https://www.justsoftwaresolutions.co.uk//files/concurrent_thinking.pdf, slide 28).
непонятки с condition variable
От: nen777w  
Дата: 04.07.16 18:42
Оценка:
Есть condition_variable, есть mutex на котором она ожидает с проверкой условия от ложного срабатывания.
Другой поток сперва устанавливает это условие а потом делает notify_one на этой переменной.
в результате.. condition variable продолжаеn висеть на этом мютексе как буд то бы никакого notify_one и не было...
Есть идеи?
Re: непонятки с condition variable
От: b0r3d0m  
Дата: 04.07.16 19:02
Оценка:
Re[2]: непонятки с condition variable
От: nen777w  
Дата: 04.07.16 19:29
Оценка:
B>
  • Если notify был вызван до wait'а, то нет никакой гарантии, что ожидающий поток будет когда-либо пробуждён.
    гм... похоже это мой случай.

    У меня дело в том что несколько рабочих потоков и у каждого свой cv.
    И основной (менеджер) тоже со своей CV который менеджит их и готовит задания.
    Задумка была в том что посредством notify_one на ожидающей CV менеджера, тот что выполняет задание сообщает менеджеру что он его выполнил и готов принять еще (а сам в этом время ждет на своей cv), а менеджер в свою очередь загружает новое задание и в его cv сообщает что пора начинать работу.

    Вот куско кода рабочего потока:
    do 
    {
        notify_cb.fn_notify_ready(shared_from_this()); //<-- сообщаем менеджеру что готовы принять работу или выполнили ее
    // <-- похоже тут менеджер успевает установить задачу и сообщить - выполняй
        boost::unique_lock<boost::mutex> lock(m_mtx_data_ready); //<-- ждем пуша от менеджера
        m_cv_data_ready.wait(lock, [this]() -> bool { return s_DataReady == m_state || s_Finalize == m_state; });
    
        if (s_DataReady == m_state)
        {
            do_job();
            m_state = s_DataProcessed;
        }
    
    } while (s_Finalize != m_state);
  • Re[3]: непонятки с condition variable
    От: b0r3d0m  
    Дата: 04.07.16 19:37
    Оценка:
    N>гм... похоже это мой случай.
    Тогда добавляйте отладочную печать и вперде.

    N>Задумка была в том что посредством notify_one на ожидающей CV менеджера, тот что выполняет задание сообщает менеджеру что он его выполнил и готов принять еще (а сам в этом время ждет на своей cv), а менеджер в свою очередь загружает новое задание и в его cv сообщает что пора начинать работу.

    Какой-то самописный task pool получается. Почему бы не использовать готовые решения?
    Re: непонятки с condition variable
    От: Кодт Россия  
    Дата: 04.07.16 20:09
    Оценка:
    Здравствуйте, nen777w, Вы писали:

    N>Есть condition_variable, есть mutex на котором она ожидает с проверкой условия от ложного срабатывания.

    N>Другой поток сперва устанавливает это условие а потом делает notify_one на этой переменной.
    N>в результате.. condition variable продолжаеn висеть на этом мютексе как буд то бы никакого notify_one и не было...
    N>Есть идеи?

    Я бы напихал туда логов, чисто чтобы убедиться, что пробуждения, хотя бы и ложные, случаются.
    // producer
    
    {
      unique_lock lock(m_mutex);
      LOG << "producer: locked " << (void*)this;
      m_state = SMTH;
      LOG << "producer: set state = " << m_state;
      m_cv.notify_one();
      LOG << "producer: triggered cv";
    }
    
    // consumer
    
    {
      unique_lock lock(m_mutex);
      LOG << "consumer: locked " << (void*)this;
      LOG << "consumer: waiting cv";
      m_cv.wait(lock, [this]->bool {
        LOG << "consumer: awaited with state = " << m_state;
        return m_state == GOOD || m_state == NOTBAD;
      });
      LOG << "consumer: achieved state = " << m_state;
    }


    Самое простое, что может случиться, — это ты не тот объект кормишь! Для этого и надо последить за this.
    Ну, или более детально — последить за соответствием членов — мьютекса и кондишена.

    Второе, — это детская ошибка, когда кондишен дёргается до изменения условия.
    Кроме того, у тебя условие составное: s_DataReady == m_state || s_Finalize == m_state, — значит, надо дёргать кондишен на присваивании этой переменной m_state обоих значений.
    В конце концов, можно пожертвовать ложными пробуждениями и дёргать кондишен на присваивании любого значения, а не только полезного.

    На десерт — всякие расстрелы памяти, из-за чего кондишен сломался под корень.
    Перекуём баги на фичи!
    Re[2]: непонятки с condition variable
    От: nen777w  
    Дата: 04.07.16 20:54
    Оценка:
    К>Я бы напихал туда логов, чисто чтобы убедиться, что пробуждения, хотя бы и ложные, случаются.

    Спасибо, да попробую залогировать, поанализировать что получится.

    К>Самое простое, что может случиться, — это ты не тот объект кормишь! Для этого и надо последить за this.

    К>Ну, или более детально — последить за соответствием членов — мьютекса и кондишена.
    М.. да вроде такого не должно случаться.

    К>Второе, — это детская ошибка, когда кондишен дёргается до изменения условия.

    Это вряд ли.

    К>Кроме того, у тебя условие составное: s_DataReady == m_state || s_Finalize == m_state, — значит, надо дёргать кондишен на присваивании этой переменной m_state обоих значений.

    Так и есть.

      "полный класс thread_worker"
    #ifndef __thread_worker_h__
    #define __thread_worker_h__
    
    //std
    #include <assert.h>
    //boost
    #include <boost/thread.hpp>
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/function.hpp>
    #include <boost/shared_ptr.hpp>
    
    class thread_worker : public boost::enable_shared_from_this<thread_worker>
    {
    public:
        enum eState {
            s_NotReady, s_WaitForTask, s_DataReady, s_DataProcessed, s_Finalize
        };
    
        struct notification_callbacks
        {
            boost::function<void(boost::shared_ptr<thread_worker>)> fn_notify_ready;
        };
    
    
    public:
        virtual ~thread_worker() {
            assert(s_NotReady == m_state || s_Finalize == m_state);
        }
    
    //protected:
    public:
        thread_worker()
            : m_state(s_NotReady)
        {}
    
    public:
        inline eState state() const { 
            return m_state; 
        }
        
        inline void join() { 
            m_thread.join(); 
        }
        
        inline void processed() { 
            m_state = s_WaitForTask; 
        }
    
    public:
        void run(notification_callbacks &notify_cb)
        {
            m_thread = boost::move(boost::thread([this, notify_cb]
                {
                    assert(notify_cb.fn_notify_ready);
    
                    m_state = s_WaitForTask;
    
                    boost::unique_lock<boost::mutex> lock(m_mtx_data_ready);
    
                    do 
                    {
                        notify_cb.fn_notify_ready(shared_from_this());
                        m_cv_data_ready.wait(lock, [this]() -> bool { return s_DataReady == m_state || s_Finalize == m_state; });
    
                        if (s_DataReady == m_state)
                        {
                            do_job();
                            m_state = s_DataProcessed;
                        }
    
                    } while (s_Finalize != m_state);
                }        
            ));
        }
    
        void finalize()
        {
            m_state = s_Finalize;
            m_cv_data_ready.notify_all();
        }
    
    //protected:
    public:
        void push()
        {
            m_state = s_DataReady;
            m_cv_data_ready.notify_all();
        }
    
    public:
            //virtual void do_job() = 0;
        virtual void do_job() {}
    
    private:
        boost::thread m_thread;
        volatile eState m_state;
        boost::mutex m_mtx_data_ready;
        boost::condition_variable m_cv_data_ready;
    };
    
    #endif


      "И его обвязка manager"
    typedef boost::shared_ptr<thread_worker > ptr_thread_worker;
    
    std::set<ptr_thread_worker> v_threads;
    std::queue<ptr_thread_worker>  q_ready_workers;
    
    //data redy sync mutex
    boost::mutex mtx_data_ready;
    boost::condition_variable cv_data_ready;
    
    
    //notification from worker
    paralel_for_each_worker::notification_callbacks callbacks;
    callbacks.fn_notify_ready = [&mtx_data_ready, &q_ready_workers, &cv_data_ready, ](boost::shared_ptr<thread_worker> ptr_worker)
    {
        assert(ptr_worker);
        assert(thread_worker::s_WaitForTask == ptr_worker->state() || thread_worker::s_DataProcessed == ptr_worker->state());
    
        //lock queue
        boost::lock_guard<boost::mutex> lock(mtx_data_ready);
        q_ready_workers.push(ptr_worker);
        
        cv_data_ready.notify_one();
    };
    
    unsigned int active_tasks = 0;
    
    //run threads
    while (threads--)
    {
        ptr_thread_worker worker = boost::make_shared<ptr_thread_worker::element_type>();
        v_threads.insert(worker);
        ++active_tasks;
        worker->run(callbacks);
    }
    
    unsigned int y = 0;
    
    boost::unique_lock<boost::mutex> lock(mtx_data_ready);
    
    do
    {
        //wait for !empty() queue
        cv_data_ready.wait(lock, [&q_ready_workers]() -> bool { return !q_ready_workers.empty(); });
    
        while (!q_ready_workers.empty())
        {
            //pop work thread
            ptr_thread_worker ptr_thread_chunk = q_ready_workers.front();
            q_ready_workers.pop();
    
            ptr_thread_chunk->processed();
    
            assert(thread_worker::s_WaitForTask == ptr_thread_chunk->state());
    
            if (y < 10) {
                ptr_thread_chunk->push();
                ++y;
            }
            else {
                --active_tasks;
                ptr_thread_chunk->finalize();
            }
        }
    
    } while (y < 10 || active_tasks);
    
    //wait for end all threads
    std::for_each(v_threads.begin(), v_threads.end(), [](ptr_thread_worker ptr_worker) { ptr_worker->finalize(); ptr_worker->join(); });
    Re: непонятки с condition variable
    От: chaotic-kotik  
    Дата: 05.07.16 10:31
    Оценка:
    Здравствуйте, nen777w, Вы писали:

    N>в результате.. condition variable продолжаеn висеть на этом мютексе как буд то бы никакого notify_one и не было...

    N>Есть идеи?

    ставлю на то, что notify_one вызывается до wait, это самый частый прокол с condition variables
    Re[4]: непонятки с condition variable
    От: Кодт Россия  
    Дата: 05.07.16 11:09
    Оценка:
    Здравствуйте, nen777w, Вы писали:

    N>Нужно была гарантия того что notofy_one() будет вызвано уже для ожидающей cv у thread_worker


    Нужна была гарантия того, что у тебя не возникает гонка на доступ в переменную m_state.
    Которую ты решил, пропустив поток источника в критическую секцию, под мьютекс. Что, в общем-то, и надо было сделать с самого начала.

    Но кстати, критические секции неспроста названы критическими. Находиться внутри них нужно как можно меньшее время.

    А то у тебя получается следующее:
    — приёмник захватывает мьютекс и входит в цикл
    — приёмник проходит через условие (допустим, с первого раза у него это получилось)
    — заходит внутрь job() и там кукует
    — источник собирается дать ему отмашку и висит перед мьютексом
    . . . . .
    — приёмник выходит из job(), в цикле заходит в условие и отпускает мьютекс
    — источник наконец получает управление, меняет условие и убегает
    — приёмник просыпается, хватает мьютекс, проверяет условие, идёт дальше

    Желательно локализовать доступ к флажку. А если job() требует защиты, — защищать его отдельной секцией с отдельным мьютексом.

    eState m_state;
    mutex m_state_guard;
    condition m_state_cv;
    
    mutex m_job_guard;
    
    void set_state(eState s) {
      unique_lock lock(m_state_guard);
      m_state = s;
      m_state_cv.notify_all();
    }
    
    bool wait_and_set_state(eState expect, eState reset) {
      unique_lock lock(m_state_guard);
      m_state_cv.wait(lock, [this,s]() { return m_state == expect || m_state == s_Finalize; });
      if (m_state == s_Finalize)
        return false;
      m_state = reset;
      return true;
    }
    
    void protected_job() {
      unique_lock lock(m_job_guard);
      job();
    }
    
    void run_consumer() {
      while(wait_and_set_state(s_DataReady, s_DataProcessing)) {
        protected_job();
      }
    }


    Вопрос на засыпку: что должно происходить, если job() ещё выполняется, а новые данные уже пришли? В какое состояние должен перейти объект по окончании job(), — s_DataReady, s_DataProcessed?
    Мы сейчас не рассматриваем проблему гонок, защиты и всего такого. Допустим, с этим всё хорошо. Будем моделировать семантику, а уж потом обвяжем синхрообъектами для достижения этой семантики.

    Логично, что следует разнести состояние объекта на состояние входа (неготово/готово) и выхода (недоделано/доделано).

    Кстати сказать, конвеер можно было бы выразить в терминах очередей.
    Которые выражаются через буфер и обвязку — из пары семафоров, либо из мьютекса, условной переменной и флажка/счётчика.

    А аварийное завершение сделать через thread::interrupt(), вместо подмешивания признака "пора валить" во все очереди и барьеры.
    Перекуём баги на фичи!
     
    Подождите ...
    Wait...
    Пока на собственное сообщение не было ответов, его можно удалить.