Task WhenAny WhenAll
От: CyberRussia  
Дата: 15.11.20 19:25
Оценка:
Привет!

Пытаюсь разобраться в работе Task применительно к своей задаче (в целом она уже сделана на ThreadPool, но хочется с Task разобраться для самообразования).
Сама задача — нужно запросить сторонний сервис для получения данных, отправив в общей сложности примерно 15 000 запросов с различными уникальными аргументами. Если попробовать отправить сразу или почти сразу все 15 000, то сервис посчитает, что его ddos'ят и на время закроется от запросов приложения. Поэтому нужно ограничивать количество единовременных запросов. Так же, поскольку процесс не особо быстрый, нужно время от времени выводить на пользовательский интерфейс уведомления о ходе процесса. По окончанию всех (именно всех) запросов есть некие финальные действия.
Сейчас мне интересно лишь как это все делается силами Task.
Набросал, как мне это видится, использовав вместо реальных запросов и аргументов — иммитации, и не 15 000, а всего 30 — просто для понимания как вообще код писать с Task.
        static async void TastWrite()
        {
            object objLockAdd = new object(); // Объект для блокировки
            try
            {
                Action<object> action = (p) =>
                {
                    Console.WriteLine(p);
                    Random random = new Random();
                    Thread.Sleep(random.Next(700, 1500));
                }; // Код для выполнения в задаче

                List<string> baslList = new List<string>();// Множество данных (аргументов) необходимых к обработке
                for (int i = 1; i < 30; i++)
                {
                    baslList.Add(i.ToString());
                }
                List<Task> tasks = new List<Task>();// Список задач, за выполнением которых будем следить
                while (baslList.Count > 0)// Пока есть данные
                {
                    if (tasks.Count > 0)
                    {
                        Task finischTask = await Task.WhenAny(tasks); // Если задачи есть, то ждем завершения хотя бы одной
                        // Тут должен быть код промежуточной реакции, вывод в пользовательский интерфейс прогресса
                        tasks.Remove(finischTask); // Убираем из списка выполненну задачу
                    }
                    while (tasks.Count < 4 && baslList.Count > 0) // Контролируем, что задач не слишком много и данные еще имеются
                    {
                        lock (objLockAdd)
                        {
                            tasks.Add(new TaskFactory().StartNew(action, baslList[baslList.Count - 1])); // Добавляем новую задачу и сразу ее запускаем, чтобы не возникло конфликта с параметром
                            baslList.RemoveAt(baslList.Count - 1); // Удаляем обработанное данное из списка
                        }
                    }
                }
                await Task.WhenAll(tasks); // Ожидаем когда все задачи будут выполнены
                Console.WriteLine("Finisch"); // Выводим уведомление, когда абсолютно все завершено
            }
            catch (Exception ex) { }
        }

Или я в принципе не так понимаю работу с Task и писать надо иначе?
Re: Task WhenAny WhenAll
От: karbofos42 Россия  
Дата: 15.11.20 19:44
Оценка:
Здравствуйте, CyberRussia, Вы писали:

CR>Или я в принципе не так понимаю работу с Task и писать надо иначе?


Ну, например, lock зачем написан? Что он должен синхронизировать?
Нужный функционал реализован в TPL Dataflow в классе ActionBlock.
Сомневаюсь правда, что его исходники помогут в понимании.
Re[2]: Task WhenAny WhenAll
От: CyberRussia  
Дата: 15.11.20 19:57
Оценка:
Здравствуйте, karbofos42, Вы писали:

K>Здравствуйте, CyberRussia, Вы писали:


CR>>Или я в принципе не так понимаю работу с Task и писать надо иначе?


K>Ну, например, lock зачем написан? Что он должен синхронизировать?

Для меня, пока, не очень понятно, доступен этот код только одному потоку — который все и запускает. Или может дергаться разными по мере завершения тасок.

К>Сомневаюсь правда, что его исходники помогут в понимании.

Раз не помогут, то и не надо. Мне не конкретную задачу решить, а понять как с тасками работать.
Отредактировано 15.11.2020 19:59 CyberRussia . Предыдущая версия .
Re: Task WhenAny WhenAll
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 15.11.20 20:21
Оценка: 3 (1) +1
Здравствуйте, CyberRussia, Вы писали:

Использование асинхронного шаблона, основанного на задачах

Посмотри там куча примеров в том числе и асинхронные очереди

WaitOneAsync

Операции с чередованием
и солнце б утром не вставало, когда бы не было меня
Отредактировано 16.11.2020 11:13 Serginio1 . Предыдущая версия . Еще …
Отредактировано 15.11.2020 20:23 Serginio1 . Предыдущая версия .
Re: Task WhenAny WhenAll
От: romangr Россия  
Дата: 16.11.20 08:09
Оценка: 6 (1)
Здравствуйте, CyberRussia, Вы писали:

