Многопоточная обработка потока данных
От: Aniskin  
Дата: 05.05.18 13:29
Оценка:
Прошу совета, как улучшить алгоритм синхронизации.

Задача – обработать входной поток неким алгоритмом и сохранить результат в выходном потоке. Есть пул потоков, каждый поток выполняет одинаковую задачу, состоящую из следующих пунктов:
0) Захват ReadMutex
1) Получение ID блока (просто число начиная от 1, увеличивается с каждым блоком) и чтение блока данных.
2) Освобождение ReadMutex
3) Обработка блока данных
4) Захват WriteMutex
5) Если ID последнего записанного блока равно текущему ID-1, то запись блока в выходной поток, освобождение WriteMutex, goto 0, иначе 6
6) освобождение WriteMutex, Sleep(100), goto 4.

Как видно из схемы работы, если поток B обработал блок данных N раньше, чем поток A обработал блок данных N-1, то возникает цикл с использованием Sleep. Но мне ну очень не нравиться этот цикл (жру понапрасну такты процессора, тем самым увеличивая энтропию вселенной и приближаю ее тепловую смерть). Хочется заменить на какой-нибудь WaitForSingleObject, но не могу придумать, что же конкретно мне ждать.
Re: Многопоточная обработка потока данных
От: Sharowarsheg  
Дата: 05.05.18 14:24
Оценка:
Здравствуйте, Aniskin, Вы писали:

Насколько большие блоки и постоянной ли они длины? Нельзя ли записывать блоки в произвольном порядке (особенно, если они постоянной длины)? Нельзя ли сделать очередь на запись, и пусть каждый поток кладет свои обработанные блоки в эту очередь, а еще один поток пусть выбирает из очереди и упорядоченно записывает. Нужно, конечно, будет принять меры, чтобы очередь не распухла, но тем не менее.
Re: Многопоточная обработка потока данных
От: Nuzhny Россия https://github.com/Nuzhny007
Дата: 05.05.18 15:58
Оценка:
Здравствуйте, Aniskin, Вы писали:

A>Прошу совета, как улучшить алгоритм синхронизации.


A>Как видно из схемы работы, если поток B обработал блок данных N раньше, чем поток A обработал блок данных N-1, то возникает цикл с использованием Sleep. Но мне ну очень не нравиться этот цикл (жру понапрасну такты процессора, тем самым увеличивая энтропию вселенной и приближаю ее тепловую смерть). Хочется заменить на какой-нибудь WaitForSingleObject, но не могу придумать, что же конкретно мне ждать.



На каждый поток сделать Event и использовать WaitForMultipleObjects? И заменить Mutex на CriticalSection.
Re[2]: Многопоточная обработка потока данных
От: Aniskin  
Дата: 06.05.18 01:12
Оценка:
Здравствуйте, Sharowarsheg, Вы писали:

S>Здравствуйте, Aniskin, Вы писали:


S>Насколько большие блоки и постоянной ли они длины?

В общем случае блоки большие и переменной длины.

S>Нельзя ли записывать блоки в произвольном порядке?

Нет. Я произвожу операцию упаковки/распаковки в многопоточном режиме, блоки должны следовать последовательно.

S>Нельзя ли сделать очередь на запись, и пусть каждый поток кладет свои обработанные блоки в эту очередь, а еще один поток пусть выбирает из очереди и упорядоченно записывает. Нужно, конечно, будет принять меры, чтобы очередь не распухла, но тем не менее.

Можно, конечно, изменить алгоритм на предложенный, но в этом случает при двух потоках первый будет читать и обрабатывать, а второй только писать, imho не очень рационально. В целом, хотелось бы остаться в рамках исходного алгоритма.
Re[2]: Многопоточная обработка потока данных
От: Aniskin  
Дата: 06.05.18 01:15
Оценка:
Здравствуйте, Nuzhny, Вы писали:

N>На каждый поток сделать Event и использовать WaitForMultipleObjects?

Я уже думал об Event-ах, но не могу придумать, как мне корректно добавить их в алгоритм.

N>И заменить Mutex на CriticalSection.

Разумно.
Re[3]: Многопоточная обработка потока данных
От: Sharowarsheg  
Дата: 06.05.18 04:28
Оценка:
Здравствуйте, Aniskin, Вы писали:


S>>Нельзя ли сделать очередь на запись, и пусть каждый поток кладет свои обработанные блоки в эту очередь, а еще один поток пусть выбирает из очереди и упорядоченно записывает. Нужно, конечно, будет принять меры, чтобы очередь не распухла, но тем не менее.

A>Можно, конечно, изменить алгоритм на предложенный, но в этом случает при двух потоках первый будет читать и обрабатывать, а второй только писать, imho не очень рационально. В целом, хотелось бы остаться в рамках исходного алгоритма.

откуда вообще два потока? для N процессоров N потоков и три очереди — чтения, готовых, и записи. очередь готовых будет списком, а чтения и записи — очередями FIFO.

читатель -> рабочие -> писатель

пусть читетель не жрёт процессор (а жрёт диск или сеть).
тогда если писатель тоже не жрёт процессор, то рабочих N штук, а если писатель жрёт процессор, то рабочих N-1 штук, но минимум 1

читатель читает блоки с диска и ставит их в очередь чтения. если она слишком длинная, то читатель пропускает ход, делая sleep(сколько-то). когда поставил очередной блок, сигналит событие R.

рабочие ждут событие R, и когда оно возникает, просыпаются и смотрят в очередь готовых. если очередь готовых слишком длинная, то они делают sleep() и потом снова смотрят в очередь готовых (потому что события R следующего может и не быть, если был считан последний блок).

после того, как установлено, что очередь готовых не забита, рабочие смотрят очередь чтения. если она не пуста, то выбирают себе по элементу и начинают обработку. Когда рабочий обработал блок, он ставит этот блок в очередь готовых и сигналит событие W. после чего рабочий засыпает снова до события R.

писатель ждет события W.
когда оно возникает, он блокирует очередь готовых, сортирует ее, и переносит все блоки, которые по порядку можно взять, из очереди готовых в очередь записи. После этого писатель отпускает блокировку очереди готовых, и начинает записывать блоки из очереди записи. После того как все блоки записаны, писатель засыпает снова до события W (если за время записи успел появиться новый готовый блок, то событие W будет уже установлено).

такая схема дает максимальную производительность, если читатель и писатель ограничены в основном диском, а рабочие в основом процессором, и если рабочие не зависят от порядка блоков.
Отредактировано 06.05.2018 4:30 Sharowarsheg . Предыдущая версия . Еще …
Отредактировано 06.05.2018 4:29 Sharowarsheg . Предыдущая версия .
Re[4]: Многопоточная обработка потока данных
От: Aniskin  
Дата: 06.05.18 04:58
Оценка:
Здравствуйте, Sharowarsheg, Вы писали:

S>такая схема дает максимальную производительность


Спасибо за идею, но imho алгоритм переусложняется. В моей ситуации обработка блока занимает 99.9% времени, а время чтения и записи практически ничтожны по сравнению со временем обработки. И мне важно, что бы все потоки занимались обработкой по максимуму. И в предложенном варианте два Sleep вместо одного, от которого я и стараюсь уйти. И в исходном варианте я могу легко запустить однопоточную версию без изменения кода, могу динамически уменьшить кол-во потоков до одного без изменения кода.
Re: Многопоточная обработка потока данных
От: Pavel Dvorkin Россия  
Дата: 06.05.18 05:11
Оценка:
Здравствуйте, Aniskin, Вы писали:

А что за входной и выходной потоки ? Куда они направлены, на файлы ? Если да — я бы вообще отказался от stream, обрабатывал по механизму random access file или file mapping. Пусть каждый поток читает и пишет, не обращая внимания на остальные, когда все закончится — выходной файл будет в порядке.
With best regards
Pavel Dvorkin
Re[2]: Многопоточная обработка потока данных
От: Aniskin  
Дата: 06.05.18 05:44
Оценка:
Здравствуйте, Pavel Dvorkin, Вы писали:

PD>А что за входной и выходной потоки?


Потоки — это именно абстрактные потоки. И размер выходного потока не известен, он зависит от того, как произойдет сжатие/распаковка данных.
Re[3]: Многопоточная обработка потока данных
От: Pavel Dvorkin Россия  
Дата: 06.05.18 06:13
Оценка: 13 (2)
Здравствуйте, Aniskin, Вы писали:

A>Потоки — это именно абстрактные потоки. И размер выходного потока не известен, он зависит от того, как произойдет сжатие/распаковка данных.


Ясно.

Если я правильно тебя понял, тебе надо, чтобы N+1 блок писался не раньше, чем N-й, так ?
При этом запись N+1 блока производит поток, условно говоря, N+1 (условно, потому что у тебя пул, а не ручной запуск), а N-го блока — N-й поток

Тогда предлагаю следующий алгоритм

При передаче задания потоку N ему передается ивент N-1 и ивент N. Он перед записью должен ждать ивент N -1, а после записи устанавливать ивент N.
0-й поток получит NULL вместо ивент N-1 — ему ждать нечего.

Установка ивента N означает — поток N свое дело сделал, следующему потоку можно писать.

Чтобы не плодить ивенты, можно сделать самописный пул ивентов, то есть поток после своего выполнения возвращает ивент в пул, откуда его потом возьмут и опять отдадут.
With best regards
Pavel Dvorkin
Re[4]: Многопоточная обработка потока данных
От: Aniskin  
Дата: 06.05.18 06:45
Оценка:
Здравствуйте, Pavel Dvorkin, Вы писали:

PD>Тогда предлагаю следующий алгоритм


Вроде подходит под мои пожелания, буду пробовать. Спасибо.

PD>Чтобы не плодить ивенты, можно сделать самописный пул ивентов


Проще, наверное, просто иметь по ивенту на поток. Поток при чтении данных читает из переменной CurrentEvent ивент "родительского" потока и пишет туда же свой ивент.
Re[5]: Многопоточная обработка потока данных
От: Aniskin  
Дата: 06.05.18 10:59
Оценка:
PD>>Чтобы не плодить ивенты, можно сделать самописный пул ивентов

A>Проще, наверное, просто иметь по ивенту на поток. Поток при чтении данных читает из переменной CurrentEvent ивент "родительского" потока и пишет туда же свой ивент.


Идея с ивентом на поток оказалась ошибочной (имею периодические дедлоки), а с пулом ивентов правильной. Для моего алгоритма требуемое количество ивентов равно количеству потоков + 1.
Re: Многопоточная обработка потока данных
От: CEMb  
Дата: 07.05.18 02:30
Оценка:
Здравствуйте, Aniskin, Вы писали:

A> Хочется заменить на какой-нибудь WaitForSingleObject, но не могу придумать, что же конкретно мне ждать.


Если у тебя потоки дохнут после того, как обработали блок, то WaitForSingleObject(hThread) — вернётся после того, как поток будет закрыт
Re[2]: Многопоточная обработка потока данных
От: Aniskin  
Дата: 07.05.18 03:36
Оценка:
Здравствуйте, CEMb, Вы писали:

CEM>Если у тебя потоки дохнут после того, как обработали блок


