Thread-safe specialized queue
От: Пельмешко Россия blog
Дата: 01.07.09 19:48
Оценка:
Потребовалось сделать велосипед со следующим поведением:
* Класс олицетворяет собой очередь сообщений.
* В класс из нескольких потоков поступают сообщения, а так же есть подписчики на эти сообщения.
* Класс должен поочерёдно извлекать сообщения из очереди и отправлять их подписчикам события.

Так же предъявляются следующие требования:
* Между отправками сообщений должен соблюдаться заданный временной интервал.
* При отсутствии сообщений в очереди не должно создаваться никаких накладных расходов (например, в виде постоянно проверяющего очередь потока).
* В очереди может быть достаточно много сообщений, поэтому реализация очереди не должна плодить потоки на каждое.
* Должна быть возможность приостановить/возобновить отправки.

На скорую руку вот что получилось:
public sealed class MessageSender
{
    private readonly Queue<string> msgQueue;
    private readonly object queueSync;
    private volatile bool senderAlive;
    private volatile bool senderPaused;
    private int timeout;

    public MessageSender()
    {
        this.msgQueue = new Queue<string>(32);
        this.queueSync = new object();
        this.timeout = 1000;
    }

    // Таймаут задержки между посылками события
    public int Timeout
    {
        get { return this.timeout; }
        set
        {
            if(value < 0) value = 100;
            this.timeout = value;
        }
    }

    public event Action<string> MessageReceived;

    // Возбуждение события
    private void RaiseReceived(string message)
    {
        Action<string> received = this.MessageReceived;
        if (received != null)
        {
            received(message);
        }
    }

    // Метод для потока-отправителя
    private void SenderJob(object state)
    {
        while(this.msgQueue.Count > 0
          && !this.senderPaused)
        {
            string message;
            lock(this.queueSync)
            {
                message = this.msgQueue.Dequeue();
            }

            RaiseReceived(message);
            Thread.Sleep(this.timeout);
        }

        this.senderAlive = false;
    }

    // Добавление сообщений в очередь
    public void EnqueueMessage(string message)
    {
        lock(this.queueSync)
        {
            this.msgQueue.Enqueue(message);

            // это осознанно внутри lock:
            if (!this.senderPaused
             && !this.senderAlive)
            {
                this.senderAlive = true;
                ThreadPool.QueueUserWorkItem(SenderJob);
            }
        }
    }

    // Приостановка отправок
    public void Pause()
    {
        this.senderPaused = true;
    }

    // Возобновление отправок
    public void Resume()
    {
        if (!this.senderPaused) return;

        lock(this.queueSync)
        {
            this.senderPaused = false;

            if(this.msgQueue.Count > 0)
            {
                this.senderAlive = true;
                ThreadPool.QueueUserWorkItem(SenderJob);
            }
        }
    }

    // Очистка очереди
    public void Abort()
    {
        lock(this.queueSync)
        {
            this.msgQueue.Clear();
        }
    }
}


Подскажите, пожалуйста, скользкие моменты, нужны ли volatile и вообще правильно ли...
Сильно не пинайте, очень прошу
Re: Thread-safe specialized queue
От: desco США http://v2matveev.blogspot.com
Дата: 01.07.09 21:17
Оценка: 11 (2)
Здравствуйте, Пельмешко, Вы писали:

стремный сценарий:
1. Поток 1 делает Enqueue
2. Поток 2 (из пула), начавщий выполнять метод SenderJob проходит проверку в while и доходит до места А, не войдя в lock.

            while (this.msgQueue.Count > 0
              && !this.senderPaused)
            {
                string message;
                // A 
                lock (this.queueSync)
                {
                    message = this.msgQueue.Dequeue();
                }

                RaiseReceived(message);
                Thread.Sleep(this.timeout);
            }


3. Поток 1 последовательно вызывает Pause и Resume. В итоге посредством магических пассов (выделенный код) в пуле появляется еще один поток (3), выполняющий SenderJob.

        // Возобновление отправок
        public void Resume()
        {
            if (!this.senderPaused) return;

            lock (this.queueSync)
            {
                this.senderPaused = false;

                if (this.msgQueue.Count > 0)
                {

                    // все проверки выполняются, попадаем сюда
                    this.senderAlive = true;
                    ThreadPool.QueueUserWorkItem(SenderJob);
                }
            }
        }


