Дождаться пока все завершат работу...
От: Shmj Ниоткуда  
Дата: 12.03.20 12:27
Оценка:
По событию создается долгоживущая задача:

Task.Run(()=> ...


Таких задач может одновременно быть много. Нужно: завершить все задачи и дождаться пока они все заврешат работу (внутри задачи могут быть не асинхронные вызовы, которые отменить с помощью CancellationToken нельзя).

Вот тестовый код:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            // Вот тут нужно отменить все и дождаться (вернуть Task) когда все задачи будут завершены
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                var task = Task.Run(async () =>
                {
                    Console.WriteLine("Begin " + taskIdCopy);
                    Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                    Console.WriteLine("End " + taskIdCopy);

                    await Task.Delay(4000, cancellationToken); // Задержка
                }, cancellationToken);
            }
        }
    }
}


Решение:

  Скрытый текст
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                await Task.WhenAll(Tasks.Values);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                var task = Task.Run(async () =>
                {
                    Console.WriteLine("Begin " + taskIdCopy);
                    Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                    Console.WriteLine("End " + taskIdCopy);

                    await Task.Delay(4000, cancellationToken); // Задержка

                    Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                }, cancellationToken);

                Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание
            }
        }
    }
}


Смущает что контроль хода программы организован через Exception. Как лучше?
Re: Дождаться пока все завершат работу...
От: Sharov Россия  
Дата: 12.03.20 12:34
Оценка: 1 (1)
Здравствуйте, Shmj, Вы писали:

Не уверен, что прав, а не проще ли taskcancelexp ловить в каждой таске самостоятельно? А потом по заверщении всех тасок проверять была ли отмена?
Кодом людям нужно помогать!
Re[2]: Дождаться пока все завершат работу...
От: Shmj Ниоткуда  
Дата: 12.03.20 12:51
Оценка:
Здравствуйте, Sharov, Вы писали:

S>Не уверен, что прав, а не проще ли taskcancelexp ловить в каждой таске самостоятельно? А потом по заверщении всех тасок проверять была ли отмена?


Как бы так:

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                await Task.WhenAll(Tasks.Values);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        try
                        {
                            await Task.Delay(4000, cancellationToken); // Задержка
                        }
                        catch (TaskCanceledException)
                        {
                            Console.WriteLine("!-1");
                        }

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        Console.WriteLine("!-2");
                    }
                });
            }
        }
    }
}


И каждая задача в итоге будет просто Completed — исключение отловим только внутри. Но опять таки — добавится больше кода и управление ходом программы будет основано на иключениях.
Re[3]: Дождаться пока все завершат работу...
От: Sharov Россия  
Дата: 12.03.20 12:58
Оценка:
Здравствуйте, Shmj, Вы писали:

S>И каждая задача в итоге будет просто Completed — исключение отловим только внутри. Но опять таки — добавится больше кода и управление ходом программы будет основано на иключениях.


А если не ловить ислкючения в тасках?
Кодом людям нужно помогать!
Re[4]: Дождаться пока все завершат работу...
От: Shmj Ниоткуда  
Дата: 12.03.20 13:04
Оценка:
Здравствуйте, Sharov, Вы писали:

S>А если не ловить ислкючения в тасках?


Тогда исключение TaskCanceledException возникает в точке await Task.WhenAll а задачи имеют статус Canceled.
Re[5]: Дождаться пока все завершат работу...
От: Sharov Россия  
Дата: 12.03.20 13:18
Оценка:
Здравствуйте, Shmj, Вы писали:


S>Тогда исключение TaskCanceledException возникает в точке await Task.WhenAll а задачи имеют статус Canceled.


Я не проверял, но мне казалось что WhenAll отработает нормально, а дальше можно пройтись по таскам и проверить их статус.
Тогда я
Кодом людям нужно помогать!
Re[6]: Дождаться пока все завершат работу...
От: Shmj Ниоткуда  
Дата: 12.03.20 13:57
Оценка:
Здравствуйте, Sharov, Вы писали:

S>Я не проверял, но мне казалось что WhenAll отработает нормально, а дальше можно пройтись по таскам и проверить их статус.

S>Тогда я

