Re: ConcurrentQueue и синхронизация
От: RushDevion Россия  
Дата: 18.02.20 10:13
Оценка: 3 (1) +1
Описано, конечно сумбурно.
Но похоже на классический producer/consumer.
Это проще на BlockingCollection сделать.
Типа такого:
class Processor<T>    
{    
    private BlockingCollection<Job> jobs = new BlockingCollection<Job>
    
    private class Job {
        public readonly T Data; 
        public readonly Action Callback;
        public Job(T data, Action callback = null) { ... }
    }
        
    public void AddJob(T data) {
        jobs.Add(new Job(data));
    }

    public void AddJob(T data, Action onCompleted) {
        jobs.Add(new Job(data, onCompleted));
    }

    public void Start(CancellationToken ct = default) {
        foreach(var job in jobs.GetConsumingEnumerable(ct)) {
            SaveToDb(item.Data);
            if (item.Callback !== null) {
                Task.Run(() => item.Callback());
            }
        }
    }
    
    public void Stop() {
        jobs.CompleteAdding();
    }
    
    private void SaveToDb() { ... }
}


// Обычный добавлятель
processor.AddJob(new Data());

// Добавлятель, которому надо подождать
var data = new Data(); 
processor.AddJob(data, () => {
    // Data saved, keep processing
});


Или можно на TaskCompletionSource переписать, т.е. будет что-то типа Task AddJobAndWait(T data),
под которым лежит TaskCompletionSource и processor после сохранения будет делать tcs.SetResult(true).
Тогда добавлятель сможет ждать или await'ить на таске.
Re: ConcurrentQueue и синхронизация
От: karbofos42 Россия  
Дата: 18.02.20 10:39
Оценка: 3 (1)
Здравствуйте, vvv848165@ya.ru, Вы писали:

VYR>ConcurrentQueue вещь хорошая но как правильно прикрутить ManualResetEvent или чтото подобное примеров нет и я немогу сообразить...


VYR>ситуация такая много потоков добавляют данные в ConcurrentQueue

VYR>один читает из него и записывает в базу

VYR>не могу добавить:

VYR>один из добавляющих потоков получает особые данные и надо подождать покудо читающий поток добавит в его в базу...

VYR>как такое можно сделать???

VYR>1) возвращатся к мутексам
VYR>2) или заводить ManualResetEvent для каждого добавляющего потока
VYR>или ещё чтото?

Из готового можно взять: TPL.ActionBlock
https://docs.microsoft.com/ru-ru/dotnet/api/system.threading.tasks.dataflow.actionblock-1?view=netcore-3.1
ConcurrentQueue и синхронизация
От: vvv848165@ya.ru  
Дата: 18.02.20 07:28
Оценка:
ConcurrentQueue вещь хорошая но как правильно прикрутить ManualResetEvent или чтото подобное примеров нет и я немогу сообразить...

ситуация такая много потоков добавляют данные в ConcurrentQueue
один читает из него и записывает в базу

не могу добавить:
один из добавляющих потоков получает особые данные и надо подождать покудо читающий поток добавит в его в базу...

как такое можно сделать???
1) возвращатся к мутексам
2) или заводить ManualResetEvent для каждого добавляющего потока
или ещё чтото?
Re: ConcurrentQueue и синхронизация
От: vensub Украина  
Дата: 18.02.20 18:39
Оценка:
System.Threading.Channels
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.