Нужен совет и MaxDegreeOfParallelism
От: Sharov Россия  
Дата: 15.02.19 11:58
Оценка:
Здравствуйте.

Имеется группа файлов разного размера, от 100кб до 100мб. Я обрабатываю эти файлы параллельно, но у меня могут возникнуть проблемы с памятью, если я буду обрабатывать слишком много файлов одновроменно или несколько больших файлов (~30Мб). В случае если файл большой (по меркам задачи, разумеется), то обработка 30Мб может оттяпать под 1Гб ОЗУ. Далее, я разбиваю файлы на три группы: 1-ая все файлы <= 10Мб, 2-ая от 10-30Мб, 3-ая от 30Мб.
Я куждую группу запускаю в Parallel.Foreach с MaxDegreeOfParallelism = -1 для 1-ой группы, 4 для 2-ой и 1 для 3-й группы. Т.е. 4 файла >30Мб будут обрабатываться друг за дружкой, по одному в любой момент времени.

Т.е. как-то так

 foreach (var processingStrategy in strategy) //strategy -- файлы и соотв. MaxDegreeOfParallelism 
 {

  Parallel.ForEach(processingStrategyю.Files, new ParallelOptions {MaxDegreeOfParallelism = processingStrategy .ParallelismDegree}
 ...
}


Думал использовать Partitioner, но енто не совсем то, он просто разбивает входной диапазон, а мне нужна
скорее группировка (GroupBy) по размеру файла.

Может тут можно как-то изящнее поступить и не городить свои классы для группировки файлов для обработки? Стоит ли для этой задачи связываться с TaskScheduler'ом?

ЗЫ: Все цифры выше предварительны, т.к. обработка на всех ядкрах (8 на моей машине) 10Мб файлов может вполен вырубить приложение по памяти, а может и нет (надо экспериментировать). Может стоит 10Мб уменьшить до 5Мб. И т.д.
Кодом людям нужно помогать!
Re: Нужен совет и MaxDegreeOfParallelism
От: RushDevion Россия  
Дата: 15.02.19 13:06
Оценка: 4 (1)
Если цель — максимизировать throughput, то можно попробовать другой подход.
При старте создаем пул буферов, с примерно таким интерфейсом:
IMemoryPool
  MemoryBuffer Acquire(long bufferSize);
  void Release(Buffer buffer);

Реализацию пока оставлю за скобками, т.к. возможны разные варианты как с предварительной аллокацией, так и динамическим отслеживанием размера свободной памяти.
Далее у нас есть очередь файлов для обработки.
Алгоритм будет таким:
1. Берем очередной файл из очереди
2. Запрашиваем из пула буфер достаточного размера для чтения/обработки файла
3.1 Если получили буфер — стартуем новую таску для обработки. По окончанию обработки таска возвращает буфер в пул.
3.2 Если буфера нет — помещаем файл в конец очереди
4. Опционально. Засыпаем на какое-то время или до получения сигнала об окончании обработки очередного файла.
5. Повторяем пока очередь не закончится.
Re[2]: Нужен совет и MaxDegreeOfParallelism
От: Sharov Россия  
Дата: 15.02.19 13:58
Оценка:
Здравствуйте, RushDevion, Вы писали:

RD>Если цель — максимизировать throughput, то можно попробовать другой подход.


Parallel для ентого и используется, но цель сейчас скорее чтобы работало стабильно и не падало при нехватки памяти. Не более.

RD>При старте создаем пул буферов, с примерно таким интерфейсом:


RD>IMemoryPool

RD> MemoryBuffer Acquire(long bufferSize);
RD> void Release(Buffer buffer);

.net 4.5.2


RD>Реализацию пока оставлю за скобками, т.к. возможны разные варианты как с предварительной аллокацией, так и динамическим отслеживанием размера свободной памяти.

RD>Далее у нас есть очередь файлов для обработки.
RD>Алгоритм будет таким:
RD>1. Берем очередной файл из очереди
RD>2. Запрашиваем из пула буфер достаточного размера для чтения/обработки файла

Увы, штука сильно динамическая, поскольку в процессе обработки будут использоваться\подгружаться другие файлв в кол-ве от 3 до 8, и все это будет конвертироваться в соотв. базу данных для обработки.
Отсюда возможно всплески до 1ГБ и более. Тут трудно что-либо предсказать. Просто есть вероятность, что если файл >30Мб то возможно отъесть до 1Гб ОЗУ. А может и нет. Поэтму не подходит.

RD>3.1 Если получили буфер — стартуем новую таску для обработки. По окончанию обработки таска возвращает буфер в пул.

RD>3.2 Если буфера нет — помещаем файл в конец очереди
RD>4. Опционально. Засыпаем на какое-то время или до получения сигнала об окончании обработки очередного файла.
RD>5. Повторяем пока очередь не закончится.

Меня больше интересуют гарантии, что при MaxDegreeOfParallelism = 1 точно будет один поток, а то выяснится, что это так, пожелание, которое планировщик вполне может игнорировать.
Кодом людям нужно помогать!
Re[3]: Нужен совет и MaxDegreeOfParallelism
От: RushDevion Россия  
Дата: 15.02.19 14:25
Оценка: 8 (1)
S>.net 4.5.2
Ну заменить MemoryBuffer на MemoryStream.

S>Увы, штука сильно динамическая, поскольку в процессе обработки будут использоваться\подгружаться другие файлв в кол-ве от 3 до 8, и все это будет конвертироваться в соотв. базу данных для обработки.

S>Отсюда возможно всплески до 1ГБ и более. Тут трудно что-либо предсказать. Просто есть вероятность, что если файл >30Мб то возможно отъесть до 1Гб ОЗУ. А может и нет. Поэтму не подходит.
Ну все равно я бы думал в сторону контролёра памяти, который следил бы за объемом доступной памяти и разрешал-запрещал операцию с файлом.

S>Меня больше интересуют гарантии, что при MaxDegreeOfParallelism = 1 точно будет один поток, а то выяснится, что это так, пожелание, которое планировщик вполне может игнорировать.

Так а зачем тебе Parallel для MaxDegreeOfParallelism=1? Просто запускай отдельный поток/таску и перебирай в цикле.
Отредактировано 15.02.2019 14:41 RushDevion . Предыдущая версия .
Re[4]: Нужен совет и MaxDegreeOfParallelism
От: Sharov Россия  
Дата: 15.02.19 15:13
Оценка:
Здравствуйте, RushDevion, Вы писали:

S>>.net 4.5.2

RD>Ну заменить MemoryBuffer на MemoryStream.

S>>Увы, штука сильно динамическая, поскольку в процессе обработки будут использоваться\подгружаться другие файлв в кол-ве от 3 до 8, и все это будет конвертироваться в соотв. базу данных для обработки.

S>>Отсюда возможно всплески до 1ГБ и более. Тут трудно что-либо предсказать. Просто есть вероятность, что если файл >30Мб то возможно отъесть до 1Гб ОЗУ. А может и нет. Поэтму не подходит.
RD>Ну все равно я бы думал в сторону контролёра памяти, который следил бы за объемом доступной памяти и разрешал-запрещал операцию с файлом.

Интересная идея, единственная проблема в том, что если криво реализовать, файл может вообще никогда не обработаться. Гипотетически, если нагурзка большая. Но это не мой случай.


S>>Меня больше интересуют гарантии, что при MaxDegreeOfParallelism = 1 точно будет один поток, а то выяснится, что это так, пожелание, которое планировщик вполне может игнорировать.

RD>Так а зачем тебе Parallel для MaxDegreeOfParallelism=1? Просто запускай отдельный поток/таску и перебирай в цикле.

А зачем таски и возиться с ними вручную, когда можно все эти делигировать среде? Ну т.е. гоняю Parallel для каждой группы и в ус не дую. К тому же при большом кол-ве маленьких файлов число
потоков может быть большим => еще нагрузка на память, а так я отдаю все на откуп среде. Таски мне тут вообще не помогут, а кода для работы с ними больше.
Кодом людям нужно помогать!
Re[5]: Нужен совет и MaxDegreeOfParallelism
От: RushDevion Россия  
Дата: 15.02.19 18:26
Оценка:
S>А зачем таски и возиться с ними вручную, когда можно все эти делигировать среде? Ну т.е. гоняю Parallel для каждой группы и в ус не дую. К тому же при большом кол-ве маленьких файлов число
S>потоков может быть большим => еще нагрузка на память, а так я отдаю все на откуп среде. Таски мне тут вообще не помогут, а кода для работы с ними больше.

Я имею ввиду, что DoP=1 — это не параллельность, это один поток: либо тот, который вызвал Parallel.For/ForEach, либо отдельный в виде LongRunning-таски.
Более того, я подозреваю, что в Parallel.For/Each такая оптимизация уже сделана, т.е. если задать DoP=1, он ничего параллелить не станет, а просто прокрутит обычный цикл в том же потоке.
Отредактировано 15.02.2019 18:38 RushDevion . Предыдущая версия . Еще …
Отредактировано 15.02.2019 18:33 RushDevion . Предыдущая версия .
Re: Нужен совет и MaxDegreeOfParallelism
От: artelk  
Дата: 15.02.19 19:54
Оценка:
Здравствуйте, Sharov, Вы писали:

S> В случае если файл большой (по меркам задачи, разумеется), то обработка 30Мб может оттяпать под 1Гб ОЗУ.

Я правильно понимаю, обработка файла — это некая байтомолотилка и исновное время тратится CPU, а не на чтение из файла, вызовов сервисов и т.п., и задача в том, чтобы уменьшить суммарное время обработки, так? В таком случае, думаю, параллелизм должен быть равен числу ядер.

S>ЗЫ: Все цифры выше предварительны, т.к. обработка на всех ядкрах (8 на моей машине) 10Мб файлов может вполен вырубить приложение по памяти, а может и нет (надо экспериментировать). Может стоит 10Мб уменьшить до 5Мб. И т.д.

Можено управлять порядком обработки файлов, чтобы суммарный размер файлов, обрабатываемый одновременно, не превышал некую величину.
Отредактировано 15.02.2019 19:59 artelk . Предыдущая версия .
Re: Нужен совет и MaxDegreeOfParallelism
От: TK Лес кывт.рф
Дата: 15.02.19 20:15
Оценка: 9 (2)
Здравствуйте, Sharov, Вы писали:

S>Может тут можно как-то изящнее поступить и не городить свои классы для группировки файлов для обработки? Стоит ли для этой задачи связываться с TaskScheduler'ом?


Используйте очередь. Например:

var largeBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 20, BoundedCapacity = 20 });
var mediumBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity = 5 });
var smallBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });

