Есть WPF приложение, которое обменивается данными в сети используя класс System.Net.Sockets.TcpClient, получая от него данные асинхронно в отдельнок потоке. Так же есть некоторое количество таймеров System.Threading.Timer которые генярят события (судя по документации все они работают последовательно в неком отдельном потоке). Отрисовка GUI тоже работает отдельно в своем потоке.
Необходимо реализовать отдельный поток в который будут складываться задания по мере срабатывания таймера или прихода данных из сети и их дальнейшая последовательная обработка в этом потоке. Есть ли какой нибудь готовый системный механизм для этого в .NET или прийдется писать что то свое?
Здравствуйте, RomikB, Вы писали:
RB>Есть WPF приложение, которое обменивается данными в сети используя класс System.Net.Sockets.TcpClient, получая от него данные асинхронно в отдельнок потоке. Так же есть некоторое количество таймеров System.Threading.Timer которые генярят события (судя по документации все они работают последовательно в неком отдельном потоке). Отрисовка GUI тоже работает отдельно в своем потоке.
RB>Необходимо реализовать отдельный поток в который будут складываться задания по мере срабатывания таймера или прихода данных из сети и их дальнейшая последовательная обработка в этом потоке. Есть ли какой нибудь готовый системный механизм для этого в .NET или прийдется писать что то свое?
В отдельном потоке или в потоке UI? Если в потоке UI, то есть Dispatcher. Если в отдельном, то элементарно реализуется при помощи BlockingCollection и Task:
Здравствуйте, RomikB, Вы писали:
RB>Необходимо реализовать отдельный поток в который будут складываться задания по мере срабатывания таймера или прихода данных из сети и их дальнейшая последовательная обработка в этом потоке. Есть ли какой нибудь готовый системный механизм для этого в .NET или прийдется писать что то свое?
Можно своё — фоновый поток, в нём цикл, который выбирает записи из BlockingCollection.GetConsumingEnumerable(). Теоретически можно наваять всю сборку/обработку на RX, но я не уверен что оно здесь действительно нужно.
Здравствуйте, Gremlin2, Вы писали:
G>В отдельном потоке или в потоке UI? Если в потоке UI, то есть Dispatcher. Если в отдельном, то элементарно реализуется при помощи BlockingCollection и Task:
Только while (!cancellationToken.IsCancellationRequested).
Здравствуйте, Gremlin2, Вы писали:
RB>>Необходимо реализовать отдельный поток в который будут складываться задания по мере срабатывания таймера или прихода данных из сети и их дальнейшая последовательная обработка в этом потоке. Есть ли какой нибудь готовый системный механизм для этого в .NET или прийдется писать что то свое?
G>В отдельном потоке или в потоке UI? Если в потоке UI, то есть Dispatcher. Если в отдельном, то элементарно реализуется при помощи BlockingCollection и Task:
Спасибо. Очень похоже на то что нужно.
А есть что то для .NET 3.5 (BlockingCollection в нем отсуствует)?
Здравствуйте, Sinix, Вы писали:
S>Здравствуйте, Gremlin2, Вы писали:
G>>В отдельном потоке или в потоке UI? Если в потоке UI, то есть Dispatcher. Если в отдельном, то элементарно реализуется при помощи BlockingCollection и Task: S>Только while (!cancellationToken.IsCancellationRequested).
Можно, но тогда уж и Take(cancellationToken). Но может работать и так: вызов CompleteAdding должен прерывать Take с InvalidOperationException.
Здравствуйте, RomikB, Вы писали:
RB>Здравствуйте, Gremlin2, Вы писали:
RB>Спасибо. Очень похоже на то что нужно. RB>А есть что то для .NET 3.5 (BlockingCollection в нем отсуствует)?
Здравствуйте, Gremlin2, Вы писали:
G>В отдельном потоке или в потоке UI? Если в потоке UI, то есть Dispatcher. Если в отдельном, то элементарно реализуется при помощи BlockingCollection и Task: G>
А как бы Вы посоветовали реализовать обработку тасков в несколько потоков с динамической нагрузкой, т.е. когда число потоков-обработчиков должно автоматически увеличиваться или уменьшаться (в пределах конфигурации min / max) в зависимости от числа тасков в очереди.
Вопрос наверно сводится к проблеме реализации динамического пула потоков.
Google подсказывает только вроде поделок типа Smart pool, но тащить такое в продакшен... вообщем не могу решиться.
SHE>А как бы Вы посоветовали реализовать обработку тасков в несколько потоков с динамической нагрузкой, т.е. когда число потоков-обработчиков должно автоматически увеличиваться или уменьшаться (в пределах конфигурации min / max) в зависимости от числа тасков в очереди.
Если есть возможность — не лазить под капот и оставить на откуп дефолтному шедулеру. Для Parallel.For/PLINQ можно указать ParallelOptions.MaxDegreesOfParallelism. Не подходит — любой из вариантов отсюда (кроме SetMaxThreads конечно).
Если совсем припёрло — можно глянуть здесь и здесь (source code — ParallelExtensionExtras/TaskSchedulers).
Здравствуйте, Sinix, Вы писали:
SHE>>А как бы Вы посоветовали реализовать обработку тасков в несколько потоков с динамической нагрузкой, т.е. когда число потоков-обработчиков должно автоматически увеличиваться или уменьшаться (в пределах конфигурации min / max) в зависимости от числа тасков в очереди.
S>Если есть возможность — не лазить под капот и оставить на откуп дефолтному шедулеру. Для Parallel.For/PLINQ можно указать ParallelOptions.MaxDegreesOfParallelism. Не подходит — любой из вариантов отсюда (кроме SetMaxThreads конечно).
S>Если совсем припёрло — можно глянуть здесь и здесь (source code — ParallelExtensionExtras/TaskSchedulers).
Спасибо за наводку, посмотрел все.
С использованием BlockingCollection и Parallel.For/PLINQ не все так гладко, если вкратце то проблема в том что и тот и другой ориентированы на многопоточность и оба используют локи и синхронизацию внутри, не зная друг о друге, что во-первых оверхед, а во-вторых вылезает в виде глюков, например когда в очереди остаются необработанные таски. Там выходит какая-то накладка с локами из-за дефолтного поведения Partitioner-а, который обрабатывает данные порциями, и нужно писать свой. Детальное описание проблемы и решение здесь, в упомянутой вами ParallelExtensionExtras библиотеке.
Другое возможное решение — использование новой опции (.Net 4.5) EnumerablePartitionerOptions.NoBuffering, нашел здесь.
Там же наткнулся на описание TPL DataFlow — похоже то что мне нужно, по описанию настоящая бомба