НеUIёвая асинхронная очередь + красивый троттлинг
От: UberPsychoSvin  
Дата: 06.02.17 14:52
Оценка: 25 (1)
Никак не въеду в неUIшное использование асунков. Вот к примеру, как написать аналог параллельной обработки некоторых данных на асунках.

ПРИМЕР 1.
Вот, такая штуковиночка. Не совсем то же самое, что и второй пример, т.к. нет отдельного треда для получения данных, но зато однострочник.
Parallel.ForEach(ProduceInput(), new ParallelOptions() { MaxDegreeOfParallelism = 4 }, Process);


ПРИМЕР 2.
Полноценный пример на System.Threading.Tasks.Dataflow, тут уже один тред получает данные, а два обрабатывают.
var queue = new BufferBlock<int>(new DataflowBlockOptions(){ BoundedCapacity = 4 })

var produce = Task.Run(() => ProduceInput());

var opts = new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4, BoundedCapacity = 4 };
var consume = new ActionBlock<string>(input => Process(input), opts);
queue.LinkTo(consume, new DataflowLinkOptions(){ PropagateCompletion = true });

Task.WaitAll(produce, consume.Completion);


Попытался нарисовать это на асунках. Вроде всё хорошо, но оно отжирает гигабайты памяти, в связи с тем, что количество обработчиков не ограничено.
Погуглил, похоже для лечения прожорливости этого дела предназначен класс SemaphoreSlim.

Но вот как его вкорячить таким образом, что бы оно
* Работало.
* Было удобно.
* Студия не ругалась на извращения в коде.