var bufferBlock = new BufferBlock<T>();
bufferBlock.LinkTo(smallBlock, x => x.Size < 1000);
bufferBlock.LinkTo(mediumBlock, x => x.Size < 100000);
bufferBlock.LinkTo(largeBlock);

bufferBlock.Post(xxx);

Мелкие сообщения будут обрабатываться по 20 за раз, средние по 5 и остальные по одному
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Re: Нужен совет и MaxDegreeOfParallelism
От: VladCore  
Дата: 15.02.19 21:12
Оценка: +2
Здравствуйте, Sharov, Вы писали:

Юзай producer-consumer очередь и добавляй в очередь файлы, размер которых меньше СВОБОДНОЙ памяти деленной на N. Если в очереди пусто, то добавляй любой файл в очередь.

Иначе на ПК с небольшим объемом памяти будет памяти не хватать. а на ПК с большим объемом памяти будут простаивать цпу/память
Re: Нужен совет и MaxDegreeOfParallelism
От: takTak  
Дата: 15.02.19 22:09
Оценка: 3 (1)
недавно натолкнулся на System.IO.Pipelines
если бы мне надо было бы что-то с файлами делать я бы постарался к этому новому проeкту прикрутить, вот тут какая-то демка для файлов,
https://github.com/dotnet/corefxlab/tree/97ae47716ed5b25fbe3188429d433914d4550c16/src/System.IO.Pipelines.File
правда, это с .NET 4.6


