Имеется группа файлов разного размера, от 100кб до 100мб. Я обрабатываю эти файлы параллельно, но у меня могут возникнуть проблемы с памятью, если я буду обрабатывать слишком много файлов одновроменно или несколько больших файлов (~30Мб). В случае если файл большой (по меркам задачи, разумеется), то обработка 30Мб может оттяпать под 1Гб ОЗУ. Далее, я разбиваю файлы на три группы: 1-ая все файлы <= 10Мб, 2-ая от 10-30Мб, 3-ая от 30Мб.
Я куждую группу запускаю в Parallel.Foreach с MaxDegreeOfParallelism = -1 для 1-ой группы, 4 для 2-ой и 1 для 3-й группы. Т.е. 4 файла >30Мб будут обрабатываться друг за дружкой, по одному в любой момент времени.
Т.е. как-то так
foreach (var processingStrategy in strategy) //strategy -- файлы и соотв. MaxDegreeOfParallelism
{
Parallel.ForEach(processingStrategyю.Files, new ParallelOptions {MaxDegreeOfParallelism = processingStrategy .ParallelismDegree}
...
}
Думал использовать Partitioner, но енто не совсем то, он просто разбивает входной диапазон, а мне нужна
скорее группировка (GroupBy) по размеру файла.
Может тут можно как-то изящнее поступить и не городить свои классы для группировки файлов для обработки? Стоит ли для этой задачи связываться с TaskScheduler'ом?
ЗЫ: Все цифры выше предварительны, т.к. обработка на всех ядкрах (8 на моей машине) 10Мб файлов может вполен вырубить приложение по памяти, а может и нет (надо экспериментировать). Может стоит 10Мб уменьшить до 5Мб. И т.д.
Реализацию пока оставлю за скобками, т.к. возможны разные варианты как с предварительной аллокацией, так и динамическим отслеживанием размера свободной памяти.
Далее у нас есть очередь файлов для обработки.
Алгоритм будет таким:
1. Берем очередной файл из очереди
2. Запрашиваем из пула буфер достаточного размера для чтения/обработки файла
3.1 Если получили буфер — стартуем новую таску для обработки. По окончанию обработки таска возвращает буфер в пул.
3.2 Если буфера нет — помещаем файл в конец очереди
4. Опционально. Засыпаем на какое-то время или до получения сигнала об окончании обработки очередного файла.
5. Повторяем пока очередь не закончится.
Здравствуйте, RushDevion, Вы писали:
RD>Если цель — максимизировать throughput, то можно попробовать другой подход.
Parallel для ентого и используется, но цель сейчас скорее чтобы работало стабильно и не падало при нехватки памяти. Не более.
RD>При старте создаем пул буферов, с примерно таким интерфейсом:
RD>IMemoryPool RD> MemoryBuffer Acquire(long bufferSize); RD> void Release(Buffer buffer);
.net 4.5.2
RD>Реализацию пока оставлю за скобками, т.к. возможны разные варианты как с предварительной аллокацией, так и динамическим отслеживанием размера свободной памяти. RD>Далее у нас есть очередь файлов для обработки. RD>Алгоритм будет таким: RD>1. Берем очередной файл из очереди RD>2. Запрашиваем из пула буфер достаточного размера для чтения/обработки файла
Увы, штука сильно динамическая, поскольку в процессе обработки будут использоваться\подгружаться другие файлв в кол-ве от 3 до 8, и все это будет конвертироваться в соотв. базу данных для обработки.
Отсюда возможно всплески до 1ГБ и более. Тут трудно что-либо предсказать. Просто есть вероятность, что если файл >30Мб то возможно отъесть до 1Гб ОЗУ. А может и нет. Поэтму не подходит.
RD>3.1 Если получили буфер — стартуем новую таску для обработки. По окончанию обработки таска возвращает буфер в пул. RD>3.2 Если буфера нет — помещаем файл в конец очереди RD>4. Опционально. Засыпаем на какое-то время или до получения сигнала об окончании обработки очередного файла. RD>5. Повторяем пока очередь не закончится.
Меня больше интересуют гарантии, что при MaxDegreeOfParallelism = 1 точно будет один поток, а то выяснится, что это так, пожелание, которое планировщик вполне может игнорировать.
S>.net 4.5.2
Ну заменить MemoryBuffer на MemoryStream.
S>Увы, штука сильно динамическая, поскольку в процессе обработки будут использоваться\подгружаться другие файлв в кол-ве от 3 до 8, и все это будет конвертироваться в соотв. базу данных для обработки. S>Отсюда возможно всплески до 1ГБ и более. Тут трудно что-либо предсказать. Просто есть вероятность, что если файл >30Мб то возможно отъесть до 1Гб ОЗУ. А может и нет. Поэтму не подходит.
Ну все равно я бы думал в сторону контролёра памяти, который следил бы за объемом доступной памяти и разрешал-запрещал операцию с файлом.
S>Меня больше интересуют гарантии, что при MaxDegreeOfParallelism = 1 точно будет один поток, а то выяснится, что это так, пожелание, которое планировщик вполне может игнорировать.
Так а зачем тебе Parallel для MaxDegreeOfParallelism=1? Просто запускай отдельный поток/таску и перебирай в цикле.
Здравствуйте, RushDevion, Вы писали:
S>>.net 4.5.2 RD>Ну заменить MemoryBuffer на MemoryStream.
S>>Увы, штука сильно динамическая, поскольку в процессе обработки будут использоваться\подгружаться другие файлв в кол-ве от 3 до 8, и все это будет конвертироваться в соотв. базу данных для обработки. S>>Отсюда возможно всплески до 1ГБ и более. Тут трудно что-либо предсказать. Просто есть вероятность, что если файл >30Мб то возможно отъесть до 1Гб ОЗУ. А может и нет. Поэтму не подходит. RD>Ну все равно я бы думал в сторону контролёра памяти, который следил бы за объемом доступной памяти и разрешал-запрещал операцию с файлом.
Интересная идея, единственная проблема в том, что если криво реализовать, файл может вообще никогда не обработаться. Гипотетически, если нагурзка большая. Но это не мой случай.
S>>Меня больше интересуют гарантии, что при MaxDegreeOfParallelism = 1 точно будет один поток, а то выяснится, что это так, пожелание, которое планировщик вполне может игнорировать. RD>Так а зачем тебе Parallel для MaxDegreeOfParallelism=1? Просто запускай отдельный поток/таску и перебирай в цикле.
А зачем таски и возиться с ними вручную, когда можно все эти делигировать среде? Ну т.е. гоняю Parallel для каждой группы и в ус не дую. К тому же при большом кол-ве маленьких файлов число
потоков может быть большим => еще нагрузка на память, а так я отдаю все на откуп среде. Таски мне тут вообще не помогут, а кода для работы с ними больше.
S>А зачем таски и возиться с ними вручную, когда можно все эти делигировать среде? Ну т.е. гоняю Parallel для каждой группы и в ус не дую. К тому же при большом кол-ве маленьких файлов число S>потоков может быть большим => еще нагрузка на память, а так я отдаю все на откуп среде. Таски мне тут вообще не помогут, а кода для работы с ними больше.
Я имею ввиду, что DoP=1 — это не параллельность, это один поток: либо тот, который вызвал Parallel.For/ForEach, либо отдельный в виде LongRunning-таски.
Более того, я подозреваю, что в Parallel.For/Each такая оптимизация уже сделана, т.е. если задать DoP=1, он ничего параллелить не станет, а просто прокрутит обычный цикл в том же потоке.
Здравствуйте, Sharov, Вы писали:
S> В случае если файл большой (по меркам задачи, разумеется), то обработка 30Мб может оттяпать под 1Гб ОЗУ.
Я правильно понимаю, обработка файла — это некая байтомолотилка и исновное время тратится CPU, а не на чтение из файла, вызовов сервисов и т.п., и задача в том, чтобы уменьшить суммарное время обработки, так? В таком случае, думаю, параллелизм должен быть равен числу ядер.
S>ЗЫ: Все цифры выше предварительны, т.к. обработка на всех ядкрах (8 на моей машине) 10Мб файлов может вполен вырубить приложение по памяти, а может и нет (надо экспериментировать). Может стоит 10Мб уменьшить до 5Мб. И т.д.
Можено управлять порядком обработки файлов, чтобы суммарный размер файлов, обрабатываемый одновременно, не превышал некую величину.
Здравствуйте, Sharov, Вы писали:
S>Может тут можно как-то изящнее поступить и не городить свои классы для группировки файлов для обработки? Стоит ли для этой задачи связываться с TaskScheduler'ом?
Используйте очередь. Например:
var largeBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 20, BoundedCapacity = 20 });
var mediumBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity = 5 });
var smallBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
var bufferBlock = new BufferBlock<T>();
bufferBlock.LinkTo(smallBlock, x => x.Size < 1000);
bufferBlock.LinkTo(mediumBlock, x => x.Size < 100000);
bufferBlock.LinkTo(largeBlock);
bufferBlock.Post(xxx);
Мелкие сообщения будут обрабатываться по 20 за раз, средние по 5 и остальные по одному
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Юзай producer-consumer очередь и добавляй в очередь файлы, размер которых меньше СВОБОДНОЙ памяти деленной на N. Если в очереди пусто, то добавляй любой файл в очередь.
Иначе на ПК с небольшим объемом памяти будет памяти не хватать. а на ПК с большим объемом памяти будут простаивать цпу/память
У тебя тут смешение понятий. Распараллеливание нужно для использования ресурсов CPU, а ты делишь на пулы по расходу памяти. В результате в твоей схеме и от OOM не застрахован, и CPU может простаивать.
Тебе надо просто перед выделением больших кусков памяти проверять сколько ее доступно, и если понятно что ее может не хватить — тормозить конкретную задачу до тех пор пока память не появится.
Главное тут — придумать логику, страхующую от дедлоков.
S>А зачем таски и возиться с ними вручную, когда можно все эти делигировать среде? Ну т.е. гоняю Parallel для каждой группы и в ус не дую.
Где нужна производительность, не получится ничего делегировать. Всё придется рано или поздно сделать самому, и со всем вручную возиться. Так что можно начинать сразу, если действительно нужна производительность. А так, вот хороший ход, правда с дедлоками — http://rsdn.org/forum/dotnet/7375950
Здравствуйте, artelk, Вы писали:
A>Я правильно понимаю, обработка файла — это некая байтомолотилка и исновное время тратится CPU, а не на чтение из файла, вызовов сервисов и т.п., и задача в том, чтобы уменьшить суммарное время обработки, так? В таком случае, думаю, параллелизм должен быть равен числу ядер.
Абсолютно. Грубо говоря идет расчет по МНК (матрицы оборачиваются и т.д.). Время обработки уменьшается за счет Parallel.ForEach. Но проблемв в том, что некотоыре файлы могут отъесть >1Гб и свалить процесс. Таки файлы хочется запускать отдельно (по одному или двум) от остальных. Для небольших файлы параллелизм числу ядер и равен -- MaxDegree = -1.
A>Можено управлять порядком обработки файлов, чтобы суммарный размер файлов, обрабатываемый одновременно, не превышал некую величину.
Я сейчас так и делаю -- разбил файлы на 3 группы (см. исходное сообщение).
Здравствуйте, TK, Вы писали:
TK>Здравствуйте, Sharov, Вы писали:
S>>Может тут можно как-то изящнее поступить и не городить свои классы для группировки файлов для обработки? Стоит ли для этой задачи связываться с TaskScheduler'ом?
TK>Используйте очередь. Например:
TK>var largeBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 20, BoundedCapacity = 20 }); TK>var mediumBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity = 5 }); TK>var smallBlock = new ActionBlock<T>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
TK>var bufferBlock = new BufferBlock<T>(); TK>bufferBlock.LinkTo(smallBlock, x => x.Size < 1000); TK>bufferBlock.LinkTo(mediumBlock, x => x.Size < 100000); TK>bufferBlock.LinkTo(largeBlock);
TK>bufferBlock.Post(xxx);
TK>Мелкие сообщения будут обрабатываться по 20 за раз, средние по 5 и остальные по одному
Ровно так и делаю, только без ActionBlock -- сначала запускаю все мелкие файлы(если есть), затем отедльно чуть более крупные (MaxDegreeOfParallelism = 4) и уже совсем отдельно крупные файлы.
Здравствуйте, VladCore, Вы писали:
VC>Здравствуйте, Sharov, Вы писали:
VC>Юзай producer-consumer очередь и добавляй в очередь файлы, размер которых меньше СВОБОДНОЙ памяти деленной на N. Если в очереди пусто, то добавляй любой файл в очередь.
VC>Иначе на ПК с небольшим объемом памяти будет памяти не хватать. а на ПК с большим объемом памяти будут простаивать цпу/память
Согласен, по-хорошему эвристика для запуска файлов должна учитывать объем достуной памяти. Т.е. для крупных файлов MaxDegree это не прсто 1, а некая ф-ия от кол-ва ОЗУ.
Здравствуйте, Ночной Смотрящий, Вы писали:
НС>Здравствуйте, Sharov, Вы писали:
НС>У тебя тут смешение понятий. Распараллеливание нужно для использования ресурсов CPU, а ты делишь на пулы по расходу памяти. В результате в твоей схеме и от OOM не застрахован, и CPU может простаивать.
Дельное замечание, но в моем случае не совсем верное. Я действительно борюсь ООМ, и готов пожертвовать пропускной способностью CPU( хотя и нет). Каждая обработка у файла у меня отдельно параллелится, и потребляем все использующиеся CPU. Как такового простоя CPU в пакетной обработке не будет, а вот OOM ловил регулярно.
НС>Тебе надо просто перед выделением больших кусков памяти проверять сколько ее доступно, и если понятно что ее может не хватить — тормозить конкретную задачу до тех пор пока память не появится.
Это безусловно грамотный подход, но как это реализовать в условиях многопоточности, когда задачи запускается динамически? Ну рассчитал я, что памяти хватит,
а в это время еще пяток задач запускается.. Проще честно запускать по 1-2 задаче отдельно от всех.
Здравствуйте, Sharov, Вы писали:
S>Здравствуйте, Sharowarsheg, Вы писали:
S>> А так, вот хороший ход, правда с дедлоками — http://rsdn.org/forum/dotnet/7375950
Здравствуйте, Sharowarsheg, Вы писали:
S>>Где дедлоки?
S>Когда задач назапускали, исходя из того, что они маленькие, а внезапно они все выросли, и ни одна не может получить больше памяти.
Ну да, гипотетически если запускать задачи в зависимости от наличия памяти. Это не deadlock, это livelock.
Здравствуйте, Sharov, Вы писали:
TK>>bufferBlock.Post(xxx); TK>>Мелкие сообщения будут обрабатываться по 20 за раз, средние по 5 и остальные по одному
S>Ровно так и делаю, только без ActionBlock -- сначала запускаю все мелкие файлы(если есть), затем отедльно чуть более крупные (MaxDegreeOfParallelism = 4) и уже совсем отдельно крупные файлы.
Здесь можно пихать сразу все без предварительной сортировки
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.