Достался мне код на доработку. Основная часть выглядит очень просто
while (need_run) {
char * data = getData(); // получить данные с устройстваchar * pr_data = processData(data); // обработка
sendData(pr_data); // по сети отправить потребителю.
}
getData — получает данные с устройства, обычно выполняется за постоянное время, порядка 40 мс. processData — от 5 до 100 мс, но обычно также в районе 40-50мс. ( в зависимости от данных). data — несколько мегабайт, pr_data — пара десяток килобайт. По факту, data — глобальный массив. sendData выполняется быстро.
Так как getData и processData абсолютно независимы, захотелось запустить их в отдельных потоках, тем самым поднять в раза полтора скорость. Родился такой код.
#include"stdafx.h"#include <system_error>
#include <iostream>
#include <windows.h>
#include <process.h>
const int MSIZE = 1024*1024 * 16;
// два буфера, применяются по очередиchar shareData1[MSIZE];
char shareData2[MSIZE];
uintptr_t thr; // хендл потока для получения данных
HANDLE hEvent1, hEvent2; // для двух event'овvolatile bool run = true; // для остановки потокаchar * thrData = NULL; // адрес одного из буферов, куда поток будет вставлять свои данные
// процедура потокаvoid thrProc(void* args) {
while (run) {
//код, симулирующий получение данныхfor (int i = 0; i < MSIZE; i++)
thrData[i] = i % 256;
//signal1
SetEvent(hEvent1);
// wait2
WaitForSingleObject( hEvent2, INFINITE );
}
}
// запуск потока получения данныхvoid startThread() {
hEvent1 = CreateEvent( NULL, FALSE, TRUE, NULL );
hEvent2 = CreateEvent( NULL, FALSE, FALSE, NULL );
thrData = shareData1;
thr = _beginthread(thrProc, 0, NULL);
}
// функция получения данныхchar * getDataNew()
{
// если запросили данные первый раз - запускаем потокif (thr == NULL) {
startThread();
}
// wait 1
WaitForSingleObject( hEvent1, INFINITE );
// меняем буфера местамиchar * data = thrData;
thrData = (thrData == shareData2)?shareData1:shareData2;
// signal 2
SetEvent( hEvent2 );
return data;
}
// функция обработки данныхint processData( char * data )
{
int s = 0;
// симулируем обработку данныхfor (int i = 0; i < MSIZE; i++)
s += data[i]; // то, что здесь переполнение, нас мало интересуетreturn s;
}
// остановка потока получения данныхvoid stopThread()
{
run = false;
SetEvent(hEvent2); // for stop
WaitForSingleObject( (HANDLE)thr, INFINITE );
}
int _tmain(int argc, _TCHAR* argv[])
{
for (int i = 0; i < 100; i++) { // 100 для приличия, не делать же бесконечный циклchar * data = getDataNew();
processData(data);
}
stopThread();
return 0;
}
Данный код тестировался примитивным профайлером и показал некоторое ускорение. Правда если уменьшать время получения/обработки данных (параметр MSIZE), то event'ы съедают всю производительность
Покритикуйте пожалуйста. Может можно посмотреть где то на более красивое решение. Цикл в main хочется оставить без изменений (иначе нужно будет много переделывать).
1. Event1 нужно создавать в сброшенном состоянии. Иначе WaitForSingleObject в getDataNew
может завершиться раньше, чем SetEvent в thrProc и получим гонку.
2. thrData, да и другие разделяемые переменные, следует сделать volatile.
Естественно, должно быть "volatile-указатель", а не "указатель на volatile".
3. Вместо _beginthread лучше использовать _beginthreadex. У первой функции есть некоторые ограничения.
4. Попробуйте замерить, насколько часто потоки погружаются в сон (WaitForSingleObject).
Если не часто, синхронизацию на событиях можно заменить спинлоками или даже критическими
секциями. Но в любом случае полученный эффект надо замерять профайлером.
Из собственного опыта могу посоветовать Completion Ports. Очень удобно, ибо вся синхронизация делается самим ядром, заморачиваться со всякими эвентами не надо. Producer сует в порт очередной айтем с помощью PostQueuedCompletionStatus, а соответсвенно Consumer выгребает это дело с помощью GetQueueCompletionStatus. На Висте и выше можно заюзать GetQueuedCompletionStatusEx которая позволяет не один айтем, а сразу кучу выгрести за один вызов. Получается очень "чистенько" в плане кода, и насколько можно производительно.
Здравствуйте, okman, Вы писали:
O>Здравствуйте, OdesitVadim.
O>Навскидку:
O>1. Event1 нужно создавать в сброшенном состоянии. Иначе WaitForSingleObject в getDataNew O>может завершиться раньше, чем SetEvent в thrProc и получим гонку.
Тырил с примера на codeproject. Почему то показалось логичным
O>3. Вместо _beginthread лучше использовать _beginthreadex. У первой функции есть некоторые ограничения.
O>4. Попробуйте замерить, насколько часто потоки погружаются в сон (WaitForSingleObject).
а чем лучше поменить? что есть простенькое и бесплатное?
Здравствуйте, mssmax, Вы писали:
M>Здравствуйте, OdesitVadim, Вы писали:
M>Из собственного опыта могу посоветовать Completion Ports. [skip] а сразу кучу выгрести за один вызов.[skip]
Вот как раз "кучу" мне не нужно. Во первых, нужно, что бы данные не сильно устаревали. А во вторых — данных может быть несколько десятков мегабайт в будущем. M>MS.
OV>while (need_run) {
OV> char * data = getData(); // получить данные с устройства
OV> char * pr_data = processData(data); // обработка
OV> sendData(pr_data); // по сети отправить потребителю.
OV>}
OV>
OV>getData — получает данные с устройства, обычно выполняется за постоянное время, порядка 40 мс. processData — от 5 до 100 мс, но обычно также в районе 40-50мс. ( в зависимости от данных). data — несколько мегабайт, pr_data — пара десяток килобайт. По факту, data — глобальный массив. sendData выполняется быстро.
OV>Так как getData и processData абсолютно независимы, захотелось запустить их в отдельных потоках, тем самым поднять в раза полтора скорость. Родился такой код.
// ... OV>Покритикуйте пожалуйста. Может можно посмотреть где то на более красивое решение. Цикл в main хочется оставить без изменений (иначе нужно будет много переделывать).
Я такие вещи делал на конкурентных очередях из Intel TBB. Там есть разные очереди: и блокирующиеся, и неблокирующиеся. Интерфейс у них похожий, так что попробуй разные и померь производительность. Общая структура программы примерно такая:
1) Поток-читатель. Вызывает getData() и скидывает полученный указатель на data в очередь.
2) Поток-обработчик. Забирает данные из очереди и вызывает processData(). Затем скидывает указатель на обработанные данные во вторую очередь.
3) Поток-отправитель. Забирает данные из второй очереди и вызывает sendData().
Потоки запускаются сразу и в произвольном порядке. Если очередь пуста, то поток спит или молотит циклы со Sleep(0). Указатели *data и *pr_data лучше сделать умными, только там с многопоточностью аккуратнее надо. Сейчас наверное можно unique_ptr задействовать. Если хочешь выгадать пару тактов, то можешь сам вручную освобождать память в processData() и sendData(), если ты уверен, что их аргумент (*data и *pr_data соответственно) больше нигде не будут использвоваться.
Это модель из Эрланга, языка заточенного под такие задачи. Просто и надёжно.
Здравствуйте, OdesitVadim, Вы писали:
OV>Тырил с примера на codeproject. Почему то показалось логичным
Offtopic:
Н-да, разочаровался я в последнее время в Codeproject.
За последний месяц три раза сталкивался с исходниками оттуда родом и
каждый раз находил какой-нибудь баг.
Здравствуйте, OdesitVadim, Вы писали:
OV>Здравствуйте, mssmax, Вы писали:
M>>Здравствуйте, OdesitVadim, Вы писали:
M>>Из собственного опыта могу посоветовать Completion Ports. [skip] а сразу кучу выгрести за один вызов.[skip] OV>Вот как раз "кучу" мне не нужно. Во первых, нужно, что бы данные не сильно устаревали. А во вторых — данных может быть несколько десятков мегабайт в будущем. M>>MS.
Ну, устаревание данных насколько я могу понять зависит от того насколько быстро они обрабатываются, так что тут проблемы я не вижу особой. Насчет размера — это не важно, в оригинальном коде у тебя два массива, пока один заполняется, другой обрабатывается ( насколько я понял ). Просто будешь кидать в порт указатель на массив подлежащий обработке и все. Или я где-то что-то не заметил ?
Чето мне кажется, ты не то параллелишь. Параллелят обычно обработку данных, а не получение.
Если загнать получение данных в отдельный поток — это подобие асинхронной операции, но никак не распараллеливание.