Пул потоков
От: Аноним  
Дата: 04.11.05 13:30
Оценка:
ТАкая ситуация. Клиент логинится на сервер и просит его выслать ему информацию. На каждый запрос создается поток. Соединение довольно скоротечное и сокеты блокирующие. Сейчас код выглядит так.


void Server::OnRead ()
{
    if (command == "get")
    {
       Data* data = new Data;
       data->id = ...
       data->data = ...

       AfxBeginThread (Thread, (LPVOID) data, ...)
    }
}



Вообщем для каждого треда создается структура как параметр. Подскажите, можно ли как нибудь иметь пул потоков для этого дела и при получении очередного запроса передавать данные уже в имеющийся поток? Все таки создавать поток для каждого соединений как то по жлобски.

Покажите пожалуйста немного кода.

спасибо


04.11.05 22:04: Перенесено модератором из 'C/C++' — Павел Кузнецов
Re: Пул потоков
От: srggal Украина  
Дата: 04.11.05 13:47
Оценка:
Здравствуйте, <Аноним>, Вы писали:

А>ТАкая ситуация. Клиент логинится на сервер и просит его выслать ему информацию. На каждый запрос создается поток. Соединение довольно скоротечное и сокеты блокирующие. Сейчас код выглядит так.



А>
А>void Server::OnRead ()
А>{
А>    if (command == "get")
А>    {
А>       Data* data = new Data;
       data->>id = ...
       data->>data = ...

А>       AfxBeginThread (Thread, (LPVOID) data, ...)
А>    }
А>}
А>



А>Вообщем для каждого треда создается структура как параметр. Подскажите, можно ли как нибудь иметь пул потоков для этого дела и при получении очередного запроса передавать данные уже в имеющийся поток? Все таки создавать поток для каждого соединений как то по жлобски.


В принципе можно использовать порты завершения, тут что-то было про это Запросы не попадают в Completion Port
Автор: Detsel
Дата: 24.10.05


А>спасибо
... << RSDN@Home 1.1.4 stable rev. 510>>
Re: Пул потоков
От: korzhik Россия  
Дата: 04.11.05 13:57
Оценка:
Здравствуйте, Аноним, Вы писали:

А>ТАкая ситуация. Клиент логинится на сервер и просит его выслать ему информацию. На каждый запрос создается поток. Соединение довольно скоротечное и сокеты блокирующие. Сейчас код выглядит так.



А>
А>void Server::OnRead ()
А>{
А>    if (command == "get")
А>    {
А>       Data* data = new Data;
       data->>id = ...
       data->>data = ...

А>       AfxBeginThread (Thread, (LPVOID) data, ...)
А>    }
А>}
А>



А>Вообщем для каждого треда создается структура как параметр. Подскажите, можно ли как нибудь иметь пул потоков для этого дела и при получении очередного запроса передавать данные уже в имеющийся поток? Все таки создавать поток для каждого соединений как то по жлобски.


А>Покажите пожалуйста немного кода.


А>спасибо


Рихтер, глава 11
Re: Пул потоков
От: Angler Россия  
Дата: 04.11.05 14:08
Оценка:
Здравствуйте, Аноним, Вы писали:


А>Вообщем для каждого треда создается структура как параметр. Подскажите, можно ли как нибудь иметь пул потоков для этого дела и при получении очередного запроса передавать данные уже в имеющийся поток? Все таки создавать поток для каждого соединений как то по жлобски.


уже подсказали, см. выше. Хочу добавить, так как соединение довольно скоротечное, будет удобно складывать запросы в очередь и в отдельном потоке вычитывать запрос, ждать освободившийся/незанятый поток из пула и им обрабатывать запрос.

по реализации конкретно:
— ATL::CThreadPool,
— std::queue<boost::shared_ptr<Data> >
Re[2]: Пул потоков
От: Аноним  
Дата: 04.11.05 15:03
Оценка: 3 (1)
Здравствуйте, Angler, Вы писали:

A>Здравствуйте, Аноним, Вы писали:



A>по реализации конкретно:

A>- ATL::CThreadPool,
A>- std::queue<boost::shared_ptr<Data> >

то есть насколько я понял все что мне надо это просто заменить AfxBeginThread () на QueueUserWorkItem () ?
Re[3]: Пул потоков
От: Angler Россия  
Дата: 04.11.05 15:18
Оценка:
Здравствуйте, Аноним, Вы писали:

