Здравствуйте, alexsoff, Вы писали:
A> Приветствую. A>Вопрос по архитектуре и грамотному использованию ресурсов. A>В общем схема такая – необходимо собирать операции (предположим из базы или очереди) и дальше их исполнять. Одновременно можно исполнить не более 50-100 операций (больше нельзя) A>Есть два варианта. A>1)Создать поток для сбора операций new Thread и после собранные операции исполнять (обычно 2-5 сек) добавляя их в ThreadPool A>2) Создать 50-100 потоков и пускай постоянно слушают базу или очередь и исполняют по одной операции. A>Что с точки зрения системы будет правильней и меньше требовательно к ресурсам (дешевле)?
Потоки — это легаси в .NET. Начиная с того, что имеем дело с так называемым managed thread вместо реального потока исполнения, и заканчивая современными Фреймворками, где даже айдишник треда узнать — и то не всегда разрешено.
Так что, Task — наше всё (ContinueWith, async/await и прочее). Таски можно создавать вложенной связкой главный-дочерние, можно легко канселить, не надо думать о том как планировщик кванты тредам раздаст.
A>Вот именно, я всегда считал чем меньше потоков тем у нас быстрее работает система в целом т.к. большое количество потоков очень дорого обслуживать. Разве это не так?
Разумеется работа планировщика потоков ОС вносит какой-то оверхед (поддержка очереди потоков, переключение контекстов и т.п.).
Но типичный поток — это не только вычисления, но еще и операции ввода-вывода.
Поток ждущий ввода-вывода не тратит процессорное время, а, значит, планировщик может отдать его другим потокам.
Таким образом снижается латентность выполнения каждой конкретной операции.
И возвращаясь к задаче.
Я бы сделал так.
Один поток, который зачитывает данные, раздает задачи исполнителям и контролирует количество in-progress задач.
Что-то типа такого:
private readonly ManualResetEvent m_StopEvent = new ManualResetEvent(false);
private int m_PendingWorkItems;
private const int MAX_WORK_ITEMS = 100;
public void WorkerRoutine()
{
while (m_StopEvent.WaitOne(TimeSpan.FromSeconds(5))) // Sleep 5 seconds
{
var pendingWorkItems = Volatile.Read(ref m_PendingWorkItems);
if (pendingWorkItems > MAX_WORK_ITEMS) continue;
var workItems = m_DataProducer.GetWorkItems(MAX_WORK_ITEMS - pendingWorkItems);
foreach (var workItem in workItems)
{
var item = workItem;
Interlocked.Increment(ref m_PendingWorkItems);
Task.Factory.StartNew(() =>
{
try
{
ProcessWorkItem(item);
}
finally
{
Interlocked.Decrement(ref m_PendingWorkItems)
}
});
}
}
MD>>Так что, Task — наше всё (ContinueWith, async/await и прочее). Таски можно создавать вложенной связкой главный-дочерние, можно легко канселить, не надо думать о том как планировщик кванты тредам раздаст. A>нуу, Task с временем жизни приложения это разве не изврат? не правильней ли создать поток отдельный?
Да, в общем-то, однофигственно.
Для таски с флагом LongRunning дефолтный шедулер все равно создаст отдельный поток.
Здравствуйте, alexsoff, Вы писали:
A> Приветствую. A>Вопрос по архитектуре и грамотному использованию ресурсов.
Я делаю как можно проще и смотрю как оно работает: http://rsdn.org/forum/dotnet/6436611.1
смотри AsyncProducerConsumerCollection и метод Add
Что бы обойтись без объектов синхронизации можно сделать 2 очереди
1. Очередь задач на выполнение, где можно регулировать в зависимости от максимальной длины очереди
2. Очередь свободны задач.
for(var i=1; i<MaxCountTask; i++)
queueFreeTask.Add(1);
while(true)
{
Action act= await queue1.Take(); // возьмем метод на выполнение из первой очереди
/// Посмотрим есть ли свободные задачиvar i= await queueFreeTask.Take();
Task.Run(() => act(); queueFreeTask.Add(1); ); // выполнимм задачу и добавим признак свободной задачи в очередь
}
Писал без студии
и солнце б утром не вставало, когда бы не было меня
A>Although this is undocumented, if you start a Task with TaskCreationOptions.LongRunning then a new Thread will be started to run the Task.
A>так что смысла в таске нет и в недрах все равно создается выделенный поток new Thread.
Смысл есть в том, что бы не смешивать потоки и задачи. Обычно LongRunning необходим для майнеров и прочих использующие цпу на 100%
В большинстве же случаев внутри LongRunning ты так же можешь использовать таски асинхронно.
и солнце б утром не вставало, когда бы не было меня
Здравствуйте, alexsoff, Вы писали:
A> Приветствую. A>Вопрос по архитектуре и грамотному использованию ресурсов. A>В общем схема такая – необходимо собирать операции (предположим из базы или очереди) и дальше их исполнять. Одновременно можно исполнить не более 50-100 операций (больше нельзя) A>Есть два варианта. A>1)Создать поток для сбора операций new Thread и после собранные операции исполнять (обычно 2-5 сек) добавляя их в ThreadPool A>2) Создать 50-100 потоков и пускай постоянно слушают базу или очередь и исполняют по одной операции. A>Что с точки зрения системы будет правильней и меньше требовательно к ресурсам (дешевле)?
Я бы копал в сторону TPL (ActionBlock какой-нибудь, например)
100 потоков больше времени между собой за процессор драться будут, нежели полезную работу выполнять.
Здравствуйте, alexsoff, Вы писали:
A>Здравствуйте, Mr.Delphist, Вы писали:
MD>>Так что, Task — наше всё (ContinueWith, async/await и прочее). Таски можно создавать вложенной связкой главный-дочерние, можно легко канселить, не надо думать о том как планировщик кванты тредам раздаст. A>нуу, Task с временем жизни приложения это разве не изврат? не правильней ли создать поток отдельный?
для этого существует TaskCreationOptions.LongRunning http://qaru.site/questions/120236/taskcreationoptionslongrunning-option-and-threadpool
и солнце б утром не вставало, когда бы не было меня
RD>>Например, если данных для обработки пока что нет — наши 50-100 потоков тупо ждут на мониторе. A>Эм почему? Так если потоки из ThreadPool (Task.Factory.StartNew) не используются большое количество времени, тогда он вроде их освобождает?
Откуда такая информация?
Тред-пул устроен как.
Есть work queue, куда QueueUserWorkItem помещает задачу. Это, кстати, managed штука, т.е. она на стороне CLR живет.
И есть (условно) массив предсозданных потоков. Он уже внутрях виртуальной машины .NET.
Поток берет задачу из очереди и начинает ее выполнять.
Доходит до semaphore.Wait() и встает в состояние WaitSleepJoin.
В этом состоянии он так и будет торчать, пока semaphore не просигналит.
Приветствую.
Вопрос по архитектуре и грамотному использованию ресурсов.
В общем схема такая – необходимо собирать операции (предположим из базы или очереди) и дальше их исполнять. Одновременно можно исполнить не более 50-100 операций (больше нельзя)
Есть два варианта.
1)Создать поток для сбора операций new Thread и после собранные операции исполнять (обычно 2-5 сек) добавляя их в ThreadPool
2) Создать 50-100 потоков и пускай постоянно слушают базу или очередь и исполняют по одной операции.
Что с точки зрения системы будет правильней и меньше требовательно к ресурсам (дешевле)?
Во-первых "база или очередь" — это серьезное различие.
50-100 потоков (не важно честных или Task'ов) могут сравнительно дешево слушать очередь (e.g. тот же RabbitMQ), а вот если они будут полить базу — это уже нехилая нагрузка получается.
Далее, многое зависит от интерфейса продьюсера данных.
Может ли он отдавать данные пачкой, или только поштучно?
Есть ли связь по данным (e.g. результат обработки первой операции нужен для следущей) или все операции независимые?
Есть ли блокировки при чтении (т.е. блокирует ли читающий consumer остальных) и насколько они серьезные?
Нужно ли репортить/квитировать результат обработки?
Время обработки порции данных примерно одинаково или может прыгать в ощутимых пределах?
Если прыгает, то важно ли соблюсти равномерность загрузки consumer'ов, т.е. если один закончил работу раньше остальных — его надо
нагружать сразу или можно подождать пока закончат остальные?
50-100 операций — это что, лимит producer'a (т.е. источник может выдавать "не более чем" в единицу времени)
или consumer'ов (т.е. не должно быть более 50-100 одновременно обрабатываемых задач)?
Здравствуйте, RushDevion, Вы писали:
RD>Во-первых "база или очередь" — это серьезное различие.
В основном очередь, иногда база. RD>50-100 потоков (не важно честных или Task'ов) могут сравнительно дешево слушать очередь (e.g. тот же RabbitMQ), а вот если они будут полить базу — это уже нехилая нагрузка получается.
Вот именно, я всегда считал чем меньше потоков тем у нас быстрее работает система в целом т.к. большое количество потоков очень дорого обслуживать. Разве это не так?
RD>Далее, многое зависит от интерфейса продьюсера данных. RD>Может ли он отдавать данные пачкой, или только поштучно?
Логически поштучно, а так в реализации есть Prefetch (судя по исходникам провайдера к очереди), так что за 2-3 сообщениям я иду за несколько тактов процессора (замерял StopWatch).
RD>Есть ли связь по данным (e.g. результат обработки первой операции нужен для следущей) или все операции независимые?
независимые (логически зависимые, но после обработки идут в другие задания в базу/очередь) RD>Есть ли блокировки при чтении (т.е. блокирует ли читающий consumer остальных) и насколько они серьезные?
Конечно. RD>Нужно ли репортить/квитировать результат обработки?
Нет. RD>Время обработки порции данных примерно одинаково или может прыгать в ощутимых пределах?
99% время 2-5 секунд. RD>Если прыгает, то важно ли соблюсти равномерность загрузки consumer'ов, т.е. если один закончил работу раньше остальных — его надо RD>нагружать сразу или можно подождать пока закончат остальные?
Равномерность загрузки достигается ограничением максимального количества на процесс обрабатываемых сообщений. RD>50-100 операций — это что, лимит producer'a (т.е. источник может выдавать "не более чем" в единицу времени) RD>или consumer'ов (т.е. не должно быть более 50-100 одновременно обрабатываемых задач)?
Одновременно обрабатываемых задач.
MD>Так что, Task — наше всё (ContinueWith, async/await и прочее). Таски можно создавать вложенной связкой главный-дочерние, можно легко канселить, не надо думать о том как планировщик кванты тредам раздаст.
нуу, Task с временем жизни приложения это разве не изврат? не правильней ли создать поток отдельный?
Здравствуйте, RushDevion, Вы писали:
RD>И возвращаясь к задаче. RD>Я бы сделал так. RD>[skip]
отлично, хоть с кем-то мысли сходятся, я сделал примерно такое же, правда на семафорах у них там есть счетчики — очень удобно. т.к. операций не больше 100 и длительностью 2-5 секунд, потерями на вход в режим ядра (ИМХО) можно пренебречь.
Реализацию бы, конечно, посмотреть.
Я так понимаю, либо для каждого work item'a ранится отдельный поток (или таска), который пытается захватить семафор перед началом обработки.
Либо есть 50 потоков + 1 producer + общая очередь готовых к обработке задач.
Других вариантов чот в голову не приходит.
Тогда в первом случае может получить напрасно простаивающие потоки, если продьюсер генерит данные быстрее, чем их успевают обработать.
А во втором случае получаем оверхед на синхранизацию доступа к очереди.
Здравствуйте, alexsoff, Вы писали:
A>нуу, Task с временем жизни приложения это разве не изврат? не правильней ли создать поток отдельный?
Понятие "правильно" тут не совсем корректно. Вас же не беспокоит, как именно делается обнуление переменной: копированием нуля в регистр или XOR AX,AX? Хотя на асме пишучи, я ставил xor потому что по тактам быстрее выходило.
Ну и это, кто мешает вместо таска-долгожителя сделать поколениями, когда короткоживущий таск перед смертью шедулит новую задачу на исполнение.
Здравствуйте, RushDevion, Вы писали:
RD>Реализацию бы, конечно, посмотреть. RD>Я так понимаю, либо для каждого work item'a ранится отдельный поток (или таска), который пытается захватить семафор перед началом обработки.
Да, этот вариант верный.
RD> в первом случае может получить напрасно простаивающие потоки, если продьюсер генерит данные быстрее, чем их успевают обработать.
около 90% случаев данные обрабатываются быстрее, чем их генерят.
RD>>Реализацию бы, конечно, посмотреть. RD>>Я так понимаю, либо для каждого work item'a ранится отдельный поток (или таска), который пытается захватить семафор перед началом обработки. A>Да, этот вариант верный. RD>> в первом случае может получить напрасно простаивающие потоки, если продьюсер генерит данные быстрее, чем их успевают обработать. A>около 90% случаев данные обрабатываются быстрее, чем их генерят.
Тоже рабочий вариант, но как говориться, есть нюансы
Во-первых, рабочие потоки какое-то время могут простаивать.
Например, если данных для обработки пока что нет — наши 50-100 потоков тупо ждут на мониторе.
А могли бы делать что-то полезное
Во-вторых, если говорить об эффективности, то оверхед на захват монитора — это ~ 250ms. А Interlocked-операция ~10ns.
Здравствуйте, RushDevion, Вы писали: RD>Например, если данных для обработки пока что нет — наши 50-100 потоков тупо ждут на мониторе.
Эм почему? Так если потоки из ThreadPool (Task.Factory.StartNew) не используются большое количество времени, тогда он вроде их освобождает?
RD>Тред-пул устроен как. RD>Доходит до semaphore.Wait() и встает в состояние WaitSleepJoin.
Я о случае когда задач на обработку нет и в semaphore.Wait не вызывается.
Т.е. все созданные задачи через StartNew завершились и в очереди нет новых задач. Логично же предположить, что сработает механизм "сборки" долговременно не активных потоков.
Вот нашел подтверждение своих слов с msdn
Beginning with the net_v40_short, the thread pool creates and destroys worker threads in order to optimize throughput, which is defined as the number of tasks that complete per unit of time
RD>>Доходит до semaphore.Wait() и встает в состояние WaitSleepJoin. A>Я о случае когда задач на обработку нет и в semaphore.Wait не вызывается. A>Т.е. все созданные задачи через StartNew завершились и в очереди нет новых задач. Логично же предположить, что сработает механизм "сборки" долговременно не активных потоков.
Так и есть. Если случился переизбыток потоков, то ThreadPool подчищает лишние (по крайней мере в .NET 4 и старше).
Но для этого поток должен быть неактивен (Suspended).
А в твоем случае каждый из 50-100 потоков по циклу либо процесит очередной work item (Running), либо ждет на семафоре (WaitSleepJoin).
Ну либо я не вполне уловил детали твоей реализации
Здравствуйте, RushDevion, Вы писали:
RD>Ну либо я не вполне уловил детали твоей реализации
Одним потоком через new Thread живущий на протяжении работы приложения бегаем по базе/очереди и получаем данные для дальнейшей обработки.
Далее через Task.Factory.StartNew запускаем таски для выполнения полезных действий.
Лимитируем все это семафором
A>Одним потоком через new Thread живущий на протяжении работы приложения бегаем по базе/очереди и получаем данные для дальнейшей обработки. A>Далее через Task.Factory.StartNew запускаем таски для выполнения полезных действий. A>Лимитируем все это семафором
Если да, то скорость генерации данных как-то лимитируется?
Потому что если нет, есть риск получить классический over-subscription:
данных поступает все больше и больше (при фиксированной скорости потребления).
ThredPool начинает наращивать количество тредов. А данных все больше и больше
Пул растет как бешеный. В итоге все время тратится на переключение контекстов вместо реальной работы.
плюс еще счетчик и буферизация(x*2) заданий. Если счетчик превысит 2x, читающий/создающий новые задачи поток блокируется, до того, как будет буфер опустошен до половины.
RD>Если да, то скорость генерации данных как-то лимитируется?
лимитируется как раз скорость потребления.
Здравствуйте, alexsoff, Вы писали:
A>Здравствуйте, RushDevion, Вы писали:
RD>>Ну либо я не вполне уловил детали твоей реализации A>Одним потоком через new Thread живущий на протяжении работы приложения бегаем по базе/очереди и получаем данные для дальнейшей обработки. A>Далее через Task.Factory.StartNew запускаем таски для выполнения полезных действий. A>Лимитируем все это семафором
А почему не используется готовый ActionBlock из пакета Tpl.Dataflow?
Тот, кто бегает по очереди/базе просто создаёт ActionBlock с соответствующими MaxDegreeOfParallelism и BoundedCapacity и помещает элементы в очередь.
Какой-то недостаток имеется или просто самому тасками рулить привычнее?
Здравствуйте, AndrewVK, Вы писали: AVK>А зачем тебе такой таск? БД IOCP не поддерживает?
Операции/данные из разных источников — одни поддерживают, другие нет. Увы, мир не идеален
Здравствуйте, alexsoff, Вы писали:
AVK>>А зачем тебе такой таск? БД IOCP не поддерживает? A>Операции/данные из разных источников — одни поддерживают, другие нет. Увы, мир не идеален
Все источники внутри компа в итоге сводятся к внешним интерфейсам — сетевые карты и носители в основном. А они все умеют IOCP.
... << RSDN@Home 1.0.0 alpha 5 rev. 0 on Windows 8 6.2.9200.0>>