НеUIёвая асинхронная очередь + красивый троттлинг
От: UberPsychoSvin  
Дата: 06.02.17 14:52
Оценка: 25 (1)
Никак не въеду в неUIшное использование асунков. Вот к примеру, как написать аналог параллельной обработки некоторых данных на асунках.

ПРИМЕР 1.
Вот, такая штуковиночка. Не совсем то же самое, что и второй пример, т.к. нет отдельного треда для получения данных, но зато однострочник.
Parallel.ForEach(ProduceInput(), new ParallelOptions() { MaxDegreeOfParallelism = 4 }, Process);


ПРИМЕР 2.
Полноценный пример на System.Threading.Tasks.Dataflow, тут уже один тред получает данные, а два обрабатывают.
var queue = new BufferBlock<int>(new DataflowBlockOptions(){ BoundedCapacity = 4 })

var produce = Task.Run(() => ProduceInput());

var opts = new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4, BoundedCapacity = 4 };
var consume = new ActionBlock<string>(input => Process(input), opts);
queue.LinkTo(consume, new DataflowLinkOptions(){ PropagateCompletion = true });

Task.WaitAll(produce, consume.Completion);


Попытался нарисовать это на асунках. Вроде всё хорошо, но оно отжирает гигабайты памяти, в связи с тем, что количество обработчиков не ограничено.
Погуглил, похоже для лечения прожорливости этого дела предназначен класс SemaphoreSlim.

Но вот как его вкорячить таким образом, что бы оно
* Работало.
* Было удобно.
* Студия не ругалась на извращения в коде.

class Program
{
    static void Main(string[] args)
    {
        try
        {
            //var throttle = new SemaphoreSlim(1, 4);
            
            for (; ; )
            {
                Process();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        Console.ReadKey();
    }

    private static async void Process()
    {
        var input = await Receive();
        var result = await Compute(input);
        await Log(input, result);
    }

    private static async Task<int> Receive()
    {
        var r = new Random();
        await Task.Delay(100);
        return r.Next(0,100);
    }

    private static async Task<int> Compute(int a)
    {
        await Task.Delay(700);
        return 17*a;
    }

    private static Task Log(int a, int r)
    {
        return Task.Run(() => Console.WriteLine("{0,4} | {1} => {2}", Thread.CurrentThread.ManagedThreadId, a, r));
    }
}
Отредактировано 06.02.2017 20:34 AndrewVK . Предыдущая версия .
async
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.