А>то есть насколько я понял все что мне надо это просто заменить AfxBeginThread () на QueueUserWorkItem () ?


я первый раз этот метод вижу
но судя по описанию это почти то что тебе нужно, почему почти? потому как все скорее всего прийдется складывать запросы в свою очередь.
Re[4]: Пул потоков
От: Аноним  
Дата: 04.11.05 15:24
Оценка:
Здравствуйте, Angler, Вы писали:

A>Здравствуйте, Аноним, Вы писали:


А>>то есть насколько я понял все что мне надо это просто заменить AfxBeginThread () на QueueUserWorkItem () ?


A>я первый раз этот метод вижу

A>но судя по описанию это почти то что тебе нужно, почему почти? потому как все скорее всего прийдется складывать запросы в свою очередь.

Да судя по всему новая функция, для нее качаю последний SDK Что значит складывать запросы в очередь? Разве система сама не распределит какой поток из системного пула будет выполнять тот или иной запрос?
Re[5]: Пул потоков
От: Angler Россия  
Дата: 04.11.05 15:43
Оценка:
Здравствуйте, Аноним, Вы писали:

А>Да судя по всему новая функция, для нее качаю последний SDK Что значит складывать запросы в очередь? Разве система сама не распределит какой поток из системного пула будет выполнять тот или иной запрос?


Ты ведь сам писал, что "Соединение довольно скоротечное". Представь себе ситуацию, когда в пуле нет свободного потока, а к серверу приходит запрос от клиента. В этои случае сервер сможет отдать управление клиенту только в случае появления свободного потока в пуле. Чтобы не ждать, можно организовать на сервере еще и очередь клиентских запросов:


class Server
{
//...
  void OnRead()
  {
    //создаем запрос
    boost::share_ptr<QueryData> pQueryData(new QueryData(...));

    //блокировка очереди
    AutoCriticalSection lockQueue(cs);
    
    //сохраняем запрос в очереди
    querys.push_back(pQueryData);
  }

  void Loop()
  {
    while(!doExit)
    { 
       //блокировка очереди 
       AutoCriticalSection lockQueue(cs);

       //есть задания в очереди?
       if(!querys.empty())
       {
          //извлекаем запрос
          boost::share_ptr<QueryData> pQueryData = querys.front();
          querys.pop();
         
          //убираем блокировку
          lockQueue.release();

          //тут используешь QueueUserWorkItem
          //...

       }       
       ::Sleep(timeToSleep);
    }
  }

//...
private:
  //очередь запросов
  std::queue<boost::share_ptr<QueryData> > querys;
  //для блокировки очереди запросов
  CriticalSection cs;
};
Re[6]: Пул потоков
От: Аноним  
Дата: 04.11.05 20:31
Оценка:
Здравствуйте, Angler, Вы писали:

A>Здравствуйте, Аноним, Вы писали:


A>Ты ведь сам писал, что "Соединение довольно скоротечное". Представь себе ситуацию, когда в пуле нет свободного потока, а к серверу приходит запрос от клиента. В этои случае сервер сможет отдать управление клиенту только в случае появления свободного потока в пуле. Чтобы не ждать, можно организовать на сервере еще и очередь клиентских запросов:


А сколько потоков может быть в пуле? ведь это так называемый системный пул так? им сама система управляет?
Re[7]: Пул потоков
От: gear nuke  
Дата: 05.11.05 01:24
Оценка:
Здравствуйте, <Аноним>, Вы писали:

А>А сколько потоков может быть в пуле? ведь это так называемый системный пул так? им сама система управляет?


В MSDN всё есть в ремарках к QueueUserWorkItem:

By default, the thread pool has a maximum of 500 threads. To raise this limit, use the WT_SET_MAX_THREADPOOL_THREAD macro defined in Winnt.h.

#define WT_SET_MAX_THREADPOOL_THREADS(Flags,Limit) ((Flags)|=(Limit)<<16)

Use this macro when specifying the Flags parameter. The macro parameters are the desired flags and the new limit (up to (2<<16)-1 threads). However, note that your application can improve its performance by keeping the number of worker threads low.

People who are more than casually interested in computers should have at least some idea of what the underlying hardware is like. Otherwise the programs they write will be pretty weird (c) D.Knuth
Re: Пул потоков
От: yslag Украина  
Дата: 07.11.05 12:12
Оценка: 2 (1)
Здравствуйте, Аноним, Вы писали:


А>Вообщем для каждого треда создается структура как параметр. Подскажите, можно ли как нибудь иметь пул потоков для этого дела и при получении очередного запроса передавать данные уже в имеющийся поток? Все таки создавать поток для каждого соединений как то по жлобски.


А>Покажите пожалуйста немного кода.


А>спасибо



здесь
Автор(ы): Алексей Ширшов
Дата: 03.08.2003
Статья посвящена системным механизмам, организующим (или помогающим организовать) пул потоков. Рассматриваются базовые, универсальные сервисы, с помощью которых можно реализовывать серверы для любых доступных механизмов взаимодействия сервера и клиента: сокеты, именованные каналы (named pipes), почтовые ящики (mailslots) и проч.
смотрели?
Re: Пул потоков
От: Аноним  
Дата: 11.11.05 17:59
Оценка:
Здравствуйте, Аноним, Вы писали:

А>ТАкая ситуация. Клиент логинится на сервер и просит его выслать ему информацию. На каждый запрос создается поток. Соединение довольно скоротечное и сокеты блокирующие. Сейчас код выглядит так.



А>
А>void Server::OnRead ()
А>{
А>    if (command == "get")
А>    {
А>       Data* data = new Data;
       data->>id = ...
       data->>data = ...

А>       AfxBeginThread (Thread, (LPVOID) data, ...)
А>    }
А>}
А>



А>Вообщем для каждого треда создается структура как параметр. Подскажите, можно ли как нибудь иметь пул потоков для этого дела и при получении очередного запроса передавать данные уже в имеющийся поток? Все таки создавать поток для каждого соединений как то по жлобски.


А>Покажите пожалуйста немного кода.


А>спасибо




// ThreadController.h: interface for the ThreadController class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_THREADCONTROLLER_H__BBC6C7A9_6CB8_48C6_861E_ABE432605310__INCLUDED_)
#define AFX_THREADCONTROLLER_H__BBC6C7A9_6CB8_48C6_861E_ABE432605310__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#include <map>
#include <vector>

#define THREADS_COUNT 5
#define MAX_REQUEST 5
#define MAX_START_THREAD_WAIT 1000
#define MAX_STOP_THREAD_WAIT 10000

struct ThreadEvents
{
    HANDLE hStartEvent;
    HANDLE hStopEvent;
};

enum TC_ExceptionType
{
    UNKNOWN_EXCEPTION,
    MEMORY_EXCEPTION,
    SYSTEM_EXCEPTION
};

struct TC_Exception
{
    TC_ExceptionType eType;
    TC_Exception(TC_ExceptionType tc_eType)
    {
        eType = tc_eType;
    }
};

class ThreadController  
{
public:
    bool SendRequest(DWORD dwParam);
    void CallAPCFunction(DWORD dwParam);
    void SetAPCFunction(PAPCFUNC pfnUserAPC);
    void Unlock();
    void Lock();
    void Decrement(DWORD dwThreadId);
    void Increment(DWORD dwThreadId);
    ThreadController();
    virtual ~ThreadController();
    static ThreadController* pController;
private:
    PAPCFUNC pfnAPC;
    std::map<int, int> m_mapCounters;
    std::map<int, HANDLE> m_mapThreads;
    int iThreadsCount;
    int iMaxRequests;
    LPCRITICAL_SECTION cs;
    HANDLE hStopThreadsEvent;
};

#endif // !defined(AFX_THREADCONTROLLER_H__BBC6C7A9_6CB8_48C6_861E_ABE432605310__INCLUDED_)



// ThreadController.cpp: implementation of the ThreadController class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "ThreadController.h"

DWORD WINAPI ThreadFunc(LPVOID);
void WINAPI APCFunc(DWORD);

ThreadController* ThreadController::pController = NULL;

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

