Нужно что бы в момент создания объекта (для простоты. Конечно можно было бы ввести дополнительные методы для запуска процесса и его останова) запускался поток, который по мере посупления ассинхронных сообщений в очередь обрабатывал их синхронно. После остановки процесса (в моем случае это метод Dispose) все сообщения в очереди должны быть обработанны. Я думаю распространненая задача.
Вот гляньте на код плз. Укажите на потенциальные проблеммы. Лично меня смущает, что isDisposed не лежит внутри блокировки. Но если я чтение и запись этой переменной помещаю внутрь блока lock иногда программа виснет (deadlock?).
class CMyClass
{
Queue<String> fifo;
Thread worker;
EventWaitHandle waitHandle = new AutoResetEvent(false);
Object lockObject = new object();
bool isDisposed = false;
// interne members
// ...
public CMyClass()
{
// create fifo
fifo = new Queue<String>();
// initialize members
// ...
// start thread
worker = new Thread(processQueue);
worker.Start();
}
public void Write(String msq)
{
if (isDisposed) // lock?
return; // new Exception?
lock (lockObject)
{
// message in fifo
fifo.Enqueue(msq);
}
// wake the thread
waitHandle.Set();
}
public void Dispose()
{
// set flag
isDisposed = true;
// null to interrupt the thread
fifo.Enqueue(null);
// wake the thread
waitHandle.Set();
// sync
worker.Join();
// release wait handle
waitHandle.Close();
}
private void processQueue()
{
while (true)
{
String entry = null;
// get entry
lock (lockObject)
{
if (fifo.Count > 0)
entry = fifo.Dequeue();
}
if (entry != null)
{
// Здесь данные обрабатываем
// ...
}
else
{
if (isDisposed) { return; }
waitHandle.WaitOne();
}
}
}
}
Переделал код с использованием Monitor.Pulse/Wait. Основу взял из статьи не это сайте. Сейчас я более менее уверен в предсказуемости всех шагов в коде. Есть ли неувиденные мной проблемы? Предпочтительнее ли он чем предыдущий с исползовнием WaitHandle?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ConsoleApplication17
{
public class TaskQueue : IDisposable
{
bool isDisposed = false;
object locker = new object();
Thread worker;
Queue<string> taskQ = new Queue<string>();
public TaskQueue()
{
worker = new Thread(Consume);
worker.Start();
}
public void Dispose()
{
lock (locker)
{
if (isDisposed) { return; }
isDisposed = true;
taskQ.Enqueue(null);
if (taskQ.Count == 1)
Monitor.Pulse(locker);
}
worker.Join();
}
public void EnqueueTask(string task)
{
lock (locker)
{
if (isDisposed) { return; }
taskQ.Enqueue(task);
if (taskQ.Count == 1)
Monitor.Pulse(locker);
}
}
void Consume()
{
while (true)
{
string task = null;
lock (locker)
{
if (taskQ.Count != 0)
task = taskQ.Dequeue();
if (task == null)
{
if (isDisposed) { return; }
else { Monitor.Wait(locker); }
}
}
if (task != null)
{
Console.Write(task);
Thread.Sleep(100); // Имитация длительной работы
}
}
}
}
class Program
{
static void Main(string[] args)
{
TaskQueue queue = new TaskQueue();
for (int i = 0; i < 100; i++)
{
string str = i.ToString("00 ");
Thread thr = new Thread(delegate()
{ queue.EnqueueTask(str); });
thr.Start();
}
queue.Dispose();
Console.WriteLine("\r\nEnd!");
Console.ReadLine();
}
}
}