4. Поток 3 успешно проходит метод SenderJob, вытащив из очереди единственный элемент.
5. Просыпается поток 2, заходит под lock ... и успешно обламывается на попытке извлечь элемент из уже пустой очереди. Вуаля.
Re: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 03:53
Оценка:
Здравствуйте, Пельмешко, Вы писали:

П>* При отсутствии сообщений в очереди не должно создаваться никаких накладных расходов (например, в виде постоянно проверяющего очередь потока).


Для этого можно использовать AutoResetEvent, вызывая WaitOne для усыпления и Set() для пробуждения потока.

Также можешь глянуть http://files.rsdn.ru/12051/TaskPulling.rar класс LabeledThread, там решалась задача принципиально схожая с твоей.
Re[2]: Thread-safe specialized queue
От: Пельмешко Россия blog
Дата: 02.07.09 04:35
Оценка:
Здравствуйте, desco, Вы писали:

D>стремный сценарий:

D>1. Поток 1 делает Enqueue
D>2. Поток 2 (из пула), начавщий выполнять метод SenderJob проходит проверку в while и доходит до места А, не войдя в lock.
D>3. Поток 1 последовательно вызывает Pause и Resume. В итоге посредством магических пассов (выделенный код) в пуле появляется еще один поток (3), выполняющий SenderJob.
D>4. Поток 3 успешно проходит метод SenderJob, вытащив из очереди единственный элемент.
D>5. Просыпается поток 2, заходит под lock ... и успешно обламывается на попытке извлечь элемент из уже пустой очереди. Вуаля.

Спасибо! Вчера голову сломал во время поиска подобного
Я так понимаю это вылечится одной дополнительной проверкой:

public void Resume()
{
    if (!this.senderPaused) return;

    lock(this.queueSync)
    {
        this.senderPaused = false;

        if (this.msgQueue.Count > 0 && !this.senderAlive)
        {
            this.senderAlive = true;
            ThreadPool.QueueUserWorkItem(SenderJob);
        }
    }
}

U>Для этого можно использовать AutoResetEvent, вызывая WaitOne для усыпления и Set() для пробуждения потока.
А ивентами не сильно дороже это будет? Объект ядра как-никак...

U>Также можешь глянуть http://files.rsdn.ru/12051/TaskPulling.rar класс LabeledThread, там решалась задача принципиально схожая с твоей.

Спасибо, поглядимс.
Re[3]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 04:48
Оценка: 10 (1)
Здравствуйте, Пельмешко, Вы писали:

U>>Для этого можно использовать AutoResetEvent, вызывая WaitOne для усыпления и Set() для пробуждения потока.

П>А ивентами не сильно дороже это будет? Объект ядра как-никак...

Это точно лучше, чем поток периодически будить, причем непонятно с какой частотой.
Re: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 08:02
Оценка:
Здравствуйте, Пельмешко, Вы писали:

Не знаю тему или не в тему, но работая с очередью определенного размера очень удобно использовать семафоры.
Во всяком случае для твоей задач не блокируеющее чтение.
и солнце б утром не вставало, когда бы не было меня
Re: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 08:33
Оценка:
Здравствуйте, Пельмешко, Вы писали:

А как общественность отнесётся как такому варианту реализации? Хотелось бы увидеть конструктивную критику

    public sealed class MessageManager: IDisposable
    {
        private readonly Queue<string> msgQueue;
        private readonly object queueSync;
        private volatile int timeout;

        private readonly Semaphore _semaphore;
        private readonly EventWaitHandle 
            _MonitorEvent, _PauseEvent;
        private readonly Thread _QueueMonitor;
        private bool _terminate = false;
        private List<Action<string>> _actions;

        public void Dispose()
        {
            Abort();
            _terminate = true;
            _PauseEvent.Set();
            _MonitorEvent.Set();
            _semaphore.Release(1);
        }

        public MessageManager()
        {
            this.msgQueue = new Queue<string>(32);
            this.queueSync = new object();
            this.timeout = 1000;
            _actions = new List<Action<string>>();
            _semaphore = new Semaphore(0, int.MaxValue);
            _MonitorEvent = new EventWaitHandle(false, EventResetMode.ManualReset);
            _PauseEvent = new EventWaitHandle(true, EventResetMode.ManualReset);
            _QueueMonitor = new Thread(MonitorRoutine);
            _QueueMonitor.Start();
        }

        internal class ActionWrapper
        {
            private readonly Action<string> _Act;
            private readonly string _ActionData;
            public void Callback(object State)
            {
                _Act(_ActionData);
            }

            public ActionWrapper(string Data, Action<string> Act)
            {
                _Act = Act;
                _ActionData = Data;
            }
        }

        private void MonitorRoutine()
        {
            do
            {
                _PauseEvent.WaitOne();
                if (_terminate) break;
                _semaphore.WaitOne();
                if (_terminate) break;

                string message = "";
                lock (queueSync)
                {
                    if (msgQueue.Count != 0)
                    { 
                        message = msgQueue.Dequeue();
                    }
                }

                if (message.Length > 0)
                {
                    lock (_actions)
                    {
                        foreach (Action<string> A in _actions)
                        {
                            if (_terminate) break;
                            ActionWrapper Act = new ActionWrapper(message, A);
                            ThreadPool.QueueUserWorkItem(Act.Callback, Act);
                        }
                    }
                }
                if (_terminate) break;
                _MonitorEvent.WaitOne(timeout, true);
            } 
            while (true);
        }

        // Таймаут задержки между посылками события
        public int Timeout
        {
            get { return this.timeout; }
            set
            {
                if (value < 0) value = 100;
                this.timeout = value;
            }
        }

        public void AddReceiver(Action<string> Receiver)
        {
            lock (_actions)
            {
                _actions.Add(Receiver);
            }
        }

        public void RemoveReceiver(Action<string> Receiver)
        {
            lock (_actions)
            {
                _actions.Remove(Receiver);
            }
        }


        // Добавление сообщений в очередь
        public void EnqueueMessage(string message)
        {
            lock (this.queueSync)
            {
                msgQueue.Enqueue(message);
            }
            _semaphore.Release(1);
        }

        // Приостановка отправок
        public void Pause()
        {
            this._PauseEvent.Reset();
        }

        // Возобновление отправок
        public void Resume()
        {
            _PauseEvent.Set();
        }

        // Очистка очереди
        public void Abort()
        {
            lock (this.queueSync)
            {
                this.msgQueue.Clear();
            }
        }
    }

    class Program
    {
        private static void Action1(string message)
        {
            Console.WriteLine("Action1: {0}", message);
        }

        private static void Action2(string message)
        {
            Console.WriteLine("Action2: {0}", message);
        }

        static void Main(string[] args)
        {

            //using( MessageManager MS = new MessageManager())
            MessageManager MS = new MessageManager();
            {
                MS.AddReceiver(Action1);
                MS.AddReceiver(Action2);
                MS.Timeout = 1000;
                MS.EnqueueMessage("Message 1");
                MS.EnqueueMessage("Message 2");
                MS.EnqueueMessage("Message 3");
                Thread.Sleep(1000);
                MS.RemoveReceiver(Action2);
                MS.EnqueueMessage("Message 1");
                MS.EnqueueMessage("Message 2");
                MS.EnqueueMessage("Message 3");
                Thread.Sleep(3000);
                Console.WriteLine("Complete");
                Console.ReadLine();
                MS.Dispose();
            }
        } 
    }
Re[2]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 09:26
Оценка: +1
Здравствуйте, Аноним, Вы писали:

А>А как общественность отнесётся как такому варианту реализации? Хотелось бы увидеть конструктивную критику


Зря лочишься на несколько объектов. Если нужен потокобезопасный объект, то лучше создать в нем readonly object lockObj = new object() и везде лочиться на него. Это практически гарантирует от любых проблем с потокобезопасностью. Используя же свой лок на каждую коллекцию легко что-нибудь напутать, скажем начать использовать какую-нибудь переменную класса одновременно из под лока на первую коллекцию и из под лока на вторую коллекцию.
Re[2]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 09:44
Оценка:
Здравствуйте, Аноним, Вы писали:

А>А как общественность отнесётся как такому варианту реализации? Хотелось бы увидеть конструктивную критику


Не понял зачем тебе WainHandle (аж три)? Вроде достаточно одного для остановки потока при отсутствии сообщений, вместо PauseEvent хватит флажка bool isPaused, который выставляем в Pause() и останавливаем поток, если этот флажок выставлен при добавлении нового сообщений не будим поток. Зачем нужен MonitorEvent я вообще не понял.