CR>Привет!


CR>Пытаюсь разобраться в работе Task применительно к своей задаче (в целом она уже сделана на ThreadPool, но хочется с Task разобраться для самообразования).

CR>Сама задача — нужно запросить сторонний сервис для получения данных, отправив в общей сложности примерно 15 000 запросов с различными уникальными аргументами. Если попробовать отправить сразу или почти сразу все 15 000, то сервис посчитает, что его ddos'ят и на время закроется от запросов приложения. Поэтому нужно ограничивать количество единовременных запросов.

Ну можно как-то так сделать:
class Program
{
    static async Task Main(string[] args)
    {
        await DoWork(100, 10);
    }

    static async Task DoWork(int totalCount, int maxConcurrentCount)
    {
        var random = new Random();
        var semaphore = new SemaphoreSlim(maxConcurrentCount);
        var list = new List<Task>();
        for (int i = 0; i < totalCount; i++)
        {
            var n = i;
            await semaphore.WaitAsync();
            var task = Task.Run(async () =>
            {
                Console.WriteLine($"Task {n} started.");
                await Task.Delay(random.Next(500, 1000));
                Console.WriteLine($"Task {n} completed.");
                semaphore.Release();
            });
            list.Add(task);
        }
        Console.WriteLine("All tasks started");
        await Task.WhenAll(list);
        Console.WriteLine("All tasks completed");
    }
}
... << RSDN@Home 1.3.110 alpha 5 rev. 62>>
Re: Task WhenAny WhenAll
От: vmpire Россия  
Дата: 16.11.20 10:53
Оценка: 6 (1)
Здравствуйте, CyberRussia, Вы писали:

CR>Привет!

CR>Сама задача — нужно запросить сторонний сервис для получения данных, отправив в общей сложности примерно 15 000 запросов с различными уникальными аргументами. Если попробовать отправить сразу или почти сразу все 15 000, то сервис посчитает, что его ddos'ят и на время закроется от запросов приложения. Поэтому нужно ограничивать количество единовременных запросов. Так же, поскольку процесс не особо быстрый, нужно время от времени выводить на пользовательский интерфейс уведомления о ходе процесса. По окончанию всех (именно всех) запросов есть некие финальные действия.
CR>Сейчас мне интересно лишь как это все делается силами Task.
Схематично — так:
— Создаёте task scheduler с нужным ограничением по количеству потоков.Можно использовать свойство MaximumConcurrencyLevel или найти любой пример по клюечвым словам "limited concurrency task scheduler".
Канонический пример реализации тут
— Создаёте 15000 тасков и сразу запускаете их в этом шедулере (через TaskFactory.StartNew с явным указанием шедулера, и, заодно, если есть желание красиво останавливать процесс, передать CancellationToken)
— Делаете WaitAll всех тасков (предварительно сохранённых где-нибудь после создания) в ожидании пока всё закончится.
— Если нужно уведомлять UI о количестве отработаннвх задач, то можно отдельно предусмотреть, наприер счётчик и инкрементировать его из самих тасков.
Толко учтите, что счётчик должен поддерживать многопоточность, поэтому не count++, а Interlocked.Increment(ref count).
А переменную эту периодически по таймеру читать в UI потоке. Так для быстрых тасков лучше, иначе большое количество вызовов будет сильно загружать UI thread.
Если вызовы долгие, то можно делать callback в UI thread,например, вызовом отдельной таски для этого, но запускаемой обязательно в контексте UI (для чего есть специальный scheduler).
Отредактировано 16.11.2020 10:57 vmpire . Предыдущая версия .
Re: Task WhenAny WhenAll
От: MadHuman Россия  
Дата: 16.11.20 12:14
Оценка:
погугли про Parallel.ForEach
позволяет запускать на выполнение много тасков и ограничивать кол-во одновременно выполняющихся тасков.
Re: Task WhenAny WhenAll
От: StatujaLeha на правах ИМХО
Дата: 16.11.20 12:34
Оценка:
Здравствуйте, CyberRussia, Вы писали:

Попробуйте этот пример посмотреть:
https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0
Re[2]: Task WhenAny WhenAll
От: artelk  
Дата: 18.11.20 17:53
Оценка: 21 (2)
Здравствуйте, Serginio1, Вы писали:

S>Операции с чередованием


