FIFO + Thread
От: Аноним  
Дата: 14.07.09 07:42
Оценка:
Нужно что бы в момент создания объекта (для простоты. Конечно можно было бы ввести дополнительные методы для запуска процесса и его останова) запускался поток, который по мере посупления ассинхронных сообщений в очередь обрабатывал их синхронно. После остановки процесса (в моем случае это метод Dispose) все сообщения в очереди должны быть обработанны. Я думаю распространненая задача.

Вот гляньте на код плз. Укажите на потенциальные проблеммы. Лично меня смущает, что isDisposed не лежит внутри блокировки. Но если я чтение и запись этой переменной помещаю внутрь блока lock иногда программа виснет (deadlock?).


class CMyClass 
   {
        Queue<String> fifo;

        Thread worker;
        EventWaitHandle waitHandle = new AutoResetEvent(false);

        Object lockObject = new object();
        bool isDisposed = false;

        // interne members
        // ...

        public CMyClass()
        {
            // create fifo
            fifo = new Queue<String>();

            // initialize members 
            // ...

            // start thread
            worker = new Thread(processQueue);
            worker.Start();
        }

        public void Write(String msq)
        {

            if (isDisposed) // lock?
                return; // new Exception?

            lock (lockObject)
            {
                // message in fifo
                fifo.Enqueue(msq);
            }

            // wake the thread
            waitHandle.Set();
        }

        public void Dispose()
        {
            // set flag
            isDisposed = true;
            // null to interrupt the thread
            fifo.Enqueue(null);
            // wake the thread 
            waitHandle.Set();
            // sync
            worker.Join();
            // release wait handle
            waitHandle.Close();
        }

        private void processQueue()
        {
            while (true)
            {
                String entry = null;

                // get entry
                lock (lockObject)
                {
                    if (fifo.Count > 0)
                        entry = fifo.Dequeue();
                }

                if (entry != null)
                {
                    // Здесь данные обрабатываем
                    // ...
                }
                else
                {
                    if (isDisposed) { return; }

                    waitHandle.WaitOne(); 
                }
            }
        }

        
    }
Re: FIFO + Thread
От: Аноним  
Дата: 14.07.09 09:43
Оценка:
Переделал код с использованием Monitor.Pulse/Wait. Основу взял из статьи не это сайте. Сейчас я более менее уверен в предсказуемости всех шагов в коде. Есть ли неувиденные мной проблемы? Предпочтительнее ли он чем предыдущий с исползовнием WaitHandle?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

using System.Threading;

namespace ConsoleApplication17
{
    public class TaskQueue : IDisposable
    {
        bool isDisposed = false;
        object locker = new object();

        Thread worker;
        Queue<string> taskQ = new Queue<string>();

        public TaskQueue()
        {
            worker = new Thread(Consume);
            worker.Start();
        }

        public void Dispose()
        {
            lock (locker)
            {
                if (isDisposed) { return; }

                isDisposed = true;

                taskQ.Enqueue(null);

                if (taskQ.Count == 1)
                    Monitor.Pulse(locker);
            }

            worker.Join();
        }

        public void EnqueueTask(string task)
        {
            lock (locker)
            {
                if (isDisposed) { return; }

                taskQ.Enqueue(task);

                if (taskQ.Count == 1)
                    Monitor.Pulse(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                string task = null;

                lock (locker)
                {
                    if (taskQ.Count != 0)
                        task = taskQ.Dequeue();

                    if (task == null)
                    {
                        if (isDisposed) { return; }
                        else { Monitor.Wait(locker); }
                    }
                }

                if (task != null)
                {
                    Console.Write(task);
                    Thread.Sleep(100);       // Имитация длительной работы
                }
            }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            TaskQueue queue = new TaskQueue();

            for (int i = 0; i < 100; i++)
            {
                string str = i.ToString("00 ");

                Thread thr = new Thread(delegate()
                { queue.EnqueueTask(str); });
                thr.Start();
            }

            queue.Dispose();
           
            Console.WriteLine("\r\nEnd!");
            Console.ReadLine();
        }
    }
}
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.