Также строго говоря terminate нужно сделать volatile. Непонятно в чем смысл вызова waitHandle.Set в Dispose.
Re[3]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 09:53
Оценка:
Здравствуйте, Undying, Вы писали:

U>Здравствуйте, Аноним, Вы писали:


А>>А как общественность отнесётся как такому варианту реализации? Хотелось бы увидеть конструктивную критику


U>Зря лочишься на несколько объектов. Если нужен потокобезопасный объект, то лучше создать в нем readonly object lockObj = new object() и везде лочиться на него. Это практически гарантирует от любых проблем с потокобезопасностью. Используя же свой лок на каждую коллекцию легко что-нибудь напутать, скажем начать использовать какую-нибудь переменную класса одновременно из под лока на первую коллекцию и из под лока на вторую коллекцию.


Спасибо за замечание, но я так не считаю. Лочить нужно только тот ресурс, к которому реально обращаемся, и на мимально возможное время. Это значительно снизит вероятность "пересечения" потоков на залоченных ресурсах, что благотворно сказывается на производительности. Чрезмерное же злоупотребление блокировками способно вообще свести на нет преимущества многопоточности. Вообще-то это аксиома многопоточного программинга В частности, здесь нет ни малейшего смысла лочить относительно редко изменяемый список подписчиков при добавлении или извлечении сообщения. Ну а по поводу "легко напутать" — ну что же делать, многопоточность требует особой внимательности и аккуратности, такова плата. Я вот не путаю

Зря я не уточнил — хотелось бы услышать прежде всего критику не логики (с ней всё в порядке в рамках сформулированной автором задачи), а реализации этой логики в рамках NET и C#, в чём я откровенно слаб.

Например, несколько странное желание автора темы иметь таймаут между рассылками приводит к необходимости заводить дополнительный эвент, но может можно обеспечить этот таймаут по-другому, без лишнего объекта? Только умоляю — про Suspend — не упоминать

Далее, насколько оправдано ручное управление списком подписчиков? Может есть стандартный механизм, позволяющий распараллелить генерацию событий по нескольким потокам?

Или вот у меня объявлено private volatile int timeout. В нативе я ы не задумываясь использовал бы эту переменную без всяких блокировок, а как с этим в NET? И что здесь лучше — volatile или использование Interlocked.Exchange?

Может есть что-то ещё, неэффективно реализованное с точки зрения NET?

Спасибо.
Re[3]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 10:11
Оценка:
Здравствуйте, Undying, Вы писали:

U>Здравствуйте, Аноним, Вы писали:


А>>А как общественность отнесётся как такому варианту реализации? Хотелось бы увидеть конструктивную критику


U>Не понял зачем тебе WainHandle (аж три)? Вроде достаточно одного для остановки потока при отсутствии сообщений, вместо PauseEvent хватит флажка bool isPaused, который выставляем в Pause() и останавливаем поток, если этот флажок выставлен при добавлении нового сообщений не будим поток. Зачем нужен MonitorEvent я вообще не понял.


Три объекта:
1) Семафор — счётчик сообщений.
2) _MonitorEvent — обеспечивает таймаут
3) _PauseEvent — обеспечивает приостановку по команде "Пауза"

Под "останавливаем поток" Вы что подразумеваете? Не Suspend, я надеюсь? Или Suspend в NET не то-же самое, что в нативе? Если то-же самое — спасибо, не надо. Ещё одно правило "Никогда не используйте Suspend для управления потоком". Рано или поздно, но Вы обязательно огребёте из-за него неприятностей.

Также строго говоря terminate нужно сделать volatile.
Это точно? Потому, что нативном коде это было-бы совершенно не нужно. Что-то мне подсказывает, что и здесь оно ни к чему. Эта переменная единожды устанавливается в TRUE и некогда не сбрасывается в FALSE, зачем её лочить?

U>Непонятно в чем смысл вызова waitHandle.Set в Dispose.


Ну тут всё просто. Мы не знаем, где находится поток в момент вызова Dispose. Возможно, он ждёт один из этих объектов, вызов Set позволит потоку максимально быстро среагировать на команду и завершиться, тем более, что иначе он из ожидания _PauseEvent или семафора может не выйти никогда.
Re[4]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 10:14
Оценка:
Здравствуйте, Аноним, Вы писали:

А>Спасибо за замечание, но я так не считаю. Лочить нужно только тот ресурс, к которому реально обращаемся, и на мимально возможное время. Это значительно снизит вероятность "пересечения" потоков на залоченных ресурсах, что благотворно сказывается на производительности. Чрезмерное же злоупотребление блокировками способно вообще свести на нет преимущества многопоточности. Вообще-то это аксиома многопоточного программинга


На практике эта аксиома приводит к дидлокам и всевозможным глюкам связанным с многопоточностью. Усложнять код (а использование локов на несколько переменных это серьезное усложнение) ради производительности стоит только после того как профайлер показал, что именно здесь узкое место.

А>Например, несколько странное желание автора темы иметь таймаут между рассылками приводит к необходимости заводить дополнительный эвент, но может можно обеспечить этот таймаут по-другому, без лишнего объекта? Только умоляю — про Suspend — не упоминать


Разумеется можно. Добавляем на уровень класса время последней отправки, если поток пробудился, то если разность между текущим временем и временем последней отправки больше таймаута, то отправляем сообщения, если нет — спим дальше. Т.е. в классе достаточно одного WaitHandle, зачем там три я не понимаю.

А>Далее, насколько оправдано ручное управление списком подписчиков? Может есть стандартный механизм, позволяющий распараллелить генерацию событий по нескольким потокам?


Вроде бы там даже в идеале особо проще записать не получится.

А>Или вот у меня объявлено private volatile int timeout. В нативе я ы не задумываясь использовал бы эту переменную без всяких блокировок, а как с этим в NET?


Volatile можно использовать без блокировок.

А>И что здесь лучше — volatile или использование Interlocked.Exchange?


Лучше volatile, не знаю как по скорости, но по лаконичности и безопасности лучше намного.
Re[2]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 10:19
Оценка:
Здравствуйте, Serginio1, Вы писали:

S>Здравствуйте, Пельмешко, Вы писали:


S>Не знаю тему или не в тему, но работая с очередью определенного размера очень удобно использовать семафоры.

S>Во всяком случае для твоей задач не блокируеющее чтение.
Посмотрел на очередь это кольцевой буфер, и если определить размер и использовать семафоры,
то при одном читателе можно использовать неблокирующее чтение.
и солнце б утром не вставало, когда бы не было меня
Re[4]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 10:26
Оценка:
Здравствуйте, Аноним, Вы писали:

А>Три объекта:

А>1) Семафор — счётчик сообщений.

А зачем нужен счетчик сообщений?

А>2) _MonitorEvent — обеспечивает таймаут


Обеспечивается хранением времени последней отправки сообщений.

А>3) _PauseEvent — обеспечивает приостановку по команде "Пауза"


Обеспечивается хранением флажка isPaused.

Т.е. достаточно одного EventWaitHandle, одного DateTime и одного bool.

А>Под "останавливаем поток" Вы что подразумеваете? Не Suspend, я надеюсь? Или Suspend в NET не то-же самое, что в нативе? Если то-же самое — спасибо, не надо.


waitHandle.WaitOne

А>Это точно? Потому, что нативном коде это было-бы совершенно не нужно. Что-то мне подсказывает, что и здесь оно ни к чему. Эта переменная единожды устанавливается в TRUE и некогда не сбрасывается в FALSE, зачем её лочить?


Как мне объясняли, если у нас два процессора и на одном мы выставили флажок в true, то в кэше второго процессора флажок может висеть в состоянии false неопределенное время.

А>Ну тут всё просто. Мы не знаем, где находится поток в момент вызова Dispose. Возможно, он ждёт один из этих объектов, вызов Set позволит потоку максимально быстро среагировать на команду и завершиться, тем более, что иначе он из ожидания _PauseEvent или семафора может не выйти никогда.


В общем случае логика есть, но в данном случае непонятно что полезного может сделать MonitorRoutine пробудившись в связи с Dispose.
Re[5]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 10:58
Оценка:
Здравствуйте, Undying, Вы писали:

U>На практике эта аксиома приводит к дидлокам и всевозможным глюкам связанным с многопоточностью. Усложнять код (а использование локов на несколько переменных это серьезное усложнение) ради производительности стоит только после того как профайлер показал, что именно здесь узкое место.


Давайте сойдёмся на том, что каждый будет поступать по-своему У Вас возникают блокировки — лочьте всё одним объектом, у меня не возникают — я как-нибудь так обойдусь, лады?