class Program
{
    static void Main(string[] args)
    {
        try
        {
            //var throttle = new SemaphoreSlim(1, 4);
            
            for (; ; )
            {
                Process();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        Console.ReadKey();
    }

    private static async void Process()
    {
        var input = await Receive();
        var result = await Compute(input);
        await Log(input, result);
    }

    private static async Task<int> Receive()
    {
        var r = new Random();
        await Task.Delay(100);
        return r.Next(0,100);
    }

    private static async Task<int> Compute(int a)
    {
        await Task.Delay(700);
        return 17*a;
    }

    private static Task Log(int a, int r)
    {
        return Task.Run(() => Console.WriteLine("{0,4} | {1} => {2}", Thread.CurrentThread.ManagedThreadId, a, r));
    }
}
Отредактировано 06.02.2017 20:34 AndrewVK . Предыдущая версия .
async
Re: НеUIёвая асинхронная очередь + красивый троттлинг
От: Sinix  
Дата: 06.02.17 16:10
Оценка: 3 (1)
Здравствуйте, UberPsychoSvin, Вы писали:

UPS>Никак не въеду в неUIшное использование асунков. Вот к примеру, как написать аналог параллельной обработки некоторых данных на асунках.


Но зачем? Чем Parallel.ForEach плох-то?

Не, если хочется неизведанного — вариантов полно. Async streams будут только в восьмом шарпе, но можно поискать костыльные решения аля https://github.com/tyrotoxin/AsyncEnumerable , можно использовать Rx, можно помучать TPL DataFlow, можно просто группировать последовательность пачками по несколько сотен элементов.
Re: НеUIёвая асинхронная очередь + красивый троттлинг
От: seregaa Ниоткуда http://blogtani.ru
Дата: 06.02.17 16:33
Оценка:
Здравствуйте, UberPsychoSvin, Вы писали:

В простейшем случае так:

class Program
{
        static SemaphoreSlim throttle = new SemaphoreSlim(1, 4);

    static void Main(string[] args)
    {
        try
        {
            for (; ; )
            {
                throttle.Wait();
                Process();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        Console.ReadKey();
    }

    private static async void Process()
    {
        var input = await Receive();
        throttle.Release(); // Если нужно троттлить только Receive, то можно сделать Release() здесь, ... 
        var result = await Compute(input);
        await Log(input, result);
        // throttle.Release(); // ... а можно здесь - если нужно троттлить всю цепочку
    }

    private static async Task<int> Receive()
    {
        var r = new Random();
        await Task.Delay(100);
        return r.Next(0,100);
    }

    private static async Task<int> Compute(int a)
    {
        await Task.Delay(700);
        return 17* a;
    }

    private static Task Log(int a, int r)
    {
        return Task.Run(() => Console.WriteLine("{0,4} | {1} => {2}", Thread.CurrentThread.ManagedThreadId, a, r));
    }
}
Мобильная версия сайта RSDN — http://rsdn.org/forum/rsdn/6938747
Автор: sergeya
Дата: 19.10.17
Re[2]: НеUIёвая асинхронная очередь + красивый троттлинг
От: UberPsychoSvin  
Дата: 07.02.17 06:46
Оценка:
Здравствуйте, Sinix, Вы писали:
S>Но зачем?
Пока не очень понял. Хочу в фишки асинков въехать, не зря же их выдумали.
Вроде с асинками тредов ждунов меньше должно быть, если правильно всё реализовано на низком уровне.

Здравствуйте, seregaa, Вы писали:
S>В простейшем случае так:
Так не работает. Не пашет параллельная обработка.
Re[3]: НеUIёвая асинхронная очередь + красивый троттлинг
От: Sinix  
Дата: 07.02.17 06:52
Оценка: 3 (1)
Здравствуйте, UberPsychoSvin, Вы писали:

UPS>Пока не очень понял. Хочу в фишки асинков въехать, не зря же их выдумали.

Их для другого выдумали — для возможности прерывать (и возобновлять) выполнение метода без блокирующих ожиданий

https://blogs.msdn.microsoft.com/benwilli/2015/09/10/tasks-are-still-not-threads-and-async-is-not-parallel/
и
https://blogs.msdn.microsoft.com/pfxteam/2012/04/12/asyncawait-faq/

в помощь

UPS>Вроде с асинками тредов ждунов меньше должно быть, если правильно всё реализовано на низком уровне.

Ну а в Parallel.For где ждуны-то? Куда уж меньше
Re[4]: НеUIёвая асинхронная очередь + красивый троттлинг
От: UberPsychoSvin  
Дата: 07.02.17 07:21
Оценка:
Здравствуйте, Sinix, Вы писали:
S>Их для другого выдумали — для возможности прерывать (и возобновлять) выполнение метода без блокирующих ожиданий
Да про UI всё ясно. Просто меньше писать Dispatcher.BeginInvoke.
Но их же не чисто ради него придумали.

S>Ну а в Parallel.For где ждуны-то? Куда уж меньше

Если Receive — синхронный, то у меня есть тред который ждёт пока Receive что-то не вернёт.
Т.е. в случае с Parallel.ForEach, ждуном является тред обработчик, который запрашивает следующий элемент в IEnumerable<Input>
IEnumerable<Input> EnumerateInput()
{
    for(;;)
        yield return Receive() // Тред ждун ждёт когда дёргает вот это
}

Parallel.ForEach(EnumerateInput, Process);


А если Receive честный асинхронный, то никакого треда не создаётся на момент ожидания данных. Когда появятся данные, тогда появится и тред, насколько я понял.
http://blog.stephencleary.com/2013/11/there-is-no-thread.html
Re[5]: НеUIёвая асинхронная очередь + красивый троттлинг
От: Sinix  
Дата: 07.02.17 07:47
Оценка:
Здравствуйте, UberPsychoSvin, Вы писали:

UPS>Да про UI всё ясно. Просто меньше писать Dispatcher.BeginInvoke.

UPS>Но их же не чисто ради него придумали.

Плюс IO. Плюс возможность предоставить поток другому коду на время ожидания какого-либо ресурса.



S>>Ну а в Parallel.For где ждуны-то? Куда уж меньше

UPS>Если Receive — синхронный, то у меня есть тред который ждёт пока Receive что-то не вернёт.
А. Ну тогда RX / TPL DataFlow в помощь. Оба — далеко не идеал по удобству, но лучше ничего на сегодня нет. Ну, или троттлинг поверх SemaphoreSlim.

Вообще, рекомендую шпаргалку Concurrency in C# Cookbook от Stephen Cleary.
Re[2]: НеUIёвая асинхронная очередь + красивый троттлинг
От: v6  
Дата: 17.02.17 08:14
Оценка:
Здравствуйте, Sinix, Вы писали:


S>Не, если хочется неизведанного — вариантов полно. Async streams будут только в восьмом шарпе, но можно поискать костыльные решения аля https://github.com/tyrotoxin/AsyncEnumerable , можно использовать Rx, можно помучать TPL DataFlow, можно просто группировать последовательность пачками по несколько сотен элементов.


А откуда информация про стримы в восьмом шарпе? Роадмапа не нашел.

По сабжу можно еще Akka Streams .Net посмотреть, там backpressure по уму вроде сделано. Но это только если не смущает, что порт Akka Streams на .net пока в бете.
Re: ОФФ
От: Mihas  
Дата: 17.02.17 08:23
Оценка:
Здравствуйте, UberPsychoSvin, Вы писали:

UPS>Попытался нарисовать это на асунках.

UPS>
UPS>    ... async ...
UPS>


Эх... Конфиг.сус... Детство...
Re[3]: НеUIёвая асинхронная очередь + красивый троттлинг
От: Sinix  
Дата: 17.02.17 08:33
Оценка: 1 (1)
Здравствуйте, v6, Вы писали:

v6>А откуда информация про стримы в восьмом шарпе? Роадмапа не нашел.

https://github.com/dotnet/csharplang/issues/43
https://github.com/dotnet/roslyn/issues/261
Спецификация будет вот тут
https://github.com/dotnet/csharplang/blob/master/proposals/async-streams.md

у них там очередная маленькая революция с переездом спек по шарпу в свой репозиторий, в процессе наполнения.
Роадмапа пока нет + всегда есть шанс, что фича переедет на релиз-другой.


v6>По сабжу можно еще Akka Streams .Net посмотреть, там backpressure по уму вроде сделано. Но это только если не смущает, что порт Akka Streams на .net пока в бете.

Akka тут совсем перебор, как мне кажется. DataFlow / RX — езй куда ни шло.
Re: НеUIёвая асинхронная очередь + красивый троттлинг
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 17.02.17 09:34
Оценка:
Здравствуйте, UberPsychoSvin, Вы писали:
Посмотри http://rsdn.org/forum/dotnet/6568604.1
Автор: Serginio1
Дата: 30.09.16
и солнце б утром не вставало, когда бы не было меня
Re[2]: ОФФ
От: UberPsychoSvin  
Дата: 20.02.17 08:02
Оценка:
Здравствуйте, Mihas, Вы писали:

M>Здравствуйте, UberPsychoSvin, Вы писали:


UPS>>Попытался нарисовать это на асунках.

UPS>>
UPS>>    ... async ...
UPS>>


M>Эх... Конфиг.сус... Детство...

Эээ?
Re[3]: ОФФ
От: Mihas  
Дата: 20.02.17 08:25
Оценка:
Здравствуйте, UberPsychoSvin, Вы писали:

M>>Эх... Конфиг.сус... Детство...

UPS>Эээ?
В MS DOS был конфигурационный файл CONFIG.SYS. На русслише произносился как КОНФИГ.СУС
Re[4]: ОФФ
От: UberPsychoSvin  
Дата: 20.02.17 11:40
Оценка:
Здравствуйте, Mihas, Вы писали:

M>Здравствуйте, UberPsychoSvin, Вы писали:


M>>>Эх... Конфиг.сус... Детство...

UPS>>Эээ?
M>В MS DOS был конфигурационный файл CONFIG.SYS. На русслише произносился как КОНФИГ.СУС
А асунки тут при чём? Дос у меня только в колледже на информатике был. Мы учились создавать в нём папки и файлы. Получалось не у всех.
Re[5]: ОФФ
От: Sinix  
Дата: 20.02.17 12:35
Оценка: :)
Здравствуйте, UberPsychoSvin, Вы писали:

UPS>Мы учились создавать в нём папки и файлы. Получалось не у всех.


Логично, папки в dos могут создать не только лишь все.
  спойлер!!!
У них там всё немного не так. Ну, те же папки, только они называют их "le directory"(с).
Re[5]: ОФФ
От: Mr.Delphist  
Дата: 20.02.17 13:27
Оценка:
Здравствуйте, UberPsychoSvin, Вы писали:

UPS>А асунки тут при чём?


Потому что они "асинки"
Re[6]: ОФФ
От: Sinix  
Дата: 20.02.17 13:38
Оценка:
Здравствуйте, Mr.Delphist, Вы писали:

MD>>>В MS DOS был конфигурационный файл CONFIG.SYS. На русслише произносился как КОНФИГ.СУС

UPS>>А асунки тут при чём?
MD>Потому что они "асинки"

На этом вечер психоделики предлагаю закрыть
…а кожура еще больше.
Re[7]: НЕОФФ
От: UberPsychoSvin  
Дата: 20.02.17 14:08
Оценка:
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, Mr.Delphist, Вы писали:


MD>>>>В MS DOS был конфигурационный файл CONFIG.SYS. На русслише произносился как КОНФИГ.СУС

UPS>>>А асунки тут при чём?
MD>>Потому что они "асинки"

S>На этом вечер психоделики предлагаю закрыть

Ааа, вы про мой акцент. А я думал про асинхронные вызовы под MSDOS.

Так то асинки и до шарпа появились. Я так понял, что в шарпе обёртка над винапишными интерфейсами. Наверное уже давно можно было писать асинхронные программы. Но походу эта возможность не очень широко применялась, т.к. я нормальных HOWTO'шек не нашёл.
Re[8]: НЕОФФ
От: Sinix  
Дата: 20.02.17 14:28
Оценка:
Здравствуйте, UberPsychoSvin, Вы писали:

UPS>Я так понял, что в шарпе обёртка над винапишными интерфейсами.

Неа, async-await в шарпе — это способ добавить программирование на продолжениях (continuation-passing style) в язык.

чтобы вместо
DownloadFile().AndThen(
    file => DoSomethingElseCalcSize(file).AndThen(
        size => AlertMe(file, size)
    )
);


можно было написать
var file = await DownloadFileAsync();
var size = await DoSomethingElseCalcSizeAsync(file);
await AlertMeAsync(file, size);
Re[7]: ОФФ
От: romangr Россия  
Дата: 20.02.17 14:56
Оценка: :)
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, Mr.Delphist, Вы писа


S>На этом вечер психоделики предлагаю закрыть


Ща расскажу историю — и закончим.

Где-то 1992-1993 год, сидим мы с товарищем, делаем какую-то лабу,
редактируем CONFIG.SYS, т.к. памяти как-то не хватило.
Заменяем DEVICE на DEVICEHIGH, и тут сзади голос какой-то девушки
— а что это за такое — девицехич?

с той поры мы ее так и звали, девица хич
... << RSDN@Home (RF) 1.2.0 alpha 5 rev. 67>>
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.