Это если только верхний Task отловить, вот так:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                var tasks = Tasks.Values.ToArray();

                await Task.WhenAll(tasks);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }

            foreach (var tasksValue in Tasks.Values)
            {
                Console.WriteLine(tasksValue.Status);
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 15 задач
            for (int taskId = 1; taskId <= 15; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        await Task.Delay(4000, cancellationToken);

                        Tasks.TryRemove(taskIdCopy,
                            out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        Console.WriteLine(taskIdCopy + " canceled");
                    }
                });
            }
        }
    }
}
Re: Дождаться пока все завершат работу...
От: RushDevion Россия  
Дата: 12.03.20 14:23
Оценка: 8 (1) +1
Чета вы мудрите
var t = Task.WhenAll(allMyTasks);
cts.Cancel(); // Отменяем
t.Wait(); // Стоим ждем, пока все таски завершатся
if (t.IsFaulted)   { ... } // По крайней мере одна из тасков крашнулась, анализируем ошибку если нужно
if (t.IsCancelled) { ... } // Никто не крашнулся, но по крайней мере одна таска была отменена, анализируем эту ситуацию, если нужно
// Если мы тут, все завершилось благополучно
Отредактировано 12.03.2020 15:22 RushDevion . Предыдущая версия .
Re[2]: Дождаться пока все завершат работу...
От: Shmj Ниоткуда  
Дата: 12.03.20 17:25
Оценка:
Здравствуйте, RushDevion, Вы писали:

RD>Чета вы мудрите


Проблема в том, что нужно на каждом асинхронном методе перехватывать TaskCanceledException. Может я что ни так делаю:

Ну ок, перехватили — а дальше что?

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                var tasks = Tasks.Values.ToArray();

                var t = Task.WhenAll(tasks);

                await t;

                Console.WriteLine("IsCanceled=" + t.IsCanceled);
                Console.WriteLine("IsFaulted=" + t.IsFaulted);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }

            foreach (var tasksValue in Tasks.Values)
            {
                Console.WriteLine(tasksValue.Status);
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 15 задач
            for (int taskId = 1; taskId <= 15; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        try
                        {
                            await Task.Delay(4000, cancellationToken);
                        }
                        catch (TaskCanceledException)
                        {
                            throw; // Вот тут возникает и уходит в самый верх!
                        }
                        
                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        Console.WriteLine(taskIdCopy + " canceled");
                    }
                });
            }
        }
    }
}
Re[3]: Дождаться пока все завершат работу...
От: RushDevion Россия  
Дата: 12.03.20 18:29
Оценка:
S>Проблема в том, что нужно на каждом асинхронном методе перехватывать TaskCanceledException.
Начальная постановка задачи у вас какая? Запустить отмену и дождаться, когда все закончится. Так?
Вот и не нужно ничего перехватывать.
Ну вывалится из дочерней таски TaskCancelledException наружу, ну зафейлит вашу таску.
Task.WhenAll().Wait() все равно дождется общего окончания.
Дальше нужно просто пробежаться по списку тасков и потрогать Exception у всех Faulted, чтобы UnobservedTaskException не выстрелило.

Более того TaskCancelledException — это наследник OperationCancelledException.
Значит, если первичная ваша таска (1) создавалась с тем же cancellation token, что вы передаете в await Task.Delay(),
то вылетевший в последнем TaskCancelledException (даже не будучи перехваченным) будет трактоваться (1) как отмена и она перейдет не в Faulted, а в Cancelled state.
Отредактировано 12.03.2020 18:57 RushDevion . Предыдущая версия . Еще …
Отредактировано 12.03.2020 18:49 RushDevion . Предыдущая версия .
Re[4]: пару вопросов
От: Sharov Россия  
Дата: 13.03.20 12:59
Оценка:
Здравствуйте, RushDevion, Вы писали:

RD>Начальная постановка задачи у вас какая? Запустить отмену и дождаться, когда все закончится. Так?

RD>Вот и не нужно ничего перехватывать.
RD>Ну вывалится из дочерней таски TaskCancelledException наружу, ну зафейлит вашу таску.
RD>Task.WhenAll().Wait() все равно дождется общего окончания.
RD>Дальше нужно просто пробежаться по списку тасков и потрогать Exception у всех Faulted, чтобы UnobservedTaskException не выстрелило.

Судя по WhenAllPromise, он обозревает все исключения, поэтому не выстрелит. Вообще странно, зачем бросать исключения, а не дать возможность
получателю отдельно проверить состояние каждого таска?

И еще вопрос:

почему при ловле TaskCanceledException
  _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        await Task.Delay(4000, cancellationToken);

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

         
                    try
                    {
                        await task;
                    }
                    catch(TaskCanceledException e)
                    {
                     
                    }

                });


