Синхронизация очереди
От: DartVader  
Дата: 03.05.07 13:36
Оценка:
Здравствуйте,

Возникла следующая проблема.
Есть очередь из которой в отдельном потоке постоянно извлекаются элементы и над ними производится довольно длительная операция. Если очередь пустая,то поток долже ждать новых элементов. Также нужно синхронизовать доступ к очереди таким образом, чтобы на время длительной операции над отдельным элементом очередь не блокировалась для добавления новых.

Почитав MSDN написал следующй тестовый код:

    
    class Program
    {
        static void Main(string[] args)
        {
            workerThread = new Thread(WorkerThreadProc);
            workerThread.IsBackground = true;

            workerThread.Start();

            Thread.Sleep(100);     // sleep for some time to make worker thread waiting
            Datafill(20);          // datafill bulk of data
            Thread.Sleep(1000);    // sleep for some time to allow worker to process some data
            
            Console.WriteLine("Woke up");
            for (int k = 0; k < 5; ++k)
            {
                Datafill(1);  
            }

            Console.ReadLine();
        }

        static void Datafill(int count)
        {
            lock (workerLock)
            {
                for (int i = 0; i < count; ++i)
                {
                    queue.Enqueue(counter++);
                }
                Console.WriteLine(" ** {0} value(s) added **", count);
                Monitor.Pulse(workerLock);
            }
        }

        static void WorkerThreadProc()
        {
            for ( ; ; )
            {
                Monitor.Enter(workerLock);
                if (queue.Count > 0)
                {
                    int val = queue.Dequeue();
                    int cnt = queue.Count;
                    Monitor.Exit(workerLock);

                    // Some long-time operation...
                    Thread.Sleep(100);
                    Console.WriteLine("Done: {0}   {1} left", val, cnt);
                }
                else
                {
                    Console.WriteLine("Waiting");
                    Monitor.Wait(workerLock);
                    Monitor.Exit(workerLock);
                }
            }
        }

        private static int counter;
        private static Thread workerThread;
        private static Queue<int> queue = new Queue<int>();
        private static object workerLock = new object();
    }


Вроде работает. Правда немного смущает несколько корявая последовательность Monitor.Wait, Monitor.Exit в WorkerThreadProc.
Кроме того, поскольку в WorkerThreadProc не используется lock конутркция придется там хендлить исключения и руками релизить workerLock.

Я думаю эта задача довольно распространенная (это я — полный нуб). Как правильно делать такую синхронизацию?
Re: Синхронизация очереди
От: _Morpheus_  
Дата: 03.05.07 13:45
Оценка:
Здравствуйте, DartVader, Вы писали:

DV>Здравствуйте,


DV>Возникла следующая проблема.

DV>Есть очередь из которой в отдельном потоке постоянно извлекаются элементы и над ними производится довольно длительная операция. Если очередь пустая,то поток долже ждать новых элементов. Также нужно синхронизовать доступ к очереди таким образом, чтобы на время длительной операции над отдельным элементом очередь не блокировалась для добавления новых.

...
DV>Вроде работает. Правда немного смущает несколько корявая последовательность Monitor.Wait, Monitor.Exit в WorkerThreadProc.
DV>Кроме того, поскольку в WorkerThreadProc не используется lock конутркция придется там хендлить исключения и руками релизить workerLock.

DV>Я думаю эта задача довольно распространенная (это я — полный нуб). Как правильно делать такую синхронизацию?



    private Queue _queue = new Queue();

    public void PostMessage(IMessage msg)    // вызывается из разных потоков, помещает сообщение в очередь
    {
        lock(_queue.SyncRoot)
            _queue.Enqueue(msg);
    }

    private void messageProcessProc()        // поток обработки очереди, выбирает сообщения и обрабатывает их
    {
        while(true)
        {
            IMessage msg = null;
            lock(_queue.SyncRoot)
                if(_queue.Count>0)
                    msg = _queue.Dequeue() as IMessage;

            if(msg==null)
                Thread.Sleep(10);    // спать пока нет сообщений
            else
                processMessage(msg);
        }
    }
    
    private void processMessage(IMessage msg)
    {
        // ...
    }