U>Разумеется можно. Добавляем на уровень класса время последней отправки, если поток пробудился, то если разность между текущим временем и временем последней отправки больше таймаута, то отправляем сообщения, если нет — спим дальше. Т.е. в классе достаточно одного WaitHandle, зачем там три я не понимаю.


То есть, если я правильно понял, Вы предлагаете периодически будить поток по таймауту, безотносительно наличия сообщений в очереди, так? Вообще-то именно этого я и пытался избежать. А если Вы что-то другое имели в виду, продемонстрируйте пожалуйста этот момент кодом.

А>>Или вот у меня объявлено private volatile int timeout. В нативе я ,ы не задумываясь использовал бы эту переменную без всяких блокировок, а как с этим в NET?


U>Volatile можно использовать без блокировок.


Я имел в виду, что если поступать по аналогии с нативным кодом, то здесь вообще никакая блокировка не нужна, ни volatile, ни какая-либо другая. Вопрос в том, насколько такая анология уместна применительно к NET. Не то, чтобы это особо важный вопрос, Interlocked jcnfnjxyj 'aatrnbdty/ просто хотелось бы определённости.

А>>И что здесь лучше — volatile или использование Interlocked.Exchange?


U>Лучше volatile, не знаю как по скорости, но по лаконичности и безопасности лучше намного.


Что volatile обеспечивает атомарность, это я понял. Я просто не в курсе, как именно она это делает, поэтому оно вызывает некоторый напряг В то-же время с Interlocked.Exchange вроде всё понятно, там сидит банальный вызов практически одноимённой нативной функции, по крайней мере на Windows. Хотелось бы и с volatile такой-же определённости

Вообще-то лаконичность кода для меня — дело третье По моей шкале эффективность и читабельность кода имеют куда больший приоритет. А вот что Вы подразумеваете под "лучше по безопасности"? В каком смысле?
Re[6]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 11:10
Оценка:
А>Что volatile обеспечивает атомарность, это я понял. Я просто не в курсе, как именно она это делает, поэтому оно вызывает некоторый напряг В то-же время с Interlocked.Exchange вроде всё понятно, там сидит банальный вызов практически одноимённой нативной функции, по крайней мере на Windows. Хотелось бы и с volatile такой-же определённости

Помотри http://msdn.microsoft.com/ru-ru/library/x13ttww7.aspx
Мое понимание Смысл при многоядерности не держать значение в кэше или регистах.
и солнце б утром не вставало, когда бы не было меня
Re[6]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 11:16
Оценка:
Здравствуйте, Аноним, Вы писали:

Посмотри http://www.rsdn.ru/article/dotnet/CSThreading2.xml#EKUAE
Автор(ы): Joseph Albahari
Дата: 27.06.2007
Окончание статьи, опубликованной в RSDN Magazine #1-2007. Рассматриваются особенности взаимодействия с апартаментами, потоковые таймеры, пулы потоков, BackgroundWorker, асинхронные методы и делегаты.
В статье использован материал из книги Joseph Albahari, Ben Albahari "C# 3.0 in a Nutshell" — http://www.oreilly.com/catalog/9780596527570/
и солнце б утром не вставало, когда бы не было меня
Re[5]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 11:19
Оценка:
Здравствуйте, Undying, Вы писали:

U>Здравствуйте, Аноним, Вы писали:


А>>Три объекта:

А>>1) Семафор — счётчик сообщений.

U>А зачем нужен счетчик сообщений?


А>>2) _MonitorEvent — обеспечивает таймаут


U>Обеспечивается хранением времени последней отправки сообщений.


А>>3) _PauseEvent — обеспечивает приостановку по команде "Пауза"


U>Обеспечивается хранением флажка isPaused.



U>Т.е. достаточно одного EventWaitHandle, одного DateTime и одного bool.


А>>Под "останавливаем поток" Вы что подразумеваете? Не Suspend, я надеюсь? Или Suspend в NET не то-же самое, что в нативе? Если то-же самое — спасибо, не надо.


U>waitHandle.WaitOne


По поводу этого я уже высказался. С таким подходом придётся периодически будить поток только для того, чтобы проверить состояние очереди. Меня это не устраивает. Пробуждение и усыпление потока — весьма дорогостоящая операция, и она будет бесполезно отбирать процессорное време у других потоков, которые, возможно, в этот момент заняты полезным делом. Меня это категорически не устраивает. Но если Вы подразумевали что-то другое, то продемонстрируйт пожалуйста кодом.

