Всем привет. У меня есть два вопроса:
1)
Мне нужен алгоритм с Task'ами который будет выбирать объекты из массива и для каждого объекта в отдельном потоке выполнять некоторые действия. Количество одновременно работающих потоков при этом должно быть ограничено.
Написал такой вариант:
class TestObject
{
public TestObject(string value)
{
Value = value;
}
public string Value { get; set; }
}
class Program
{
const int maxTasksCount = 5;
static void Main()
{
CancellationTokenSource abortToken = new CancellationTokenSource();
List<TestObject> testObjects = new List<TestObject>();
for (int i = 0; i < 20; i++)
{
testObjects.Add(new TestObject(i.ToString()));
}
var task = Task.Run(() =>
{
List<Task> tasks = new List<Task>();
lock (testObjects)
{
foreach (var obj in testObjects)
{
if (abortToken.IsCancellationRequested)
return;
if (tasks.Count >= maxTasksCount)
{
var index = Task.WaitAny(tasks.ToArray());
tasks.RemoveAt(index);
}
tasks.Add(Task.Run(() => Work(obj, abortToken)));
}
}
Task.WaitAll(tasks.ToArray());
});
task.Wait();
foreach (var obj in testObjects)
Console.Write(obj.Value);
Console.ReadLine();
}
static void Work(TestObject obj, CancellationTokenSource abortToken)
{
lock (obj)
{
Thread.Sleep(1000);
obj.Value = "!"; // Типа полезная работаif (abortToken.IsCancellationRequested)
return;
}
}
}
Алгоритм работает, но есть сомнения. Подскажите на сколько правильно я всё сделал и что нужно поправить.
2)
На сколько верно в подобных задачах для приостановки выполнения Task'a использовать ManualResetEvent? Я тут где то вычитал мнение, что Task != Thread и в общем случает один Thread может выполнять несколько Task'ов и один Task может быть размазан по нескольким Thread'ам. На сколько это верно. Под Task'ом я здесь понимаю, элементарный Task без вложенных задач.
Здравствуйте, Cynic, Вы писали:
C>Количество одновременно работающих потоков при этом должно быть ограничено. google://.net limited concurrency scheduler
Пусть знатоки ответят, лично я бы постарался как можно дальше держаться от ручных event'ов.
C>Я тут где то вычитал мнение, что Task != Thread и в общем случает один Thread может выполнять несколько Task'ов и один Task может быть размазан по нескольким Thread'ам. На сколько это верно. Под Task'ом я здесь понимаю, элементарный Task без вложенных задач.
Да, это верно. Task не является thread и может исполняться разными тредами.
Здравствуйте, Слава, Вы писали:
C>>Я тут где то вычитал мнение, что Task != Thread и в общем случает один Thread может выполнять несколько Task'ов и один Task может быть размазан по нескольким Thread'ам. На сколько это верно. Под Task'ом я здесь понимаю, элементарный Task без вложенных задач.
С>Да, это верно. Task не является thread и может исполняться разными тредами
Честно говоря я с трудом себе это представляю. Как мне кажется, таск может выполняться несколькими тредами только когда имеет вложенные таски. Не понятно как и кто будет одиночный таск "разрывать на части" для выполнения разными потоками?
Здравствуйте, Cynic, Вы писали:
C>Честно говоря я с трудом себе это представляю. Как мне кажется, таск может выполняться несколькими тредами только когда имеет вложенные таски. Не понятно как и кто будет одиночный таск "разрывать на части" для выполнения разными потоками?
Таск же может ожидать на каком-то объекте синхронизации. И начать ожидание в одном треде, а быть разбужен — в другом.
Здравствуйте, Cynic, Вы писали:
C>Здравствуйте, Слава, Вы писали:
C>>>Я тут где то вычитал мнение, что Task != Thread и в общем случает один Thread может выполнять несколько Task'ов и один Task может быть размазан по нескольким Thread'ам. На сколько это верно. Под Task'ом я здесь понимаю, элементарный Task без вложенных задач.
С>>Да, это верно. Task не является thread и может исполняться разными тредами
C>Честно говоря я с трудом себе это представляю. Как мне кажется, таск может выполняться несколькими тредами только когда имеет вложенные таски. Не понятно как и кто будет одиночный таск "разрывать на части" для выполнения разными потоками?
Таск не разрывается на части. А для задачи гляньте на SemaphoreSlim, должно помочь, по идее, для ограничения количества одновременных тасков
Здравствуйте, Cynic, Вы писали:
C>Честно говоря я с трудом себе это представляю. Как мне кажется, таск может выполняться несколькими тредами только когда имеет вложенные таски. Не понятно как и кто будет одиночный таск "разрывать на части" для выполнения разными потоками?
Шарповские асинки тем и отличаются от обычного кода, что "разрезаны" по всем местам, где использован await. И дефолтный шедулер не гарантирует, что до и после любого await будет один и тот же поток.
... << RSDN@Home 1.0.0 alpha 5 rev. 0 on Windows 8 6.2.9200.0>>
Здравствуйте, AndrewVK, Вы писали:
AVK>Шарповские асинки тем и отличаются от обычного кода, что "разрезаны" по всем местам, где использован await. И дефолтный шедулер не гарантирует, что до и после любого await будет один и тот же поток.
Ну т.е. в любой конкретный момент времени в потоке всё равно выполняется только один таск и использование примтивов синхронизации или конструкций типа Thread.SpinWait или Thread.Sleep не возбраняется?
Здравствуйте, Слава, Вы писали:
С>Здравствуйте, Cynic, Вы писали:
C>>Количество одновременно работающих потоков при этом должно быть ограничено. С>google://.net limited concurrency scheduler
С>Задача настолько типовая, что даже в MSDN есть https://msdn.microsoft.com/ru-ru/library/ee789351(v=vs.95).aspx
Честно говоря создавать custom'ный scheduler для такой задачи это как-то громоздко на мой взгляд. По сути какие преимущества он дает если сравнивать с моим подходом?
Здравствуйте, Cynic, Вы писали:
C>Ну т.е. в любой конкретный момент времени в потоке всё равно выполняется только один таск
Естественно.
C> и использование примтивов синхронизации или конструкций типа Thread.SpinWait или Thread.Sleep не возбраняется?
Только если ты очень хорошо понимаешь что происходит. Потому что неверным применением легко поставить шедулер на колени — многозадачность то у тасков отчасти кооперативная.
... << RSDN@Home 1.0.0 alpha 5 rev. 0 on Windows 8 6.2.9200.0>>
Нашел ещё несколько подходов к решению описанной задачи средствами .Net:
1) Использовать Parallel.ForEach которому передать ParallelOptions с установленным свойством MaxDegreeOfParallelism:
private Task ParallelForEachVersion<T>(IEnumerable<T> collection, Action<T> action, int maxDegreeOfParallelism, CancellationToken cancelToken)
{
var options = new ParallelOptions();
options.MaxDegreeOfParallelism = maxDegreeOfParallelism;
options.CancellationToken = cancelToken;
// Создаем load-balancing partitioner без него будет использоваться статическое распределение, что приведет к
// не рациональному потреблению ресурсов если продолжительность выполнения задач может существенно отличатьсяvar partitioner = Partitioner.Create(collection.ToArray(), true);
return Task.Run(() =>
{
try
{
Parallel.ForEach(partitioner, options, action);
}
catch(OperationCanceledException exc)
{
if (!cancelToken.IsCancellationRequested)
{
...
}
}
});
}
Минуса три:
Умеет прерывать работу только между вложенными задачами, т.к. action имеет тип Action<T>, а collection IEnumerable<T> (можно конечно сделать специальный тип <Т> и передать через него, но это извращение);
При прерывании генерирует исключение OperationCanceledException, приходится его обрабатывать;
Имеет не значительно более низкую производительность среди испробованных подходов. Не понял почему;
2) Использовать ActionBlock из System.Threading.Tasks.Dataflow которому передать ExecutionDataflowBlockOptions с установленным свойством MaxDegreeOfParallelism:
private Task DataFlowVersion<T>(IEnumerable<T> collection, Action<ActionData<T>> action, int maxDegreeOfParallelism, CancellationToken cancelToken)
{
var dataFlowOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
CancellationToken = cancelToken
};
var actionBlock = new ActionBlock<ActionData<T>>(action, dataFlowOptions);
return Task.Run(async () =>
{
try
{
foreach (var item in collection)
{
actionBlock.Post(new ActionData<T>(item, cancelToken));
}
// без Complete() await ни чего не дождётся, т.к. actionBlock будет ожидать, что ему могут дать еще сообщение (в терминах DataFlow)
actionBlock.Complete();
await actionBlock.Completion;
}
catch (TaskCanceledException exc)
{
if (!cancelToken.IsCancellationRequested)
{
...
}
}
});
}
class ActionData<T>
{
public ActionData(T data, CancellationToken cancelToken)
{
Data = data;
CancelToken = cancelToken;
}
public T Data { get; set; }
public CancellationToken CancelToken { get; set; }
}
Минуса два:
Нативно умеет прерывать работу только между вложенными задачами. Чтобы прерывать работу во вложенной задаче пришлось городить тип ActionData<T> через который передавать CancellationToken внутрь Action<T>;
При прерывании генерирует исключение TaskCanceledException, приходится его обрабатывать;
Во всех случаях производительность была примерно одинакова, так что я пришел к выводу, что можно использовать алгоритм предложенный в первом посте ни чем не хуже.