что же касается твоей проблемы, то вот тебе пара идей подумать :

1) ограничение параллелизма с помощью SemaphoreSlim
https://github.com/danielmarbach/Await.HeadExplosion/blob/4a418d2fa34a7cdedf0ad392109b73816ce449ce/Presentation/ConcurrencyLimit.cs
2) использование ValueTask вместо Task, где это возможно; для ранних версий .NET nuget packages должны быть


а вот тут хороший курс: https://www.pluralsight.com/courses/async-parallel-app-design
Re: Нужен совет и MaxDegreeOfParallelism
От: Ночной Смотрящий Россия  
Дата: 16.02.19 16:02
Оценка:
Здравствуйте, Sharov, Вы писали:

У тебя тут смешение понятий. Распараллеливание нужно для использования ресурсов CPU, а ты делишь на пулы по расходу памяти. В результате в твоей схеме и от OOM не застрахован, и CPU может простаивать.
Тебе надо просто перед выделением больших кусков памяти проверять сколько ее доступно, и если понятно что ее может не хватить — тормозить конкретную задачу до тех пор пока память не появится.
Главное тут — придумать логику, страхующую от дедлоков.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[5]: Нужен совет и MaxDegreeOfParallelism
От: Sharowarsheg  
Дата: 16.02.19 16:20
Оценка:
Здравствуйте, Sharov, Вы писали:


S>А зачем таски и возиться с ними вручную, когда можно все эти делигировать среде? Ну т.е. гоняю Parallel для каждой группы и в ус не дую.


Где нужна производительность, не получится ничего делегировать. Всё придется рано или поздно сделать самому, и со всем вручную возиться. Так что можно начинать сразу, если действительно нужна производительность. А так, вот хороший ход, правда с дедлоками — http://rsdn.org/forum/dotnet/7375950
Автор: Sharov
Дата: 15.02.19
Re[2]: Нужен совет и MaxDegreeOfParallelism
От: Sharov Россия  
Дата: 17.02.19 11:58
Оценка:
Здравствуйте, artelk, Вы писали:

A>Я правильно понимаю, обработка файла — это некая байтомолотилка и исновное время тратится CPU, а не на чтение из файла, вызовов сервисов и т.п., и задача в том, чтобы уменьшить суммарное время обработки, так? В таком случае, думаю, параллелизм должен быть равен числу ядер.


Абсолютно. Грубо говоря идет расчет по МНК (матрицы оборачиваются и т.д.). Время обработки уменьшается за счет Parallel.ForEach. Но проблемв в том, что некотоыре файлы могут отъесть >1Гб и свалить процесс. Таки файлы хочется запускать отдельно (по одному или двум) от остальных. Для небольших файлы параллелизм числу ядер и равен -- MaxDegree = -1.

A>Можено управлять порядком обработки файлов, чтобы суммарный размер файлов, обрабатываемый одновременно, не превышал некую величину.


Я сейчас так и делаю -- разбил файлы на 3 группы (см. исходное сообщение).
Кодом людям нужно помогать!
Re[2]: Нужен совет и MaxDegreeOfParallelism
От: Sharov Россия  
Дата: 17.02.19 12:02
Оценка:
Здравствуйте, TK, Вы писали:

TK>Здравствуйте, Sharov, Вы писали:


S>>Может тут можно как-то изящнее поступить и не городить свои классы для группировки файлов для обработки? Стоит ли для этой задачи связываться с TaskScheduler'ом?


TK>Используйте очередь. Например:


TK>var largeBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 20, BoundedCapacity = 20 });

TK>var mediumBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity = 5 });
TK>var smallBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });

TK>var bufferBlock = new BufferBlock<T>();

TK>bufferBlock.LinkTo(smallBlock, x => x.Size < 1000);
TK>bufferBlock.LinkTo(mediumBlock, x => x.Size < 100000);
TK>bufferBlock.LinkTo(largeBlock);

TK>bufferBlock.Post(xxx);


TK>Мелкие сообщения будут обрабатываться по 20 за раз, средние по 5 и остальные по одному


Ровно так и делаю, только без ActionBlock -- сначала запускаю все мелкие файлы(если есть), затем отедльно чуть более крупные (MaxDegreeOfParallelism = 4) и уже совсем отдельно крупные файлы.
Кодом людям нужно помогать!
Re[2]: Нужен совет и MaxDegreeOfParallelism
От: Sharov Россия  
Дата: 17.02.19 12:06
Оценка:
Здравствуйте, VladCore, Вы писали:

VC>Здравствуйте, Sharov, Вы писали:


VC>Юзай producer-consumer очередь и добавляй в очередь файлы, размер которых меньше СВОБОДНОЙ памяти деленной на N. Если в очереди пусто, то добавляй любой файл в очередь.


VC>Иначе на ПК с небольшим объемом памяти будет памяти не хватать. а на ПК с большим объемом памяти будут простаивать цпу/память


Согласен, по-хорошему эвристика для запуска файлов должна учитывать объем достуной памяти. Т.е. для крупных файлов MaxDegree это не прсто 1, а некая ф-ия от кол-ва ОЗУ.
Кодом людям нужно помогать!
Re[2]: Нужен совет и MaxDegreeOfParallelism
От: Sharov Россия  
Дата: 17.02.19 12:20
Оценка:
Здравствуйте, Ночной Смотрящий, Вы писали:

НС>Здравствуйте, Sharov, Вы писали:


НС>У тебя тут смешение понятий. Распараллеливание нужно для использования ресурсов CPU, а ты делишь на пулы по расходу памяти. В результате в твоей схеме и от OOM не застрахован, и CPU может простаивать.


Дельное замечание, но в моем случае не совсем верное. Я действительно борюсь ООМ, и готов пожертвовать пропускной способностью CPU( хотя и нет). Каждая обработка у файла у меня отдельно параллелится, и потребляем все использующиеся CPU. Как такового простоя CPU в пакетной обработке не будет, а вот OOM ловил регулярно.

НС>Тебе надо просто перед выделением больших кусков памяти проверять сколько ее доступно, и если понятно что ее может не хватить — тормозить конкретную задачу до тех пор пока память не появится.


Это безусловно грамотный подход, но как это реализовать в условиях многопоточности, когда задачи запускается динамически? Ну рассчитал я, что памяти хватит,
а в это время еще пяток задач запускается.. Проще честно запускать по 1-2 задаче отдельно от всех.
Кодом людям нужно помогать!
Re[6]: Нужен совет и MaxDegreeOfParallelism
От: Sharov Россия  
Дата: 17.02.19 12:21
Оценка:
Здравствуйте, Sharowarsheg, Вы писали:

S> А так, вот хороший ход, правда с дедлоками — http://rsdn.org/forum/dotnet/7375950
Автор: Sharov
Дата: 15.02.19


Где дедлоки?
Кодом людям нужно помогать!
Re[7]: Нужен совет и MaxDegreeOfParallelism
От: Sharowarsheg  
Дата: 17.02.19 14:20
Оценка:
Здравствуйте, Sharov, Вы писали:

S>Здравствуйте, Sharowarsheg, Вы писали:


S>> А так, вот хороший ход, правда с дедлоками — http://rsdn.org/forum/dotnet/7375950
Автор: Sharov
Дата: 15.02.19


S>Где дедлоки?


Когда задач назапускали, исходя из того, что они маленькие, а внезапно они все выросли, и ни одна не может получить больше памяти.
Re[8]: Нужен совет и MaxDegreeOfParallelism
От: Sharov Россия  
Дата: 17.02.19 15:14
Оценка:
Здравствуйте, Sharowarsheg, Вы писали:

S>>Где дедлоки?


S>Когда задач назапускали, исходя из того, что они маленькие, а внезапно они все выросли, и ни одна не может получить больше памяти.


Ну да, гипотетически если запускать задачи в зависимости от наличия памяти. Это не deadlock, это livelock.
Кодом людям нужно помогать!
Re[3]: Нужен совет и MaxDegreeOfParallelism
От: TK Лес кывт.рф
Дата: 17.02.19 17:57
Оценка:
Здравствуйте, Sharov, Вы писали:

TK>>bufferBlock.Post(xxx);

TK>>Мелкие сообщения будут обрабатываться по 20 за раз, средние по 5 и остальные по одному

S>Ровно так и делаю, только без ActionBlock -- сначала запускаю все мелкие файлы(если есть), затем отедльно чуть более крупные (MaxDegreeOfParallelism = 4) и уже совсем отдельно крупные файлы.


Здесь можно пихать сразу все без предварительной сортировки
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.