Дождаться пока все завершат работу...
От: Shmj Ниоткуда  
Дата: 12.03.20 12:27
Оценка:
По событию создается долгоживущая задача:

Task.Run(()=> ...


Таких задач может одновременно быть много. Нужно: завершить все задачи и дождаться пока они все заврешат работу (внутри задачи могут быть не асинхронные вызовы, которые отменить с помощью CancellationToken нельзя).

Вот тестовый код:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            // Вот тут нужно отменить все и дождаться (вернуть Task) когда все задачи будут завершены
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                var task = Task.Run(async () =>
                {
                    Console.WriteLine("Begin " + taskIdCopy);
                    Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                    Console.WriteLine("End " + taskIdCopy);

                    await Task.Delay(4000, cancellationToken); // Задержка
                }, cancellationToken);
            }
        }
    }
}


Решение:

  Скрытый текст
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                await Task.WhenAll(Tasks.Values);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                var task = Task.Run(async () =>
                {
                    Console.WriteLine("Begin " + taskIdCopy);
                    Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                    Console.WriteLine("End " + taskIdCopy);

                    await Task.Delay(4000, cancellationToken); // Задержка

                    Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                }, cancellationToken);

                Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание
            }
        }
    }
}


Смущает что контроль хода программы организован через Exception. Как лучше?
Re: Дождаться пока все завершат работу...
От: Sharov Россия  
Дата: 12.03.20 12:34
Оценка: 1 (1)
Здравствуйте, Shmj, Вы писали:

Не уверен, что прав, а не проще ли taskcancelexp ловить в каждой таске самостоятельно? А потом по заверщении всех тасок проверять была ли отмена?
Кодом людям нужно помогать!
Re[2]: Дождаться пока все завершат работу...
От: Shmj Ниоткуда  
Дата: 12.03.20 12:51
Оценка:
Здравствуйте, Sharov, Вы писали:

S>Не уверен, что прав, а не проще ли taskcancelexp ловить в каждой таске самостоятельно? А потом по заверщении всех тасок проверять была ли отмена?


Как бы так:

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                await Task.WhenAll(Tasks.Values);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        try
                        {
                            await Task.Delay(4000, cancellationToken); // Задержка
                        }
                        catch (TaskCanceledException)
                        {
                            Console.WriteLine("!-1");
                        }

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        Console.WriteLine("!-2");
                    }
                });
            }
        }
    }
}


И каждая задача в итоге будет просто Completed — исключение отловим только внутри. Но опять таки — добавится больше кода и управление ходом программы будет основано на иключениях.
Re[3]: Дождаться пока все завершат работу...
От: Sharov Россия  
Дата: 12.03.20 12:58
Оценка:
Здравствуйте, Shmj, Вы писали:

S>И каждая задача в итоге будет просто Completed — исключение отловим только внутри. Но опять таки — добавится больше кода и управление ходом программы будет основано на иключениях.


А если не ловить ислкючения в тасках?
Кодом людям нужно помогать!
Re[4]: Дождаться пока все завершат работу...
От: Shmj Ниоткуда  
Дата: 12.03.20 13:04
Оценка:
Здравствуйте, Sharov, Вы писали:

S>А если не ловить ислкючения в тасках?


Тогда исключение TaskCanceledException возникает в точке await Task.WhenAll а задачи имеют статус Canceled.
Re[5]: Дождаться пока все завершат работу...
От: Sharov Россия  
Дата: 12.03.20 13:18
Оценка:
Здравствуйте, Shmj, Вы писали:


S>Тогда исключение TaskCanceledException возникает в точке await Task.WhenAll а задачи имеют статус Canceled.


Я не проверял, но мне казалось что WhenAll отработает нормально, а дальше можно пройтись по таскам и проверить их статус.
Тогда я
Кодом людям нужно помогать!
Re[6]: Дождаться пока все завершат работу...
От: Shmj Ниоткуда  
Дата: 12.03.20 13:57
Оценка:
Здравствуйте, Sharov, Вы писали:

S>Я не проверял, но мне казалось что WhenAll отработает нормально, а дальше можно пройтись по таскам и проверить их статус.

S>Тогда я