... << RSDN@Home 1.2.0 alpha rev. 676>>
Re: Синхронизация очереди
От: Lloyd Россия  
Дата: 03.05.07 13:47
Оценка: +1 -1
Здравствуйте, DartVader, Вы писали:

DV>Я думаю эта задача довольно распространенная (это я — полный нуб). Как правильно делать такую синхронизацию?


для синхронизации доступа к колекции используй lock.
для спячки/просыпания — event.

по-моему так.
... << RSDN@Home 1.1.4 stable SR1 rev. 568>>
Re[2]: Синхронизация очереди
От: DartVader  
Дата: 03.05.07 14:10
Оценка:
Здравствуйте, _Morpheus_, Вы писали:

DV>>Я думаю эта задача довольно распространенная (это я — полный нуб). Как правильно делать такую синхронизацию?



_M_>
_M_>    private Queue _queue = new Queue();

_M_>    public void PostMessage(IMessage msg)    // вызывается из разных потоков, помещает сообщение в очередь
_M_>    {
_M_>        lock(_queue.SyncRoot)
_M_>            _queue.Enqueue(msg);
_M_>    }

_M_>    private void messageProcessProc()        // поток обработки очереди, выбирает сообщения и обрабатывает их
_M_>    {
_M_>        while(true)
_M_>        {
_M_>            IMessage msg = null;
_M_>            lock(_queue.SyncRoot)
_M_>                if(_queue.Count>0)
_M_>                    msg = _queue.Dequeue() as IMessage;

_M_>            if(msg==null)
_M_>                Thread.Sleep(10);    // спать пока нет сообщений
_M_>            else
_M_>                processMessage(msg);
_M_>        }
_M_>    }
    
_M_>    private void processMessage(IMessage msg)
_M_>    {
_M_>        // ...
_M_>    }
_M_>



Спасибо,
А насколько идеологически правильно использовать периодический Sleep вместо сообщения (Pulse, ...)?
Да, на всякий случай уточню, сама очередь запрятана внутри моего класса и добавления могут быть только посредством моего метода.
Re[2]: Синхронизация очереди
От: DartVader  
Дата: 03.05.07 14:14
Оценка:
Здравствуйте, Lloyd, Вы писали:

L>для синхронизации доступа к колекции используй lock.

L>для спячки/просыпания — event.

Это понятно Я так и делаю
Просто реализация получается не такой красивой как хотелось бы
Re: Синхронизация очереди
От: _FRED_ Черногория
Дата: 03.05.07 14:16
Оценка:
Здравствуйте, DartVader, Вы писали:

DV>Есть очередь из которой в отдельном потоке постоянно извлекаются элементы и над ними производится довольно длительная операция. Если очередь пустая,то поток долже ждать новых элементов. Также нужно синхронизовать доступ к очереди таким образом, чтобы на время длительной операции над отдельным элементом очередь не блокировалась для добавления новых.


ReaderWriterLock Class
Help will always be given at Hogwarts to those who ask for it.
Re[3]: Синхронизация очереди
От: _Morpheus_  
Дата: 03.05.07 14:29
Оценка: 2 (1) +1 -1
Здравствуйте, DartVader, Вы писали:


DV>Спасибо,

для спасибо есть специальная кнопочка

DV>А насколько идеологически правильно использовать периодический Sleep?


это зависит от требований — времени реакции на помещение объекта в очередь.

Если Время не критично (т.е. нет ограничений вроде "реакция на помещение сообщения в очередь не более десяти миллисекунд"), то лучше использовать sleep. При используя Sleep экономятся системные хэндлы, т.к. создание event'а сопряжено с выделением системного хэндла.

