job waiter/workers [boost based]
От: nen777w  
Дата: 22.08.12 14:00
Оценка: -1
С недавнего времени обзавелся вот таким самописным лисапедом, делюсь может кому то пригодится:
Суть такая:
— нужно было много рабочих потоков которые обрабатывают данные для основного
— функции рабочих потоков могут иметь разные сигнатуры, так как входные данные тоже разные
— в один момент времени основной поток может обработать данные только одного рабочего потока
— основной поток получает нотификацию от рабочего когда данные для него готовы, остальные становятся в очередь

  "Пример использования:"
//функция рабочего потока получает интерфейс и (jobID) через который уведомляет что работа закончилась.
//может вернуть true, если требуется ещё одна итерация или false если рабочий поток больше не нужен. 
bool  job_thread(JOB::I_job_pool_worker* p, unsigned int jobID)
{
    //это рабочий поток здесь готовим данные (что то грузим с диска/динамически генерируем и т.д.)
    //и сообщаем основному потоку что данные готовы
    void *pData = ....
    p->notify( pData ); //можно просто p->notify() - тогда в основном потоке ориентируемся на ID
    return true;
}

//функция основного потока ожидающая данные от любого рабочего потока, 
//и нотифицирующая его (по ID) что обработка данных закончена и можно продолжать работу.
void  job_waiter(JOB::I_job_pool_waiter* p)
{
    unsigned int id = -1;
    void* pData = NULL;
    
    while(p->wait(&id, &pData)) 
    //или просто while(p->wait(&id)) - если ориентируемся по ID (в id будет записан идентификатор рабочего потока от которого пришло сообщение)
    {
        //основной поток что то делает с данными
        //и после сообщает рабочему потоку что его данные успешно обработаны
        p->processed();
    }
}

int main() {
    
    //создаем pool на 45 рабочих потоков, не беда если подпишемся не на все
    const unsigned int total_job = 45; 
    JOB::job_pool    pool(total_job);
    
    for( unsigned int n = 0; n < total_job-2; ++n )
    {
      pool.subscribe_job_thread(n, boost::bind(&job_thread, _1, _2));
    }

    //функции рабочих потоков могут иметь разные сигнатуры
    int n = ...;
    pool.subscribe_job_thread(n, boost::bind(&job_thread_2, _1, _2, 10, boost::ref(n)));
    
    //запускаем наш пул
    pool.run(boost::bind(&job_waiter, _1));   

    return 0;
}


  "Код класса:"
#ifndef __job_pool_h__
#define __job_pool_h__

//boost
#include "boost/function.hpp"
#include "boost/thread.hpp"
#include "boost/thread/locks.hpp"
#include "boost/function.hpp"
#include "boost/bind.hpp"
#include "boost/shared_array.hpp"
//std
#include <map>

namespace JOB {
//........................................................................
  class  I_job_pool_waiter
  {
  public:
    virtual bool  wait( unsigned int* jobID, void **user_data = NULL ) = 0;
    virtual void  processed() = 0;
  };
//........................................................................
  class  I_job_pool_worker
  {
  public:
    virtual void  notify( void* user_data = NULL ) = 0;
  };
//........................................................................
//........................................................................
  class  job_pool : public I_job_pool_waiter, public I_job_pool_worker
  {
    //interface I_job_pool_waiter
  private:
    //call when waiter ready for process job
    virtual bool  wait( unsigned int* jobID, void **user_data = NULL )
    {
      //wait for notify about data
      boost::unique_lock<boost::mutex> lock_waiter(m_mtx_waiter);
      if( active_jobs() ) 
      { //if !finish
        if(-1 == m_job_done_ID)
          m_cond_main.wait(m_mtx_waiter);

        if( -1 == m_job_done_ID ) return false;

        assert(-1 != m_job_done_ID);
        *jobID = m_job_done_ID;
        m_job_done_ID = -1;
        if(NULL != user_data) { *user_data = m_user_data; m_user_data = NULL; }

        return true;
      }

      //no job
      return false;
    }

    //call when waiter processed job
    virtual void  processed()
    {
      m_cond_main.notify_one();
    }

    //interface I_job_pool_worker
  private:
    //call when work thread want notify that job is done
    virtual void  notify( void* user_data = NULL )
    {
      //only one job thread can enter to here
      boost::lock_guard<boost::mutex> lock_guard(m_mtx_notify);

      //no any other job
      assert(-1 == m_job_done_ID && NULL == m_user_data);
      boost::unique_lock<boost::mutex> lock_waiter(m_mtx_waiter);  

      //set jobID and data
      m_job_done_ID = threadID_to_jobID(boost::this_thread::get_id());
      m_user_data = user_data;
      
      //notify to main thread that job is ready for processed and wait
      m_cond_main.notify_all();
      m_cond_main.wait(m_mtx_waiter);
    }