оно все равно простреливает?
Кодом людям нужно помогать!
Re[5]: пару вопросов
От: RushDevion Россия  
Дата: 13.03.20 13:58
Оценка: 4 (1)
S>Судя по WhenAllPromise, он обозревает все исключения, поэтому не выстрелит. Вообще странно, зачем бросать исключения, а не дать возможность
S>получателю отдельно проверить состояние каждого таска?

Потому что WhenAll возвращает Task.
А поведение Task таково, что возникшее в ней исключение перевыбрасывается при task.Wait().
И это консистентно и ожидаемо. Было бы странным, если для таски из WhenAll это поведение изменили.

Что касается возможности анализа ошибок в индивидуальных тасках никто ее у вас не отнимал — перехватываем/игнорим исключение и вперед.

var task = Task.WhenAll(myTasks)
  .ContinueWith(t => t.Exception?.Handle(_ => true), TaskContinuationOptions.NotOnRanToCompletion);
task.Wait(); // Все, можно не беспокоиться об exceptions
var faultedTasks = myTasks.Where(t=>t.IsFaulted); // Анализируем ошибки



S>оно все равно простреливает?

Что и куда простреливает?
Re[6]: пару вопросов
От: Sharov Россия  
Дата: 13.03.20 16:12
Оценка:
Здравствуйте, RushDevion, Вы писали:

S>>Судя по WhenAllPromise, он обозревает все исключения, поэтому не выстрелит. Вообще странно, зачем бросать исключения, а не дать возможность

S>>получателю отдельно проверить состояние каждого таска?

RD>
RD>var task = Task.WhenAll(myTasks)
RD>  .ContinueWith(t => t.Exception?.Handle(_ => true), TaskContinuationOptions.NotOnRanToCompletion);
RD>task.Wait(); // Все, можно не беспокоиться об exceptions
RD>var faultedTasks = myTasks.Where(t=>t.IsFaulted); // Анализируем ошибки
RD>


Понял, а есть какая-то разница в поведение между await task и task.Wait() в данном примере. Я понимаю, что task.Wait() блокирующий и т.д.
Просто любопытно.

S>>оно все равно простреливает?

RD>Что и куда простреливает?

Исключение TaskCanceledExp.
Кодом людям нужно помогать!
Re[7]: пару вопросов
От: RushDevion Россия  
Дата: 14.03.20 10:40
Оценка: 4 (1)
S>Понял, а есть какая-то разница в поведение между await task и task.Wait() в данном примере. Я понимаю, что task.Wait() блокирующий и т.д.
S>Просто любопытно.

-Async/await компилируется в класс, реализующий машину состояний, а task.Wait — просто в вызов метода
-Если возникнет исключение, то с task.Wait() оно вылетит как AggregateException, а await перевыбросит оригинальное

S>>>оно все равно простреливает?

RD>>Что и куда простреливает?
S>Исключение TaskCanceledExp.
В какой момент?
Если напишешь воспроизводимый тест и ткнешь пальцем в непонятное, я, наверное, смогу объяснить, почему оно так работает.
Re[8]: пару вопросов
От: Shmj Ниоткуда  
Дата: 15.03.20 09:16
Оценка:
Здравствуйте, RushDevion, Вы писали:

RD>В какой момент?

RD>Если напишешь воспроизводимый тест и ткнешь пальцем в непонятное, я, наверное, смогу объяснить, почему оно так работает.

Вот же:

  Скрытый текст
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                await Task.WhenAll(Tasks.Values);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        await Task.Delay(4000, cancellationToken); // Задержка

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        // Не спасает...
                    }
                });
            }
        }
    }
}


Хотя, в принципе, просто путает вложенность. Срабатывает ожидание для задачи из словаря раньше, чем то ожидание, которое в try/catch. Если переписать так, то ок:

  Скрытый текст
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken);

            Thread.Sleep(1000);

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                await Task.WhenAll(Tasks.Values);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy);

                        await Task.Delay(4000, cancellationToken); // Задержка

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                    }, cancellationToken);

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                    }

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание
                });
            }
        }
    }
}
Отредактировано 15.03.2020 9:22 Shmj . Предыдущая версия .
Re[9]: пару вопросов
От: RushDevion Россия  
Дата: 15.03.20 12:05
Оценка:
Откоментил по тексту
  Скрытый текст