Если есть требования к минимальному времени реакции, то вместо Sleep можно использовать event:
    private Queue _queue = new Queue();
        private AutoResetEvent _event = new AutoResetEvent(false);

    public void PostMessage(IMessage msg)    // вызывается из разных потоков, помещает сообщение в очередь
    {
        lock(_queue.SyncRoot)
                    _queue.Enqueue(msg);
                _event.Set();
                
    }

    private void messageProcessProc()        // поток обработки очереди, выбирает сообщения и обрабатывает их
    {
        while(true)
        {
                        IMessage msg = null;
            lock(_queue.SyncRoot)
                if(_queue.Count>0)
                    msg = _queue.Dequeue() as IMessage;

            if(msg==null)
                            _event.WaitOne();
                        else
              processMessage(msg);
        }
    }
    
    private void processMessage(IMessage msg)
    {
        // ...
    }
... << RSDN@Home 1.2.0 alpha rev. 676>>
Re[4]: Синхронизация очереди
От: DartVader  
Дата: 03.05.07 14:47
Оценка:
Здравствуйте, _Morpheus_, Вы писали:

_M_>это зависит от требований — времени реакции на помещение объекта в очередь.

У меня как раз такая ситуация — объекты в очередь должны помещаться как можно быстрее, а пустой она бывает редко.
То есть, делать лучше всего через Sleep — просто и видимо в данном случае наиболее эффективно.
Re: Синхронизация очереди
От: desco США http://v2matveev.blogspot.com
Дата: 03.05.07 14:53
Оценка:
Здравствуйте, DartVader, Вы писали:

можно сделать как-то так: (оригинальный вариант встретил, по-моему, в блоге Джо Даффи)
    abstract class Producer<T>
    {
        private readonly object _queueSyncRoot = new object();
        private readonly Queue<T> _queue = new Queue<T>();

        private volatile bool _finished;
        private Thread _thread;

        public void Start()
        {
            _finished = false;
            _thread = new Thread(Produce);
            _thread.Start();
        }

        public void Stop()
        {
            _finished = true;
            lock(_queueSyncRoot)
            {
                Monitor.PulseAll(_queueSyncRoot);
            }
        }

        public IEnumerable<T> Buffer
        {
            get
            {
                while(!_finished)
                {
                    T item;
                    lock(_queueSyncRoot)
                    {
                        if (_queue.Count == 0)
                        {
                            Monitor.Wait(_queueSyncRoot);
                        }
                        item = _queue.Dequeue();
                    }
                    yield return item;
                }
            }
        }

        protected abstract T Create();

        private void Produce()
        {
            while (!_finished)
            {
                T item = Create();
                lock(_queueSyncRoot)
                {
                    _queue.Enqueue(item);
                    Monitor.Pulse(_queueSyncRoot);
                }
            }
        }
    }

    abstract class Consumer<T>
    {
        private readonly Producer<T> _producer;
        private volatile bool _finished;
        private Thread _thread;

        protected Consumer(Producer<T> _producer)
        {
            this._producer = _producer;
        }

        public void Start()
        {
            _finished = false;
            _thread = new Thread(Consume);
            _thread.Start();
        }

        public void Stop()
        {
            _finished = true;
        }

        protected abstract void Consume(T item);

        private void Consume()
        {
            foreach (T obj in _producer.Buffer)
            {
                Consume(obj);
                if (_finished)
                {
                    break;
                }
            }
        }
    }

   class NumberProducer : Producer<int>
    {
        private int _i;
        protected override int Create()
        {
            return ++_i;
        }
    }

    class NumberConsumer : Consumer<int>
    {
        public NumberConsumer(Producer<int> producer) 
            : base(producer)
        {
        }

        protected override void Consume(int item)
        {
            Console.WriteLine(item);
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            NumberProducer p = new NumberProducer();
            NumberConsumer c1 = new NumberConsumer(p);
            NumberConsumer c2 = new NumberConsumer(p);
            NumberConsumer c3 = new NumberConsumer(p);

            p.Start();
            c1.Start();c2.Start();c3.Start();

            Console.ReadKey();

            c1.Stop();c2.Stop();c3.Stop();
            p.Stop();
        }
    }
