Общий поток для всех обработок
От: RomikB  
Дата: 26.10.12 10:04
Оценка:
Есть WPF приложение, которое обменивается данными в сети используя класс System.Net.Sockets.TcpClient, получая от него данные асинхронно в отдельнок потоке. Так же есть некоторое количество таймеров System.Threading.Timer которые генярят события (судя по документации все они работают последовательно в неком отдельном потоке). Отрисовка GUI тоже работает отдельно в своем потоке.

Необходимо реализовать отдельный поток в который будут складываться задания по мере срабатывания таймера или прихода данных из сети и их дальнейшая последовательная обработка в этом потоке. Есть ли какой нибудь готовый системный механизм для этого в .NET или прийдется писать что то свое?
... << RSDN@Home 1.2.0 alpha 5 rev. 1539>>
Re: Общий поток для всех обработок
От: Gremlin2 http://www.fb2library.net/
Дата: 26.10.12 10:28
Оценка:
Здравствуйте, RomikB, Вы писали:

RB>Есть WPF приложение, которое обменивается данными в сети используя класс System.Net.Sockets.TcpClient, получая от него данные асинхронно в отдельнок потоке. Так же есть некоторое количество таймеров System.Threading.Timer которые генярят события (судя по документации все они работают последовательно в неком отдельном потоке). Отрисовка GUI тоже работает отдельно в своем потоке.


RB>Необходимо реализовать отдельный поток в который будут складываться задания по мере срабатывания таймера или прихода данных из сети и их дальнейшая последовательная обработка в этом потоке. Есть ли какой нибудь готовый системный механизм для этого в .NET или прийдется писать что то свое?


В отдельном потоке или в потоке UI? Если в потоке UI, то есть Dispatcher. Если в отдельном, то элементарно реализуется при помощи BlockingCollection и Task:

            BlockingCollection<AsyncMessage> queue = new BlockingCollection<AsyncMessage>();

            Task.Factory.StartNew(() =>
            {
                try
                {
                    while (true)
                    {
                        AsyncMessage message = queue.Take();
                        // TODO: ...
                    }
                }
                catch (InvalidOperationException)
                {
                }
            });

            modalQueue.Add(new AsyncMessage());
            modalQueue.Add(new AsyncMessage());
            modalQueue.Add(new AsyncMessage());
            modalQueue.CompleteAdding();
Re: Общий поток для всех обработок
От: Sinix  
Дата: 26.10.12 10:48
Оценка:
Здравствуйте, RomikB, Вы писали:

RB>Необходимо реализовать отдельный поток в который будут складываться задания по мере срабатывания таймера или прихода данных из сети и их дальнейшая последовательная обработка в этом потоке. Есть ли какой нибудь готовый системный механизм для этого в .NET или прийдется писать что то свое?


Можно своё — фоновый поток, в нём цикл, который выбирает записи из BlockingCollection.GetConsumingEnumerable(). Теоретически можно наваять всю сборку/обработку на RX, но я не уверен что оно здесь действительно нужно.
Re[2]: Общий поток для всех обработок
От: Sinix  
Дата: 26.10.12 10:51
Оценка:
Здравствуйте, Gremlin2, Вы писали:

G>В отдельном потоке или в потоке UI? Если в потоке UI, то есть Dispatcher. Если в отдельном, то элементарно реализуется при помощи BlockingCollection и Task:

Только while (!cancellationToken.IsCancellationRequested).
Re[2]: Общий поток для всех обработок
От: RomikB  
Дата: 26.10.12 10:52
Оценка:
Здравствуйте, Gremlin2, Вы писали:

RB>>Необходимо реализовать отдельный поток в который будут складываться задания по мере срабатывания таймера или прихода данных из сети и их дальнейшая последовательная обработка в этом потоке. Есть ли какой нибудь готовый системный механизм для этого в .NET или прийдется писать что то свое?


G>В отдельном потоке или в потоке UI? Если в потоке UI, то есть Dispatcher. Если в отдельном, то элементарно реализуется при помощи BlockingCollection и Task:


Спасибо. Очень похоже на то что нужно.
А есть что то для .NET 3.5 (BlockingCollection в нем отсуствует)?
... << RSDN@Home 1.2.0 alpha 5 rev. 1539>>
Re[3]: Общий поток для всех обработок
От: Gremlin2 http://www.fb2library.net/
Дата: 26.10.12 11:03
Оценка: +1
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, Gremlin2, Вы писали:


G>>В отдельном потоке или в потоке UI? Если в потоке UI, то есть Dispatcher. Если в отдельном, то элементарно реализуется при помощи BlockingCollection и Task:

