Поругать алгоритм с Task'ами
От: Cynic Россия  
Дата: 12.05.17 23:41
Оценка:
Всем привет. У меня есть два вопроса:
1)
Мне нужен алгоритм с Task'ами который будет выбирать объекты из массива и для каждого объекта в отдельном потоке выполнять некоторые действия. Количество одновременно работающих потоков при этом должно быть ограничено.
Написал такой вариант:
    class TestObject
    {
        public TestObject(string value)
        {
            Value = value;
        }

        public string Value { get; set; }
    }

    class Program
    {
        const int maxTasksCount = 5;
        
        static void Main()
        {
            CancellationTokenSource abortToken = new CancellationTokenSource();

            List<TestObject> testObjects = new List<TestObject>();

            for (int i = 0; i < 20; i++)
            {
                testObjects.Add(new TestObject(i.ToString()));
            }

            var task = Task.Run(() =>
            {
                List<Task> tasks = new List<Task>();

                lock (testObjects)
                {
                    foreach (var obj in testObjects)
                    {
                        if (abortToken.IsCancellationRequested)
                            return;

                        if (tasks.Count >= maxTasksCount)
                        {
                            var index = Task.WaitAny(tasks.ToArray());
                            tasks.RemoveAt(index);
                        }

                        tasks.Add(Task.Run(() => Work(obj, abortToken)));
                    }
                }

                Task.WaitAll(tasks.ToArray());
            });

            task.Wait();

            foreach (var obj in testObjects)
                Console.Write(obj.Value);

            Console.ReadLine();
        }

        static void Work(TestObject obj, CancellationTokenSource abortToken)
        {
            lock (obj)
            {
                Thread.Sleep(1000);

                obj.Value = "!";  // Типа полезная работа

                if (abortToken.IsCancellationRequested)
                    return;
            }
        }
    }

Алгоритм работает, но есть сомнения. Подскажите на сколько правильно я всё сделал и что нужно поправить.
2)
На сколько верно в подобных задачах для приостановки выполнения Task'a использовать ManualResetEvent? Я тут где то вычитал мнение, что Task != Thread и в общем случает один Thread может выполнять несколько Task'ов и один Task может быть размазан по нескольким Thread'ам. На сколько это верно. Под Task'ом я здесь понимаю, элементарный Task без вложенных задач.
:)
Re: Поругать алгоритм с Task'ами
От: Слава  
Дата: 13.05.17 04:50
Оценка:
Здравствуйте, Cynic, Вы писали:

C>Количество одновременно работающих потоков при этом должно быть ограничено.

google://.net limited concurrency scheduler

Задача настолько типовая, что даже в MSDN есть https://msdn.microsoft.com/ru-ru/library/ee789351(v=vs.95).aspx

C>На сколько верно в подобных задачах для приостановки выполнения Task'a использовать ManualResetEvent?


Пусть знатоки ответят, лично я бы постарался как можно дальше держаться от ручных event'ов.

C>Я тут где то вычитал мнение, что Task != Thread и в общем случает один Thread может выполнять несколько Task'ов и один Task может быть размазан по нескольким Thread'ам. На сколько это верно. Под Task'ом я здесь понимаю, элементарный Task без вложенных задач.


Да, это верно. Task не является thread и может исполняться разными тредами.
Re[2]: Поругать алгоритм с Task'ами
От: Cynic Россия  
Дата: 13.05.17 12:40
Оценка:
Здравствуйте, Слава, Вы писали:

C>>Я тут где то вычитал мнение, что Task != Thread и в общем случает один Thread может выполнять несколько Task'ов и один Task может быть размазан по нескольким Thread'ам. На сколько это верно. Под Task'ом я здесь понимаю, элементарный Task без вложенных задач.


С>Да, это верно. Task не является thread и может исполняться разными тредами


Честно говоря я с трудом себе это представляю. Как мне кажется, таск может выполняться несколькими тредами только когда имеет вложенные таски. Не понятно как и кто будет одиночный таск "разрывать на части" для выполнения разными потоками?
:)
Re[3]: Поругать алгоритм с Task'ами
От: Слава  
Дата: 13.05.17 13:04
Оценка: +1
Здравствуйте, Cynic, Вы писали:

C>Честно говоря я с трудом себе это представляю. Как мне кажется, таск может выполняться несколькими тредами только когда имеет вложенные таски. Не понятно как и кто будет одиночный таск "разрывать на части" для выполнения разными потоками?


Таск же может ожидать на каком-то объекте синхронизации. И начать ожидание в одном треде, а быть разбужен — в другом.
Re[3]: Поругать алгоритм с Task'ами
От: Codechanger Россия  
Дата: 13.05.17 15:11
Оценка: +1
Здравствуйте, Cynic, Вы писали:

C>Здравствуйте, Слава, Вы писали:


C>>>Я тут где то вычитал мнение, что Task != Thread и в общем случает один Thread может выполнять несколько Task'ов и один Task может быть размазан по нескольким Thread'ам. На сколько это верно. Под Task'ом я здесь понимаю, элементарный Task без вложенных задач.


С>>Да, это верно. Task не является thread и может исполняться разными тредами


C>Честно говоря я с трудом себе это представляю. Как мне кажется, таск может выполняться несколькими тредами только когда имеет вложенные таски. Не понятно как и кто будет одиночный таск "разрывать на части" для выполнения разными потоками?


