С недавнего времени обзавелся вот таким самописным лисапедом, делюсь может кому то пригодится:
Суть такая:
— нужно было много рабочих потоков которые обрабатывают данные для основного
— функции рабочих потоков могут иметь разные сигнатуры, так как входные данные тоже разные
— в один момент времени основной поток может обработать данные только одного рабочего потока
— основной поток получает нотификацию от рабочего когда данные для него готовы, остальные становятся в очередь
| "Пример использования:" |
| //функция рабочего потока получает интерфейс и (jobID) через который уведомляет что работа закончилась.
//может вернуть true, если требуется ещё одна итерация или false если рабочий поток больше не нужен.
bool job_thread(JOB::I_job_pool_worker* p, unsigned int jobID)
{
//это рабочий поток здесь готовим данные (что то грузим с диска/динамически генерируем и т.д.)
//и сообщаем основному потоку что данные готовы
void *pData = ....
p->notify( pData ); //можно просто p->notify() - тогда в основном потоке ориентируемся на ID
return true;
}
//функция основного потока ожидающая данные от любого рабочего потока,
//и нотифицирующая его (по ID) что обработка данных закончена и можно продолжать работу.
void job_waiter(JOB::I_job_pool_waiter* p)
{
unsigned int id = -1;
void* pData = NULL;
while(p->wait(&id, &pData))
//или просто while(p->wait(&id)) - если ориентируемся по ID (в id будет записан идентификатор рабочего потока от которого пришло сообщение)
{
//основной поток что то делает с данными
//и после сообщает рабочему потоку что его данные успешно обработаны
p->processed();
}
}
int main() {
//создаем pool на 45 рабочих потоков, не беда если подпишемся не на все
const unsigned int total_job = 45;
JOB::job_pool pool(total_job);
for( unsigned int n = 0; n < total_job-2; ++n )
{
pool.subscribe_job_thread(n, boost::bind(&job_thread, _1, _2));
}
//функции рабочих потоков могут иметь разные сигнатуры
int n = ...;
pool.subscribe_job_thread(n, boost::bind(&job_thread_2, _1, _2, 10, boost::ref(n)));
//запускаем наш пул
pool.run(boost::bind(&job_waiter, _1));
return 0;
}
|
| |
| "Код класса:" |
| #ifndef __job_pool_h__
#define __job_pool_h__
//boost
#include "boost/function.hpp"
#include "boost/thread.hpp"
#include "boost/thread/locks.hpp"
#include "boost/function.hpp"
#include "boost/bind.hpp"
#include "boost/shared_array.hpp"
//std
#include <map>
namespace JOB {
//........................................................................
class I_job_pool_waiter
{
public:
virtual bool wait( unsigned int* jobID, void **user_data = NULL ) = 0;
virtual void processed() = 0;
};
//........................................................................
class I_job_pool_worker
{
public:
virtual void notify( void* user_data = NULL ) = 0;
};
//........................................................................
//........................................................................
class job_pool : public I_job_pool_waiter, public I_job_pool_worker
{
//interface I_job_pool_waiter
private:
//call when waiter ready for process job
virtual bool wait( unsigned int* jobID, void **user_data = NULL )
{
//wait for notify about data
boost::unique_lock<boost::mutex> lock_waiter(m_mtx_waiter);
if( active_jobs() )
{ //if !finish
if(-1 == m_job_done_ID)
m_cond_main.wait(m_mtx_waiter);
if( -1 == m_job_done_ID ) return false;
assert(-1 != m_job_done_ID);
*jobID = m_job_done_ID;
m_job_done_ID = -1;
if(NULL != user_data) { *user_data = m_user_data; m_user_data = NULL; }
return true;
}
//no job
return false;
}
//call when waiter processed job
virtual void processed()
{
m_cond_main.notify_one();
}
//interface I_job_pool_worker
private:
//call when work thread want notify that job is done
virtual void notify( void* user_data = NULL )
{
//only one job thread can enter to here
boost::lock_guard<boost::mutex> lock_guard(m_mtx_notify);
//no any other job
assert(-1 == m_job_done_ID && NULL == m_user_data);
boost::unique_lock<boost::mutex> lock_waiter(m_mtx_waiter);
//set jobID and data
m_job_done_ID = threadID_to_jobID(boost::this_thread::get_id());
m_user_data = user_data;
//notify to main thread that job is ready for processed and wait
m_cond_main.notify_all();
m_cond_main.wait(m_mtx_waiter);
}
public:
job_pool(unsigned int threads_count)
: m_connection(new connection_info[threads_count])
, m_total_job_threads(threads_count)
, m_job_counter(0)
, m_job_done_ID(-1)
, m_user_data(NULL)
{}
template< typename F >
bool subscribe_job_thread(unsigned int jobID, F work_thread_funk)
{
if( !is_exist_jobID(jobID) && m_job_counter > m_total_job_threads ) return false;
unsigned int jobIDX = 0;
if( is_exist_jobID(jobID) ) {
//job already exist
jobIDX = jobID_to_jobIDX(jobID);
} else {
//new job
jobIDX = m_job_counter++;
}
//subscribe
m_connection[jobIDX].m_thread_fn = work_thread_funk;
m_map_jobID_con_info_IDX[jobID] = jobIDX;
return true;
}
template< typename F >
void run(F main_thread_funk)
{
create_job_threads();
main_thread_funk((I_job_pool_waiter*)this);
//wait while all threads done
m_tg.join_all();
}
private:
void job_thread(unsigned int jobID)
{
register_threadID(boost::this_thread::get_id(), jobID);
bool bContinue = true;
while( bContinue )
{
bContinue = m_connection[jobID_to_jobIDX(jobID)].m_thread_fn((I_job_pool_worker*)this, jobID);
}
if(0 == dec_job_counter()) {
boost::unique_lock<boost::mutex> lock_waiter(m_mtx_waiter);
m_cond_main.notify_all();
}
}
void create_job_threads()
{
for( map_uiui::const_iterator it = m_map_jobID_con_info_IDX.begin(), itE = m_map_jobID_con_info_IDX.end(); it != itE; ++it)
{
m_tg.create_thread(boost::bind(&job_pool::job_thread, this, (*it).first));
}
}
private:
bool is_exist_jobID(unsigned int jobID) const
{
return m_map_jobID_con_info_IDX.end() != m_map_jobID_con_info_IDX.find(jobID);
}
unsigned int jobID_to_jobIDX(unsigned int jobID) const
{
map_uiui::const_iterator it = m_map_jobID_con_info_IDX.find(jobID);
assert( m_map_jobID_con_info_IDX.end() != it );
return (*it).second;
}
unsigned int threadID_to_jobID(boost::thread::id threadID) const
{
map_thID_ui::const_iterator it = m_map_ThreadID_to_jobID.find(threadID);
assert( m_map_ThreadID_to_jobID.end() != it );
return (*it).second;
}
void register_threadID(boost::thread::id threadID, unsigned int jobID)
{
boost::lock_guard<boost::mutex> lock(m_mtx_rc);
m_map_ThreadID_to_jobID[threadID] = jobID;
}
unsigned int active_jobs() {
boost::lock_guard<boost::mutex> lock(m_mtx_counter);
return m_job_counter;
}
unsigned int dec_job_counter() {
boost::lock_guard<boost::mutex> lock(m_mtx_counter);
return --m_job_counter;
}
private:
typedef boost::function<bool(I_job_pool_worker*, unsigned int)> job_worker_fn;
struct connection_info
{
job_worker_fn m_thread_fn;
};
typedef boost::shared_array<connection_info> sh_a_connections;
typedef std::map<unsigned int, unsigned int> map_uiui;
typedef std::map<boost::thread::id, unsigned int> map_thID_ui;
map_uiui m_map_jobID_con_info_IDX;
map_thID_ui m_map_ThreadID_to_jobID;
sh_a_connections m_connection;
private:
const unsigned int m_total_job_threads;
volatile unsigned m_job_counter;
volatile unsigned int m_job_done_ID;
void* m_user_data;
boost::thread_group m_tg;
boost::condition_variable_any m_cond_main;
boost::mutex m_mtx_waiter;
boost::mutex m_mtx_counter;
boost::mutex m_mtx_notify;
boost::mutex m_mtx_rc;
};
} //JOB ns
#endif
|
| |