Никак не въеду в неUIшное использование асунков. Вот к примеру, как написать аналог параллельной обработки некоторых данных на асунках.
ПРИМЕР 1.
Вот, такая штуковиночка. Не совсем то же самое, что и второй пример, т.к. нет отдельного треда для получения данных, но зато однострочник.
Parallel.ForEach(ProduceInput(), new ParallelOptions() { MaxDegreeOfParallelism = 4 }, Process);
ПРИМЕР 2.
Полноценный пример на System.Threading.Tasks.Dataflow, тут уже один тред получает данные, а два обрабатывают.
var queue = new BufferBlock<int>(new DataflowBlockOptions(){ BoundedCapacity = 4 })
var produce = Task.Run(() => ProduceInput());
var opts = new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4, BoundedCapacity = 4 };
var consume = new ActionBlock<string>(input => Process(input), opts);
queue.LinkTo(consume, new DataflowLinkOptions(){ PropagateCompletion = true });
Task.WaitAll(produce, consume.Completion);
Попытался нарисовать это на асунках. Вроде всё хорошо, но оно отжирает гигабайты памяти, в связи с тем, что количество обработчиков не ограничено.
Погуглил, похоже для лечения прожорливости этого дела предназначен класс SemaphoreSlim.
Но вот как его вкорячить таким образом, что бы оно * Работало. * Было удобно. * Студия не ругалась на извращения в коде.
class Program
{
static void Main(string[] args)
{
try
{
//var throttle = new SemaphoreSlim(1, 4);for (; ; )
{
Process();
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
Console.ReadKey();
}
private static async void Process()
{
var input = await Receive();
var result = await Compute(input);
await Log(input, result);
}
private static async Task<int> Receive()
{
var r = new Random();
await Task.Delay(100);
return r.Next(0,100);
}
private static async Task<int> Compute(int a)
{
await Task.Delay(700);
return 17*a;
}
private static Task Log(int a, int r)
{
return Task.Run(() => Console.WriteLine("{0,4} | {1} => {2}", Thread.CurrentThread.ManagedThreadId, a, r));
}
}
Здравствуйте, UberPsychoSvin, Вы писали:
UPS>Никак не въеду в неUIшное использование асунков. Вот к примеру, как написать аналог параллельной обработки некоторых данных на асунках.
Но зачем? Чем Parallel.ForEach плох-то?
Не, если хочется неизведанного — вариантов полно. Async streams будут только в восьмом шарпе, но можно поискать костыльные решения аля https://github.com/tyrotoxin/AsyncEnumerable , можно использовать Rx, можно помучать TPL DataFlow, можно просто группировать последовательность пачками по несколько сотен элементов.
Re: НеUIёвая асинхронная очередь + красивый троттлинг
class Program
{
static SemaphoreSlim throttle = new SemaphoreSlim(1, 4);static void Main(string[] args)
{
try
{
for (; ; )
{
throttle.Wait();
Process();
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
Console.ReadKey();
}
private static async void Process()
{
var input = await Receive();
throttle.Release(); // Если нужно троттлить только Receive, то можно сделать Release() здесь, ... var result = await Compute(input);
await Log(input, result);
// throttle.Release(); // ... а можно здесь - если нужно троттлить всю цепочку
}
private static async Task<int> Receive()
{
var r = new Random();
await Task.Delay(100);
return r.Next(0,100);
}
private static async Task<int> Compute(int a)
{
await Task.Delay(700);
return 17* a;
}
private static Task Log(int a, int r)
{
return Task.Run(() => Console.WriteLine("{0,4} | {1} => {2}", Thread.CurrentThread.ManagedThreadId, a, r));
}
}
Здравствуйте, Sinix, Вы писали: S>Но зачем?
Пока не очень понял. Хочу в фишки асинков въехать, не зря же их выдумали.
Вроде с асинками тредов ждунов меньше должно быть, если правильно всё реализовано на низком уровне.
Здравствуйте, seregaa, Вы писали: S>В простейшем случае так:
Так не работает. Не пашет параллельная обработка.
Re[3]: НеUIёвая асинхронная очередь + красивый троттлинг
Здравствуйте, UberPsychoSvin, Вы писали:
UPS>Пока не очень понял. Хочу в фишки асинков въехать, не зря же их выдумали.
Их для другого выдумали — для возможности прерывать (и возобновлять) выполнение метода без блокирующих ожиданий
в помощь
UPS>Вроде с асинками тредов ждунов меньше должно быть, если правильно всё реализовано на низком уровне.
Ну а в Parallel.For где ждуны-то? Куда уж меньше
Re[4]: НеUIёвая асинхронная очередь + красивый троттлинг
Здравствуйте, Sinix, Вы писали: S>Их для другого выдумали — для возможности прерывать (и возобновлять) выполнение метода без блокирующих ожиданий
Да про UI всё ясно. Просто меньше писать Dispatcher.BeginInvoke.
Но их же не чисто ради него придумали.
S>Ну а в Parallel.For где ждуны-то? Куда уж меньше
Если Receive — синхронный, то у меня есть тред который ждёт пока Receive что-то не вернёт.
Т.е. в случае с Parallel.ForEach, ждуном является тред обработчик, который запрашивает следующий элемент в IEnumerable<Input>
IEnumerable<Input> EnumerateInput()
{
for(;;)
yield return Receive() // Тред ждун ждёт когда дёргает вот это
}
Parallel.ForEach(EnumerateInput, Process);
Здравствуйте, UberPsychoSvin, Вы писали:
UPS>Да про UI всё ясно. Просто меньше писать Dispatcher.BeginInvoke. UPS>Но их же не чисто ради него придумали.
Плюс IO. Плюс возможность предоставить поток другому коду на время ожидания какого-либо ресурса.
S>>Ну а в Parallel.For где ждуны-то? Куда уж меньше UPS>Если Receive — синхронный, то у меня есть тред который ждёт пока Receive что-то не вернёт.
А. Ну тогда RX / TPL DataFlow в помощь. Оба — далеко не идеал по удобству, но лучше ничего на сегодня нет. Ну, или троттлинг поверх SemaphoreSlim.
S>Не, если хочется неизведанного — вариантов полно. Async streams будут только в восьмом шарпе, но можно поискать костыльные решения аля https://github.com/tyrotoxin/AsyncEnumerable , можно использовать Rx, можно помучать TPL DataFlow, можно просто группировать последовательность пачками по несколько сотен элементов.
А откуда информация про стримы в восьмом шарпе? Роадмапа не нашел.
По сабжу можно еще Akka Streams .Net посмотреть, там backpressure по уму вроде сделано. Но это только если не смущает, что порт Akka Streams на .net пока в бете.
у них там очередная маленькая революция с переездом спек по шарпу в свой репозиторий, в процессе наполнения.
Роадмапа пока нет + всегда есть шанс, что фича переедет на релиз-другой.
v6>По сабжу можно еще Akka Streams .Net посмотреть, там backpressure по уму вроде сделано. Но это только если не смущает, что порт Akka Streams на .net пока в бете.
Akka тут совсем перебор, как мне кажется. DataFlow / RX — езй куда ни шло.
Re: НеUIёвая асинхронная очередь + красивый троттлинг
Здравствуйте, UberPsychoSvin, Вы писали:
M>>Эх... Конфиг.сус... Детство... UPS>Эээ?
В MS DOS был конфигурационный файл CONFIG.SYS. На русслише произносился как КОНФИГ.СУС
Здравствуйте, Mihas, Вы писали:
M>Здравствуйте, UberPsychoSvin, Вы писали:
M>>>Эх... Конфиг.сус... Детство... UPS>>Эээ? M>В MS DOS был конфигурационный файл CONFIG.SYS. На русслише произносился как КОНФИГ.СУС
А асунки тут при чём? Дос у меня только в колледже на информатике был. Мы учились создавать в нём папки и файлы. Получалось не у всех.
Здравствуйте, Mr.Delphist, Вы писали:
MD>>>В MS DOS был конфигурационный файл CONFIG.SYS. На русслише произносился как КОНФИГ.СУС UPS>>А асунки тут при чём? MD>Потому что они "асинки"
Здравствуйте, Sinix, Вы писали:
S>Здравствуйте, Mr.Delphist, Вы писали:
MD>>>>В MS DOS был конфигурационный файл CONFIG.SYS. На русслише произносился как КОНФИГ.СУС UPS>>>А асунки тут при чём? MD>>Потому что они "асинки"
S>На этом вечер психоделики предлагаю закрыть
Ааа, вы про мой акцент. А я думал про асинхронные вызовы под MSDOS.
Так то асинки и до шарпа появились. Я так понял, что в шарпе обёртка над винапишными интерфейсами. Наверное уже давно можно было писать асинхронные программы. Но походу эта возможность не очень широко применялась, т.к. я нормальных HOWTO'шек не нашёл.
Здравствуйте, UberPsychoSvin, Вы писали:
UPS>Я так понял, что в шарпе обёртка над винапишными интерфейсами.
Неа, async-await в шарпе — это способ добавить программирование на продолжениях (continuation-passing style) в язык.
Здравствуйте, Sinix, Вы писали:
S>Здравствуйте, Mr.Delphist, Вы писа
S>На этом вечер психоделики предлагаю закрыть
Ща расскажу историю — и закончим.
Где-то 1992-1993 год, сидим мы с товарищем, делаем какую-то лабу,
редактируем CONFIG.SYS, т.к. памяти как-то не хватило.
Заменяем DEVICE на DEVICEHIGH, и тут сзади голос какой-то девушки
— а что это за такое — девицехич?