А>>Это точно? Потому, что нативном коде это было-бы совершенно не нужно. Что-то мне подсказывает, что и здесь оно ни к чему. Эта переменная единожды устанавливается в TRUE и некогда не сбрасывается в FALSE, зачем её лочить?


U>Как мне объясняли, если у нас два процессора и на одном мы выставили флажок в true, то в кэше второго процессора флажок может висеть в состоянии false неопределенное время.


Любопытно, кто это Вам сказал? К сожалению, Вам неправильно объяснили. Кэши процессоров имеют специальные механизмы, поддерживающие их в актуальном состоянии, в том числе — в многопроцессорных машишах. И какие-либо блокировки на это никак не влияют. А иначе критические секции, да и другие средства синхронизации, оказались бы в большинстве случаев бесполезным хламом.

А>>Ну тут всё просто. Мы не знаем, где находится поток в момент вызова Dispose. Возможно, он ждёт один из этих объектов, вызов Set позволит потоку максимально быстро среагировать на команду и завершиться, тем более, что иначе он из ожидания _PauseEvent или семафора может не выйти никогда.


U>В общем случае логика есть, но в данном случае непонятно что полезного может сделать MonitorRoutine пробудившись в связи с Dispose.


То есть как что?! Она завершится, что же ещё? А иначе поток просто останется висеть на одном из этих объектов до самого вызова ExitProcess, после чего будет просто жёстко терминирован. Если до вызова ExitProcess дело вообще дойдёт — кто его знает, этот NET. А то ведь может статься, что только TerminateProcess остановит это безобразие
Re[6]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 11:22
Оценка:
Здравствуйте, Аноним, Вы писали:

А>То есть, если я правильно понял, Вы предлагаете периодически будить поток по таймауту, безотносительно наличия сообщений в очереди, так? Вообще-то именно этого я и пытался избежать. А если Вы что-то другое имели в виду, продемонстрируйте пожалуйста этот момент кодом.


Примерно так (локи опущены):

DateTime lastSendingTime;
void MonitorRoutine()
{
  while (!terminate)
  {
    DateTime curTime = DateTime.UtcNow;
    TimeSpan interval = curTime - lastSendingTime;
    if (interval < timeout)
    {
      waitHandle.WaitOne(timeout - interval, false);
      continue;
    }
    
    if (msgQueue.Count == 0)
    {
      waitHandle.WaitOne();
    }
    else
    {
      //Отправляем сообщение

      lastSendingTime = DateTime.UtcNow;
      waitHandle.WaitOne(timeout, false);
    }
  }
}

        public void EnqueueMessage(string message)
        {
            lock (this.queueSync)
            {
                msgQueue.Enqueue(message);
            }
            if (!isPaused)
              waitHandle.Set();
              
        }

        volatile bool isPaused = false;
        public void Pause()
        {
          isPaused = true;
          waitHandle.WaitOne();
        }

        // Возобновление отправок
        public void Resume()
        {
          isPaused = false;
          waitHandle.Set();
        }


А>Вообще-то лаконичность кода для меня — дело третье По моей шкале эффективность и читабельность кода имеют куда больший приоритет. А вот что Вы подразумеваете под "лучше по безопасности"? В каком смысле?


Безопаснее в том смысле, что код с volatile гораздо сложнее сломать изменениями, чем код с Interlocked.Exchange.
Re[7]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 11:25
Оценка:
Здравствуйте, Serginio1, Вы писали:


А>>Что volatile обеспечивает атомарность, это я понял. Я просто не в курсе, как именно она это делает, поэтому оно вызывает некоторый напряг В то-же время с Interlocked.Exchange вроде всё понятно, там сидит банальный вызов практически одноимённой нативной функции, по крайней мере на Windows. Хотелось бы и с volatile такой-же определённости


S>Помотри http://msdn.microsoft.com/ru-ru/library/x13ttww7.aspx

S>Мое понимание Смысл при многоядерности не держать значение в кэше или регистах.

Вот это уже весомо, это — Аргумент Спасибо. И отдельное спасибо за ссылку на статью — проштудирую, как только буду уверен, что ничто не отвлечёт.
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.