Покритикуйте реализацию producer-consumer
От: OdesitVadim Украина  
Дата: 05.12.12 14:41
Оценка:
Hi

Достался мне код на доработку. Основная часть выглядит очень просто

while (need_run) {
  char * data = getData(); // получить данные с устройства
  char * pr_data = processData(data); // обработка
  sendData(pr_data); // по сети отправить потребителю.
}


getData — получает данные с устройства, обычно выполняется за постоянное время, порядка 40 мс. processData — от 5 до 100 мс, но обычно также в районе 40-50мс. ( в зависимости от данных). data — несколько мегабайт, pr_data — пара десяток килобайт. По факту, data — глобальный массив. sendData выполняется быстро.

Так как getData и processData абсолютно независимы, захотелось запустить их в отдельных потоках, тем самым поднять в раза полтора скорость. Родился такой код.


#include "stdafx.h"
#include <system_error>
#include <iostream>
#include <windows.h>
#include <process.h>

const int MSIZE = 1024*1024 * 16;
// два буфера, применяются по очереди
char shareData1[MSIZE];
char shareData2[MSIZE];

uintptr_t thr; // хендл потока для получения данных
HANDLE hEvent1, hEvent2;  // для двух event'ов
volatile bool run = true; // для остановки потока
char * thrData = NULL; // адрес одного из буферов, куда поток будет вставлять свои данные

// процедура потока
void thrProc(void* args) {
    while (run) {
        //код, симулирующий получение данных
        for (int i = 0; i < MSIZE; i++) 
            thrData[i] = i % 256;
        //signal1
        SetEvent(hEvent1);
        // wait2
        WaitForSingleObject( hEvent2, INFINITE );
    }
}

// запуск потока получения данных
void startThread() {
    hEvent1 = CreateEvent( NULL, FALSE, TRUE, NULL );
    hEvent2 = CreateEvent( NULL, FALSE, FALSE, NULL );
        thrData = shareData1;
    thr = _beginthread(thrProc, 0, NULL);
}

// функция получения данных
char * getDataNew()
{
        // если запросили данные первый раз - запускаем поток
    if (thr == NULL) {
        startThread();
    }
    // wait 1
    WaitForSingleObject( hEvent1, INFINITE );
        // меняем буфера местами
    char * data = thrData;
    thrData = (thrData == shareData2)?shareData1:shareData2;
    // signal 2
    SetEvent( hEvent2 );
    return data;
}

// функция обработки данных
int processData( char * data ) 
{
    int s = 0;
        // симулируем обработку данных
    for (int i = 0; i < MSIZE; i++)
        s += data[i];  // то, что здесь переполнение, нас мало интересует
    return s;
}

// остановка потока получения данных
void stopThread() 
{
    run = false;
    SetEvent(hEvent2); // for stop
    WaitForSingleObject( (HANDLE)thr, INFINITE );
}

int _tmain(int argc, _TCHAR* argv[])
{
    for (int i = 0; i < 100; i++) { // 100 для приличия, не делать же бесконечный цикл
        char * data = getDataNew();
        processData(data);
    }
    stopThread();
    return 0;
}

Данный код тестировался примитивным профайлером и показал некоторое ускорение. Правда если уменьшать время получения/обработки данных (параметр MSIZE), то event'ы съедают всю производительность

Покритикуйте пожалуйста. Может можно посмотреть где то на более красивое решение. Цикл в main хочется оставить без изменений (иначе нужно будет много переделывать).
vs2010 producer-consumer
Re: Покритикуйте реализацию producer-consumer
От: okman Беларусь https://searchinform.ru/
Дата: 05.12.12 15:43
Оценка:
Здравствуйте, OdesitVadim.

Навскидку:

1. Event1 нужно создавать в сброшенном состоянии. Иначе WaitForSingleObject в getDataNew
может завершиться раньше, чем SetEvent в thrProc и получим гонку.

2. thrData, да и другие разделяемые переменные, следует сделать volatile.
Естественно, должно быть "volatile-указатель", а не "указатель на volatile".

3. Вместо _beginthread лучше использовать _beginthreadex. У первой функции есть некоторые ограничения.

4. Попробуйте замерить, насколько часто потоки погружаются в сон (WaitForSingleObject).
Если не часто, синхронизацию на событиях можно заменить спинлоками или даже критическими
секциями. Но в любом случае полученный эффект надо замерять профайлером.
Re: Покритикуйте реализацию producer-consumer
От: mssmax  
Дата: 05.12.12 15:52
Оценка: 1 (1)
Здравствуйте, OdesitVadim, Вы писали:

Из собственного опыта могу посоветовать Completion Ports. Очень удобно, ибо вся синхронизация делается самим ядром, заморачиваться со всякими эвентами не надо. Producer сует в порт очередной айтем с помощью PostQueuedCompletionStatus, а соответсвенно Consumer выгребает это дело с помощью GetQueueCompletionStatus. На Висте и выше можно заюзать GetQueuedCompletionStatusEx которая позволяет не один айтем, а сразу кучу выгрести за один вызов. Получается очень "чистенько" в плане кода, и насколько можно производительно.

MS.
Re[2]: Покритикуйте реализацию producer-consumer
От: OdesitVadim Украина  
Дата: 05.12.12 15:56
Оценка:
Здравствуйте, okman, Вы писали:

O>Здравствуйте, OdesitVadim.


O>Навскидку:


O>1. Event1 нужно создавать в сброшенном состоянии. Иначе WaitForSingleObject в getDataNew

O>может завершиться раньше, чем SetEvent в thrProc и получим гонку.
Тырил с примера на codeproject. Почему то показалось логичным

O>3. Вместо _beginthread лучше использовать _beginthreadex. У первой функции есть некоторые ограничения.


O>4. Попробуйте замерить, насколько часто потоки погружаются в сон (WaitForSingleObject).

а чем лучше поменить? что есть простенькое и бесплатное?
Re[2]: Покритикуйте реализацию producer-consumer
От: OdesitVadim Украина  
Дата: 05.12.12 16:00
Оценка:
Здравствуйте, mssmax, Вы писали:

M>Здравствуйте, OdesitVadim, Вы писали:


M>Из собственного опыта могу посоветовать Completion Ports. [skip] а сразу кучу выгрести за один вызов.[skip]

Вот как раз "кучу" мне не нужно. Во первых, нужно, что бы данные не сильно устаревали. А во вторых — данных может быть несколько десятков мегабайт в будущем.
M>MS.
Re: Покритикуйте реализацию producer-consumer
От: Mazay Россия  
Дата: 05.12.12 17:43
Оценка:
Здравствуйте, OdesitVadim, Вы писали:

OV>
OV>while (need_run) {
OV>  char * data = getData(); // получить данные с устройства
OV>  char * pr_data = processData(data); // обработка
OV>  sendData(pr_data); // по сети отправить потребителю.
OV>}
OV>


OV>getData — получает данные с устройства, обычно выполняется за постоянное время, порядка 40 мс. processData — от 5 до 100 мс, но обычно также в районе 40-50мс. ( в зависимости от данных). data — несколько мегабайт, pr_data — пара десяток килобайт. По факту, data — глобальный массив. sendData выполняется быстро.


OV>Так как getData и processData абсолютно независимы, захотелось запустить их в отдельных потоках, тем самым поднять в раза полтора скорость. Родился такой код.

// ...
OV>Покритикуйте пожалуйста. Может можно посмотреть где то на более красивое решение. Цикл в main хочется оставить без изменений (иначе нужно будет много переделывать).

Я такие вещи делал на конкурентных очередях из Intel TBB. Там есть разные очереди: и блокирующиеся, и неблокирующиеся. Интерфейс у них похожий, так что попробуй разные и померь производительность. Общая структура программы примерно такая:
1) Поток-читатель. Вызывает getData() и скидывает полученный указатель на data в очередь.
2) Поток-обработчик. Забирает данные из очереди и вызывает processData(). Затем скидывает указатель на обработанные данные во вторую очередь.
3) Поток-отправитель. Забирает данные из второй очереди и вызывает sendData().
Потоки запускаются сразу и в произвольном порядке. Если очередь пуста, то поток спит или молотит циклы со Sleep(0). Указатели *data и *pr_data лучше сделать умными, только там с многопоточностью аккуратнее надо. Сейчас наверное можно unique_ptr задействовать. Если хочешь выгадать пару тактов, то можешь сам вручную освобождать память в processData() и sendData(), если ты уверен, что их аргумент (*data и *pr_data соответственно) больше нигде не будут использвоваться.

Это модель из Эрланга, языка заточенного под такие задачи. Просто и надёжно.
Главное гармония ...
Re[3]: Покритикуйте реализацию producer-consumer
От: okman Беларусь https://searchinform.ru/
Дата: 05.12.12 17:56
Оценка:
Здравствуйте, OdesitVadim, Вы писали:

OV>Тырил с примера на codeproject. Почему то показалось логичным


Offtopic:
Н-да, разочаровался я в последнее время в Codeproject.
За последний месяц три раза сталкивался с исходниками оттуда родом и
каждый раз находил какой-нибудь баг.
Re[3]: Покритикуйте реализацию producer-consumer
От: mssmax  
Дата: 06.12.12 08:38
Оценка:
Здравствуйте, OdesitVadim, Вы писали:

OV>Здравствуйте, mssmax, Вы писали:


M>>Здравствуйте, OdesitVadim, Вы писали:


M>>Из собственного опыта могу посоветовать Completion Ports. [skip] а сразу кучу выгрести за один вызов.[skip]

OV>Вот как раз "кучу" мне не нужно. Во первых, нужно, что бы данные не сильно устаревали. А во вторых — данных может быть несколько десятков мегабайт в будущем.
M>>MS.

Ну, устаревание данных насколько я могу понять зависит от того насколько быстро они обрабатываются, так что тут проблемы я не вижу особой. Насчет размера — это не важно, в оригинальном коде у тебя два массива, пока один заполняется, другой обрабатывается ( насколько я понял ). Просто будешь кидать в порт указатель на массив подлежащий обработке и все. Или я где-то что-то не заметил ?

MS.
Re: Покритикуйте реализацию producer-consumer
От: flamin  
Дата: 07.12.12 16:05
Оценка:
Здравствуйте, OdesitVadim, Вы писали:

Чето мне кажется, ты не то параллелишь. Параллелят обычно обработку данных, а не получение.
Если загнать получение данных в отдельный поток — это подобие асинхронной операции, но никак не распараллеливание.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.