Пытаюсь разобраться в работе Task применительно к своей задаче (в целом она уже сделана на ThreadPool, но хочется с Task разобраться для самообразования).
Сама задача — нужно запросить сторонний сервис для получения данных, отправив в общей сложности примерно 15 000 запросов с различными уникальными аргументами. Если попробовать отправить сразу или почти сразу все 15 000, то сервис посчитает, что его ddos'ят и на время закроется от запросов приложения. Поэтому нужно ограничивать количество единовременных запросов. Так же, поскольку процесс не особо быстрый, нужно время от времени выводить на пользовательский интерфейс уведомления о ходе процесса. По окончанию всех (именно всех) запросов есть некие финальные действия.
Сейчас мне интересно лишь как это все делается силами Task.
Набросал, как мне это видится, использовав вместо реальных запросов и аргументов — иммитации, и не 15 000, а всего 30 — просто для понимания как вообще код писать с Task.
static async void TastWrite()
{
object objLockAdd = new object(); // Объект для блокировкиtry
{
Action<object> action = (p) =>
{
Console.WriteLine(p);
Random random = new Random();
Thread.Sleep(random.Next(700, 1500));
}; // Код для выполнения в задаче
List<string> baslList = new List<string>();// Множество данных (аргументов) необходимых к обработкеfor (int i = 1; i < 30; i++)
{
baslList.Add(i.ToString());
}
List<Task> tasks = new List<Task>();// Список задач, за выполнением которых будем следитьwhile (baslList.Count > 0)// Пока есть данные
{
if (tasks.Count > 0)
{
Task finischTask = await Task.WhenAny(tasks); // Если задачи есть, то ждем завершения хотя бы одной
// Тут должен быть код промежуточной реакции, вывод в пользовательский интерфейс прогресса
tasks.Remove(finischTask); // Убираем из списка выполненну задачу
}
while (tasks.Count < 4 && baslList.Count > 0) // Контролируем, что задач не слишком много и данные еще имеются
{
lock (objLockAdd)
{
tasks.Add(new TaskFactory().StartNew(action, baslList[baslList.Count - 1])); // Добавляем новую задачу и сразу ее запускаем, чтобы не возникло конфликта с параметром
baslList.RemoveAt(baslList.Count - 1); // Удаляем обработанное данное из списка
}
}
}
await Task.WhenAll(tasks); // Ожидаем когда все задачи будут выполнены
Console.WriteLine("Finisch"); // Выводим уведомление, когда абсолютно все завершено
}
catch (Exception ex) { }
}
Или я в принципе не так понимаю работу с Task и писать надо иначе?
Здравствуйте, CyberRussia, Вы писали:
CR>Или я в принципе не так понимаю работу с Task и писать надо иначе?
Ну, например, lock зачем написан? Что он должен синхронизировать?
Нужный функционал реализован в TPL Dataflow в классе ActionBlock.
Сомневаюсь правда, что его исходники помогут в понимании.
Здравствуйте, karbofos42, Вы писали:
K>Здравствуйте, CyberRussia, Вы писали:
CR>>Или я в принципе не так понимаю работу с Task и писать надо иначе?
K>Ну, например, lock зачем написан? Что он должен синхронизировать?
Для меня, пока, не очень понятно, доступен этот код только одному потоку — который все и запускает. Или может дергаться разными по мере завершения тасок.
К>Сомневаюсь правда, что его исходники помогут в понимании.
Раз не помогут, то и не надо. Мне не конкретную задачу решить, а понять как с тасками работать.
Здравствуйте, CyberRussia, Вы писали:
CR>Привет!
CR>Пытаюсь разобраться в работе Task применительно к своей задаче (в целом она уже сделана на ThreadPool, но хочется с Task разобраться для самообразования). CR>Сама задача — нужно запросить сторонний сервис для получения данных, отправив в общей сложности примерно 15 000 запросов с различными уникальными аргументами. Если попробовать отправить сразу или почти сразу все 15 000, то сервис посчитает, что его ddos'ят и на время закроется от запросов приложения. Поэтому нужно ограничивать количество единовременных запросов.
Ну можно как-то так сделать:
class Program
{
static async Task Main(string[] args)
{
await DoWork(100, 10);
}
static async Task DoWork(int totalCount, int maxConcurrentCount)
{
var random = new Random();
var semaphore = new SemaphoreSlim(maxConcurrentCount);
var list = new List<Task>();
for (int i = 0; i < totalCount; i++)
{
var n = i;
await semaphore.WaitAsync();
var task = Task.Run(async () =>
{
Console.WriteLine($"Task {n} started.");
await Task.Delay(random.Next(500, 1000));
Console.WriteLine($"Task {n} completed.");
semaphore.Release();
});
list.Add(task);
}
Console.WriteLine("All tasks started");
await Task.WhenAll(list);
Console.WriteLine("All tasks completed");
}
}
Здравствуйте, CyberRussia, Вы писали:
CR>Привет! CR>Сама задача — нужно запросить сторонний сервис для получения данных, отправив в общей сложности примерно 15 000 запросов с различными уникальными аргументами. Если попробовать отправить сразу или почти сразу все 15 000, то сервис посчитает, что его ddos'ят и на время закроется от запросов приложения. Поэтому нужно ограничивать количество единовременных запросов. Так же, поскольку процесс не особо быстрый, нужно время от времени выводить на пользовательский интерфейс уведомления о ходе процесса. По окончанию всех (именно всех) запросов есть некие финальные действия. CR>Сейчас мне интересно лишь как это все делается силами Task.
Схематично — так:
— Создаёте task scheduler с нужным ограничением по количеству потоков.Можно использовать свойство MaximumConcurrencyLevel или найти любой пример по клюечвым словам "limited concurrency task scheduler".
Канонический пример реализации тут
— Создаёте 15000 тасков и сразу запускаете их в этом шедулере (через TaskFactory.StartNew с явным указанием шедулера, и, заодно, если есть желание красиво останавливать процесс, передать CancellationToken)
— Делаете WaitAll всех тасков (предварительно сохранённых где-нибудь после создания) в ожидании пока всё закончится.
— Если нужно уведомлять UI о количестве отработаннвх задач, то можно отдельно предусмотреть, наприер счётчик и инкрементировать его из самих тасков.
Толко учтите, что счётчик должен поддерживать многопоточность, поэтому не count++, а Interlocked.Increment(ref count).
А переменную эту периодически по таймеру читать в UI потоке. Так для быстрых тасков лучше, иначе большое количество вызовов будет сильно загружать UI thread.
Если вызовы долгие, то можно делать callback в UI thread,например, вызовом отдельной таски для этого, но запускаемой обязательно в контексте UI (для чего есть специальный scheduler).
static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)
{
var taskCount = 0;
var completedTasks = new ConcurrentQueue<Task<T>>();
var completedSemaphore = new SemaphoreSlim(0);
foreach (var task in tasks)
{
taskCount++;
task.ContinueWith(completed =>
{
completedTasks.Enqueue(completed);
completedSemaphore.Release();
},
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
return Enumerable.Repeat(true, taskCount).Select(async _ =>
{
await completedSemaphore.WaitAsync().ConfigureAwait(false);
completedTasks.TryDequeue(out var task);
return await task;
});
}
В итоге await-имся на самих исходных тасках, не нужно вручную переносить результаты в TaskCompletionSource-ы, не требуется коллекция TaskCompletionSource-ов, лежащая в памяти до самого конца работы.
I> public static async Task ForEachAsync< T >( this IEnumerable< T > seq
I> , SemaphoreSlim semaphore
I> , AsyncWaitEvent finitaEvent
I> , Func< T, Task > seqItemFunc )
I>
I>[/cut]
Семафор — это degreeOfParallelismSemaphore, seqItemFunc нужен, чтоб создание тасков в параллельных потоках было — вдруг это медленная операция, а иначе можно было бы сразу принимать готовый IEnumerable<Task> seq.
Но зачем finitaEvent снаружи приходит? Чтобы уметь прерывать процесс? Для этого обычно CancellationToken используют.
Здравствуйте, artelk, Вы писали:
A>Здравствуйте, Igorxz, Вы писали:
I>>
I>> public static async Task ForEachAsync< T >( this IEnumerable< T > seq
I>> , SemaphoreSlim semaphore
I>> , AsyncWaitEvent finitaEvent
I>> , Func< T, Task > seqItemFunc )
I>>
I>>[/cut] A>Семафор — это degreeOfParallelismSemaphore, seqItemFunc нужен, чтоб создание тасков в параллельных потоках было — вдруг это медленная операция, а иначе можно было бы сразу принимать готовый IEnumerable<Task> seq. A>Но зачем finitaEvent снаружи приходит? Чтобы уметь прерывать процесс? Для этого обычно CancellationToken используют.
действительно, можно и не снаружи (равно как и семафор — можно degreeOfParallelism числом передавать),
в той реализации, где это использовалось finitaEvent брался откуда-то из пула. поэтому снаружи.
вообще реализация подобных штук сильно зависит от внешних условий. здесь например прерывания нет — не было нужно.