Возникла следующая проблема.
Есть очередь из которой в отдельном потоке постоянно извлекаются элементы и над ними производится довольно длительная операция. Если очередь пустая,то поток долже ждать новых элементов. Также нужно синхронизовать доступ к очереди таким образом, чтобы на время длительной операции над отдельным элементом очередь не блокировалась для добавления новых.
Почитав MSDN написал следующй тестовый код:
class Program
{
static void Main(string[] args)
{
workerThread = new Thread(WorkerThreadProc);
workerThread.IsBackground = true;
workerThread.Start();
Thread.Sleep(100); // sleep for some time to make worker thread waiting
Datafill(20); // datafill bulk of data
Thread.Sleep(1000); // sleep for some time to allow worker to process some data
Console.WriteLine("Woke up");
for (int k = 0; k < 5; ++k)
{
Datafill(1);
}
Console.ReadLine();
}
static void Datafill(int count)
{
lock (workerLock)
{
for (int i = 0; i < count; ++i)
{
queue.Enqueue(counter++);
}
Console.WriteLine(" ** {0} value(s) added **", count);
Monitor.Pulse(workerLock);
}
}
static void WorkerThreadProc()
{
for ( ; ; )
{
Monitor.Enter(workerLock);
if (queue.Count > 0)
{
int val = queue.Dequeue();
int cnt = queue.Count;
Monitor.Exit(workerLock);
// Some long-time operation...
Thread.Sleep(100);
Console.WriteLine("Done: {0} {1} left", val, cnt);
}
else
{
Console.WriteLine("Waiting");
Monitor.Wait(workerLock);
Monitor.Exit(workerLock);
}
}
}
private static int counter;
private static Thread workerThread;
private static Queue<int> queue = new Queue<int>();
private static object workerLock = new object();
}
Вроде работает. Правда немного смущает несколько корявая последовательность Monitor.Wait, Monitor.Exit в WorkerThreadProc.
Кроме того, поскольку в WorkerThreadProc не используется lock конутркция придется там хендлить исключения и руками релизить workerLock.
Я думаю эта задача довольно распространенная (это я — полный нуб). Как правильно делать такую синхронизацию?
Здравствуйте, DartVader, Вы писали:
DV>Здравствуйте,
DV>Возникла следующая проблема. DV>Есть очередь из которой в отдельном потоке постоянно извлекаются элементы и над ними производится довольно длительная операция. Если очередь пустая,то поток долже ждать новых элементов. Также нужно синхронизовать доступ к очереди таким образом, чтобы на время длительной операции над отдельным элементом очередь не блокировалась для добавления новых.
... DV>Вроде работает. Правда немного смущает несколько корявая последовательность Monitor.Wait, Monitor.Exit в WorkerThreadProc. DV>Кроме того, поскольку в WorkerThreadProc не используется lock конутркция придется там хендлить исключения и руками релизить workerLock.
DV>Я думаю эта задача довольно распространенная (это я — полный нуб). Как правильно делать такую синхронизацию?
private Queue _queue = new Queue();
public void PostMessage(IMessage msg) // вызывается из разных потоков, помещает сообщение в очередь
{
lock(_queue.SyncRoot)
_queue.Enqueue(msg);
}
private void messageProcessProc() // поток обработки очереди, выбирает сообщения и обрабатывает их
{
while(true)
{
IMessage msg = null;
lock(_queue.SyncRoot)
if(_queue.Count>0)
msg = _queue.Dequeue() as IMessage;
if(msg==null)
Thread.Sleep(10); // спать пока нет сообщенийelse
processMessage(msg);
}
}
private void processMessage(IMessage msg)
{
// ...
}
Здравствуйте, _Morpheus_, Вы писали:
DV>>Я думаю эта задача довольно распространенная (это я — полный нуб). Как правильно делать такую синхронизацию?
_M_>
_M_> private Queue _queue = new Queue();
_M_> public void PostMessage(IMessage msg) // вызывается из разных потоков, помещает сообщение в очередь
_M_> {
_M_> lock(_queue.SyncRoot)
_M_> _queue.Enqueue(msg);
_M_> }
_M_> private void messageProcessProc() // поток обработки очереди, выбирает сообщения и обрабатывает их
_M_> {
_M_> while(true)
_M_> {
_M_> IMessage msg = null;
_M_> lock(_queue.SyncRoot)
_M_> if(_queue.Count>0)
_M_> msg = _queue.Dequeue() as IMessage;
_M_> if(msg==null)
_M_> Thread.Sleep(10); // спать пока нет сообщений
_M_> else
_M_> processMessage(msg);
_M_> }
_M_> }
_M_> private void processMessage(IMessage msg)
_M_> {
_M_> // ...
_M_> }
_M_>
Спасибо,
А насколько идеологически правильно использовать периодический Sleep вместо сообщения (Pulse, ...)?
Да, на всякий случай уточню, сама очередь запрятана внутри моего класса и добавления могут быть только посредством моего метода.
Здравствуйте, DartVader, Вы писали:
DV>Есть очередь из которой в отдельном потоке постоянно извлекаются элементы и над ними производится довольно длительная операция. Если очередь пустая,то поток долже ждать новых элементов. Также нужно синхронизовать доступ к очереди таким образом, чтобы на время длительной операции над отдельным элементом очередь не блокировалась для добавления новых.
DV>Спасибо,
для спасибо есть специальная кнопочка
DV>А насколько идеологически правильно использовать периодический Sleep?
это зависит от требований — времени реакции на помещение объекта в очередь.
Если Время не критично (т.е. нет ограничений вроде "реакция на помещение сообщения в очередь не более десяти миллисекунд"), то лучше использовать sleep. При используя Sleep экономятся системные хэндлы, т.к. создание event'а сопряжено с выделением системного хэндла.
Если есть требования к минимальному времени реакции, то вместо Sleep можно использовать event:
private Queue _queue = new Queue();
private AutoResetEvent _event = new AutoResetEvent(false);
public void PostMessage(IMessage msg) // вызывается из разных потоков, помещает сообщение в очередь
{
lock(_queue.SyncRoot)
_queue.Enqueue(msg);
_event.Set();
}
private void messageProcessProc() // поток обработки очереди, выбирает сообщения и обрабатывает их
{
while(true)
{
IMessage msg = null;
lock(_queue.SyncRoot)
if(_queue.Count>0)
msg = _queue.Dequeue() as IMessage;
if(msg==null)
_event.WaitOne();
else
processMessage(msg);
}
}
private void processMessage(IMessage msg)
{
// ...
}
Здравствуйте, _Morpheus_, Вы писали:
_M_>это зависит от требований — времени реакции на помещение объекта в очередь.
У меня как раз такая ситуация — объекты в очередь должны помещаться как можно быстрее, а пустой она бывает редко.
То есть, делать лучше всего через Sleep — просто и видимо в данном случае наиболее эффективно.
Здравствуйте, DartVader, Вы писали:
DV>Здравствуйте, _Morpheus_, Вы писали:
_M_>>это зависит от требований — времени реакции на помещение объекта в очередь. DV>У меня как раз такая ситуация — объекты в очередь должны помещаться как можно быстрее, а пустой она бывает редко. DV>То есть, делать лучше всего через Sleep — просто и видимо в данном случае наиболее эффективно.
Быстрее всего будет работать через Monitor.Enter (lock) — Monitor.Wait — Monitor.Pulse/PulseAll.
примерно как здесь
Здравствуйте, DartVader, Вы писали:
DV>Здравствуйте, _Morpheus_, Вы писали:
DV>>>Я думаю эта задача довольно распространенная (это я — полный нуб). Как правильно делать такую синхронизацию?
Да, распространенная, и .net имеет специальный механизм под эти задачи — Monitor. Ты правильно начал — с использования lock, осталось только довести дело до конца — выкинуть sleep и правильно организовать Pulse. Почитай книгу страницы где-то в районе 50.
_M_>>
_M_>> private Queue _queue = new Queue();
_M_>> public void PostMessage(IMessage msg) // вызывается из разных потоков, помещает сообщение в очередь
_M_>> {
_M_>> lock(_queue.SyncRoot)
_M_>> _queue.Enqueue(msg);
_M_>> }
_M_>> private void messageProcessProc() // поток обработки очереди, выбирает сообщения и обрабатывает их
_M_>> {
_M_>> while(true)
_M_>> {
_M_>> IMessage msg = null;
_M_>> lock(_queue.SyncRoot)
_M_>> if(_queue.Count>0)
_M_>> msg = _queue.Dequeue() as IMessage;
_M_>> if(msg==null)
_M_>> Thread.Sleep(10); // спать пока нет сообщений
_M_>> else
_M_>> processMessage(msg);
_M_>> }
_M_>> }
_M_>> private void processMessage(IMessage msg)
_M_>> {
_M_>> // ...
_M_>> }
_M_>>
DV>Спасибо, DV>А насколько идеологически правильно использовать периодический Sleep вместо сообщения (Pulse, ...)?
Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) — из-за использования неуправляемых ресурсов.
Здравствуйте, Andrbig, Вы писали:
DV>>А насколько идеологически правильно использовать периодический Sleep вместо сообщения (Pulse, ...)?
A>Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) — из-за использования неуправляемых ресурсов.
Здравствуйте, Andrbig, Вы писали:
A>Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) — из-за использования неуправляемых ресурсов.
Monitor не использует неуправляемых ресурсов? Вот так новость!
Использовать вызовы методов Monitor вместо конструкции lock() — глупое пижонство, тем более в такой простой ситуации, ИМХО. Дополнительная возможность наделать ошибок в коде.
Блокировать надо только операции выборки значения из очереди и вставки в очередь, т.е. скобки {} у lock() должны быть как можно короче. И нечего париться с Pulse Wait Enter Exit...
А вот использование WaitHandle вместо Sleep действительно снижает и нагрузку на систему, и латентность, поскольку позволяет избежать в обработчике очереди холостых циклов.
Здравствуйте, stump, Вы писали:
A>>Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) — из-за использования неуправляемых ресурсов. S>Monitor не использует неуправляемых ресурсов? Вот так новость! S>Использовать вызовы методов Monitor вместо конструкции lock() — глупое пижонство, тем более в такой простой ситуации, ИМХО. Дополнительная возможность наделать ошибок в коде. S>Блокировать надо только операции выборки значения из очереди и вставки в очередь, т.е. скобки {} у lock() должны быть как можно короче. И нечего париться с Pulse Wait Enter Exit...
можешь проверить — из обоих примеров получится один и тот-же IL код
S>А вот использование WaitHandle вместо Sleep действительно снижает и нагрузку на систему, и латентность, поскольку позволяет избежать в обработчике очереди холостых циклов.
Не совсем так, использование WaitHandle подразумевает выделение у системы хэндла это раз, будет расходоваться больше процессорного времени на обработку event.Set() и event.WaitOne() это два. Причем затраты процессорного времени на обработку event будут заметно выше чем с холостым циклом раз в 10 мс
Здравствуйте, _Morpheus_, Вы писали:
_M_>Здравствуйте, stump, Вы писали:
S>>Использовать вызовы методов Monitor вместо конструкции lock() — глупое пижонство, тем более в такой простой ситуации, ИМХО. Дополнительная возможность наделать ошибок в коде. S>>Блокировать надо только операции выборки значения из очереди и вставки в очередь, т.е. скобки {} у lock() должны быть как можно короче. И нечего париться с Pulse Wait Enter Exit...
_M_>lock это Monitor и есть
Я об этом и хотел сказать. Может выразился не совсем понятно.
_ _M_>Не совсем так, использование WaitHandle подразумевает выделение у системы хэндла это раз, будет расходоваться больше процессорного времени на обработку event.Set() и event.WaitOne() это два. Причем затраты процессорного времени на обработку event будут заметно выше чем с холостым циклом раз в 10 мс
Все зависит от соотношения количества холостых и рабочих циклов
К тому-же латентность в 10 мс может быть великовата для некоторых задач.
S>Я об этом и хотел сказать. Может выразился не совсем понятно. S>_ _M_>>Не совсем так, использование WaitHandle подразумевает выделение у системы хэндла это раз, будет расходоваться больше процессорного времени на обработку event.Set() и event.WaitOne() это два. Причем затраты процессорного времени на обработку event будут заметно выше чем с холостым циклом раз в 10 мс S>Все зависит от соотношения количества холостых и рабочих циклов S>К тому-же латентность в 10 мс может быть великовата для некоторых задач.
для задач которым критична реакция в 10 мс многопоточность делать не стоит. Такие задачи нужно решать в одном потоке.
Здравствуйте, Andrbig, Вы писали:
DV>>А насколько идеологически правильно использовать периодический Sleep вместо сообщения (Pulse, ...)?
A>Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) —
можно поподробнее?
на самом деле Sleep более выгоден чем Monitor, т.к. потребляет меньше процессорного времени. Единственный недостаток Sleep — относительно большое время реакции, т.е. поток проснется не сразу, а гдето через полсотни миллисекунд. Однако для задач в которых критично время реакции многопоточность лучше не использовать (за исключением редких случаев когда нет возможности вклинится в код длительной операции выполняемой основным потоком)
A>из-за использования неуправляемых ресурсов.
очевидно ты хотел сказать "использования системных хэндлов"?
Здравствуйте, _Morpheus_, Вы писали:
_M_>Здравствуйте, Andrbig, Вы писали:
DV>>>А насколько идеологически правильно использовать периодический Sleep вместо сообщения (Pulse, ...)?
A>>Неправильно. Также нежелательно использовать event вместо того о чем ты говоришь (Pulse) —
_M_>можно поподробнее?
Вроде не глупые люди, а дружно такие вопросы задаете да минусы лупите.
Вот из msdn:
It is important to note the distinction between use of Monitor and WaitHandle objects. Monitor objects are purely managed, fully portable, and might be more efficient in terms of operating-system resource requirements. WaitHandle objects represent operating-system waitable objects, are useful for synchronizing between managed and unmanaged code, and expose some advanced operating-system features like the ability to wait on many objects at once.
Потому я и писал про "нежелательно использовать".
A>>из-за использования неуправляемых ресурсов.
_M_>очевидно ты хотел сказать "использования системных хэндлов"?
Открываем тот же msdn и читаем про WaitHandle.Handle: "native operating system handle". Если учесть что WaitHandle имплементит IDisposable, то сложив этот факт с предыдущей цитатой (про отличия) можно понять что строится Event на win32-handle. Мне до сегодняшнего дня казалось что это неуправляемый ресурс, как и прочие хэндлы win32. Или не так?
Если вы пользуетесь Monitor.Wait, Monitor.Pulse, Monitor.PulseAll вы неявно используете EventWaitHandle,
которые создает фреймворк внутри для каждого потока.
Можно почитать блог Joe Duffy, вот здесь
у него статья по выбору между объектами синхронизации.