Async worker with cleanup on worker exit
От: Jericho113 Украина  
Дата: 28.02.20 17:54
Оценка:
Всем привет,

Сори может несколько сумбурно изложу проблему но все же

Есть некий глобальный воркер (.net core) который слушает очередь и на каждый новое сообщение в очереди должен делать следующее:
1) Проверить что сообщение (некий int TenantID приходящий в сообщении) еще не обрабатывается ни каким внутренним воркером
сейчас это просто
IDictionary<int,Task> allTenantsWorkers;


2) Если этот идентификатор (TenantID) еще не был обработан то добавить его в allTenantsWorkers и запустить некий обработчик (общий для всех) —
 var tenantWorker = new TenantWoker(TenantId);
 await tenantWorker.Run();

3) Когда TenantWorker::Run отработал то это нужно как то отследить и удалить тот TenantID который обработался из списка allTenantsWorkers но я не могу понять как это отследить

Как корректно отследить то что поток уже завершился и нужно сделать некий пост процессинг???



Dictionary<int> allTenantsWorkers=...;


while(!cancelled)
{
   // новые сообщения могут появляться время от времени- они попадают в очередь через внешний, по отношению к этому, сервис
   var tenantId = _rabbitMQ.GetNextFromQueue(); 
   if( tenantid==null) continue;
   if( allTenantsWorkers.ContainsKey(tenantid.value)) continue; // избежать запуска дубликата тенант воркера

   var tenantWorker = WorkerFactory.CreateWorkerFor(tenantId);    

   // запустить воркер асинхронно и как либо дождаться его 
   var workerTask = Task.Run(async()=> await tenantWorker.Run()) // может выполняться часа 2-3 или более
// код ниже подходит для того что бы корректно сделать пост процессинг и удалить воркер из глобального списка??? или я туплю крепко 
            .ContunueWith(async (parentThread)=>
                {
                                  await parentThread;  // дождаться завершения родителя ???
                                  allTenantWorkers.Remove(tenantid);  // удалить из списка выполняющихся воркеров когда он завершит работу
                                 },TaskContinuationOptions.RunContinuationsAsynchronously);

   allTenantsWorkers.Add(tenantId,workerTask);   // добавить воркера и тенант который он обрабатывает в список для исключения 
}


Сори за некую сумбурность изложения
NetDigitally yours ....
Re: Async worker with cleanup on worker exit
От: StatujaLeha на правах ИМХО
Дата: 28.02.20 21:50
Оценка:
Здравствуйте, Jericho113, Вы писали:

Этот код не взлетит:
   // Если по каким-то причинам workerTask у Вас отработает очень быстро и будет почти сразу Completed,
   // то ContinueWith может быть вызван также почти сразу.
   // Операция в нем - очистка словаря. 
   // Т.к. мы можем прийти к ней почти сразу, то получается, что удаление из словаря может быть выполнено до добавления. 
   // В случае реализации это сценария tenantId останется в словаре навечно.
   var workerTask = Task.Run(async()=> await tenantWorker.Run()) // может выполняться часа 2-3 или более
// код ниже подходит для того что бы корректно сделать пост процессинг и удалить воркер из глобального списка??? или я туплю крепко 
            .ContunueWith(async (parentThread)=>
                {
                                  await parentThread;  // дождаться завершения родителя ???
                                  allTenantWorkers.Remove(tenantid);  // удалить из списка выполняющихся воркеров когда он завершит работу
                                 },TaskContinuationOptions.RunContinuationsAsynchronously);

   allTenantsWorkers.Add(tenantId,workerTask);   // добавить воркера и тенант который он обрабатывает в список для исключения
Отредактировано 29.02.2020 21:31 StatujaLeha . Предыдущая версия .
Re: Async worker with cleanup on worker exit
От: RushDevion Россия  
Дата: 29.02.20 12:11
Оценка: 6 (1)
Если я правильно понял, то тебе нужно что-то такое:
public interface IQueueListener
{
    int? GetNextMessage();
}