Мне мой больше нравится:
static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)
{
    var taskCount = 0;
    var completedTasks = new ConcurrentQueue<Task<T>>();
    var completedSemaphore = new SemaphoreSlim(0);
    foreach (var task in tasks)
    {
        taskCount++;
        task.ContinueWith(completed =>
        {
            completedTasks.Enqueue(completed);
            completedSemaphore.Release();
        },
        CancellationToken.None,
        TaskContinuationOptions.ExecuteSynchronously,
        TaskScheduler.Default);
    }

    return Enumerable.Repeat(true, taskCount).Select(async _ =>
    {
        await completedSemaphore.WaitAsync().ConfigureAwait(false);
        completedTasks.TryDequeue(out var task);
        return await task;
    });
}

В итоге await-имся на самих исходных тасках, не нужно вручную переносить результаты в TaskCompletionSource-ы, не требуется коллекция TaskCompletionSource-ов, лежащая в памяти до самого конца работы.
Отредактировано 18.11.2020 18:15 artelk . Предыдущая версия .
Re[3]: Task WhenAny WhenAll
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 19.11.20 07:30
Оценка:
Здравствуйте, artelk, Вы писали:

A>Здравствуйте, Serginio1, Вы писали:


S>>Операции с чередованием


Спасибо интересено. Я когда посмотрел этот пример подумал про AsyncProducerConsumerCollection
асинхронную очередь
AsyncProducerConsumerCollection<Task> m_data = …;