Re[5]: Синхронизация очереди
От: romangr Россия  
Дата: 03.05.07 16:02
Оценка:
Здравствуйте, DartVader, Вы писали:

DV>Здравствуйте, _Morpheus_, Вы писали:


_M_>>это зависит от требований — времени реакции на помещение объекта в очередь.

DV>У меня как раз такая ситуация — объекты в очередь должны помещаться как можно быстрее, а пустой она бывает редко.
DV>То есть, делать лучше всего через Sleep — просто и видимо в данном случае наиболее эффективно.

Быстрее всего будет работать через Monitor.Enter (lock) — Monitor.Wait — Monitor.Pulse/PulseAll.
примерно как здесь
Автор: desco
Дата: 03.05.07
desco написал.
Можно еще погуглить по словам producer-consumer
... << RSDN@Home 1.2.0 alpha rev. 670>>
Re[2]: Синхронизация очереди
От: DartVader  
Дата: 03.05.07 16:30
Оценка:
Здравствуйте, desco, Вы писали:

>можно сделать как-то так: (оригинальный вариант встретил, по-моему, в блоге Джо Даффи)


Да, очевидно, это правильный вариант той идеи, что использую я.

Мой плохой кусок:
Monitor.Enter(workerLock);
if (queue.Count > 0)
{
    int val = queue.Dequeue();
    Monitor.Exit(workerLock);

    // Some long-time operation...
    Thread.Sleep(100);
    Console.WriteLine("Done: {0}   {1} left", val, cnt);
}
else
{
    Monitor.Wait(workerLock);
    Monitor.Exit(workerLock);
}



Правильная запись:
     lock(_queueSyncRoot) 
     {
         if (_queue.Count == 0)
         {
             Monitor.Wait(_queueSyncRoot);
         }
         item = _queue.Dequeue();
    }
    //Some long-time operation
    Thread.Sleep(100);
    Console.WriteLine("Done: {0}   {1} left", val, cnt);


Все оказалось просто — достаточно поменять местами проверки Но я знал, что как-то так оно и должно быть.
Re[3]: Синхронизация очереди
От: Andrbig  
Дата: 04.05.07 05:28
Оценка: -1
Здравствуйте, DartVader, Вы писали:

DV>Здравствуйте, _Morpheus_, Вы писали:


DV>>>Я думаю эта задача довольно распространенная (это я — полный нуб). Как правильно делать такую синхронизацию?


Да, распространенная, и .net имеет специальный механизм под эти задачи — Monitor. Ты правильно начал — с использования lock, осталось только довести дело до конца — выкинуть sleep и правильно организовать Pulse. Почитай книгу страницы где-то в районе 50.

_M_>>
_M_>>    private Queue _queue = new Queue();

_M_>>    public void PostMessage(IMessage msg)    // вызывается из разных потоков, помещает сообщение в очередь
_M_>>    {
_M_>>        lock(_queue.SyncRoot)
_M_>>            _queue.Enqueue(msg);
_M_>>    }

_M_>>    private void messageProcessProc()        // поток обработки очереди, выбирает сообщения и обрабатывает их
_M_>>    {
_M_>>        while(true)
_M_>>        {
_M_>>            IMessage msg = null;
_M_>>            lock(_queue.SyncRoot)
_M_>>                if(_queue.Count>0)
_M_>>                    msg = _queue.Dequeue() as IMessage;

_M_>>            if(msg==null)
_M_>>                Thread.Sleep(10);    // спать пока нет сообщений
_M_>>            else
_M_>>                processMessage(msg);
_M_>>        }
_M_>>    }
    
_M_>>    private void processMessage(IMessage msg)
_M_>>    {
_M_>>        // ...
_M_>>    }
_M_>>



DV>Спасибо,