Это если только верхний Task отловить, вот так:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                var tasks = Tasks.Values.ToArray();

                await Task.WhenAll(tasks);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }

            foreach (var tasksValue in Tasks.Values)
            {
                Console.WriteLine(tasksValue.Status);
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 15 задач
            for (int taskId = 1; taskId <= 15; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        await Task.Delay(4000, cancellationToken);

                        Tasks.TryRemove(taskIdCopy,
                            out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        Console.WriteLine(taskIdCopy + " canceled");
                    }
                });
            }
        }
    }
}
Re: Дождаться пока все завершат работу...
От: RushDevion Россия  
Дата: 12.03.20 14:23
Оценка: 8 (1) +1
Чета вы мудрите
var t = Task.WhenAll(allMyTasks);
cts.Cancel(); // Отменяем
t.Wait(); // Стоим ждем, пока все таски завершатся
if (t.IsFaulted)   { ... } // По крайней мере одна из тасков крашнулась, анализируем ошибку если нужно
if (t.IsCancelled) { ... } // Никто не крашнулся, но по крайней мере одна таска была отменена, анализируем эту ситуацию, если нужно
// Если мы тут, все завершилось благополучно
Отредактировано 12.03.2020 15:22 RushDevion . Предыдущая версия .
Re[2]: Дождаться пока все завершат работу...
От: Shmj Ниоткуда  
Дата: 12.03.20 17:25
Оценка:
Здравствуйте, RushDevion, Вы писали:

RD>Чета вы мудрите


Проблема в том, что нужно на каждом асинхронном методе перехватывать TaskCanceledException. Может я что ни так делаю:

Ну ок, перехватили — а дальше что?

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                var tasks = Tasks.Values.ToArray();

                var t = Task.WhenAll(tasks);

                await t;

                Console.WriteLine("IsCanceled=" + t.IsCanceled);
                Console.WriteLine("IsFaulted=" + t.IsFaulted);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }

            foreach (var tasksValue in Tasks.Values)
            {
                Console.WriteLine(tasksValue.Status);
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 15 задач
            for (int taskId = 1; taskId <= 15; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        try
                        {
                            await Task.Delay(4000, cancellationToken);
                        }
                        catch (TaskCanceledException)
                        {
                            throw; // Вот тут возникает и уходит в самый верх!
                        }
                        
                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        Console.WriteLine(taskIdCopy + " canceled");
                    }
                });
            }
        }
    }
}
Re[3]: Дождаться пока все завершат работу...
От: RushDevion Россия  
Дата: 12.03.20 18:29
Оценка:
S>Проблема в том, что нужно на каждом асинхронном методе перехватывать TaskCanceledException.
Начальная постановка задачи у вас какая? Запустить отмену и дождаться, когда все закончится. Так?
Вот и не нужно ничего перехватывать.
Ну вывалится из дочерней таски TaskCancelledException наружу, ну зафейлит вашу таску.
Task.WhenAll().Wait() все равно дождется общего окончания.
Дальше нужно просто пробежаться по списку тасков и потрогать Exception у всех Faulted, чтобы UnobservedTaskException не выстрелило.

Более того TaskCancelledException — это наследник OperationCancelledException.
Значит, если первичная ваша таска (1) создавалась с тем же cancellation token, что вы передаете в await Task.Delay(),
то вылетевший в последнем TaskCancelledException (даже не будучи перехваченным) будет трактоваться (1) как отмена и она перейдет не в Faulted, а в Cancelled state.
Отредактировано 12.03.2020 18:57 RushDevion . Предыдущая версия . Еще …
Отредактировано 12.03.2020 18:49 RushDevion . Предыдущая версия .
Re[4]: пару вопросов
От: Sharov Россия  
Дата: 13.03.20 12:59
Оценка:
Здравствуйте, RushDevion, Вы писали:

RD>Начальная постановка задачи у вас какая? Запустить отмену и дождаться, когда все закончится. Так?

RD>Вот и не нужно ничего перехватывать.
RD>Ну вывалится из дочерней таски TaskCancelledException наружу, ну зафейлит вашу таску.
RD>Task.WhenAll().Wait() все равно дождется общего окончания.
RD>Дальше нужно просто пробежаться по списку тасков и потрогать Exception у всех Faulted, чтобы UnobservedTaskException не выстрелило.