namespace ConsoleApp45
{
    class Program
    {
        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource();
        private static readonly ConcurrentDictionary<int, Task> Tasks = new ConcurrentDictionary<int, Task>();

        static async Task Main(string[] args)
        {
            var cancellationToken = TokenSource.Token;

            AddTasksPeriodically(cancellationToken); 
            // В этой точке у тебя есть 10 запущенных задач (точнее 20, считая parent таски)
            // Каждая из них завершится не раньше, чем Thread.Sleep(2000) + await Task.Delay(4000) = 6 секунд

            // Через секунду - ты все отменяшь
            Thread.Sleep(1000);
    
            // Ни одна из задач к этому моменту еще не будет завершена, т.е. отменятся все
            // А если хоть одна из задач отменена, Task.WhenAll ожидаемо выкинет TaskCancelledException

            // Перенос же Tasks.TryAdd(taskIdCopy, task) в конец parent-таски приводит к тому, что
            // Tasks.Values к моменту отмены будет пустым (Count=0), т.к. запись в него произойдет лишь спустя 6 секунд.
            // Соответственно, никакого ожидания (а, значит, и TaskCancelledException) не будет

            await StopAllTasksAndWait();
            Console.WriteLine("StopAllTasksAndWait");

            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }

        static async Task StopAllTasksAndWait()
        {
            Console.WriteLine("Cancel");
            TokenSource.Cancel();

            try
            {
                Console.WriteLine($"Stopping {Tasks.Values.Count} tasks"); // !!! А сколько собственно задач мы отменяем?
                await Task.WhenAll(Tasks.Values);
            }
            catch (TaskCanceledException taskCanceledException)
            {
                Console.WriteLine("TaskCanceledException");
            }
            catch (AggregateException aggregateException)
            {
                Console.WriteLine("AggregateException (cnt=" + aggregateException.InnerExceptions.Count + ")");
            }
        }

        static void AddTasksPeriodically(CancellationToken cancellationToken)
        {
            // Одновременно стартуем 10 задач
            for (int taskId = 1; taskId <= 10; taskId++)
            {
                int taskIdCopy = taskId;

                if (cancellationToken.IsCancellationRequested)
                    break;

                _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy); // !!! Это не End taskIdCopy

                        await Task.Delay(4000, cancellationToken); // Задержка

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)
                     
                        // !!! Вот здесь End taskIdCopy
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        // Не спасает...
                    }
                });
            }
        }
    }
}
Re[10]: пару вопросов
От: Sharov Россия  
Дата: 16.03.20 12:16
Оценка:
Здравствуйте, RushDevion, Вы писали:

RD>Откоментил по тексту


Поясните, пожалуйста, почему имея сл. код:
  _ = Task.Run(async () =>
                {
                    var task = Task.Run(async () =>
                    {
                        Console.WriteLine("Begin " + taskIdCopy);
                        Thread.Sleep(2000); // Вызываем синхронный метод без возможности отмены
                        Console.WriteLine("End " + taskIdCopy); // !!! Это не End taskIdCopy

                        await Task.Delay(4000, cancellationToken); // Задержка

                        Tasks.TryRemove(taskIdCopy, out _); // Удаляем из списка (уже исполнена, нет смысла ждать завершения)

                        // !!! Вот здесь End taskIdCopy
                    }, cancellationToken);

                    Tasks.TryAdd(taskIdCopy, task); // Добавляем в список на ожидание

                    try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        // Не спасает...
                        Console.WriteLine("TaskCanceledException2");
                    }
                });


я обозреваю все TaskCanceledException. Но WhenAll все равно бросает TaskCanceledException? Смысл выделенного фрагмента кода?

Упд: Кажется я понял в чем дело... Я добавил в catch Tasks.TryRemove(taskIdCopy, out _), но это все равно не помогло, т.е. whenall бросал tsckcancekexp. И тут дошло, что у нас race condition, т.е. у нас whenall может увидеть состояние задачи раньше, чем это сделает выделенный блок try\catch. Я прав?
Кодом людям нужно помогать!
Re[11]: пару вопросов
От: RushDevion Россия  
Дата: 16.03.20 12:56
Оценка: 8 (1)
S>Упд: Кажется я понял в чем дело... Я добавил в catch Tasks.TryRemove(taskIdCopy, out _), но это все равно не помогло, т.е. whenall бросал tsckcancekexp. И тут дошло, что у нас race condition, т.е. у нас whenall может увидеть состояние задачи раньше, чем это сделает выделенный блок try\catch. Я прав?