task.ContinueWith(completed =>
        {
            m_data.Add(completed);
        }

 while(true)
    {
        Task nextItem = await m_data.Take();

Ну и проверку Task на Error и прочее
    }
и солнце б утром не вставало, когда бы не было меня
Отредактировано 19.11.2020 7:32 Serginio1 . Предыдущая версия .
Re: Task WhenAny WhenAll
От: Igorxz  
Дата: 21.11.20 15:47
Оценка: 4 (1)
  c#
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using M = System.Runtime.CompilerServices.MethodImplAttribute;
using O = System.Runtime.CompilerServices.MethodImplOptions;

namespace Async_Experiments
{
    /// <summary>
    /// SemaphoreSlim based
    /// </summary>
    public struct AsyncWaitEvent : IDisposable
    {
        private SemaphoreSlim _Smpr;
        [M(O.AggressiveInlining)] public static AsyncWaitEvent Create() => new AsyncWaitEvent( true );
        [M(O.AggressiveInlining)] public AsyncWaitEvent( bool initialState ) => _Smpr = new SemaphoreSlim( (initialState ? 1 : 0), 1 );
        [M(O.AggressiveInlining)] public void Dispose()
        {
            if ( _Smpr != null )
            {
                _Smpr.Dispose();
                _Smpr = null;
            }
        }

        public bool IsEmpty { [M(O.AggressiveInlining)] get => (_Smpr == null); }

        [M(O.AggressiveInlining)] public void Wait() => _Smpr.Wait();
        [M(O.AggressiveInlining)] public void Wait( CancellationToken ct ) => _Smpr.Wait( ct );
        [M(O.AggressiveInlining)] public bool Wait( TimeSpan timeout ) => _Smpr.Wait( timeout );
        [M(O.AggressiveInlining)] public bool Wait( TimeSpan timeout, CancellationToken ct ) => _Smpr.Wait( timeout, ct );
        [M(O.AggressiveInlining)] public bool Wait( int millisecondsTimeout ) => _Smpr.Wait( millisecondsTimeout );
        [M(O.AggressiveInlining)] public bool Wait( int millisecondsTimeout, CancellationToken ct ) => _Smpr.Wait( millisecondsTimeout, ct );

        [M(O.AggressiveInlining)] public Task WaitAsync() => _Smpr.WaitAsync();
        [M(O.AggressiveInlining)] public Task WaitAsync( CancellationToken ct ) => _Smpr.WaitAsync( ct );
        [M(O.AggressiveInlining)] public Task< bool > WaitAsync( int millisecondsTimeout ) => _Smpr.WaitAsync( millisecondsTimeout );
        [M(O.AggressiveInlining)] public Task< bool > WaitAsync( TimeSpan timeout ) => _Smpr.WaitAsync( timeout );
        [M(O.AggressiveInlining)] public Task< bool > WaitAsync( TimeSpan timeout, CancellationToken ct ) => _Smpr.WaitAsync( timeout, ct );
        [M(O.AggressiveInlining)] public Task< bool > WaitAsync( int millisecondsTimeout, CancellationToken ct ) => _Smpr.WaitAsync( millisecondsTimeout, ct );

        [M(O.AggressiveInlining)] public void Set() => _Smpr.Release();
        [M(O.AggressiveInlining )] public void Release() => _Smpr.Release();
#if DEBUG
        public override string ToString() => (IsEmpty ? "NULL" : _Smpr.CurrentCount.ToString());
#endif
    }

    /// <summary>
    /// 
    /// </summary>
    public static class AsyncWaitEventHelper
    {
        /// <summary>
        /// 
        /// </summary>
        private struct ReleaseHolder : IDisposable
        {
            private AsyncWaitEvent _Awe;
            [M(O.AggressiveInlining)] public ReleaseHolder( AsyncWaitEvent awe ) => _Awe = awe;
            [M(O.AggressiveInlining)] public void Dispose()
            {
                if ( !_Awe.IsEmpty )
                {
                    _Awe.Release();
                    _Awe = default;
                }
            }
        }

        [M(O.AggressiveInlining)] public static async Task< IDisposable > UseWaitAsync( this AsyncWaitEvent awe )
        {
            await awe.WaitAsync().ConfigureAwait( false );
            return (new ReleaseHolder( awe ));
        }
    }

    /// <summary>
    ///
    /// </summary>
    public static class ParallelTaskExtensions
    {
        public static async Task ForEachAsync< T >( this IEnumerable< T > seq
            , SemaphoreSlim   semaphore
            , AsyncWaitEvent  finitaEvent
            , Func< T, Task > seqItemFunc )
        {
            if ( (seq == null) || !seq.Any() )
            {
                return;
            }
            if ( semaphore   == null ) throw (new ArgumentException( nameof(semaphore) ));
            if ( finitaEvent.IsEmpty ) throw (new ArgumentException( nameof(finitaEvent) ));
            if ( seqItemFunc == null ) throw (new ArgumentException( nameof(seqItemFunc) ));
            //-----------------------------------------------------------//

            using ( var e = seq.GetEnumerator() )
            {
                if ( !e.MoveNext() )
                {
                    return;
                }
                
                var seqIsFinished   = false;
                var enqueueSeqCount = 0;
                for ( var t = e.Current; ; t = e.Current )
                {
                    if ( !e.MoveNext() )
                    {
                        seqIsFinished = true;
                    }

                    Interlocked.Increment( ref enqueueSeqCount );
                    await semaphore.WaitAsync().ConfigureAwait( false );                        
#pragma warning disable CS4014
                    //Because this call is not awaited, execution of the current method continues before the call is completed. 
                    //Consider applying the 'await' operator to the result of the call.

                    Task.Run( async () =>
                    {
                        try
                        {
                            await seqItemFunc( t ).ConfigureAwait( false );
                        }
                        finally
                        {
                            semaphore.Release();

                            if ( (Interlocked.Decrement( ref enqueueSeqCount ) == 0) && seqIsFinished )
                            {
                                finitaEvent.Set();
                            }
                        }
                    });
#pragma warning restore CS4014

                    if ( seqIsFinished )
                    {
                        break;
                    }
                }

                await finitaEvent.WaitAsync().ConfigureAwait( false );
            }            
        }
    }
}
Re[2]: Task WhenAny WhenAll
От: artelk  
Дата: 22.11.20 10:34
Оценка:
Здравствуйте, Igorxz, Вы писали:


I>
I>        public static async Task ForEachAsync< T >( this IEnumerable< T > seq
I>            , SemaphoreSlim   semaphore
I>            , AsyncWaitEvent  finitaEvent
I>            , Func< T, Task > seqItemFunc )
I>

I>[/cut]
Семафор — это degreeOfParallelismSemaphore, seqItemFunc нужен, чтоб создание тасков в параллельных потоках было — вдруг это медленная операция, а иначе можно было бы сразу принимать готовый IEnumerable<Task> seq.
Но зачем finitaEvent снаружи приходит? Чтобы уметь прерывать процесс? Для этого обычно CancellationToken используют.
Re[3]: Task WhenAny WhenAll
От: Igorxz  
Дата: 22.11.20 15:08
Оценка:
Здравствуйте, artelk, Вы писали:

A>Здравствуйте, Igorxz, Вы писали:



I>>
I>>        public static async Task ForEachAsync< T >( this IEnumerable< T > seq
I>>            , SemaphoreSlim   semaphore
I>>            , AsyncWaitEvent  finitaEvent
I>>            , Func< T, Task > seqItemFunc )
I>>

I>>[/cut]
A>Семафор — это degreeOfParallelismSemaphore, seqItemFunc нужен, чтоб создание тасков в параллельных потоках было — вдруг это медленная операция, а иначе можно было бы сразу принимать готовый IEnumerable<Task> seq.
A>Но зачем finitaEvent снаружи приходит? Чтобы уметь прерывать процесс? Для этого обычно CancellationToken используют.

действительно, можно и не снаружи (равно как и семафор — можно degreeOfParallelism числом передавать),
в той реализации, где это использовалось finitaEvent брался откуда-то из пула. поэтому снаружи.
вообще реализация подобных штук сильно зависит от внешних условий. здесь например прерывания нет — не было нужно.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.