Судя по WhenAllPromise, он обозревает все исключения, поэтому не выстрелит. Вообще странно, зачем бросать исключения, а не дать возможность
получателю отдельно проверить состояние каждого таска?

И еще вопрос:

почему при ловле TaskCanceledException
  _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        await Task.Delay(4000, cancellationToken);

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

         
                    try
                    {
                        await task;
                    }
                    catch(TaskCanceledException e)
                    {
                     
                    }

                });


оно все равно простреливает?
Кодом людям нужно помогать!
Re[5]: пару вопросов
От: RushDevion Россия  
Дата: 13.03.20 13:58
Оценка: 4 (1)
S>Судя по WhenAllPromise, он обозревает все исключения, поэтому не выстрелит. Вообще странно, зачем бросать исключения, а не дать возможность
S>получателю отдельно проверить состояние каждого таска?

Потому что WhenAll возвращает Task.
А поведение Task таково, что возникшее в ней исключение перевыбрасывается при task.Wait().
И это консистентно и ожидаемо. Было бы странным, если для таски из WhenAll это поведение изменили.

Что касается возможности анализа ошибок в индивидуальных тасках никто ее у вас не отнимал — перехватываем/игнорим исключение и вперед.

var task = Task.WhenAll(myTasks)
  .ContinueWith(t => t.Exception?.Handle(_ => true), TaskContinuationOptions.NotOnRanToCompletion);
task.Wait(); // Все, можно не беспокоиться об exceptions
var faultedTasks = myTasks.Where(t=>t.IsFaulted); // Анализируем ошибки



S>оно все равно простреливает?

Что и куда простреливает?
Re[6]: пару вопросов
От: Sharov Россия  
Дата: 13.03.20 16:12
Оценка:
Здравствуйте, RushDevion, Вы писали:

S>>Судя по WhenAllPromise, он обозревает все исключения, поэтому не выстрелит. Вообще странно, зачем бросать исключения, а не дать возможность

S>>получателю отдельно проверить состояние каждого таска?

RD>
RD>var task = Task.WhenAll(myTasks)
RD>  .ContinueWith(t => t.Exception?.Handle(_ => true), TaskContinuationOptions.NotOnRanToCompletion);
RD>task.Wait(); // Все, можно не беспокоиться об exceptions
RD>var faultedTasks = myTasks.Where(t=>t.IsFaulted); // Анализируем ошибки
RD>


Понял, а есть какая-то разница в поведение между await task и task.Wait() в данном примере. Я понимаю, что task.Wait() блокирующий и т.д.
Просто любопытно.

S>>оно все равно простреливает?

RD>Что и куда простреливает?

Исключение TaskCanceledExp.
Кодом людям нужно помогать!
Re[7]: пару вопросов
От: RushDevion Россия  
Дата: 14.03.20 10:40
Оценка: 4 (1)
S>Понял, а есть какая-то разница в поведение между await task и task.Wait() в данном примере. Я понимаю, что task.Wait() блокирующий и т.д.
S>Просто любопытно.

-Async/await компилируется в класс, реализующий машину состояний, а task.Wait — просто в вызов метода
-Если возникнет исключение, то с task.Wait() оно вылетит как AggregateException, а await перевыбросит оригинальное

S>>>оно все равно простреливает?

RD>>Что и куда простреливает?
S>Исключение TaskCanceledExp.
В какой момент?
Если напишешь воспроизводимый тест и ткнешь пальцем в непонятное, я, наверное, смогу объяснить, почему оно так работает.
Re[8]: пару вопросов
От: Shmj Ниоткуда  
Дата: 15.03.20 09:16
Оценка:
Здравствуйте, RushDevion, Вы писали:

RD>В какой момент?

RD>Если напишешь воспроизводимый тест и ткнешь пальцем в непонятное, я, наверное, смогу объяснить, почему оно так работает.

Вот же:

  Скрытый текст
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                await Task.WhenAll(Tasks.Values);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        await Task.Delay(4000, cancellationToken); // Задержка

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        // Не спасает...
                    }
                });
            }
        }
    }
}