Да, я уже писал, что race condition возможен.
Но главная проблема не в нем, а в том, что WhenAll ждет на неправильных тасках
Получается следущее:
1. Стартует parent task _
2. Она запускает child task, ждет ее завершения (await task) и перехватывает TaskCancelledException
3. Но в словарь добавляется child task
4. Main-поток await'ит на child task, видит что она отменена, выкидывает TaskCancelledException, которое в этом потоке уже никто не перехватывает.

А вот если в список задач на ожидание класть parent таски (которые _), то все заработает.

Т.е. TaskCancelledException для отмененной таски выкидвается каждый раз, когда кто-то делает на ней Wait/await.
Вот для лучшего понимания:
[Test]
public void CancelTaskTest()
{
    var cancelledTask = Task.FromCanceled(new CancellationToken(canceled: true)); // Одна отмененная таска
    var task1 = Task.Run(async () =>
    {
        try
        {
            await cancelledTask;
        }
        catch (TaskCanceledException e)
        {
            Console.WriteLine(e); // Здесь вылетит exception
        }
    });
    var task2 = Task.Run(async () =>
    {
        try
        {
            await cancelledTask;
        }
        catch (TaskCanceledException e)
        {
            Console.WriteLine(e); // И здесь тоже вылетит
        }

    });

    Task.WhenAll(task2, task1).Wait();  // А тут не вылетит, т.к. мы все перехватили в task1, task2
}
Re[12]: пару вопросов
От: Sharov Россия  
Дата: 16.03.20 13:33
Оценка:
Здравствуйте, RushDevion, Вы писали:

S>>Упд: Кажется я понял в чем дело... Я добавил в catch Tasks.TryRemove(taskIdCopy, out _), но это все равно не помогло, т.е. whenall бросал tsckcancekexp. И тут дошло, что у нас race condition, т.е. у нас whenall может увидеть состояние задачи раньше, чем это сделает выделенный блок try\catch. Я прав?


RD>Да, я уже писал, что race condition возможен.

RD>Но главная проблема не в нем, а в том, что WhenAll ждет на неправильных тасках
RD>Получается следущее:
RD>1. Стартует parent task _
RD>2. Она запускает child task, ждет ее завершения (await task) и перехватывает TaskCancelledException
RD>3. Но в словарь добавляется child task
RD>4. Main-поток await'ит на child task, видит что она отменена, выкидывает TaskCancelledException, которое в этом потоке уже никто не перехватывает.

Не согласен. Whenall ждет правильных тасков. Судя по всему, именно в rc дело: у нас происходит отмена и кидает Delay, такска закончена, состояние Canceled, исключение не поймано.
Далее, между try/catch {await task } влезает whenall и у него вылетает соотв. исключение. Т.е. изначально не предполагалось родительский поток _ добавлять в словарь, но если добавлять
родительский, то по идее все будет действительно корректно. Но вот хотелось понять почему так работает в конкретном случае. Дело в rc.
Кодом людям нужно помогать!
Re[12]: пару вопросов
От: Sharov Россия  
Дата: 17.03.20 11:16
Оценка:
Здравствуйте, RushDevion, Вы писали:

S>>Упд: Кажется я понял в чем дело... Я добавил в catch Tasks.TryRemove(taskIdCopy, out _), но это все равно не помогло, т.е. whenall бросал tsckcancekexp. И тут дошло, что у нас race condition, т.е. у нас whenall может увидеть состояние задачи раньше, чем это сделает выделенный блок try\catch. Я прав?


RD>Да, я уже писал, что race condition возможен.

RD>Но главная проблема не в нем, а в том, что WhenAll ждет на неправильных тасках
RD>Получается следущее:
RD>1. Стартует parent task _
RD>2. Она запускает child task, ждет ее завершения (await task) и перехватывает TaskCancelledException
RD>3. Но в словарь добавляется child task
RD>4. Main-поток await'ит на child task, видит что она отменена, выкидывает TaskCancelledException, которое в этом потоке уже никто не перехватывает.

Похоже, выше я неправ. Нету никакого rc, т.е. он есть, но не в этом суть. Даже если я словил отмену
  try
                    {
                        await task;
                    }
                    catch (TaskCanceledException)
                    {
                        // Не спасает...
                        Console.WriteLine("TaskCanceledException2");
                        //Tasks.TryRemove(taskIdCopy, out _);
                    }