ThreadController::ThreadController()
{
    ThreadController::pController = this;

    HANDLE hStartEvent = NULL;
    HANDLE hStopEvent = NULL;

    HANDLE hThread = NULL;
    DWORD dwThreadId = 0;

    this->iMaxRequests = MAX_REQUEST;
    this->iThreadsCount = THREADS_COUNT;
    this->hStopThreadsEvent = NULL;
    this->pfnAPC = NULL;

    this->cs = new CRITICAL_SECTION;
    InitializeCriticalSection(this->cs);

    if(this->cs == NULL)
    {
        throw TC_Exception(MEMORY_EXCEPTION);
    }

    hStartEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
    if(hStartEvent == NULL)
    {
        throw TC_Exception(SYSTEM_EXCEPTION);
    }

    hStopEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
    if(hStopEvent == NULL)
    {
        throw TC_Exception(SYSTEM_EXCEPTION);
    }

    ThreadEvents *te = new ThreadEvents;
    if(te == NULL)
    {
        throw TC_Exception(MEMORY_EXCEPTION);
    }
    te->hStartEvent = hStartEvent;
    te->hStopEvent = hStopEvent;

    for(int i = 0; i < this->iThreadsCount; i++)
    {
        hThread = CreateThread(NULL, 0, ThreadFunc, te, 0, &dwThreadId);
        if(hThread == NULL)
        {
            throw TC_Exception(SYSTEM_EXCEPTION);
        }
        this->m_mapThreads[dwThreadId] = hThread;
        this->m_mapCounters[dwThreadId] = 0;
        DWORD dwResult = WaitForSingleObject(hStartEvent, MAX_START_THREAD_WAIT);
        if(dwResult == WAIT_TIMEOUT)
        {
            throw TC_Exception(SYSTEM_EXCEPTION);
        }
    }

    delete te;

    if(hStartEvent != NULL)
    {
        CloseHandle(hStartEvent);
    }

    if(hStopEvent != NULL)
    {
        this->hStopThreadsEvent = hStopEvent;
    }
}

ThreadController::~ThreadController()
{
    int Count = this->iThreadsCount;

    HANDLE* Array = new HANDLE[this->iThreadsCount];
    HANDLE* TempArray;

    std::map<int, HANDLE>::const_iterator map_cIter;

    int i = 0;

    for(map_cIter = this->m_mapThreads.begin(); map_cIter != this->m_mapThreads.end(); map_cIter++, i++)
    {
        Array[i] = map_cIter->second;
    }

    for(i = 0; i < this->iThreadsCount; i++)
    {
        SetEvent(this->hStopThreadsEvent);
        DWORD dwResult = WaitForMultipleObjects(Count, Array, false, MAX_STOP_THREAD_WAIT);
        if(dwResult == WAIT_TIMEOUT)
        {
            throw TC_Exception(SYSTEM_EXCEPTION);
        }
        if(i != (this->iThreadsCount - 1))
        {
            TempArray = Array;
            Array = new HANDLE[Count-1];
            for(int j = 0, r = 0 ; j < Count; j++)
            {
                DWORD dwCode = 0;
                if(GetExitCodeThread(TempArray[j], &dwCode))
                {
                    if(dwCode == STILL_ACTIVE)
                    {
                        Array[r] = TempArray[j];
                        r++;
                    }
                }
            }
            delete[] TempArray;
            Count -= 1;
        }
        else
        {
            delete[] Array;
        }
    }

    for(map_cIter = this->m_mapThreads.begin(); map_cIter != this->m_mapThreads.end(); map_cIter++)
    {
        CloseHandle(map_cIter->second);
    }

    CloseHandle(this->hStopThreadsEvent);
    
    DeleteCriticalSection(this->cs);
    delete this->cs;
    
    ThreadController::pController = NULL;
}

void ThreadController::Increment(DWORD dwThreadId)
{
    //std::map<int, int>::iterator map_Iter;
    //map_Iter = m_mapCounters.find(dwThreadId);
    //if(map_Iter != m_mapCounters.end())
    //{
        //map_Iter->second++;
        this->m_mapCounters[dwThreadId]++;
    //}
}

void ThreadController::Decrement(DWORD dwThreadId)
{
    //std::map<int, int>::iterator map_Iter;
    //map_Iter = m_mapCounters.find(dwThreadId);
    //if(map_Iter != m_mapCounters.end())
    //{
        //map_Iter->second--;
        this->m_mapCounters[dwThreadId]--;
    //}
}

void ThreadController::Lock()
{
    EnterCriticalSection(this->cs);
}

void ThreadController::Unlock()
{
    LeaveCriticalSection(this->cs);
}

void ThreadController::SetAPCFunction(PAPCFUNC pfnUserAPC)
{
    this->pfnAPC = pfnUserAPC;
}

void ThreadController::CallAPCFunction(DWORD dwParam)
{
    (this->pfnAPC)(dwParam);
}