Таск не разрывается на части. А для задачи гляньте на SemaphoreSlim, должно помочь, по идее, для ограничения количества одновременных тасков
Re[3]: Поругать алгоритм с Task'ами
От: AndrewVK Россия http://blogs.rsdn.org/avk
Дата: 14.05.17 05:49
Оценка: +2
Здравствуйте, Cynic, Вы писали:

C>Честно говоря я с трудом себе это представляю. Как мне кажется, таск может выполняться несколькими тредами только когда имеет вложенные таски. Не понятно как и кто будет одиночный таск "разрывать на части" для выполнения разными потоками?


Шарповские асинки тем и отличаются от обычного кода, что "разрезаны" по всем местам, где использован await. И дефолтный шедулер не гарантирует, что до и после любого await будет один и тот же поток.
... << RSDN@Home 1.0.0 alpha 5 rev. 0 on Windows 8 6.2.9200.0>>
AVK Blog
Re[4]: Поругать алгоритм с Task'ами
От: Cynic Россия  
Дата: 14.05.17 12:46
Оценка:
Здравствуйте, AndrewVK, Вы писали:

AVK>Шарповские асинки тем и отличаются от обычного кода, что "разрезаны" по всем местам, где использован await. И дефолтный шедулер не гарантирует, что до и после любого await будет один и тот же поток.


Ну т.е. в любой конкретный момент времени в потоке всё равно выполняется только один таск и использование примтивов синхронизации или конструкций типа Thread.SpinWait или Thread.Sleep не возбраняется?
:)
Re[2]: Поругать алгоритм с Task'ами
От: Cynic Россия  
Дата: 14.05.17 14:42
Оценка:
Здравствуйте, Слава, Вы писали:

С>Здравствуйте, Cynic, Вы писали:


C>>Количество одновременно работающих потоков при этом должно быть ограничено.

С>google://.net limited concurrency scheduler

С>Задача настолько типовая, что даже в MSDN есть https://msdn.microsoft.com/ru-ru/library/ee789351(v=vs.95).aspx


Честно говоря создавать custom'ный scheduler для такой задачи это как-то громоздко на мой взгляд. По сути какие преимущества он дает если сравнивать с моим подходом?
:)
Re[5]: Поругать алгоритм с Task'ами
От: AndrewVK Россия http://blogs.rsdn.org/avk
Дата: 14.05.17 19:44
Оценка: +2
Здравствуйте, Cynic, Вы писали:

C>Ну т.е. в любой конкретный момент времени в потоке всё равно выполняется только один таск


Естественно.

C> и использование примтивов синхронизации или конструкций типа Thread.SpinWait или Thread.Sleep не возбраняется?


Только если ты очень хорошо понимаешь что происходит. Потому что неверным применением легко поставить шедулер на колени — многозадачность то у тасков отчасти кооперативная.
... << RSDN@Home 1.0.0 alpha 5 rev. 0 on Windows 8 6.2.9200.0>>
AVK Blog
Re: Поругать алгоритм с Task'ами
От: Cynic Россия  
Дата: 15.05.17 15:37
Оценка:
Здравствуйте, Cynic, Вы писали:

Нашел ещё несколько подходов к решению описанной задачи средствами .Net:
1) Использовать Parallel.ForEach которому передать ParallelOptions с установленным свойством MaxDegreeOfParallelism:
        private Task ParallelForEachVersion<T>(IEnumerable<T> collection, Action<T> action, int maxDegreeOfParallelism, CancellationToken cancelToken)
        {
            var options = new ParallelOptions();
            options.MaxDegreeOfParallelism = maxDegreeOfParallelism;
            options.CancellationToken = cancelToken;

            // Создаем load-balancing partitioner без него будет использоваться статическое распределение, что приведет к
            // не рациональному потреблению ресурсов если продолжительность выполнения задач может существенно отличаться
            var partitioner = Partitioner.Create(collection.ToArray(), true);

            return Task.Run(() =>
            {
                try
                {
                    Parallel.ForEach(partitioner, options, action);
                }
                catch(OperationCanceledException exc)
                {
                    if (!cancelToken.IsCancellationRequested)
                    {
                        ...
                    }
                }
            });
       }

Минуса три:

2) Использовать ActionBlock из System.Threading.Tasks.Dataflow которому передать ExecutionDataflowBlockOptions с установленным свойством MaxDegreeOfParallelism:
        private Task DataFlowVersion<T>(IEnumerable<T> collection, Action<ActionData<T>> action, int maxDegreeOfParallelism, CancellationToken cancelToken)
        {
            var dataFlowOptions = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = maxDegreeOfParallelism,
                CancellationToken = cancelToken
            };

            var actionBlock = new ActionBlock<ActionData<T>>(action, dataFlowOptions);

            return Task.Run(async () =>
            {
                try
                {
                    foreach (var item in collection)
                    {
                        actionBlock.Post(new ActionData<T>(item, cancelToken));
                    }

                    // без Complete() await ни чего не дождётся, т.к. actionBlock будет ожидать, что ему могут дать еще сообщение (в терминах DataFlow)
                    actionBlock.Complete();
                    await actionBlock.Completion;
                }
                catch (TaskCanceledException exc)
                {
                    if (!cancelToken.IsCancellationRequested)
                    {
                        ...
                    }
                }
            });
        }

    class ActionData<T>
    {
        public ActionData(T data, CancellationToken cancelToken)
        {
            Data = data;
            CancelToken = cancelToken;
        }
    
        public T Data { get; set; }
        public CancellationToken CancelToken { get; set; }
    }

Минуса два:

Во всех случаях производительность была примерно одинакова, так что я пришел к выводу, что можно использовать алгоритм предложенный в первом посте ни чем не хуже.
:)
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.