public class Multiplexor
{
    private readonly IQueueListener m_Queue;
    private readonly Action<int, CancellationToken> m_Worker;
    private readonly HashSet<int> m_TenantsBeingProcessed; // Тенанты, обрабатываемые в данный момент

    public Multiplexor(IQueueListener queue, Action<int, CancellationToken> worker)
    {
        m_Queue = queue;
        m_Worker = worker;
    }

    public void Start(CancellationToken ct = default(CancellationToken))
    {
        while (!ct.IsCancellationRequested)
        {
            int? tenant = m_Queue.GetNextMessage();
            if (!tenant.HasValue)
            {
                // Тротлимся, если нет сообщений, чтобы не перегружать RMQ пустыми запросами (хотя, тут конечно все от имплементации QueueListener зависит)
                Task.Delay(TimeSpan.FromSeconds(30), ct).Wait(ct);
            };

            lock (m_TenantsBeingProcessed)
            {
                if (m_TenantsBeingProcessed.Contains(tenant.Value)) continue; // Тенант уже обрабатывается - игнорируем
                m_TenantsBeingProcessed.Add(tenant.Value);
            }

            Task.Factory.StartNew(() =>
            {
                try
                {
                    m_Worker(tenant.Value, ct);
                }
                finally
                {
                    //  Обработка закончена - уберем тенант из списка обрабатываемых
                    lock (m_TenantsBeingProcessed)
                    {
                        m_TenantsBeingProcessed.Remove(tenant.Value);
                    }
                }
            }, ct);
        }
    }

}
Re[2]: Async worker with cleanup on worker exit
От: Jericho113 Украина  
Дата: 29.02.20 14:10
Оценка:
Здравствуйте, RushDevion, Вы писали:

RD>Если я правильно понял, то тебе нужно что-то такое:

Да, спасибо. Очень похоже на концепт того что мне нужно.
Троттлинг у меня есть это просто несущественная деталь которую я опустил в своем вопросе что бы не засорять код ненужными подробностями.
Огромное спасибо еще раз!
NetDigitally yours ....
Отредактировано 29.02.2020 17:40 Jericho113 . Предыдущая версия .
Re[2]: Async worker with cleanup on worker exit
От: Jericho113 Украина  
Дата: 29.02.20 14:18
Оценка:
Здравствуйте, StatujaLeha, Вы писали:

SL>Здравствуйте, Jericho113, Вы писали:


SL>Этот код не взлетит:

SL>
SL>   // Вы передаете в Task.Run async-функцию, она завершится на первом await, tenantWorker запустится, но ожидания не будет.
SL>   // Т.е. workerTask у Вас будет  сразу Completed, никаких 2-3 часов не пройдет.
SL>   // Дальше сразу вызовется ContinueWith, Ваш await в нем смысла не имеет: Вы ждете на уже завершившейся таске. 
SL>


Спасибо за анализ кода..
А можно подробнее (со ссылкаками на статьи/доки если можно) почему
такой код

Task.Run(async ()=> {await myType.Run();}).ContinueWith(...);

Завершится сразу на await и перейдет в ContinueWith ?
Я не совсем понимаю почему это произойдет
Ведь await по идее должен ждать завершения метода Run и только потом завершить поток выполнения и перейти на ContinueWith

У меня большой пробел в таких ньюансах работы многопоточности в .NET поэтому буду очень благодарен за пояснения со ссылками на релевантные статьи/ блоги /видео..

Заранее благодарен.
NetDigitally yours ....
Re[3]: Async worker with cleanup on worker exit
От: StatujaLeha на правах ИМХО
Дата: 29.02.20 21:50
Оценка: 4 (1)
Здравствуйте, Jericho113, Вы писали:

Извиняюсь, в исходном сообщении был не во всем прав.
Поправил анализ кода.

В принципе, ниже Вам уже написали, как может все это выглядеть.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.