Хотя, в принципе, просто путает вложенность. Срабатывает ожидание для задачи из словаря раньше, чем то ожидание, которое в try/catch. Если переписать так, то ок:

  Скрытый текст
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                await Task.WhenAll(Tasks.Values);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        await Task.Delay(4000, cancellationToken); // Задержка

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                    }

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание
                });
            }
        }
    }
}
Отредактировано 15.03.2020 9:22 Shmj . Предыдущая версия .
Re[9]: пару вопросов
От: RushDevion Россия  
Дата: 15.03.20 12:05
Оценка:
Откоментил по тексту
  Скрытый текст
namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken); 
            // В этой точке у тебя есть 10 запущенных задач (точнее 20, считая parent таски)
            // Каждая из них завершится не раньше, чем Thread.Sleep(2000) + await Task.Delay(4000) = 6 секунд

            // Через секунду - ты все отменяшь
            Thread.Sleep(1000);
    
            // Ни одна из задач к этому моменту еще не будет завершена, т.е. отменятся все
            // А если хоть одна из задач отменена, Task.WhenAll ожидаемо выкинет TaskCancelledException

            // Перенос же Tasks.TryAdd(taskIdCopy, task) в конец parent-таски приводит к тому, что
            // Tasks.Values к моменту отмены будет пустым (Count=0), т.к. запись в него произойдет лишь спустя 6 секунд.
            // Соответственно, никакого ожидания (а, значит, и TaskCancelledException) не будет

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                Console.WriteLine($"Stopping {Tasks.Values.Count} tasks"); // !!! А сколько собственно задач мы отменяем?
                await Task.WhenAll(Tasks.Values);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy); // !!! Это не End taskIdCopy

                        await Task.Delay(4000, cancellationToken); // Задержка

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                     
                        // !!! Вот здесь End taskIdCopy
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        // Не спасает...
                    }
                });
            }
        }
    }
}
Re[10]: пару вопросов
От: Sharov Россия  
Дата: 16.03.20 12:16
Оценка:
Здравствуйте, RushDevion, Вы писали:

RD>Откоментил по тексту


Поясните, пожалуйста, почему имея сл. код:
  _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy); // !!! Это не End taskIdCopy

                        await Task.Delay(4000, cancellationToken); // Задержка

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)

                        // !!! Вот здесь End taskIdCopy
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        // Не спасает...
                        Console.WriteLine("TaskCanceledException2");
                    }
                });


я обозреваю все TaskCanceledException. Но WhenAll все равно бросает TaskCanceledException? Смысл выделенного фрагмента кода?

Упд: Кажется я понял в чем дело... Я добавил в catch Tasks.TryRemove(taskIdCopy, out _), но это все равно не помогло, т.е. whenall бросал tsckcancekexp. И тут дошло, что у нас race condition, т.е. у нас whenall может увидеть состояние задачи раньше, чем это сделает выделенный блок try\catch. Я прав?
Кодом людям нужно помогать!
Re[11]: пару вопросов
От: RushDevion Россия  
Дата: 16.03.20 12:56
Оценка: 8 (1)
S>Упд: Кажется я понял в чем дело... Я добавил в catch Tasks.TryRemove(taskIdCopy, out _), но это все равно не помогло, т.е. whenall бросал tsckcancekexp. И тут дошло, что у нас race condition, т.е. у нас whenall может увидеть состояние задачи раньше, чем это сделает выделенный блок try\catch. Я прав?

Да, я уже писал, что race condition возможен.
Но главная проблема не в нем, а в том, что WhenAll ждет на неправильных тасках
Получается следущее:
1. Стартует parent task _
2. Она запускает child task, ждет ее завершения (await task) и перехватывает TaskCancelledException
3. Но в словарь добавляется child task
4. Main-поток await'ит на child task, видит что она отменена, выкидывает TaskCancelledException, которое в этом потоке уже никто не перехватывает.

А вот если в список задач на ожидание класть parent таски (которые _), то все заработает.