DV>А насколько идеологически правильно использовать периодический Sleep вместо сообщения (Pulse, ...)?

Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) — из-за использования неуправляемых ресурсов.
Re[4]: Синхронизация очереди
От: Lloyd Россия  
Дата: 04.05.07 07:32
Оценка: +1
Здравствуйте, Andrbig, Вы писали:

DV>>А насколько идеологически правильно использовать периодический Sleep вместо сообщения (Pulse, ...)?


A>Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) — из-за использования неуправляемых ресурсов.


А поподробнее?
Re[4]: Синхронизация очереди
От: stump http://stump-workshop.blogspot.com/
Дата: 04.05.07 09:38
Оценка: +1
Здравствуйте, Andrbig, Вы писали:

A>Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) — из-за использования неуправляемых ресурсов.

Monitor не использует неуправляемых ресурсов? Вот так новость!
Использовать вызовы методов Monitor вместо конструкции lock() — глупое пижонство, тем более в такой простой ситуации, ИМХО. Дополнительная возможность наделать ошибок в коде.
Блокировать надо только операции выборки значения из очереди и вставки в очередь, т.е. скобки {} у lock() должны быть как можно короче. И нечего париться с Pulse Wait Enter Exit...
А вот использование WaitHandle вместо Sleep действительно снижает и нагрузку на систему, и латентность, поскольку позволяет избежать в обработчике очереди холостых циклов.
Понедельник начинается в субботу
Re[5]: Синхронизация очереди
От: _Morpheus_  
Дата: 04.05.07 12:39
Оценка:
Здравствуйте, stump, Вы писали:

A>>Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) — из-за использования неуправляемых ресурсов.

S>Monitor не использует неуправляемых ресурсов? Вот так новость!
S>Использовать вызовы методов Monitor вместо конструкции lock() — глупое пижонство, тем более в такой простой ситуации, ИМХО. Дополнительная возможность наделать ошибок в коде.
S>Блокировать надо только операции выборки значения из очереди и вставки в очередь, т.е. скобки {} у lock() должны быть как можно короче. И нечего париться с Pulse Wait Enter Exit...

lock это Monitor и есть

Конструкция:
    lock(_syncObject)
        blabla();


на самом деле скомпилируется в конструкцию:
    Monitor.Enter(_syncObject)
    try
    {
        blabla();
    }
    finally
    {
        Monitor.Exit(_syncObject);
    }


можешь проверить — из обоих примеров получится один и тот-же IL код

S>А вот использование WaitHandle вместо Sleep действительно снижает и нагрузку на систему, и латентность, поскольку позволяет избежать в обработчике очереди холостых циклов.


Не совсем так, использование WaitHandle подразумевает выделение у системы хэндла это раз, будет расходоваться больше процессорного времени на обработку event.Set() и event.WaitOne() это два. Причем затраты процессорного времени на обработку event будут заметно выше чем с холостым циклом раз в 10 мс
... << RSDN@Home 1.2.0 alpha rev. 676>>
Re[6]: Синхронизация очереди
От: stump http://stump-workshop.blogspot.com/
Дата: 04.05.07 12:44
Оценка:
Здравствуйте, _Morpheus_, Вы писали:

_M_>Здравствуйте, stump, Вы писали:


S>>Использовать вызовы методов Monitor вместо конструкции lock() — глупое пижонство, тем более в такой простой ситуации, ИМХО. Дополнительная возможность наделать ошибок в коде.

S>>Блокировать надо только операции выборки значения из очереди и вставки в очередь, т.е. скобки {} у lock() должны быть как можно короче. И нечего париться с Pulse Wait Enter Exit...

_M_>lock это Monitor и есть


