Здравствуйте,
Тут вот какая проблема. Есть консольное приложение для обработки данных.
Часть важных данных, порядок следования которых критичен, обрабатывается в отдельном потоке
через LimitedConcurrencyLevelTaskScheduler (ссылка https://msdn.microsoft.com/en-us/library/ee789351%28v=vs.100%29.aspx)
private static readonly TaskScheduler key1ts =
new LimitedConcurrencyLevelTaskScheduler(1);
private static void NewData(string data)
{
Data d = new Data();
d.line = data;
if (data.StartsWith("key1"))
{
Task.Factory.StartNew(
GetKey1Handler,
d,
CancellationToken.None,
TaskCreationOptions.None,
key1ts);
return;
}
}
С другой стороны есть несколько алгоритмов обработки этих данных (экземпляров класса Algo)
Управление в эти алгоритмы передается так
private static void GetKey1Handler(object o)
{
Data d = (Data)o;
tD.par1 = d.line;
tD.par2 = d.line;
int ii = 0;
for (int i = 0; i < 10; i++)
{
ii++;
foreach (Algo a in aList)
{
a.GetHandler();
}
sw.WriteLine(ii);
sw.Flush();
}
}
Очень странно, но при некоторых неблагоприятных условиях (долгая обработка в a.GetHandler) в файл sw попадют не все значения ii.
Я это понимаю так, что новая порция данных сбивает обработку, а сама куда-то исчезает.
Все мои попытки воспроизвести ситуацию на упрощенном примере ни к чему не приводят: в файле sw последовательное перечисление индекса.
Но боевой вариант точно где-то сбоит: в логе обрыв на полуслове обработки в отдельном хэндлере, а в файле не все значения ii.
Есть какие-то мысли почему так происходит?
A>Есть какие-то мысли почему так происходит?
Что-то пошло не так, большего по вопросу не поймёшь.
Начнём с самого простого:
1. зачем вам вообще таски, если вы ограничиваете конкурентность до одного потока?
2. Как вы проверяете, что таски действительно выполняются в одном потоке?
3. Как вы гарантируете, что с sw работает только один поток?
4. В каком окружении выполняется код? отдельный сервис, iis, ещё что-то?
5. Вызывает ли ваш код Thread.Abort()?
Здравствуйте, Sinix, Вы писали: S>1. зачем вам вообще таски, если вы ограничиваете конкурентность до одного потока?
На входе разная информация, разные ключи, думал для быстродействия разделить обработку, тем более, что она впрямую не связана S>2. Как вы проверяете, что таски действительно выполняются в одном потоке?
Такой проверки нету, верю микрософту и его примеру S>3. Как вы гарантируете, что с sw работает только один поток?
Я (далеко не самый знающий человек) понимаю так: в данный обработчик попадают только данные с ключом key1, причем уровень параллельности 1, т.е. новый пакет данных ждет, пока не закончится обработка старого. Значит, я думаю, что с файлом работает только один поток. S>4. В каком окружении выполняется код? отдельный сервис, iis, ещё что-то?
Самый сложный вопрос. Это десктоповское приложение... S>5. Вызывает ли ваш код Thread.Abort()?
Нет, впрямую такую конструкцию не использую
Здравствуйте, abb269, Вы писали:
A>На входе разная информация, разные ключи, думал для быстродействия разделить обработку, тем более, что она впрямую не связана
А, дошло. В теории должно работать, если нет косяков в самом шедулере. При беглом просмотре их не наблюдается, остаётся только случай "ваш код бросает исключение".
Как вариант, можно пока заменить шедулер на что-то типа такого. Если проблема исчезнет — будет понятно, куда копать дальше. Если не исчезнет — тем более понятно.
Ну и отладочный логгинг исключений ч/з AppDomain.FirstChanceException event сделать, если ещё не.
A>Значит, я думаю, что с файлом работает только один поток.
Ну а другие участки кода к sw доступа точно не имеют? Если имеют — проверить, что они не пытаются записать что-то параллельно.
Для контроля в начале GetKey1Handler:
var check = Interlocked.Increment(ref concurrencyCount); // static int field
в конце, в finally,
Interlocked.Decrement(ref concurrencyCount);
если check > 1 — у вас проблемы c шедулингом.
S>>4. В каком окружении выполняется код? отдельный сервис, iis, ещё что-то? A>Самый сложный вопрос. Это десктоповское приложение...
Угу, тогда половина потенциальных проблем отпадает.
Sinix, Большое спасибо, буду пробовать.
И еще вопрос. Хочу попробовать для увеличения быстродействия конструкции просто распараллелить обработку в алгоритмах. Можно ли просто в лоб вместо просто foreach
Parallel.ForEach(aList, a => { a.GetHandler(); });
Здравствуйте, abb269, Вы писали:
A>И еще вопрос. Хочу попробовать для увеличения быстродействия конструкции просто распараллелить обработку в алгоритмах. Можно ли просто в лоб вместо просто foreach
A>Parallel.ForEach(aList, a => { a.GetHandler(); });
Зависит от того, что делают GetHandler'ы. Если они не модифицируют разделяемое состояние, то можно так параллелить.