Сори может несколько сумбурно изложу проблему но все же
Есть некий глобальный воркер (.net core) который слушает очередь и на каждый новое сообщение в очереди должен делать следующее:
1) Проверить что сообщение (некий int TenantID приходящий в сообщении) еще не обрабатывается ни каким внутренним воркером
сейчас это просто
IDictionary<int,Task> allTenantsWorkers;
2) Если этот идентификатор (TenantID) еще не был обработан то добавить его в allTenantsWorkers и запустить некий обработчик (общий для всех) —
var tenantWorker = new TenantWoker(TenantId);
await tenantWorker.Run();
3) Когда TenantWorker::Run отработал то это нужно как то отследить и удалить тот TenantID который обработался из списка allTenantsWorkers но я не могу понять как это отследить
Как корректно отследить то что поток уже завершился и нужно сделать некий пост процессинг???
Dictionary<int> allTenantsWorkers=...;
while(!cancelled)
{
// новые сообщения могут появляться время от времени- они попадают в очередь через внешний, по отношению к этому, сервисvar tenantId = _rabbitMQ.GetNextFromQueue();
if( tenantid==null) continue;
if( allTenantsWorkers.ContainsKey(tenantid.value)) continue; // избежать запуска дубликата тенант воркераvar tenantWorker = WorkerFactory.CreateWorkerFor(tenantId);
// запустить воркер асинхронно и как либо дождаться его var workerTask = Task.Run(async()=> await tenantWorker.Run()) // может выполняться часа 2-3 или более
// код ниже подходит для того что бы корректно сделать пост процессинг и удалить воркер из глобального списка??? или я туплю крепко
.ContunueWith(async (parentThread)=>
{
await parentThread; // дождаться завершения родителя ???
allTenantWorkers.Remove(tenantid); // удалить из списка выполняющихся воркеров когда он завершит работу
},TaskContinuationOptions.RunContinuationsAsynchronously);
allTenantsWorkers.Add(tenantId,workerTask); // добавить воркера и тенант который он обрабатывает в список для исключения
}
// Если по каким-то причинам workerTask у Вас отработает очень быстро и будет почти сразу Completed,
// то ContinueWith может быть вызван также почти сразу.
// Операция в нем - очистка словаря.
// Т.к. мы можем прийти к ней почти сразу, то получается, что удаление из словаря может быть выполнено до добавления.
// В случае реализации это сценария tenantId останется в словаре навечно.var workerTask = Task.Run(async()=> await tenantWorker.Run()) // может выполняться часа 2-3 или более
// код ниже подходит для того что бы корректно сделать пост процессинг и удалить воркер из глобального списка??? или я туплю крепко
.ContunueWith(async (parentThread)=>
{
await parentThread; // дождаться завершения родителя ???
allTenantWorkers.Remove(tenantid); // удалить из списка выполняющихся воркеров когда он завершит работу
},TaskContinuationOptions.RunContinuationsAsynchronously);
allTenantsWorkers.Add(tenantId,workerTask); // добавить воркера и тенант который он обрабатывает в список для исключения
Здравствуйте, RushDevion, Вы писали:
RD>Если я правильно понял, то тебе нужно что-то такое:
Да, спасибо. Очень похоже на концепт того что мне нужно.
Троттлинг у меня есть это просто несущественная деталь которую я опустил в своем вопросе что бы не засорять код ненужными подробностями.
Огромное спасибо еще раз!
Здравствуйте, StatujaLeha, Вы писали:
SL>Здравствуйте, Jericho113, Вы писали:
SL>Этот код не взлетит: SL>
SL> // Вы передаете в Task.Run async-функцию, она завершится на первом await, tenantWorker запустится, но ожидания не будет.
SL> // Т.е. workerTask у Вас будет сразу Completed, никаких 2-3 часов не пройдет.
SL> // Дальше сразу вызовется ContinueWith, Ваш await в нем смысла не имеет: Вы ждете на уже завершившейся таске.
SL>
Спасибо за анализ кода..
А можно подробнее (со ссылкаками на статьи/доки если можно) почему
такой код
Завершится сразу на await и перейдет в ContinueWith ?
Я не совсем понимаю почему это произойдет
Ведь await по идее должен ждать завершения метода Run и только потом завершить поток выполнения и перейти на ContinueWith
У меня большой пробел в таких ньюансах работы многопоточности в .NET поэтому буду очень благодарен за пояснения со ссылками на релевантные статьи/ блоги /видео..