whenall все равно бросит TaskCanceledExp, т.к. все таски находятся в состоянии Canceled. Вопрос, по чему так? Я же в порождающем потоке (_ = Task.Run(...)) уже отловил отмену (я специально морозил поток с whenall и ждал обработки искл. для отмененных потоков) и await whenall все равно бросал TaskCanceledException. Почему, какой в этом смысл, разве отмененные потоки и обработанная отмена этих потоков не делают задачу completed?

Упд: Т.е. если мы у отмененной задачи отловили исключение TaskCanceledException, то это стату этой задачи все равно будет Canceled? И поэтому ожидание, в том числе whеnall, у такой задачи всегда бросает (ну это пример теста выше)?
Кодом людям нужно помогать!
Отредактировано 17.03.2020 11:39 Sharov . Предыдущая версия .
Re[13]: пару вопросов
От: RushDevion Россия  
Дата: 19.03.20 08:56
Оценка: 8 (1) +1
S>whenall все равно бросит TaskCanceledExp, т.к. все таски находятся в состоянии Canceled. Вопрос, по чему так? Я же в порождающем потоке (_ = Task.Run(...)) уже отловил отмену (я специально морозил поток с whenall и ждал обработки искл. для отмененных потоков) и await whenall все равно бросал TaskCanceledException. Почему, какой в этом смысл, разве отмененные потоки и обработанная отмена этих потоков не делают задачу completed?

S>Упд: Т.е. если мы у отмененной задачи отловили исключение TaskCanceledException, то это стату этой задачи все равно будет Canceled? И поэтому ожидание, в том числе whеnall, у такой задачи всегда бросает (ну это пример теста выше)?


Во-первых, нет такой вещи как "отмененный поток".
Поток — это просто механизм исполнения таски.
Т.е. планировщик выбирает очередную таску и отдает ее какому-то потоку на исполнение.
Если в ходе исполнения возникает exception, он перехватывается и сохраняется в таске.
При этом последняя переходит в Faulted или Cancelled state в зависимости от типа exception'а.
Далее, пусть кто-то в другом потоке говорит таске Wait/await.
Если таска была в Faulted state, она перевыбросит сохраненный Exception (обернутый в AggregateException т.к. в общем случае таска может
представлять более одной операции) на вызывающем потоке.
Если таска была в Cancelled state, на выбросит TaskCancelledException на вызывающем потоке.
Если вызывающих потоков несколько и каждый делает Wait/await — exception будет выкинут для каждого из них.
Поэтому бесполезно пытаться проглотить exception, перехватывая его на одном потоке, а потом делая таске Wait/await на другом.
Re[14]: пару вопросов
От: Sharov Россия  
Дата: 19.03.20 10:14
Оценка:
Здравствуйте, RushDevion, Вы писали:


S>>Упд: Т.е. если мы у отмененной задачи отловили исключение TaskCanceledException, то это стату этой задачи все равно будет Canceled? И поэтому ожидание, в том числе whеnall, у такой задачи всегда бросает (ну это пример теста выше)?


RD>Во-первых, нет такой вещи как "отмененный поток".


Ну оговорился, отмененная задача, вестимо.

RD>Поток — это просто механизм исполнения таски.


Благодарю, хорошая формулировка . А то на собесах любят спросить про отличия, я и растекался мыслью про более высокоуровневую абстракцию.

RD>Если в ходе исполнения возникает exception, он перехватывается и сохраняется в таске.

RD>При этом последняя переходит в Faulted или Cancelled state в зависимости от типа exception'а.
RD>Далее, пусть кто-то в другом потоке говорит таске Wait/await.
RD>Если таска была в Faulted state, она перевыбросит сохраненный Exception (обернутый в AggregateException т.к. в общем случае таска может
RD>представлять более одной операции) на вызывающем потоке.

RD>Если таска была в Cancelled state, на выбросит TaskCancelledException на вызывающем потоке.

RD>Если вызывающих потоков несколько и каждый делает Wait/await — exception будет выкинут для каждого из них.
RD>Поэтому бесполезно пытаться проглотить exception, перехватывая его на одном потоке, а потом делая таске Wait/await на другом.

Вот этого я не знал, я думал что перехватив соотв. исключение, бросившая задача помечается как Completed, ну т.е. больше бросать не будет.
Кодом людям нужно помогать!
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.