Нет, они не дохнут, а переходят к обработке следующего блока.
Re[2]: Многопоточная обработка потока данных
От: ononim  
Дата: 07.05.18 16:11
Оценка:
CEM>Если у тебя потоки дохнут после того, как обработали блок, то WaitForSingleObject(hThread) — вернётся после того, как поток будет закрыт
WaitForSingleObject(hThread) вернется когда поток завершит исполнение. Если под _закрыт_ подразумевалось CloseHandle(hThread) в то время как
работает WaitForSingleObject(hThread) — то это русская рулетка, только в ногу.
Как много веселых ребят, и все делают велосипед...
Re[3]: Многопоточная обработка потока данных
От: CEMb  
Дата: 08.05.18 02:33
Оценка:
Здравствуйте, ononim, Вы писали:

CEM>>Если у тебя потоки дохнут после того, как обработали блок, то WaitForSingleObject(hThread) — вернётся после того, как поток будет закрыт

O>WaitForSingleObject(hThread) вернется когда поток завершит исполнение. Если под _закрыт_ подразумевалось CloseHandle(hThread) в то время как
O>работает WaitForSingleObject(hThread) — то это русская рулетка, только в ногу.

О, кстати, я _endthreadex вообще не пользуюсь, просто жду, когда сам выйдет
Насколько помню, любая попытка закрыть таким образом поток, даже без WaitForSingleObject(hThread), приводила к падению.
Когда можно/нужно использовать _endthreadex?
Re: Многопоточная обработка потока данных
От: VVV Россия  
Дата: 08.05.18 08:03
Оценка:
Здравствуйте, Aniskin, Вы писали:

A>4) Захват WriteMutex

A>5) Если ID последнего записанного блока равно текущему ID-1, то запись блока в выходной поток, освобождение WriteMutex, goto 0, иначе 6
A>6) освобождение WriteMutex, Sleep(100), goto 4.

Хочется заменить на какой-нибудь WaitForSingleObject, но не могу придумать, что же конкретно мне ждать.

Завести Event типа idChanged.

Тогда на шаге 5), после записи блока данных выставлять этот Event, на шаге 6) вместо Sleep(100) ждать WaitXXX(idChanged...).
Re[4]: Многопоточная обработка потока данных
От: ononim  
Дата: 08.05.18 13:54
Оценка:
CEM>О, кстати, я _endthreadex вообще не пользуюсь, просто жду, когда сам выйдет
CEM>Насколько помню, любая попытка закрыть таким образом поток, даже без WaitForSingleObject(hThread), приводила к падению.
CEM>Когда можно/нужно использовать _endthreadex?
разделите понятия закрыть и завершить
Поток можно завершить.
Закрыть можно хэндл. На что угодно. На мутекс, ивент, файл, поток.
Закрытие хэндл на поток и завершение потоков — два разных ортогональных друг другу действия.
_endthreadex завершает выполнение текущего потока. Но с ньюансами. Чтобы завершить выполнение текущего потока без ньюансов — надо просто выйти из его головной процедуры.
Как много веселых ребят, и все делают велосипед...
Отредактировано 08.05.2018 13:54 ononim . Предыдущая версия .
Re[3]: Многопоточная обработка потока данных
От: uuuser  
Дата: 13.05.18 00:06
Оценка:
Здравствуйте, Aniskin, Вы писали:

N>>И заменить Mutex на CriticalSection.

A>Разумно.

а лучше на https://msdn.microsoft.com/en-us/library/windows/desktop/aa904937(v=vs.85).aspx
CS прошлый век.
Re[4]: Многопоточная обработка потока данных
От: Aniskin  
Дата: 13.05.18 00:15
Оценка:
Здравствуйте, uuuser, Вы писали:

U>Здравствуйте, Aniskin, Вы писали:


N>>>И заменить Mutex на CriticalSection.

A>>Разумно.

U>а лучше на https://msdn.microsoft.com/en-us/library/windows/desktop/aa904937(v=vs.85).aspx

U>CS прошлый век.

Может быть зря конечно, но я еще просто XP поддерживаю. И в моей ситуации скорость секции не играет никакого значения.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.