Т.е. TaskCancelledException для отмененной таски выкидвается каждый раз, когда кто-то делает на ней Wait/await.
Вот для лучшего понимания:
[Test]
public void CancelTaskTest()
{
    var cancelledTask = Task.FromCanceled(new CancellationToken(canceled: true)); // Одна отмененная таска
    var task1 = Task.Run(async () =>
    {
        try
        {
            await cancelledTask;
        }
        catch (TaskCanceledException e)
        {
            Console.WriteLine(e); // Здесь вылетит exception
        }
    });
    var task2 = Task.Run(async () =>
    {
        try
        {
            await cancelledTask;
        }
        catch (TaskCanceledException e)
        {
            Console.WriteLine(e); // И здесь тоже вылетит
        }

    });

    Task.WhenAll(task2, task1).Wait();  // А тут не вылетит, т.к. мы все перехватили в task1, task2
}
Re[12]: пару вопросов
От: Sharov Россия  
Дата: 16.03.20 13:33
Оценка:
Здравствуйте, RushDevion, Вы писали:

S>>Упд: Кажется я понял в чем дело... Я добавил в catch Tasks.TryRemove(taskIdCopy, out _), но это все равно не помогло, т.е. whenall бросал tsckcancekexp. И тут дошло, что у нас race condition, т.е. у нас whenall может увидеть состояние задачи раньше, чем это сделает выделенный блок try\catch. Я прав?


RD>Да, я уже писал, что race condition возможен.

RD>Но главная проблема не в нем, а в том, что WhenAll ждет на неправильных тасках
RD>Получается следущее:
RD>1. Стартует parent task _
RD>2. Она запускает child task, ждет ее завершения (await task) и перехватывает TaskCancelledException
RD>3. Но в словарь добавляется child task
RD>4. Main-поток await'ит на child task, видит что она отменена, выкидывает TaskCancelledException, которое в этом потоке уже никто не перехватывает.

Не согласен. Whenall ждет правильных тасков. Судя по всему, именно в rc дело: у нас происходит отмена и кидает Delay, такска закончена, состояние Canceled, исключение не поймано.
Далее, между try/catch {await task } влезает whenall и у него вылетает соотв. исключение. Т.е. изначально не предполагалось родительский поток _ добавлять в словарь, но если добавлять
родительский, то по идее все будет действительно корректно. Но вот хотелось понять почему так работает в конкретном случае. Дело в rc.
Кодом людям нужно помогать!
Re[12]: пару вопросов
От: Sharov Россия  
Дата: 17.03.20 11:16
Оценка:
Здравствуйте, RushDevion, Вы писали:

S>>Упд: Кажется я понял в чем дело... Я добавил в catch Tasks.TryRemove(taskIdCopy, out _), но это все равно не помогло, т.е. whenall бросал tsckcancekexp. И тут дошло, что у нас race condition, т.е. у нас whenall может увидеть состояние задачи раньше, чем это сделает выделенный блок try\catch. Я прав?


RD>Да, я уже писал, что race condition возможен.

RD>Но главная проблема не в нем, а в том, что WhenAll ждет на неправильных тасках
RD>Получается следущее:
RD>1. Стартует parent task _
RD>2. Она запускает child task, ждет ее завершения (await task) и перехватывает TaskCancelledException
RD>3. Но в словарь добавляется child task
RD>4. Main-поток await'ит на child task, видит что она отменена, выкидывает TaskCancelledException, которое в этом потоке уже никто не перехватывает.

Похоже, выше я неправ. Нету никакого rc, т.е. он есть, но не в этом суть. Даже если я словил отмену
  try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        // Не спасает...
                        Console.WriteLine("TaskCanceledException2");
                        //Tasks.TryRemove(taskIdCopy, out _);
                    }


whenall все равно бросит TaskCanceledExp, т.к. все таски находятся в состоянии Canceled. Вопрос, по чему так? Я же в порождающем потоке (_ = Task.Run(...)) уже отловил отмену (я специально морозил поток с whenall и ждал обработки искл. для отмененных потоков) и await whenall все равно бросал TaskCanceledException. Почему, какой в этом смысл, разве отмененные потоки и обработанная отмена этих потоков не делают задачу completed?

Упд: Т.е. если мы у отмененной задачи отловили исключение TaskCanceledException, то это стату этой задачи все равно будет Canceled? И поэтому ожидание, в том числе whеnall, у такой задачи всегда бросает (ну это пример теста выше)?
Кодом людям нужно помогать!
Отредактировано 17.03.2020 11:39 Sharov . Предыдущая версия .
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.