S>Только while (!cancellationToken.IsCancellationRequested).
Можно, но тогда уж и Take(cancellationToken). Но может работать и так: вызов CompleteAdding должен прерывать Take с InvalidOperationException.
Re[3]: Общий поток для всех обработок
От: Gremlin2 http://www.fb2library.net/
Дата: 26.10.12 11:10
Оценка: +1
Здравствуйте, RomikB, Вы писали:

RB>Здравствуйте, Gremlin2, Вы писали:


RB>Спасибо. Очень похоже на то что нужно.

RB>А есть что то для .NET 3.5 (BlockingCollection в нем отсуствует)?

Reactive Extensions for .NET 3.5SP1 включает в себя TPL для .NET 3.5
Re[2]: Общий поток для всех обработок
От: SHEMA  
Дата: 26.10.12 18:40
Оценка:
Здравствуйте, Gremlin2, Вы писали:

G>В отдельном потоке или в потоке UI? Если в потоке UI, то есть Dispatcher. Если в отдельном, то элементарно реализуется при помощи BlockingCollection и Task:

G>
G>            BlockingCollection<AsyncMessage> queue = new BlockingCollection<AsyncMessage>();
G>            Task.Factory.StartNew(() =>
G>            {
G>                try
G>                {
G>                    while (true)
G>                    {
G>                        AsyncMessage message = queue.Take();
G>                        // TODO: ...
G>                    }
G>                }
G>                catch (InvalidOperationException)
G>                {
G>                }
G>            });

G>            modalQueue.Add(new AsyncMessage());
G>            modalQueue.Add(new AsyncMessage());
G>            modalQueue.Add(new AsyncMessage());
G>            modalQueue.CompleteAdding();
G>


А как бы Вы посоветовали реализовать обработку тасков в несколько потоков с динамической нагрузкой, т.е. когда число потоков-обработчиков должно автоматически увеличиваться или уменьшаться (в пределах конфигурации min / max) в зависимости от числа тасков в очереди.

Вопрос наверно сводится к проблеме реализации динамического пула потоков.
Google подсказывает только вроде поделок типа Smart pool, но тащить такое в продакшен... вообщем не могу решиться.

Поделитесь опытом, коллеги
Re[3]: Общий поток для всех обработок
От: Sinix  
Дата: 27.10.12 07:21
Оценка: 4 (1) +1
Здравствуйте, SHEMA, Вы писали:


SHE>А как бы Вы посоветовали реализовать обработку тасков в несколько потоков с динамической нагрузкой, т.е. когда число потоков-обработчиков должно автоматически увеличиваться или уменьшаться (в пределах конфигурации min / max) в зависимости от числа тасков в очереди.


Если есть возможность — не лазить под капот и оставить на откуп дефолтному шедулеру. Для Parallel.For/PLINQ можно указать ParallelOptions.MaxDegreesOfParallelism. Не подходит — любой из вариантов отсюда (кроме SetMaxThreads конечно).

Если совсем припёрло — можно глянуть здесь и здесь (source code — ParallelExtensionExtras/TaskSchedulers).
Re[4]: Общий поток для всех обработок
От: SHEMA  
Дата: 27.10.12 12:28
Оценка: 22 (1) +1
Здравствуйте, Sinix, Вы писали:

SHE>>А как бы Вы посоветовали реализовать обработку тасков в несколько потоков с динамической нагрузкой, т.е. когда число потоков-обработчиков должно автоматически увеличиваться или уменьшаться (в пределах конфигурации min / max) в зависимости от числа тасков в очереди.


S>Если есть возможность — не лазить под капот и оставить на откуп дефолтному шедулеру. Для Parallel.For/PLINQ можно указать ParallelOptions.MaxDegreesOfParallelism. Не подходит — любой из вариантов отсюда (кроме SetMaxThreads конечно).


S>Если совсем припёрло — можно глянуть здесь и здесь (source code — ParallelExtensionExtras/TaskSchedulers).


Спасибо за наводку, посмотрел все.

С использованием BlockingCollection и Parallel.For/PLINQ не все так гладко, если вкратце то проблема в том что и тот и другой ориентированы на многопоточность и оба используют локи и синхронизацию внутри, не зная друг о друге, что во-первых оверхед, а во-вторых вылезает в виде глюков, например когда в очереди остаются необработанные таски. Там выходит какая-то накладка с локами из-за дефолтного поведения Partitioner-а, который обрабатывает данные порциями, и нужно писать свой. Детальное описание проблемы и решение здесь, в упомянутой вами ParallelExtensionExtras библиотеке.
Другое возможное решение — использование новой опции (.Net 4.5) EnumerablePartitionerOptions.NoBuffering, нашел здесь.
Там же наткнулся на описание TPL DataFlow — похоже то что мне нужно, по описанию настоящая бомба
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.