bool ThreadController::SendRequest(DWORD dwParam)
{
    bool bResult = false;
    DWORD dwThreadId;
    int Min;
    std::map<int, int>::const_iterator map_cIter;
    this->Lock();
    map_cIter = this->m_mapCounters.begin();
    dwThreadId = map_cIter->first;
    Min = map_cIter->second;
    for(;map_cIter != this->m_mapCounters.end(); map_cIter++)
    {
        if(map_cIter->second < Min)
        {
            dwThreadId = map_cIter->first;
            Min = map_cIter->second;
        }
    }
    this->m_mapCounters[dwThreadId]++;
    if((Min + 1) > MAX_REQUEST)
    {
        bResult = false;
    }
    else
    {
        QueueUserAPC(APCFunc, this->m_mapThreads[dwThreadId], dwParam);
        bResult = true;
    }
    this->Unlock();
    return bResult;
}

DWORD WINAPI ThreadFunc(LPVOID lpvThreadParm)
{
    ThreadEvents *te = (ThreadEvents *) lpvThreadParm;
    
    HANDLE hStartEvent = te->hStartEvent;
    HANDLE hStopEvent = te->hStopEvent;
    
    SetEvent(hStartEvent);

    for(;;)
    {
        DWORD dwResult = WaitForSingleObjectEx(hStopEvent, INFINITE, true);
        if(dwResult == WAIT_OBJECT_0)
        {
            break;
        }
        else
        {
            if(dwResult == WAIT_IO_COMPLETION)
            {
                continue;
            }
            else
            {
                break;
            }
        }
    }

    return 0;
}

void WINAPI APCFunc(DWORD dwParam)
{
    if(ThreadController::pController != NULL)
    {
        ThreadController::pController->CallAPCFunction(dwParam);
        ThreadController::pController->Lock();
        ThreadController::pController->Decrement(GetCurrentThreadId());
        ThreadController::pController->Unlock();
    }
}



        ThreadController* tc;
        try
        {
            tc = new ThreadController();
            tc->SetAPCFunction(pfnAPCControlService);
        }
        catch(TC_Exception)
        {
            ThreadController::pController = NULL;
        }



        try
        {
            delete tc;
            ThreadController::pController = NULL;
        }
        catch(TC_Exception)
        {
            ThreadController::pController = NULL;
        }
Re: Пул потоков
От: c-smile Канада http://terrainformatica.com
Дата: 11.11.05 20:32
Оценка: +1
Здравствуйте, Аноним, Вы писали:

А>ТАкая ситуация. Клиент логинится на сервер и просит его выслать ему информацию. На каждый запрос создается поток. Соединение довольно скоротечное и сокеты блокирующие. Сейчас код выглядит так.


Дурная мысль: а зачем там потоки вообще?

Дело в том что сокеты сами по себе очереди (входящие) и если у тебя
отдаваемая информация под рукой проще отдать её сразу.

Очень большая вероятность что такое решение будет работаь быстрее
чем на потоках. UDP еще для такого типа трафика весьма полезно.
Re[2]: Пул потоков
От: al71  
Дата: 11.11.05 23:32
Оценка:
Здравствуйте, c-smile, Вы писали:

CS>Здравствуйте, Аноним, Вы писали:


А>>ТАкая ситуация. Клиент логинится на сервер и просит его выслать ему информацию. На каждый запрос создается поток. Соединение довольно скоротечное и сокеты блокирующие. Сейчас код выглядит так.


CS>Дурная мысль: а зачем там потоки вообще?


CS>Дело в том что сокеты сами по себе очереди (входящие) и если у тебя

CS>отдаваемая информация под рукой проще отдать её сразу.

CS>Очень большая вероятность что такое решение будет работаь быстрее

CS>чем на потоках. UDP еще для такого типа трафика весьма полезно.

Полностью согласен с c-smile — в большинстве случаев потоки действительно не нужны. Но если уж совсем критично (скорость, надежность), то смотреть лучше не в сторону потоков и даже их пула (хотя всегда есть варианты) — а в сторону организации еще одной очереди, куда вы будете отправлять принятые вами сообщения — очереди на обработку сообщений (ка вариант см. MessageQueue, Service Broker на Yukon, AQ Oracle и даже Names Pipe) — просто на другой стороне очереди (возможно даже на другом железе) организуете нужное вам количество слушающих эту очередь потоков и все.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.