Я об этом и хотел сказать. Может выразился не совсем понятно.
_
_M_>Не совсем так, использование WaitHandle подразумевает выделение у системы хэндла это раз, будет расходоваться больше процессорного времени на обработку event.Set() и event.WaitOne() это два. Причем затраты процессорного времени на обработку event будут заметно выше чем с холостым циклом раз в 10 мс
Все зависит от соотношения количества холостых и рабочих циклов
К тому-же латентность в 10 мс может быть великовата для некоторых задач.
Понедельник начинается в субботу
Re[7]: Синхронизация очереди
От: _Morpheus_  
Дата: 04.05.07 13:17
Оценка:
Здравствуйте, stump, Вы писали:


S>Я об этом и хотел сказать. Может выразился не совсем понятно.

S>_
_M_>>Не совсем так, использование WaitHandle подразумевает выделение у системы хэндла это раз, будет расходоваться больше процессорного времени на обработку event.Set() и event.WaitOne() это два. Причем затраты процессорного времени на обработку event будут заметно выше чем с холостым циклом раз в 10 мс
S>Все зависит от соотношения количества холостых и рабочих циклов
S>К тому-же латентность в 10 мс может быть великовата для некоторых задач.

для задач которым критична реакция в 10 мс многопоточность делать не стоит. Такие задачи нужно решать в одном потоке.
... << RSDN@Home 1.2.0 alpha rev. 676>>
Re[4]: Синхронизация очереди
От: _Morpheus_  
Дата: 04.05.07 13:27
Оценка:
Здравствуйте, Andrbig, Вы писали:

DV>>А насколько идеологически правильно использовать периодический Sleep вместо сообщения (Pulse, ...)?


A>Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) —


можно поподробнее?

на самом деле Sleep более выгоден чем Monitor, т.к. потребляет меньше процессорного времени. Единственный недостаток Sleep — относительно большое время реакции, т.е. поток проснется не сразу, а гдето через полсотни миллисекунд. Однако для задач в которых критично время реакции многопоточность лучше не использовать (за исключением редких случаев когда нет возможности вклинится в код длительной операции выполняемой основным потоком)

A>из-за использования неуправляемых ресурсов.


очевидно ты хотел сказать "использования системных хэндлов"?
... << RSDN@Home 1.2.0 alpha rev. 676>>
Re[5]: Синхронизация очереди
От: Andrbig  
Дата: 04.05.07 14:29
Оценка: +1
Здравствуйте, _Morpheus_, Вы писали:

_M_>Здравствуйте, Andrbig, Вы писали:


DV>>>А насколько идеологически правильно использовать периодический Sleep вместо сообщения (Pulse, ...)?


A>>Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) —


_M_>можно поподробнее?


Вроде не глупые люди, а дружно такие вопросы задаете да минусы лупите.
Вот из msdn:

It is important to note the distinction between use of Monitor and WaitHandle objects. Monitor objects are purely managed, fully portable, and might be more efficient in terms of operating-system resource requirements. WaitHandle objects represent operating-system waitable objects, are useful for synchronizing between managed and unmanaged code, and expose some advanced operating-system features like the ability to wait on many objects at once.


Потому я и писал про "нежелательно использовать".

A>>из-за использования неуправляемых ресурсов.


_M_>очевидно ты хотел сказать "использования системных хэндлов"?


Открываем тот же msdn и читаем про WaitHandle.Handle: "native operating system handle". Если учесть что WaitHandle имплементит IDisposable, то сложив этот факт с предыдущей цитатой (про отличия) можно понять что строится Event на win32-handle. Мне до сегодняшнего дня казалось что это неуправляемый ресурс, как и прочие хэндлы win32. Или не так?
Re[6]: Синхронизация очереди
От: romangr Россия  
Дата: 04.05.07 14:40
Оценка: 4 (1)
Здравствуйте, Andrbig, Вы писали:

...skip...

Если вы пользуетесь Monitor.Wait, Monitor.Pulse, Monitor.PulseAll вы неявно используете EventWaitHandle,
которые создает фреймворк внутри для каждого потока.

Можно почитать блог Joe Duffy, вот здесь
у него статья по выбору между объектами синхронизации.
... << RSDN@Home 1.2.0 alpha rev. 670>>
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.