Thread-safe specialized queue
От: Пельмешко Россия blog
Дата: 01.07.09 19:48
Оценка:
Потребовалось сделать велосипед со следующим поведением:
* Класс олицетворяет собой очередь сообщений.
* В класс из нескольких потоков поступают сообщения, а так же есть подписчики на эти сообщения.
* Класс должен поочерёдно извлекать сообщения из очереди и отправлять их подписчикам события.

Так же предъявляются следующие требования:
* Между отправками сообщений должен соблюдаться заданный временной интервал.
* При отсутствии сообщений в очереди не должно создаваться никаких накладных расходов (например, в виде постоянно проверяющего очередь потока).
* В очереди может быть достаточно много сообщений, поэтому реализация очереди не должна плодить потоки на каждое.
* Должна быть возможность приостановить/возобновить отправки.

На скорую руку вот что получилось:
public sealed class MessageSender
{
    private readonly Queue<string> msgQueue;
    private readonly object queueSync;
    private volatile bool senderAlive;
    private volatile bool senderPaused;
    private int timeout;

    public MessageSender()
    {
        this.msgQueue = new Queue<string>(32);
        this.queueSync = new object();
        this.timeout = 1000;
    }

    // Таймаут задержки между посылками события
    public int Timeout
    {
        get { return this.timeout; }
        set
        {
            if(value < 0) value = 100;
            this.timeout = value;
        }
    }

    public event Action<string> MessageReceived;

    // Возбуждение события
    private void RaiseReceived(string message)
    {
        Action<string> received = this.MessageReceived;
        if (received != null)
        {
            received(message);
        }
    }

    // Метод для потока-отправителя
    private void SenderJob(object state)
    {
        while(this.msgQueue.Count > 0
          && !this.senderPaused)
        {
            string message;
            lock(this.queueSync)
            {
                message = this.msgQueue.Dequeue();
            }

            RaiseReceived(message);
            Thread.Sleep(this.timeout);
        }

        this.senderAlive = false;
    }

    // Добавление сообщений в очередь
    public void EnqueueMessage(string message)
    {
        lock(this.queueSync)
        {
            this.msgQueue.Enqueue(message);

            // это осознанно внутри lock:
            if (!this.senderPaused
             && !this.senderAlive)
            {
                this.senderAlive = true;
                ThreadPool.QueueUserWorkItem(SenderJob);
            }
        }
    }

    // Приостановка отправок
    public void Pause()
    {
        this.senderPaused = true;
    }

    // Возобновление отправок
    public void Resume()
    {
        if (!this.senderPaused) return;

        lock(this.queueSync)
        {
            this.senderPaused = false;

            if(this.msgQueue.Count > 0)
            {
                this.senderAlive = true;
                ThreadPool.QueueUserWorkItem(SenderJob);
            }
        }
    }

    // Очистка очереди
    public void Abort()
    {
        lock(this.queueSync)
        {
            this.msgQueue.Clear();
        }
    }
}


Подскажите, пожалуйста, скользкие моменты, нужны ли volatile и вообще правильно ли...
Сильно не пинайте, очень прошу
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.