  public:
    job_pool(unsigned int threads_count)
      : m_connection(new connection_info[threads_count])
      , m_total_job_threads(threads_count)
      , m_job_counter(0)
      , m_job_done_ID(-1)
      , m_user_data(NULL)
    {}

    template< typename F >
    bool  subscribe_job_thread(unsigned int jobID, F work_thread_funk)
    {
      if( !is_exist_jobID(jobID) && m_job_counter > m_total_job_threads ) return false;

      unsigned int jobIDX = 0;
      if( is_exist_jobID(jobID) ) {
        //job already exist
        jobIDX = jobID_to_jobIDX(jobID);
      } else {
        //new job
        jobIDX = m_job_counter++;
      }
      
      //subscribe
      m_connection[jobIDX].m_thread_fn = work_thread_funk;
      m_map_jobID_con_info_IDX[jobID] = jobIDX;
    
      return true;
    }

    template< typename F >
    void  run(F main_thread_funk) 
    {
      create_job_threads();
      main_thread_funk((I_job_pool_waiter*)this);
      
      //wait while all threads done
      m_tg.join_all();
    }

  private:
    void  job_thread(unsigned int jobID)
    {
      register_threadID(boost::this_thread::get_id(), jobID);

      bool bContinue = true;
      while( bContinue ) 
      {
        bContinue = m_connection[jobID_to_jobIDX(jobID)].m_thread_fn((I_job_pool_worker*)this, jobID);
      }
      
      if(0 == dec_job_counter()) {
        boost::unique_lock<boost::mutex> lock_waiter(m_mtx_waiter);    
        m_cond_main.notify_all();
      }
    }

    void  create_job_threads()
    {
      for( map_uiui::const_iterator it = m_map_jobID_con_info_IDX.begin(), itE = m_map_jobID_con_info_IDX.end(); it != itE; ++it)
      {
        m_tg.create_thread(boost::bind(&job_pool::job_thread, this, (*it).first));
      }
    }

  private:
    bool  is_exist_jobID(unsigned int jobID) const
    {
      return m_map_jobID_con_info_IDX.end() != m_map_jobID_con_info_IDX.find(jobID);
    }

    unsigned int jobID_to_jobIDX(unsigned int jobID) const
    {
      map_uiui::const_iterator it = m_map_jobID_con_info_IDX.find(jobID);
      assert( m_map_jobID_con_info_IDX.end() != it );
      return (*it).second;
    }

    unsigned int threadID_to_jobID(boost::thread::id threadID) const
    {
      map_thID_ui::const_iterator it = m_map_ThreadID_to_jobID.find(threadID);
      assert( m_map_ThreadID_to_jobID.end() != it );
      return (*it).second;
    }

    void  register_threadID(boost::thread::id threadID, unsigned int jobID)
    {
      boost::lock_guard<boost::mutex> lock(m_mtx_rc);
      m_map_ThreadID_to_jobID[threadID] = jobID;
    }

    unsigned int active_jobs() {
      boost::lock_guard<boost::mutex> lock(m_mtx_counter);
      return m_job_counter;
    }

    unsigned int dec_job_counter() {
      boost::lock_guard<boost::mutex> lock(m_mtx_counter);
      return --m_job_counter;
    }

  private:
    typedef boost::function<bool(I_job_pool_worker*, unsigned int)> job_worker_fn;

    struct  connection_info 
    {
      job_worker_fn      m_thread_fn;
    };

    typedef boost::shared_array<connection_info> sh_a_connections;
    typedef std::map<unsigned int, unsigned int> map_uiui;
    typedef std::map<boost::thread::id, unsigned int>   map_thID_ui;

    map_uiui      m_map_jobID_con_info_IDX;
    map_thID_ui      m_map_ThreadID_to_jobID;
    sh_a_connections  m_connection;

  private:
    const unsigned int    m_total_job_threads;
    volatile unsigned    m_job_counter;
    volatile unsigned int  m_job_done_ID;
    void*          m_user_data;

    boost::thread_group        m_tg;
    boost::condition_variable_any  m_cond_main;
    boost::mutex          m_mtx_waiter;
    boost::mutex          m_mtx_counter;
    boost::mutex          m_mtx_notify;
    boost::mutex          m_mtx_rc;
  };
} //JOB ns

#endif
Re: job waiter/workers [boost based]
От: ioj Ниоткуда  
Дата: 05.10.12 17:02
Оценка:
Здравствуйте, nen777w, Вы писали:

а чем boost::asio не угодил?
нормально делай — нормально будет
Re[2]: job waiter/workers [boost based]
От: nen777w  
Дата: 05.10.12 21:49
Оценка:
ioj>а чем boost::asio не угодил?
Я с boost::asio только недавно начал знакомиться. Когда читал ловил себя на мысли что можно было и его заюзать.
Я просто раньше думал что он исключительно для сетевых нужд.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.