Thread-safe specialized queue
От: Пельмешко Россия blog
Дата: 01.07.09 19:48
Оценка:
Потребовалось сделать велосипед со следующим поведением:
* Класс олицетворяет собой очередь сообщений.
* В класс из нескольких потоков поступают сообщения, а так же есть подписчики на эти сообщения.
* Класс должен поочерёдно извлекать сообщения из очереди и отправлять их подписчикам события.

Так же предъявляются следующие требования:
* Между отправками сообщений должен соблюдаться заданный временной интервал.
* При отсутствии сообщений в очереди не должно создаваться никаких накладных расходов (например, в виде постоянно проверяющего очередь потока).
* В очереди может быть достаточно много сообщений, поэтому реализация очереди не должна плодить потоки на каждое.
* Должна быть возможность приостановить/возобновить отправки.

На скорую руку вот что получилось:
public sealed class MessageSender
{
    private readonly Queue<string> msgQueue;
    private readonly object queueSync;
    private volatile bool senderAlive;
    private volatile bool senderPaused;
    private int timeout;

    public MessageSender()
    {
        this.msgQueue = new Queue<string>(32);
        this.queueSync = new object();
        this.timeout = 1000;
    }

    // Таймаут задержки между посылками события
    public int Timeout
    {
        get { return this.timeout; }
        set
        {
            if(value < 0) value = 100;
            this.timeout = value;
        }
    }

    public event Action<string> MessageReceived;

    // Возбуждение события
    private void RaiseReceived(string message)
    {
        Action<string> received = this.MessageReceived;
        if (received != null)
        {
            received(message);
        }
    }

    // Метод для потока-отправителя
    private void SenderJob(object state)
    {
        while(this.msgQueue.Count > 0
          && !this.senderPaused)
        {
            string message;
            lock(this.queueSync)
            {
                message = this.msgQueue.Dequeue();
            }

            RaiseReceived(message);
            Thread.Sleep(this.timeout);
        }

        this.senderAlive = false;
    }

    // Добавление сообщений в очередь
    public void EnqueueMessage(string message)
    {
        lock(this.queueSync)
        {
            this.msgQueue.Enqueue(message);

            // это осознанно внутри lock:
            if (!this.senderPaused
             && !this.senderAlive)
            {
                this.senderAlive = true;
                ThreadPool.QueueUserWorkItem(SenderJob);
            }
        }
    }

    // Приостановка отправок
    public void Pause()
    {
        this.senderPaused = true;
    }

    // Возобновление отправок
    public void Resume()
    {
        if (!this.senderPaused) return;

        lock(this.queueSync)
        {
            this.senderPaused = false;

            if(this.msgQueue.Count > 0)
            {
                this.senderAlive = true;
                ThreadPool.QueueUserWorkItem(SenderJob);
            }
        }
    }

    // Очистка очереди
    public void Abort()
    {
        lock(this.queueSync)
        {
            this.msgQueue.Clear();
        }
    }
}


Подскажите, пожалуйста, скользкие моменты, нужны ли volatile и вообще правильно ли...
Сильно не пинайте, очень прошу
Re: Thread-safe specialized queue
От: desco США http://v2matveev.blogspot.com
Дата: 01.07.09 21:17
Оценка: 11 (2)
Здравствуйте, Пельмешко, Вы писали:

стремный сценарий:
1. Поток 1 делает Enqueue
2. Поток 2 (из пула), начавщий выполнять метод SenderJob проходит проверку в while и доходит до места А, не войдя в lock.

            while (this.msgQueue.Count > 0
              && !this.senderPaused)
            {
                string message;
                // A 
                lock (this.queueSync)
                {
                    message = this.msgQueue.Dequeue();
                }

                RaiseReceived(message);
                Thread.Sleep(this.timeout);
            }


3. Поток 1 последовательно вызывает Pause и Resume. В итоге посредством магических пассов (выделенный код) в пуле появляется еще один поток (3), выполняющий SenderJob.

        // Возобновление отправок
        public void Resume()
        {
            if (!this.senderPaused) return;

            lock (this.queueSync)
            {
                this.senderPaused = false;

                if (this.msgQueue.Count > 0)
                {

                    // все проверки выполняются, попадаем сюда
                    this.senderAlive = true;
                    ThreadPool.QueueUserWorkItem(SenderJob);
                }
            }
        }


4. Поток 3 успешно проходит метод SenderJob, вытащив из очереди единственный элемент.
5. Просыпается поток 2, заходит под lock ... и успешно обламывается на попытке извлечь элемент из уже пустой очереди. Вуаля.
Re: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 03:53
Оценка:
Здравствуйте, Пельмешко, Вы писали:

П>* При отсутствии сообщений в очереди не должно создаваться никаких накладных расходов (например, в виде постоянно проверяющего очередь потока).


Для этого можно использовать AutoResetEvent, вызывая WaitOne для усыпления и Set() для пробуждения потока.

Также можешь глянуть http://files.rsdn.ru/12051/TaskPulling.rar класс LabeledThread, там решалась задача принципиально схожая с твоей.
Re[2]: Thread-safe specialized queue
От: Пельмешко Россия blog
Дата: 02.07.09 04:35
Оценка:
Здравствуйте, desco, Вы писали:

D>стремный сценарий:

D>1. Поток 1 делает Enqueue
D>2. Поток 2 (из пула), начавщий выполнять метод SenderJob проходит проверку в while и доходит до места А, не войдя в lock.
D>3. Поток 1 последовательно вызывает Pause и Resume. В итоге посредством магических пассов (выделенный код) в пуле появляется еще один поток (3), выполняющий SenderJob.
D>4. Поток 3 успешно проходит метод SenderJob, вытащив из очереди единственный элемент.
D>5. Просыпается поток 2, заходит под lock ... и успешно обламывается на попытке извлечь элемент из уже пустой очереди. Вуаля.

Спасибо! Вчера голову сломал во время поиска подобного
Я так понимаю это вылечится одной дополнительной проверкой:

public void Resume()
{
    if (!this.senderPaused) return;

    lock(this.queueSync)
    {
        this.senderPaused = false;

        if (this.msgQueue.Count > 0 && !this.senderAlive)
        {
            this.senderAlive = true;
            ThreadPool.QueueUserWorkItem(SenderJob);
        }
    }
}

U>Для этого можно использовать AutoResetEvent, вызывая WaitOne для усыпления и Set() для пробуждения потока.
А ивентами не сильно дороже это будет? Объект ядра как-никак...

U>Также можешь глянуть http://files.rsdn.ru/12051/TaskPulling.rar класс LabeledThread, там решалась задача принципиально схожая с твоей.

Спасибо, поглядимс.
Re[3]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 04:48
Оценка: 10 (1)
Здравствуйте, Пельмешко, Вы писали:

U>>Для этого можно использовать AutoResetEvent, вызывая WaitOne для усыпления и Set() для пробуждения потока.

П>А ивентами не сильно дороже это будет? Объект ядра как-никак...

Это точно лучше, чем поток периодически будить, причем непонятно с какой частотой.
Re: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 08:02
Оценка:
Здравствуйте, Пельмешко, Вы писали:

Не знаю тему или не в тему, но работая с очередью определенного размера очень удобно использовать семафоры.
Во всяком случае для твоей задач не блокируеющее чтение.
и солнце б утром не вставало, когда бы не было меня
Re: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 08:33
Оценка:
Здравствуйте, Пельмешко, Вы писали:

А как общественность отнесётся как такому варианту реализации? Хотелось бы увидеть конструктивную критику

    public sealed class MessageManager: IDisposable
    {
        private readonly Queue<string> msgQueue;
        private readonly object queueSync;
        private volatile int timeout;

        private readonly Semaphore _semaphore;
        private readonly EventWaitHandle 
            _MonitorEvent, _PauseEvent;
        private readonly Thread _QueueMonitor;
        private bool _terminate = false;
        private List<Action<string>> _actions;

        public void Dispose()
        {
            Abort();
            _terminate = true;
            _PauseEvent.Set();
            _MonitorEvent.Set();
            _semaphore.Release(1);
        }

        public MessageManager()
        {
            this.msgQueue = new Queue<string>(32);
            this.queueSync = new object();
            this.timeout = 1000;
            _actions = new List<Action<string>>();
            _semaphore = new Semaphore(0, int.MaxValue);
            _MonitorEvent = new EventWaitHandle(false, EventResetMode.ManualReset);
            _PauseEvent = new EventWaitHandle(true, EventResetMode.ManualReset);
            _QueueMonitor = new Thread(MonitorRoutine);
            _QueueMonitor.Start();
        }

        internal class ActionWrapper
        {
            private readonly Action<string> _Act;
            private readonly string _ActionData;
            public void Callback(object State)
            {
                _Act(_ActionData);
            }

            public ActionWrapper(string Data, Action<string> Act)
            {
                _Act = Act;
                _ActionData = Data;
            }
        }

        private void MonitorRoutine()
        {
            do
            {
                _PauseEvent.WaitOne();
                if (_terminate) break;
                _semaphore.WaitOne();
                if (_terminate) break;

                string message = "";
                lock (queueSync)
                {
                    if (msgQueue.Count != 0)
                    { 
                        message = msgQueue.Dequeue();
                    }
                }

                if (message.Length > 0)
                {
                    lock (_actions)
                    {
                        foreach (Action<string> A in _actions)
                        {
                            if (_terminate) break;
                            ActionWrapper Act = new ActionWrapper(message, A);
                            ThreadPool.QueueUserWorkItem(Act.Callback, Act);
                        }
                    }
                }
                if (_terminate) break;
                _MonitorEvent.WaitOne(timeout, true);
            } 
            while (true);
        }

        // Таймаут задержки между посылками события
        public int Timeout
        {
            get { return this.timeout; }
            set
            {
                if (value < 0) value = 100;
                this.timeout = value;
            }
        }

        public void AddReceiver(Action<string> Receiver)
        {
            lock (_actions)
            {
                _actions.Add(Receiver);
            }
        }

        public void RemoveReceiver(Action<string> Receiver)
        {
            lock (_actions)
            {
                _actions.Remove(Receiver);
            }
        }


        // Добавление сообщений в очередь
        public void EnqueueMessage(string message)
        {
            lock (this.queueSync)
            {
                msgQueue.Enqueue(message);
            }
            _semaphore.Release(1);
        }

        // Приостановка отправок
        public void Pause()
        {
            this._PauseEvent.Reset();
        }

        // Возобновление отправок
        public void Resume()
        {
            _PauseEvent.Set();
        }

        // Очистка очереди
        public void Abort()
        {
            lock (this.queueSync)
            {
                this.msgQueue.Clear();
            }
        }
    }

    class Program
    {
        private static void Action1(string message)
        {
            Console.WriteLine("Action1: {0}", message);
        }

        private static void Action2(string message)
        {
            Console.WriteLine("Action2: {0}", message);
        }

        static void Main(string[] args)
        {

            //using( MessageManager MS = new MessageManager())
            MessageManager MS = new MessageManager();
            {
                MS.AddReceiver(Action1);
                MS.AddReceiver(Action2);
                MS.Timeout = 1000;
                MS.EnqueueMessage("Message 1");
                MS.EnqueueMessage("Message 2");
                MS.EnqueueMessage("Message 3");
                Thread.Sleep(1000);
                MS.RemoveReceiver(Action2);
                MS.EnqueueMessage("Message 1");
                MS.EnqueueMessage("Message 2");
                MS.EnqueueMessage("Message 3");
                Thread.Sleep(3000);
                Console.WriteLine("Complete");
                Console.ReadLine();
                MS.Dispose();
            }
        } 
    }
Re[2]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 09:26
Оценка: +1
Здравствуйте, Аноним, Вы писали:

А>А как общественность отнесётся как такому варианту реализации? Хотелось бы увидеть конструктивную критику


Зря лочишься на несколько объектов. Если нужен потокобезопасный объект, то лучше создать в нем readonly object lockObj = new object() и везде лочиться на него. Это практически гарантирует от любых проблем с потокобезопасностью. Используя же свой лок на каждую коллекцию легко что-нибудь напутать, скажем начать использовать какую-нибудь переменную класса одновременно из под лока на первую коллекцию и из под лока на вторую коллекцию.
Re[2]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 09:44
Оценка:
Здравствуйте, Аноним, Вы писали:

А>А как общественность отнесётся как такому варианту реализации? Хотелось бы увидеть конструктивную критику


Не понял зачем тебе WainHandle (аж три)? Вроде достаточно одного для остановки потока при отсутствии сообщений, вместо PauseEvent хватит флажка bool isPaused, который выставляем в Pause() и останавливаем поток, если этот флажок выставлен при добавлении нового сообщений не будим поток. Зачем нужен MonitorEvent я вообще не понял.

Также строго говоря terminate нужно сделать volatile. Непонятно в чем смысл вызова waitHandle.Set в Dispose.
Re[3]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 09:53
Оценка:
Здравствуйте, Undying, Вы писали:

U>Здравствуйте, Аноним, Вы писали:


А>>А как общественность отнесётся как такому варианту реализации? Хотелось бы увидеть конструктивную критику


U>Зря лочишься на несколько объектов. Если нужен потокобезопасный объект, то лучше создать в нем readonly object lockObj = new object() и везде лочиться на него. Это практически гарантирует от любых проблем с потокобезопасностью. Используя же свой лок на каждую коллекцию легко что-нибудь напутать, скажем начать использовать какую-нибудь переменную класса одновременно из под лока на первую коллекцию и из под лока на вторую коллекцию.


Спасибо за замечание, но я так не считаю. Лочить нужно только тот ресурс, к которому реально обращаемся, и на мимально возможное время. Это значительно снизит вероятность "пересечения" потоков на залоченных ресурсах, что благотворно сказывается на производительности. Чрезмерное же злоупотребление блокировками способно вообще свести на нет преимущества многопоточности. Вообще-то это аксиома многопоточного программинга В частности, здесь нет ни малейшего смысла лочить относительно редко изменяемый список подписчиков при добавлении или извлечении сообщения. Ну а по поводу "легко напутать" — ну что же делать, многопоточность требует особой внимательности и аккуратности, такова плата. Я вот не путаю

Зря я не уточнил — хотелось бы услышать прежде всего критику не логики (с ней всё в порядке в рамках сформулированной автором задачи), а реализации этой логики в рамках NET и C#, в чём я откровенно слаб.

Например, несколько странное желание автора темы иметь таймаут между рассылками приводит к необходимости заводить дополнительный эвент, но может можно обеспечить этот таймаут по-другому, без лишнего объекта? Только умоляю — про Suspend — не упоминать

Далее, насколько оправдано ручное управление списком подписчиков? Может есть стандартный механизм, позволяющий распараллелить генерацию событий по нескольким потокам?

Или вот у меня объявлено private volatile int timeout. В нативе я ы не задумываясь использовал бы эту переменную без всяких блокировок, а как с этим в NET? И что здесь лучше — volatile или использование Interlocked.Exchange?

Может есть что-то ещё, неэффективно реализованное с точки зрения NET?

Спасибо.
Re[3]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 10:11
Оценка:
Здравствуйте, Undying, Вы писали:

U>Здравствуйте, Аноним, Вы писали:


А>>А как общественность отнесётся как такому варианту реализации? Хотелось бы увидеть конструктивную критику


U>Не понял зачем тебе WainHandle (аж три)? Вроде достаточно одного для остановки потока при отсутствии сообщений, вместо PauseEvent хватит флажка bool isPaused, который выставляем в Pause() и останавливаем поток, если этот флажок выставлен при добавлении нового сообщений не будим поток. Зачем нужен MonitorEvent я вообще не понял.


Три объекта:
1) Семафор — счётчик сообщений.
2) _MonitorEvent — обеспечивает таймаут
3) _PauseEvent — обеспечивает приостановку по команде "Пауза"

Под "останавливаем поток" Вы что подразумеваете? Не Suspend, я надеюсь? Или Suspend в NET не то-же самое, что в нативе? Если то-же самое — спасибо, не надо. Ещё одно правило "Никогда не используйте Suspend для управления потоком". Рано или поздно, но Вы обязательно огребёте из-за него неприятностей.

Также строго говоря terminate нужно сделать volatile.
Это точно? Потому, что нативном коде это было-бы совершенно не нужно. Что-то мне подсказывает, что и здесь оно ни к чему. Эта переменная единожды устанавливается в TRUE и некогда не сбрасывается в FALSE, зачем её лочить?

U>Непонятно в чем смысл вызова waitHandle.Set в Dispose.


Ну тут всё просто. Мы не знаем, где находится поток в момент вызова Dispose. Возможно, он ждёт один из этих объектов, вызов Set позволит потоку максимально быстро среагировать на команду и завершиться, тем более, что иначе он из ожидания _PauseEvent или семафора может не выйти никогда.
Re[4]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 10:14
Оценка:
Здравствуйте, Аноним, Вы писали:

А>Спасибо за замечание, но я так не считаю. Лочить нужно только тот ресурс, к которому реально обращаемся, и на мимально возможное время. Это значительно снизит вероятность "пересечения" потоков на залоченных ресурсах, что благотворно сказывается на производительности. Чрезмерное же злоупотребление блокировками способно вообще свести на нет преимущества многопоточности. Вообще-то это аксиома многопоточного программинга


На практике эта аксиома приводит к дидлокам и всевозможным глюкам связанным с многопоточностью. Усложнять код (а использование локов на несколько переменных это серьезное усложнение) ради производительности стоит только после того как профайлер показал, что именно здесь узкое место.

А>Например, несколько странное желание автора темы иметь таймаут между рассылками приводит к необходимости заводить дополнительный эвент, но может можно обеспечить этот таймаут по-другому, без лишнего объекта? Только умоляю — про Suspend — не упоминать


Разумеется можно. Добавляем на уровень класса время последней отправки, если поток пробудился, то если разность между текущим временем и временем последней отправки больше таймаута, то отправляем сообщения, если нет — спим дальше. Т.е. в классе достаточно одного WaitHandle, зачем там три я не понимаю.

А>Далее, насколько оправдано ручное управление списком подписчиков? Может есть стандартный механизм, позволяющий распараллелить генерацию событий по нескольким потокам?


Вроде бы там даже в идеале особо проще записать не получится.

А>Или вот у меня объявлено private volatile int timeout. В нативе я ы не задумываясь использовал бы эту переменную без всяких блокировок, а как с этим в NET?


Volatile можно использовать без блокировок.

А>И что здесь лучше — volatile или использование Interlocked.Exchange?


Лучше volatile, не знаю как по скорости, но по лаконичности и безопасности лучше намного.
Re[2]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 10:19
Оценка:
Здравствуйте, Serginio1, Вы писали:

S>Здравствуйте, Пельмешко, Вы писали:


S>Не знаю тему или не в тему, но работая с очередью определенного размера очень удобно использовать семафоры.

S>Во всяком случае для твоей задач не блокируеющее чтение.
Посмотрел на очередь это кольцевой буфер, и если определить размер и использовать семафоры,
то при одном читателе можно использовать неблокирующее чтение.
и солнце б утром не вставало, когда бы не было меня
Re[4]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 10:26
Оценка:
Здравствуйте, Аноним, Вы писали:

А>Три объекта:

А>1) Семафор — счётчик сообщений.

А зачем нужен счетчик сообщений?

А>2) _MonitorEvent — обеспечивает таймаут


Обеспечивается хранением времени последней отправки сообщений.

А>3) _PauseEvent — обеспечивает приостановку по команде "Пауза"


Обеспечивается хранением флажка isPaused.

Т.е. достаточно одного EventWaitHandle, одного DateTime и одного bool.

А>Под "останавливаем поток" Вы что подразумеваете? Не Suspend, я надеюсь? Или Suspend в NET не то-же самое, что в нативе? Если то-же самое — спасибо, не надо.


waitHandle.WaitOne

А>Это точно? Потому, что нативном коде это было-бы совершенно не нужно. Что-то мне подсказывает, что и здесь оно ни к чему. Эта переменная единожды устанавливается в TRUE и некогда не сбрасывается в FALSE, зачем её лочить?


Как мне объясняли, если у нас два процессора и на одном мы выставили флажок в true, то в кэше второго процессора флажок может висеть в состоянии false неопределенное время.

А>Ну тут всё просто. Мы не знаем, где находится поток в момент вызова Dispose. Возможно, он ждёт один из этих объектов, вызов Set позволит потоку максимально быстро среагировать на команду и завершиться, тем более, что иначе он из ожидания _PauseEvent или семафора может не выйти никогда.


В общем случае логика есть, но в данном случае непонятно что полезного может сделать MonitorRoutine пробудившись в связи с Dispose.
Re[5]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 10:58
Оценка:
Здравствуйте, Undying, Вы писали:

U>На практике эта аксиома приводит к дидлокам и всевозможным глюкам связанным с многопоточностью. Усложнять код (а использование локов на несколько переменных это серьезное усложнение) ради производительности стоит только после того как профайлер показал, что именно здесь узкое место.


Давайте сойдёмся на том, что каждый будет поступать по-своему У Вас возникают блокировки — лочьте всё одним объектом, у меня не возникают — я как-нибудь так обойдусь, лады?


U>Разумеется можно. Добавляем на уровень класса время последней отправки, если поток пробудился, то если разность между текущим временем и временем последней отправки больше таймаута, то отправляем сообщения, если нет — спим дальше. Т.е. в классе достаточно одного WaitHandle, зачем там три я не понимаю.


То есть, если я правильно понял, Вы предлагаете периодически будить поток по таймауту, безотносительно наличия сообщений в очереди, так? Вообще-то именно этого я и пытался избежать. А если Вы что-то другое имели в виду, продемонстрируйте пожалуйста этот момент кодом.

А>>Или вот у меня объявлено private volatile int timeout. В нативе я ,ы не задумываясь использовал бы эту переменную без всяких блокировок, а как с этим в NET?


U>Volatile можно использовать без блокировок.


Я имел в виду, что если поступать по аналогии с нативным кодом, то здесь вообще никакая блокировка не нужна, ни volatile, ни какая-либо другая. Вопрос в том, насколько такая анология уместна применительно к NET. Не то, чтобы это особо важный вопрос, Interlocked jcnfnjxyj 'aatrnbdty/ просто хотелось бы определённости.

А>>И что здесь лучше — volatile или использование Interlocked.Exchange?


U>Лучше volatile, не знаю как по скорости, но по лаконичности и безопасности лучше намного.


Что volatile обеспечивает атомарность, это я понял. Я просто не в курсе, как именно она это делает, поэтому оно вызывает некоторый напряг В то-же время с Interlocked.Exchange вроде всё понятно, там сидит банальный вызов практически одноимённой нативной функции, по крайней мере на Windows. Хотелось бы и с volatile такой-же определённости

Вообще-то лаконичность кода для меня — дело третье По моей шкале эффективность и читабельность кода имеют куда больший приоритет. А вот что Вы подразумеваете под "лучше по безопасности"? В каком смысле?
Re[6]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 11:10
Оценка:
А>Что volatile обеспечивает атомарность, это я понял. Я просто не в курсе, как именно она это делает, поэтому оно вызывает некоторый напряг В то-же время с Interlocked.Exchange вроде всё понятно, там сидит банальный вызов практически одноимённой нативной функции, по крайней мере на Windows. Хотелось бы и с volatile такой-же определённости

Помотри http://msdn.microsoft.com/ru-ru/library/x13ttww7.aspx
Мое понимание Смысл при многоядерности не держать значение в кэше или регистах.
и солнце б утром не вставало, когда бы не было меня
Re[6]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 11:16
Оценка:
Здравствуйте, Аноним, Вы писали:

Посмотри http://www.rsdn.ru/article/dotnet/CSThreading2.xml#EKUAE
Автор(ы): Joseph Albahari
Дата: 27.06.2007
Окончание статьи, опубликованной в RSDN Magazine #1-2007. Рассматриваются особенности взаимодействия с апартаментами, потоковые таймеры, пулы потоков, BackgroundWorker, асинхронные методы и делегаты.
В статье использован материал из книги Joseph Albahari, Ben Albahari "C# 3.0 in a Nutshell" — http://www.oreilly.com/catalog/9780596527570/
и солнце б утром не вставало, когда бы не было меня
Re[5]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 11:19
Оценка:
Здравствуйте, Undying, Вы писали:

U>Здравствуйте, Аноним, Вы писали:


А>>Три объекта:

А>>1) Семафор — счётчик сообщений.

U>А зачем нужен счетчик сообщений?


А>>2) _MonitorEvent — обеспечивает таймаут


U>Обеспечивается хранением времени последней отправки сообщений.


А>>3) _PauseEvent — обеспечивает приостановку по команде "Пауза"


U>Обеспечивается хранением флажка isPaused.



U>Т.е. достаточно одного EventWaitHandle, одного DateTime и одного bool.


А>>Под "останавливаем поток" Вы что подразумеваете? Не Suspend, я надеюсь? Или Suspend в NET не то-же самое, что в нативе? Если то-же самое — спасибо, не надо.


U>waitHandle.WaitOne


По поводу этого я уже высказался. С таким подходом придётся периодически будить поток только для того, чтобы проверить состояние очереди. Меня это не устраивает. Пробуждение и усыпление потока — весьма дорогостоящая операция, и она будет бесполезно отбирать процессорное време у других потоков, которые, возможно, в этот момент заняты полезным делом. Меня это категорически не устраивает. Но если Вы подразумевали что-то другое, то продемонстрируйт пожалуйста кодом.

А>>Это точно? Потому, что нативном коде это было-бы совершенно не нужно. Что-то мне подсказывает, что и здесь оно ни к чему. Эта переменная единожды устанавливается в TRUE и некогда не сбрасывается в FALSE, зачем её лочить?


U>Как мне объясняли, если у нас два процессора и на одном мы выставили флажок в true, то в кэше второго процессора флажок может висеть в состоянии false неопределенное время.


Любопытно, кто это Вам сказал? К сожалению, Вам неправильно объяснили. Кэши процессоров имеют специальные механизмы, поддерживающие их в актуальном состоянии, в том числе — в многопроцессорных машишах. И какие-либо блокировки на это никак не влияют. А иначе критические секции, да и другие средства синхронизации, оказались бы в большинстве случаев бесполезным хламом.

А>>Ну тут всё просто. Мы не знаем, где находится поток в момент вызова Dispose. Возможно, он ждёт один из этих объектов, вызов Set позволит потоку максимально быстро среагировать на команду и завершиться, тем более, что иначе он из ожидания _PauseEvent или семафора может не выйти никогда.


U>В общем случае логика есть, но в данном случае непонятно что полезного может сделать MonitorRoutine пробудившись в связи с Dispose.


То есть как что?! Она завершится, что же ещё? А иначе поток просто останется висеть на одном из этих объектов до самого вызова ExitProcess, после чего будет просто жёстко терминирован. Если до вызова ExitProcess дело вообще дойдёт — кто его знает, этот NET. А то ведь может статься, что только TerminateProcess остановит это безобразие
Re[6]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 11:22
Оценка:
Здравствуйте, Аноним, Вы писали:

А>То есть, если я правильно понял, Вы предлагаете периодически будить поток по таймауту, безотносительно наличия сообщений в очереди, так? Вообще-то именно этого я и пытался избежать. А если Вы что-то другое имели в виду, продемонстрируйте пожалуйста этот момент кодом.


Примерно так (локи опущены):

DateTime lastSendingTime;
void MonitorRoutine()
{
  while (!terminate)
  {
    DateTime curTime = DateTime.UtcNow;
    TimeSpan interval = curTime - lastSendingTime;
    if (interval < timeout)
    {
      waitHandle.WaitOne(timeout - interval, false);
      continue;
    }
    
    if (msgQueue.Count == 0)
    {
      waitHandle.WaitOne();
    }
    else
    {
      //Отправляем сообщение

      lastSendingTime = DateTime.UtcNow;
      waitHandle.WaitOne(timeout, false);
    }
  }
}

        public void EnqueueMessage(string message)
        {
            lock (this.queueSync)
            {
                msgQueue.Enqueue(message);
            }
            if (!isPaused)
              waitHandle.Set();
              
        }

        volatile bool isPaused = false;
        public void Pause()
        {
          isPaused = true;
          waitHandle.WaitOne();
        }

        // Возобновление отправок
        public void Resume()
        {
          isPaused = false;
          waitHandle.Set();
        }


А>Вообще-то лаконичность кода для меня — дело третье По моей шкале эффективность и читабельность кода имеют куда больший приоритет. А вот что Вы подразумеваете под "лучше по безопасности"? В каком смысле?


Безопаснее в том смысле, что код с volatile гораздо сложнее сломать изменениями, чем код с Interlocked.Exchange.
Re[7]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 11:25
Оценка:
Здравствуйте, Serginio1, Вы писали:


А>>Что volatile обеспечивает атомарность, это я понял. Я просто не в курсе, как именно она это делает, поэтому оно вызывает некоторый напряг В то-же время с Interlocked.Exchange вроде всё понятно, там сидит банальный вызов практически одноимённой нативной функции, по крайней мере на Windows. Хотелось бы и с volatile такой-же определённости


S>Помотри http://msdn.microsoft.com/ru-ru/library/x13ttww7.aspx

S>Мое понимание Смысл при многоядерности не держать значение в кэше или регистах.

Вот это уже весомо, это — Аргумент Спасибо. И отдельное спасибо за ссылку на статью — проштудирую, как только буду уверен, что ничто не отвлечёт.
Re[6]: Thread-safe specialized queue
От: Undying Россия  
Дата: 02.07.09 11:29
Оценка:
Здравствуйте, Аноним, Вы писали:

А>По поводу этого я уже высказался. С таким подходом придётся периодически будить поток только для того, чтобы проверить состояние очереди. Меня это не устраивает.


Не нужно периодически будить поток, код в другой подветке.

А>Любопытно, кто это Вам сказал? К сожалению, Вам неправильно объяснили. Кэши процессоров имеют специальные механизмы, поддерживающие их в актуальном состоянии, в том числе — в многопроцессорных машишах. И какие-либо блокировки на это никак не влияют. А иначе критические секции, да и другие средства синхронизации, оказались бы в большинстве случаев бесполезным хламом.


Volatile принудительно сбрасывает кэши процессоров, если переменная таким образом не помечена и используется не из под лока, то устаревшее значение переменной может жить в кэшах процессоров неопределенное время (т.е. до тех пор пока кто-нибудь другой кэш не сбросит).

А>То есть как что?! Она завершится, что же ещё? А иначе поток просто останется висеть на одном из этих объектов до самого вызова ExitProcess, после чего будет просто жёстко терминирован. Если до вызова ExitProcess дело вообще дойдёт — кто его знает, этот NET. А то ведь может статься, что только TerminateProcess остановит это безобразие


Да, торможу. Про то что в MonitorRoutine terminate проверяется я забыл.
Re[7]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 11:32
Оценка:
Здравствуйте, Serginio1, Вы писали:

Так. Если я правильно понял, то может статься, что поле нестатическое поле класса может оказаться расположенным в регистре процессора? Забавно

А под кэшем Вы что подразумевали? Я не совсем уловил.
Re[4]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 11:42
Оценка:
Здравствуйте, Аноним, Вы писали:

А>Или вот у меня объявлено private volatile int timeout. В нативе я ы не задумываясь использовал бы эту переменную без всяких блокировок


При запуске на машине с числом физических ядер более одного, нативный volitile становится злой шуткой вместо гарантированной синхронизации.
Re[7]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 12:07
Оценка:
Здравствуйте, Undying, Вы писали:

U>Здравствуйте, Аноним, Вы писали:


А>>То есть, если я правильно понял, Вы предлагаете периодически будить поток по таймауту, безотносительно наличия сообщений в очереди, так? Вообще-то именно этого я и пытался избежать. А если Вы что-то другое имели в виду, продемонстрируйте пожалуйста этот момент кодом.


U>Примерно так (локи опущены):


U>
U>DateTime lastSendingTime;
U>void MonitorRoutine()
U>{
U>  while (!terminate)
U>  {
U>    DateTime curTime = DateTime.UtcNow;
U>    TimeSpan interval = curTime - lastSendingTime;
U>    if (interval < timeout)
U>    {
U>      waitHandle.WaitOne(timeout - interval, false);
U>      continue;
U>    }
    
U>    if (msgQueue.Count == 0)
U>    {
U>      waitHandle.WaitOne();
U>    }
U>    else
U>    {
U>      //Отправляем сообщение

U>      lastSendingTime = DateTime.UtcNow;
U>      waitHandle.WaitOne(timeout, false);
U>    }
U>  }
U>}

U>        public void EnqueueMessage(string message)
U>        {
U>            lock (this.queueSync)
U>            {
U>                msgQueue.Enqueue(message);
U>            }
U>            if (!isPaused)
U>              waitHandle.Set();
              
U>        }

U>        volatile bool isPaused = false;
U>        public void Pause()
U>        {
U>          isPaused = true;
U>          waitHandle.WaitOne();
U>        }

U>        // Возобновление отправок
U>        public void Resume()
U>        {
U>          isPaused = false;
U>          waitHandle.Set();
U>        }

U>


К сожалению, приведённый Вами код не соответствует сформулированной автором теме задаче. В частности, у Вас добавление сообщения в очередь автоматически приводит к снятию с паузы, а этого происходить не должно. Попытка же этот исправить без добавления эвента неизбежно привёдет либо к возникновению гонок, либо к необходимости будить поток, даже в состоянии паузы. Да собственно, гонки ми в этом коде имеют место — вызов EnqueueMessage может легко вклиниться if (msgQueue.Count == 0) и waitHandle.WaitOne(), а залочить это не получится.

Вообще, Вам не кажется, что Вы несколько непоследовательны? С одной стороны предлагаете всё лочить одним объектом, дабы на корню убить гипотетические деадлоки, хотя один объект блокировки таких гарантий всё равно дать не может. С другой — готовы идти на усложнение логики, лишь бы избавиться от одного-двух объектов, а ведь усложнение логики — прямой путь к появлению логических ошибок, к тому-же трудноуловимых в многопоточном коде.

А>>Вообще-то лаконичность кода для меня — дело третье По моей шкале эффективность и читабельность кода имеют куда больший приоритет. А вот что Вы подразумеваете под "лучше по безопасности"? В каком смысле?


U>Безопаснее в том смысле, что код с volatile гораздо сложнее сломать изменениями, чем код с Interlocked.Exchange.


Вероятно, Вы правы. Хотя в нативе мы как-то обходимся Впрочем, по поводу преимуществ volatile убедительно сказано здесь
Автор: Serginio1
Дата: 02.07.09


Давайте всё-таки сделаем допущение, что с логикой всё в порядке и сосредоточимся на особенностях её реализации на C#, мне именно это более всего интересно

PS Надеюсь, автор темы простит меня за то, что я так беспардонно в неё вторгся. Я нечаянно
Re[8]: Thread-safe specialized queue
От: Кондраций Россия  
Дата: 02.07.09 12:20
Оценка: +1
Здравствуйте, Аноним, Вы писали:

А>Здравствуйте, Serginio1, Вы писали:


А>Так. Если я правильно понял, то может статься, что поле нестатическое поле класса может оказаться расположенным в регистре процессора? Забавно

А что мешает? Это просто кусок памяти, RAMы.

А>А под кэшем Вы что подразумевали? Я не совсем уловил.

Кэш ядра процессора.
Сообщение заговорено потомственным колдуном, целителем и магом в девятом поколении!
Модерирование или минусование сообщения ведет к половому бессилию, венерическим заболеваниям, венцу безбрачия и диарее!
Re[7]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 12:28
Оценка:
Здравствуйте, Undying, Вы писали:

U>Здравствуйте, Аноним, Вы писали:


А>>По поводу этого я уже высказался. С таким подходом придётся периодически будить поток только для того, чтобы проверить состояние очереди. Меня это не устраивает.


U>Не нужно периодически будить поток, код в другой подветке.


Мне как-то удобнее смотреть в плоском виде Но я видел и там отписался.

А>>Любопытно, кто это Вам сказал? К сожалению, Вам неправильно объяснили. Кэши процессоров имеют специальные механизмы, поддерживающие их в актуальном состоянии, в том числе — в многопроцессорных машишах. И какие-либо блокировки на это никак не влияют. А иначе критические секции, да и другие средства синхронизации, оказались бы в большинстве случаев бесполезным хламом.


U>Volatile принудительно сбрасывает кэши процессоров, если переменная таким образом не помечена и используется не из под лока, то устаревшее значение переменной может жить в кэшах процессоров неопределенное время (т.е. до тех пор пока кто-нибудь другой кэш не сбросит).


Ещё раз, вот смотрите. Имеем переменную, расположенную в памяти, допустим int32 Value = 10. Допустим, два потока на разных процессорах уже обращались к ней для чтения и она оказалась в кэшах обоих процессоров. Теперь наступает момент, когда эти два потока на тех-же процессорах пытаются выполнить следующий псевдокод
  EnterCriticalSection(CS);
  Value = Value + 5;
  LeaveCriticalSection(CS);


Если бы не существовало механизма поддержания когерентности кэшей процессоров, то каждый из потоков имел бы дело с собственной переменной Value в своём кэше,. и переменная Value в каждом из кэшей оказалась бы равна 15. То есть от применения критической секции не было бы никакого проку. Но так как такой механизм таки существует, и существует он на "железном" уровне, то ничего подобного не происходит, критические секции работают, а переменная Value окажется равна 20, как ей и полагается. Давайте Вы хорошенько это обдумаете, и мы к этому вопросу больше возвращаться не будем, договорились? Скучно, ей-Богу
Re[8]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 12:37
Оценка:
Здравствуйте, Аноним, Вы писали:

А>Здравствуйте, Serginio1, Вы писали:


А>Так. Если я правильно понял, то может статься, что поле нестатическое поле класса может оказаться расположенным в регистре процессора? Забавно


При оптимизации даже при обноядерном процессоре.
А>А под кэшем Вы что подразумевали? Я не совсем уловил.
Кэш ядра. Он у каждого ядра свой, а так как потоки могут выполняться на разных ядрах, то и дашшые одного кэша могут быть невидимы для другого.
Но тут нужно смотреть архитектуру.
и солнце б утром не вставало, когда бы не было меня
Re[9]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 12:38
Оценка:
Здравствуйте, Кондраций, Вы писали:

К>Здравствуйте, Аноним, Вы писали:


А>>Здравствуйте, Serginio1, Вы писали:


А>>Так. Если я правильно понял, то может статься, что поле нестатическое поле класса может оказаться расположенным в регистре процессора? Забавно

К>А что мешает? Это просто кусок памяти, RAMы.

А>>А под кэшем Вы что подразумевали? Я не совсем уловил.

К>Кэш ядра процессора.

Да ничто вобщем-то не мешает, просто как-то нелогично. А скорее просто не привычно

PS RAM всё-таки принято применять к чисто ОЗУ, к регистрам процессоров оно ка-бы не совсем подходит.
Re[5]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 12:47
Оценка:
Здравствуйте, Аноним, Вы писали:

А>Здравствуйте, Аноним, Вы писали:


А>>Или вот у меня объявлено private volatile int timeout. В нативе я ы не задумываясь использовал бы эту переменную без всяких блокировок


А>При запуске на машине с числом физических ядер более одного, нативный volitile становится злой шуткой вместо гарантированной синхронизации.


Честно говоря, совсем Вас не понял Не моглибы Вы повторить это в несколько более развёрнутом виде? Спасибо.
Re[8]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 12:49
Оценка: +1
Здравствуйте, Аноним, Вы писали:


U>>Volatile принудительно сбрасывает кэши процессоров, если переменная таким образом не помечена и используется не из под лока, то устаревшее значение переменной может жить в кэшах процессоров неопределенное время (т.е. до тех пор пока кто-нибудь другой кэш не сбросит).


А>Ещё раз, вот смотрите. Имеем переменную, расположенную в памяти, допустим int32 Value = 10. Допустим, два потока на разных процессорах уже обращались к ней для чтения и она оказалась в кэшах обоих процессоров. Теперь наступает момент, когда эти два потока на тех-же процессорах пытаются выполнить следующий псевдокод

А>
А>  EnterCriticalSection(CS);
А>  Value = Value + 5;
А>  LeaveCriticalSection(CS);
А>


Смотри выделенное. лок соответствует критической секцией
и солнце б утром не вставало, когда бы не было меня
Re[9]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 12:55
Оценка:
Здравствуйте, Serginio1, Вы писали:

S>Кэш ядра. Он у каждого ядра свой,


Вообще-то это не для всех многоядерников справедливо, есть и с, грубо говоря, совмещённым...Ну там довольно сложный механизм. По поводу когерентности, я не хочу повторяться, вот здесь
Автор:
Дата: 02.07.09
посмотрите.
Re[9]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 13:03
Оценка: :))
Здравствуйте, Serginio1, Вы писали:

S>Здравствуйте, Аноним, Вы писали:



U>>>Volatile принудительно сбрасывает кэши процессоров, если переменная таким образом не помечена и используется не из под лока, то устаревшее значение переменной может жить в кэшах процессоров неопределенное время (т.е. до тех пор пока кто-нибудь другой кэш не сбросит).


А>>Ещё раз, вот смотрите. Имеем переменную, расположенную в памяти, допустим int32 Value = 10. Допустим, два потока на разных процессорах уже обращались к ней для чтения и она оказалась в кэшах обоих процессоров. Теперь наступает момент, когда эти два потока на тех-же процессорах пытаются выполнить следующий псевдокод

А>>
А>>  EnterCriticalSection(CS);
А>>  Value = Value + 5;
А>>  LeaveCriticalSection(CS);
А>>


S>Смотри выделенное.


Признаться, я не понял, что нужно смотреть, и что значит "лок соответствует критической секцией" В WinApi мы просто вызываем EnterCriticalSection, при чём тут лок? Вы можете с успехом заменить EnterCriticalSection на WaitForSingleObject(Semaphore), ничего не поменяется. Здесь нет никаких иницируемых системой сбросов кэша, этого просто не нужно. Когерентность обеспечивается механизмом самих кэшей, на уровне железа. Вообще принудительный сброс кэша нужен только в очень редких, весьма экзотических, случааях.

В общем, я не знаю, как это ещё нужно объяснять, и более к этому возвращаться не намерен. Если не понятно — более ничем не могу помочь, извините Могу только посоветовать ознакомиться в общих чертах с архитектурой современных процессоров.
Re[10]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 13:13
Оценка:
Здравствуйте, Аноним, Вы писали:

http://rsdn.ru/article/dotnet/CSThreading2.xml#ENUAE
Автор(ы): Joseph Albahari
Дата: 27.06.2007
Окончание статьи, опубликованной в RSDN Magazine #1-2007. Рассматриваются особенности взаимодействия с апартаментами, потоковые таймеры, пулы потоков, BackgroundWorker, асинхронные методы и делегаты.
В статье использован материал из книги Joseph Albahari, Ben Albahari "C# 3.0 in a Nutshell" — http://www.oreilly.com/catalog/9780596527570/

Имелось ввиду Барьеры в памяти.
В свое время шел разговор и об кэшах процессоров.
и солнце б утром не вставало, когда бы не было меня
Re[7]: Thread-safe specialized queue
От: Пельмешко Россия blog
Дата: 02.07.09 13:55
Оценка:
Здравствуйте, Undying, Вы писали:
U>Примерно так (локи опущены):

А если вот так?
public sealed class MessageSender
{
    public event Action<string> MessageReceived;

    private readonly ManualResetEvent sendEvent;
    private readonly Queue<string> msgQueue;
    private readonly object syncRoot;

    private volatile bool isPaused;
    private int interval;

    public MessageSender()
    {
        this.sendEvent = new ManualResetEvent(false);
        this.msgQueue = new Queue<string>(32);
        this.syncRoot = new object();
        this.interval = 1000;

        ThreadPool.QueueUserWorkItem(SenderJob);
    }

    // Интервал между посылками события
    public int Interval
    {
        get { return this.interval; }
        set
        {
            if(value < 0) value = 100;
            this.interval = value;
        }
    }

    // Метод потока-отправителя
    private void SenderJob(object state)
    {
        while(true)
        {
            this.sendEvent.WaitOne();

            string message = null;
            lock(this.syncRoot)
            {
                if (this.msgQueue.Count > 0)
                {
                    message = this.msgQueue.Dequeue();
                }
                else this.sendEvent.Reset();
            }

            if (message != null)
            {
                // Возбуждение события
                Action<string> receiver = this.MessageReceived;
                if (receiver != null) receiver(message);
            }

            Thread.Sleep(this.interval);
        }
    }

    // Добавление сообщений в очередь
    public void EnqueueMessage(string message)
    {
        lock(this.syncRoot)
        {
            this.msgQueue.Enqueue(message);

            if (!this.isPaused)
            {
                this.sendEvent.Set();
            }
        }
    }

    // Приостановка отправок
    public void Pause()
    {
        lock (this.syncRoot)
        {
            this.isPaused = true;
            this.sendEvent.Reset();
        }
    }

    // Возобновление отправок
    public void Resume()
    {
        if   (this.isPaused)
        lock (this.syncRoot)
        {
            this.isPaused = false;
            if (this.msgQueue.Count > 0)
            {
                this.sendEvent.Set();
            }
        }
    }

    // Очистка очереди
    public void Abort()
    {
        lock(this.syncRoot)
        {
            this.msgQueue.Clear();
        }
    }
}


Работа с WaitHandle в локах осознанно, так как поток отправителя сам сбрасывает ивент... То есть может выйти так, что:
1. Поток-отправитель проверит очередь, убедится что в ней нету сообщений.
2. Другой поток добавит в очередь сообщение и включит ивент в сигнальное состояние.
3. Убедившись, что сообщений нет, поток-отправитель, сбросит событие.
4. В итоге отправитель ждёт события, а в очереди 1 сообщение.

Работа устраивает, проблем не наблюдается, но хочется довести до ума чтобы "уж наверняка"...
Re[11]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 14:25
Оценка:
Здравствуйте, Serginio1, Вы писали:

S>Здравствуйте, Аноним, Вы писали:


S>http://rsdn.ru/article/dotnet/CSThreading2.xml#ENUAE
Автор(ы): Joseph Albahari
Дата: 27.06.2007
Окончание статьи, опубликованной в RSDN Magazine #1-2007. Рассматриваются особенности взаимодействия с апартаментами, потоковые таймеры, пулы потоков, BackgroundWorker, асинхронные методы и делегаты.
В статье использован материал из книги Joseph Albahari, Ben Albahari "C# 3.0 in a Nutshell" — http://www.oreilly.com/catalog/9780596527570/

S>Имелось ввиду Барьеры в памяти.
S>В свое время шел разговор и об кэшах процессоров.

Я просмотрел раздел "Барьеры в памяти и асинхронная изменчивость (volatility)", Вы ведь его имели в виду? Но при чём тут кэш процессора? Ключевая фраза могут кэшироваться в регистрах CPU, ну всё правильно, видимо. Я не знаю, может Вы просто не в курсе, что "регистры процессора" и "L1, L2 кэши процессора" зто абсолютно разные, никак не связанные вещи? Не обижайтесь, если это не так, и Вы в курсе этой разницы, но у меня просто не осталось других идей А то, что там названо "барьерами", это действительно, самые обычные критические секции, к но NET, судя по всему, добавляет внутрь защищённого блока ещё и код, копирующий значения переменных из регистров в ОЗУ. Но дело в том, что на самом деле этот код копирует из регистров в кэш. Понимаете, современные процессы вообще не умеют работать напрямую с ОЗУ, но сами они об этом не знают. Пытаясь записать что-то в ОЗУ, они на самом деле пишут в кэш, и при чтении читают из кэша. Но само ядро об этом даже не догадывается, да и не нужно ему это. Контролеер кэша делает кэш прозрачным для ядра, а следовательно и для программы. Именно контроллер следит, чтобы значение по адресу в озу, к которму обращается ядро, совпадало с тем, что лежит в кэше. И он-же обеспечивает непротиворечивость данных в кэшах процессоров на многопроцессорной системе.

В общем, это меня утомило, честное слово Все, кто предпочитает думать, что volatile и Lock обладают каким-то волшебством, недоступным нативному, даже ассемблерному, коду, тот, разумеется волен в своих желаниях. А кто хочет разобраться, советую хотя-бы в общих чертах ознакомиться с устройством процессоров. А здесь об этом говорить как-то не очень подходяще.
Re[8]: Thread-safe specialized queue
От: Arboz Россия  
Дата: 02.07.09 14:37
Оценка:
Здравствуйте, Пельмешко, Вы писали:

П>Здравствуйте, Undying, Вы писали:

U>>Примерно так (локи опущены):

П>Работа устраивает, проблем не наблюдается, но хочется довести до ума чтобы "уж наверняка"...


А зачем вам while(true), лишняя логика по проверке очереди на ноль для Reset-а и флаг isPaused?

можно, например, так:

private void SenderJob(object state)
{
            sendEvent.WaitOne();

            string message;
            lock (msgQueue)
            {
                message = _queues.Dequeue();
            }
            
            if (message != null)
            {
                //непосредственно перед
                lock(syncRoot)
                {
                    Thread.Sleep(interval);
                }

                Action<string> receiver = this.MessageReceived;
                if (receiver != null) receiver(message);
            }
}

public void EnqueueMessage(string message)
{
        lock(msgQueue)
        {
            msgQueue.Enqueue(message);
        }
        ThreadPool.QueueUserWorkItem(SenderJob, null);
}
Re[12]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 14:42
Оценка:
Здравствуйте, Аноним, Вы писали:


А>В общем, это меня утомило, честное слово Все, кто предпочитает думать, что volatile и Lock обладают каким-то волшебством, недоступным нативному, даже ассемблерному, коду, тот, разумеется волен в своих желаниях. А кто хочет разобраться, советую хотя-бы в общих чертах ознакомиться с устройством процессоров. А здесь об этом говорить как-то не очень подходяще.

Я же оговорился, что свое время речь шла о кэшах процессоров, и от том, что зависит от архитектуры.
Буду рад если дашь толковую ссылку на Контролеер кэшей. Особенно когда ядра будут исчисляться сотнямию
volatility так же как и в нативе просто запрещает оптимизацию для кэширования значений в CPU.
А Lock это обычная критическая секция. Так, что Net в этом плане недалеко от натива ушел
и солнце б утром не вставало, когда бы не было меня
Re[10]: Thread-safe specialized queue
От: jedi Мухосранск  
Дата: 02.07.09 14:47
Оценка: :)
Здравствуйте, <Аноним>, Вы писали:

А>Здравствуйте, Serginio1, Вы писали:


А>Признаться, я не понял, что нужно смотреть, и что значит "лок соответствует критической секцией" В WinApi мы просто вызываем EnterCriticalSection, при чём тут лок? Вы можете с успехом заменить EnterCriticalSection на WaitForSingleObject(Semaphore), ничего не поменяется. Здесь нет никаких иницируемых системой сбросов кэша, этого просто не нужно. Когерентность обеспечивается механизмом самих кэшей, на уровне железа. Вообще принудительный сброс кэша нужен только в очень редких, весьма экзотических, случааях.


АААА.... Я хочу такое железо как у него. Почему мне не такое продали???
... << RSDN@Home 1.2.0 alpha 4 rev. 1228>>
Re[9]: Thread-safe specialized queue
От: Пельмешко Россия blog
Дата: 02.07.09 14:54
Оценка:
Здравствуйте, Аноним, Вы писали:
А>В общем, это меня утомило, честное слово Все, кто предпочитает думать, что volatile и Lock обладают каким-то волшебством, недоступным нативному, даже ассемблерному, коду, тот, разумеется волен в своих желаниях. А кто хочет разобраться, советую хотя-бы в общих чертах ознакомиться с устройством процессоров. А здесь об этом говорить как-то не очень подходяще.

Ща перечитал топик и никак не пойму что и кому Вас утомило доказывать.
Зато я понял:
1. Чем чаще употребляешь слово "нативный" в managed-ветке, тем ты авторитетнее.
2. Советовать всем изучить архитектуру CPU — это просто высший пилотаж!



Здравствуйте, Arboz, Вы писали:
П>>Работа устраивает, проблем не наблюдается, но хочется довести до ума чтобы "уж наверняка"...

A>А зачем вам while(true), лишняя логика по проверке очереди на ноль для Reset-а и флаг isPaused?

Действительно, и зачем только такое понадобилось?

А перечитайте начало топика, посмотрите какие требования к логике представлены нужны и что подсказал Undying по поводу постоянного создания потоков.
                lock(syncRoot)
                {
                    Thread.Sleep(interval);
                }
Даже не знаю что Вам на ЭТО ответить...
Re[13]: Thread-safe specialized queue
От: Аноним  
Дата: 02.07.09 15:23
Оценка:
Здравствуйте, Serginio1, Вы писали:

S>Здравствуйте, Аноним, Вы писали:



А>>В общем, это меня утомило, честное слово Все, кто предпочитает думать, что volatile и Lock обладают каким-то волшебством, недоступным нативному, даже ассемблерному, коду, тот, разумеется волен в своих желаниях. А кто хочет разобраться, советую хотя-бы в общих чертах ознакомиться с устройством процессоров. А здесь об этом говорить как-то не очень подходяще.

S> Я же оговорился, что свое время речь шла о кэшах процессоров, и от том, что зависит от архитектуры.
S>Буду рад если дашь толковую ссылку на Контролеер кэшей. Особенно когда ядра будут исчисляться сотнямию
S> volatility так же как и в нативе просто запрещает оптимизацию для кэширования значений в CPU.
S>А Lock это обычная критическая секция. Так, что Net в этом плане недалеко от натива ушел

Извините, если всё-таки обидел, не правильно поняв, я не хотел. Вы имели в виду, что кэши ранее обсуждались где-то в другом месте, а не в статье? И к каким выводам пришло обсуждение? Случайно не помните, хотя-бы где и когда, это было, любопытно было-бы взглянуть.

По поводу ссылок — с удовольствием-бы, но я их не сохранил. Всё-таки это не вчера было, и даже не позавчера Если не ошибаюсь, на IXBT было несколько очень неплохих статей. Да я думаю, найти будет не сложно, во всяком случае не помню, чтобы это вызвало какие-то затруднения. Надо только не забывать, что Intel и AMD строит свои кэши по-разному. Одноко для обеспечения когерентности существует достаточно старый протокол, принятый ещё в 90-х годах., и все производители его поддерживают. Название протокола — извините, тоже забыл
Re[12]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 16:22
Оценка:
Здравствуйте, Аноним, Вы писали:
Если про кэши ядра я оговорился, то с кэшами процессоров не совсем понятно
Тогда если так хорошо умельцы из MS придумали
Thread.VolatileRead и Thread.VolatileWrite упомянутые в той статье.
http://msdn.microsoft.com/ru-ru/library/system.threading.thread.volatileread.aspx
http://msdn.microsoft.com/ru-ru/library/system.threading.thread.volatilewrite.aspx
и солнце б утром не вставало, когда бы не было меня
Re[14]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 16:41
Оценка:
Здравствуйте, Аноним, Вы писали:

На обиженных воду . Мне самому интересно разобраться. О том что ты написал я сам смутно помнил по этому и сомневался
Но вот такие комментарии http://msdn.microsoft.com/ru-ru/library/th4b7fw3.aspx
их только развивают. Никакой конкретики, если только процессоронезависимость Net, Ява
и солнце б утром не вставало, когда бы не было меня
Re[14]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 02.07.09 16:41
Оценка:
Здравствуйте, Аноним, Вы писали:

На обиженных воду . Мне самому интересно разобраться. О том что ты написал я сам смутно помнил по этому и сомневался
Но вот такие комментарии http://msdn.microsoft.com/ru-ru/library/th4b7fw3.aspx
их только развивают. Никакой конкретики, если только процессоронезависимость Net, Ява
и солнце б утром не вставало, когда бы не было меня
Re[10]: Thread-safe specialized queue
От: Arboz Россия  
Дата: 02.07.09 17:50
Оценка:
Здравствуйте, Пельмешко, Вы писали:


П>А перечитайте начало топика, посмотрите какие требования к логике представлены нужны и что подсказал Undying по П>поводу постоянного создания потоков.

давайте по порядку

П>Потребовалось сделать велосипед со следующим поведением:

П>* Класс олицетворяет собой очередь сообщений.
П>* В класс из нескольких потоков поступают сообщения, а так же есть подписчики на эти сообщения.
П>* Класс должен поочерёдно извлекать сообщения из очереди и отправлять их подписчикам события.и
тут все требования, как я понимаю, удовлетворены, т.к. мое предложение по модификации вашего кода не касается этих требований.

П>* Между отправками сообщений должен соблюдаться заданный временной интервал.

соблюдается
П>* При отсутствии сообщений в очереди не должно создаваться никаких накладных расходов (например, в виде постоянно П>проверяющего очередь потока).
В моем случае нет собщений — нет ни одного работающего потока, в вашем случае есть — ожидающий добавления сообщения.
С этой точки зрения мое предложение лучше.
П>* В очереди может быть достаточно много сообщений, поэтому реализация очереди не должна плодить потоки на каждое.
Обращаю ваше внимание на формулировку — в моем случае потоки не плодятся. Покажите, где создается поток?
П>* Должна быть возможность приостановить/возобновить отправки.
соблюдается

П>
П>                lock(syncRoot)
П>                {
П>                    Thread.Sleep(interval);
П>                }
П>
Даже не знаю что Вам на ЭТО ответить...


Ну уж скажите что-нибудь. Аргументированное, пожалуйста.
Re[11]: Thread-safe specialized queue
От: Пельмешко Россия blog
Дата: 02.07.09 19:05
Оценка:
Здравствуйте, Arboz, Вы писали:
П>>* Между отправками сообщений должен соблюдаться заданный временной интервал.
A>соблюдается
Ага, перед отправкой первого так же, хотя он ни в чём не виноват и может быть быть отправлен без задержки по определению (требуется интервал между отправками сообщений).

П>>* При отсутствии сообщений в очереди не должно создаваться никаких накладных расходов (например, в виде постоянно П>проверяющего очередь потока).

A>В моем случае нет собщений — нет ни одного работающего потока, в вашем случае есть — ожидающий добавления сообщения.
A>С этой точки зрения мое предложение лучше.

Ага, Вы забиваете пул задачами, потоки не "плодятся"
А зафиг тогда очередь? Почему сообщение отправителю через state не отправить?
В конце-концов, где гарантия, что пул выдержит 100000 сообщений?

A>Ну уж скажите что-нибудь. Аргументированное, пожалуйста.


                lock(syncRoot)
                {
                    Thread.Sleep(interval);
                }

А как Вам такой сценарий: работа потока может быть прервана после выхода из лока, и следующий SenderJob может пройти лок и отправить следующее сообщение первым, то есть не будет соблюдаться порядок? Маловероятно? А если подписчик на событие долго (дольше интервала) обрабатывает сообщение?

И вообще это выглядит очень странно: лок ресурса на время.
Я считаю это не интуитивно понятным и использованием lock не по назначению.
Вы часто такое применение lock видели?
Re[15]: Thread-safe specialized queue
От: Аноним  
Дата: 03.07.09 01:48
Оценка: 6 (2)
Здравствуйте, Serginio1, Вы писали:

S>Здравствуйте, Аноним, Вы писали:


S> На обиженных воду . Мне самому интересно разобраться. О том что ты написал я сам смутно помнил по этому и сомневался

S>Но вот такие комментарии http://msdn.microsoft.com/ru-ru/library/th4b7fw3.aspx
S>их только развивают. Никакой конкретики, если только процессоронезависимость Net, Ява

В своё время отсутствие чёткой информации от MS и сподвигло меня на разбирательство с современным состоянием дел

По поводу сказанного в MSDN — ну что тут скажешь? За комментариями надо-бы обращаться к авторам. Со своей стороны могу только предположить:
NET позиционируется как полностью кроссплатформенная. Исходя из этого можно предположить, что возможно появление среды исполнения NET для некой гипотетической платформы, в которой управления кэшами производится программно, и на этот случай разработчики специально оговаривают, что возьмут эту задачу на себя. Хотя лично мне несколько затруднительно представить, как может функционировать такая многопроцессорная система без синхронизации кэшей.

Сейчас попробовал поискать инфу в сети. Как иожидалось, ссылок — вагон и маленькая тележка Где-нибудь среди них наверняка есть и про синхронизацию кэшей, но на разгребание этой кучи надо время, которого нет. Но вот с первой страницы:

первое, Рихтер:

Однако кэш-линии сильно усложняют обновление памяти в многопроцессорной среде. Вот небольшой пример:

1. Процессор 1 считывает байт, извлекая этот и смежные байты в свою кэш-линию.
2. Процессор 2 считывает тот же байт, а значит, и тот же набор байтов, что и процессор 1; извлеченные байты помещаются в кэш-линию процессора 2.
3. Процессор 1 модифицирует байт памяти, и этот байт записывается в его кэш линию. Но эти изменения еще не записаны в оперативную память.
4. Процессор 2 повторно считывает тот же байт Поскольку он уже помещен в кэш-линию этого процессора, последний не обращается к памяти и, следова тельно, не «видит" новое значение данного байта.

Такой сценарий был бы настоящей катастрофой. Но разработчики чипов прекрасно осведомлены об этой проблеме и учитывают её при проектировании своих процессоров В частности, когда один из процессоров модифицирует байты в своей кэш линии, об этом оповещаются другие процессоры, и содержимое их кэш-линий объяв ляется недействительным. Таким образом, в примере, приведенном выше, после изменения байта процессором 1, кэш процессора 2 был бы объявлен недействительным. На этапе 4 процессор 1 должен сбросить содержимое своего кэша в оперативную память, а процессор 2 — повторно обратиться к памяти и вновь заполнить свою кэш линию Как видите, кэш-линии, которые, как правило, увеличивают быстродействие процессора, в многопроцессорных машинах могут стать причиной снижения произ водительности.


И вот неплохая статья об общих принципах построения кэшей, правда о многопроцессорности там не упоминается.
Re[10]: Thread-safe specialized queue
От: Аноним  
Дата: 03.07.09 03:06
Оценка:
Здравствуйте, Пельмешко, Вы писали:

Здравствуйте, jedi, Вы писали:

Господа, есть предложение придерживаться при общении хотя-бы некоторого подобия норм профессиональной этики. Аргументы типа "сам дурак" возможно и котируются в дворовой компании, но в среде профессионалов они вызывают некоторое удивление и недоумение, мягко говоря.
Re[11]: Thread-safe specialized queue
От: jedi Мухосранск  
Дата: 03.07.09 04:05
Оценка:
Здравствуйте, jedi, Вы писали:

J>Здравствуйте, <Аноним>, Вы писали:


А>>Здравствуйте, Serginio1, Вы писали:


А>>Признаться, я не понял, что нужно смотреть, и что значит "лок соответствует критической секцией" В WinApi мы просто вызываем EnterCriticalSection, при чём тут лок? Вы можете с успехом заменить EnterCriticalSection на WaitForSingleObject(Semaphore), ничего не поменяется. Здесь нет никаких иницируемых системой сбросов кэша, этого просто не нужно. Когерентность обеспечивается механизмом самих кэшей, на уровне железа. Вообще принудительный сброс кэша нужен только в очень редких, весьма экзотических, случааях.


J>АААА.... Я хочу такое железо как у него. Почему мне не такое продали???


Уважаемый, Аноним, приношу извинения. Неправильно понял и сморозил глупость. Обещаю исправиться

Тут другой вопрос возникает. Понятно, что когерентность кешей хорошо работает в современнх многоядерных процах, где все процы на одном камне. А вот интересно как обстоит дело с кешами в архитектурах, где процы реально раздельные. Ведь процессоры "далеко" и если они начнут следить за когерентностью кешей, не убьет ли это саму идею кеша? Или таких архитектур просто нет на сегодняшний день?
... << RSDN@Home 1.2.0 alpha 4 rev. 1228>>
Re[8]: Thread-safe specialized queue
От: Undying Россия  
Дата: 03.07.09 05:04
Оценка:
Здравствуйте, Пельмешко, Вы писали:

П>Работа устраивает, проблем не наблюдается, но хочется довести до ума чтобы "уж наверняка"...


Так-то нормально. Но если завтра задача усложнится, например, наряду с обычными появятся срочные сообщения, которые нужно будет отправлять мгновенно, то как ты ее будешь решать? Thread.Sleep(interval) там уже нельзя будет использовать.

Еще можешь terminate добавить и Dispose, как в примере Анонима, чтобы была возможность штатного завершения потока.
Re[12]: Thread-safe specialized queue
От: Аноним  
Дата: 03.07.09 05:31
Оценка: 6 (2)
Здравствуйте, jedi, Вы писали:

J>Уважаемый, Аноним, приношу извинения. Неправильно понял и сморозил глупость. Обещаю исправиться


Пустяки, забудим. У всех у нас есть нервы, и далеко не всегда этот факт идёт нам на пользу

J>Тут другой вопрос возникает. Понятно, что когерентность кешей хорошо работает в современнх многоядерных процах, где все процы на одном камне. А вот интересно как обстоит дело с кешами в архитектурах, где процы реально раздельные. Ведь процессоры "далеко" и если они начнут следить за когерентностью кешей, не убьет ли это саму идею кеша? Или таких архитектур просто нет на сегодняшний день?


Ну почему-же, есть.

Я знакомился с данным вопросом несколько лет назад, и всё написанное ниже базируется не на точных знаниях, а на воспоминаниях, на которые, к сожалению, не всегда можно полностью положиться

Не стоит забывать, что многопроцессорные системы появились намного раньше, чем многоядерные процессоры. В доисторические времена, когда компьютеры были большими, каждый производитель решал эту проблему по-своему. Появление новой модели было достаточно редким явлением, и особой необходимости в едином стандарте просто не было. Но потом появились достаточно серьёзные модели микропроцессоров. Однако эти первые модели обладали всё-таки ограниченной вычислительной мощьностью. Сам собой встал вопрос об увеличении мощности компьютеров путём создания многопроцессорных систем, благо такой опыт уже был накоплен. Тогда и возникла идея разработки унифицированного протокола синхронизации кэшей, что и было сделано где-то в середине 90-х. Смутно припоминаю, что вроде как он базировался на каком-то более раннем протоколе, но это не факт. Этот протокол был принят всеми производителями и они стали включать его поддержку в свои модели. Физически, насколько помню, у процессоров предусмотрена специальная внешняя шина, по которой контроллеры кэшей договариваются друг с другом. Ну а когда появились многоядерники, то этот протокол был реализован и для них, что было вобщем-то даже проще, чем для многопроцессорного варианта.

Убить идею кэша конечно можно, о чём и Рихтер пишет, но это уже зависит от автора программы. Нужно просто учитывать эти особенности при реализации многопоточности, уделяя достаточно внимания эффективности кода. В частности, весьма желательно минимизировать взаимодействие потоков между собой. Когда каждый из потоков имеет дело со своим обособленным набором данных, то и вероятность инвалидации линейки в кэше сводится к минимуму. Да и производители чипов не сидят сложа руки, работают над оптимизацией алгоритмов управления кэшем. В современных процессорах линия кэша обычно имеет размер 32 или 64 байта, при её инвалидации требуется обновить только эти байты, а не весь кэш. Это, кстати напоминает нам а крайней желательности выравнивания переменных на эту границы — если часть структуры окажется в одной, а другая в другой линейке, то придётся обновить в два раза больше байтов. Но в NET, я думаю, это не столь актуально, среда исполнения наверняка учитывает такие нюансы. Двухуровневый кэш тоже значительно снижает остроту проблемы, позволяя второму уровню обмениваться памятью одновременно с обработкой процессором данных.

Так что не всё так печально, как это может показаться на первый взгляд Но учитывать такие тонкости всё-таки стоит.
Re[13]: Thread-safe specialized queue
От: jedi Мухосранск  
Дата: 03.07.09 05:53
Оценка:
Здравствуйте, <Аноним>, Вы писали:

А>Не стоит забывать, что многопроцессорные системы появились намного раньше, чем многоядерные процессоры. В доисторические времена, когда компьютеры были большими, каждый производитель решал эту проблему по-своему. Появление новой модели было достаточно редким явлением, и особой необходимости в едином стандарте просто не было. Но потом появились достаточно серьёзные модели микропроцессоров. Однако эти первые модели обладали всё-таки ограниченной вычислительной мощьностью. Сам собой встал вопрос об увеличении мощности компьютеров путём создания многопроцессорных систем, благо такой опыт уже был накоплен. Тогда и возникла идея разработки унифицированного протокола синхронизации кэшей, что и было сделано где-то в середине 90-х. Смутно припоминаю, что вроде как он базировался на каком-то более раннем протоколе, но это не факт. Этот протокол был принят всеми производителями и они стали включать его поддержку в свои модели. Физически, насколько помню, у процессоров предусмотрена специальная внешняя шина, по которой контроллеры кэшей договариваются друг с другом. Ну а когда появились многоядерники, то этот протокол был реализован и для них, что было вобщем-то даже проще, чем для многопроцессорного варианта.


А>Убить идею кэша конечно можно, о чём и Рихтер пишет, но это уже зависит от автора программы. Нужно просто учитывать эти особенности при реализации многопоточности, уделяя достаточно внимания эффективности кода. В частности, весьма желательно минимизировать взаимодействие потоков между собой. Когда каждый из потоков имеет дело со своим обособленным набором данных, то и вероятность инвалидации линейки в кэше сводится к минимуму. Да и производители чипов не сидят сложа руки, работают над оптимизацией алгоритмов управления кэшем. В современных процессорах линия кэша обычно имеет размер 32 или 64 байта, при её инвалидации требуется обновить только эти байты, а не весь кэш. Это, кстати напоминает нам а крайней желательности выравнивания переменных на эту границы — если часть структуры окажется в одной, а другая в другой линейке, то придётся обновить в два раза больше байтов. Но в NET, я думаю, это не столь актуально, среда исполнения наверняка учитывает такие нюансы. Двухуровневый кэш тоже значительно снижает остроту проблемы, позволяя второму уровню обмениваться памятью одновременно с обработкой процессором данных.


А>Так что не всё так печально, как это может показаться на первый взгляд Но учитывать такие тонкости всё-таки стоит.


Спасибо, познавательно

P.S. Вы бы зарегистрировались, я думаю народ бы отблагодарил оценками за информацию
... << RSDN@Home 1.2.0 alpha 4 rev. 1228>>
Re[12]: Thread-safe specialized queue
От: Arboz Россия  
Дата: 03.07.09 06:31
Оценка:
Здравствуйте, Пельмешко, Вы писали:

П>Ага, перед отправкой первого так же, хотя он ни в чём не виноват и может быть быть отправлен без задержки по определению (требуется интервал между отправками сообщений).

Поставьте, так понравившеюся вам секцию (lock + Sleep), после отправки сообщения,
тогда будет точное удовлетворение этому условию.

П>Ага, Вы забиваете пул задачами, потоки не "плодятся"

Да, не плодятся.
П>А зафиг тогда очередь? Почему сообщение отправителю через state не отправить?
Очередь гарантирует заданную последовательность обработки ваших сообщений.
Последовательность помещение делегатов в ThreadPool не гарантирует порядок их выполнения, и хотя он сейчас FIFO,

Проблема, которую требуется решить, в первую очередь требует понимания того, как работает подготовка к отправке в ThreadPool. Внутренне, он поддерживает очередь переданных ему рабочих элементов. Когда поток в пуле доступен для исполнения работы, он возвращается к этой очереди и вытягивает из нее следующий элемент. Порядок, в котором происходит эта обработка, не задокументирован и на него совершенно точно не следует полагаться (поскольку он может измениться и, скорее всего, изменится в будущих версиях.
Сейчас он реализован очень просто: очередь «первым вошел – первым вышел» (first-in-first-out – FIFO).


П>В конце-концов, где гарантия, что пул выдержит 100000 сообщений?

В документации нет упоминаний про ограничения... Тем более что —

Сейчас он реализован очень просто: очередь «первым вошел – первым вышел»


В оригинале статьи рассказывается как решаются задачи, подобные вашей.

П>А как Вам такой сценарий: работа потока может быть прервана после выхода из лока, и следующий SenderJob может пройти лок и отправить следующее сообщение первым, то есть не будет соблюдаться порядок?

Порядок гарантируется выбранной структурой хранения — а именно очередью, с синхронизированным доступом.

П>А если подписчик на событие долго (дольше интервала) обрабатывает сообщение?

Да, подобной проблемы мой код не решает. Так же как и ваш.


П>И вообще это выглядит очень странно: лок ресурса на время.

П>Я считаю это не интуитивно понятным и использованием lock не по назначению.
П>Вы часто такое применение lock видели?
Это искусственный таймаут — я не вижу тут никакого совсем никакого криминала.
Re[9]: Thread-safe specialized queue
От: Arboz Россия  
Дата: 03.07.09 06:39
Оценка:
Здравствуйте, Undying, Вы писали:

U>Здравствуйте, Пельмешко, Вы писали:


П>>Работа устраивает, проблем не наблюдается, но хочется довести до ума чтобы "уж наверняка"...


U>Так-то нормально. Но если завтра задача усложнится, например, наряду с обычными появятся срочные сообщения, которые нужно будет отправлять мгновенно, то как ты ее будешь решать?

А решать буду так: ThreadPoolPriority
Да, Sleep-а не будет

U>Еще можешь terminate добавить и Dispose, как в примере Анонима, чтобы была возможность штатного завершения потока.

Это я оставил на откуп автору.
Re[10]: Thread-safe specialized queue
От: Undying Россия  
Дата: 03.07.09 07:43
Оценка:
Здравствуйте, Arboz, Вы писали:

A>А решать буду так: ThreadPoolPriority


А зачем нужны такие сложности?
Re[12]: Thread-safe specialized queue
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 03.07.09 08:04
Оценка:
Здравствуйте, Пельмешко, Вы писали:

П>Ага, Вы забиваете пул задачами, потоки не "плодятся"

П>А зафиг тогда очередь? Почему сообщение отправителю через state не отправить?
П>В конце-концов, где гарантия, что пул выдержит 100000 сообщений?

Выдержит, так очередь ограничена только памятью. Другой вопрос, что ограничить эту очередь,
семафорами.
и солнце б утром не вставало, когда бы не было меня
Re[11]: Thread-safe specialized queue
От: Arboz Россия  
Дата: 03.07.09 08:21
Оценка:
Здравствуйте, Undying, Вы писали:

U>Здравствуйте, Arboz, Вы писали:


A>>А решать буду так: ThreadPoolPriority


U>А зачем нужны такие сложности?

Да не сказать:
В нашем случае — две очереди, вместо одной _queues

_priorityQueue = new Queue<string>(),
_standardQueue = new Queue<string>();


и логика

private void SenderJob(object state)
{
            sendEvent.WaitOne();

            string message;
            lock(_priorityQueue)
            {
              message = (_priorityQueue.Count > 0 ? 
                _priorityQueue : _standardQueue).Dequeue();
            }

            if (message != null)
            {
                Action<string> receiver = this.MessageReceived;
                if (receiver != null) receiver(message);

                // :) а таймаут оставляем
                lock(syncRoot)
                {
                    Thread.Sleep(interval);
                }

            }
}

public void EnqueueMessage(string message, bool priority)
{
        lock(_priorityQueue) {
            (isPriority ? _priorityQueue : _standardQueue).Enqueue(qc);
        }

        ThreadPool.QueueUserWorkItem(SenderJob, null);
}
Re[12]: Thread-safe specialized queue
От: Undying Россия  
Дата: 03.07.09 08:41
Оценка:
Здравствуйте, Arboz, Вы писали:

A>В нашем случае — две очереди, вместо одной _queues


И где в приведенном коде срочные сообщения? Срочные это значит, что они должны быть отправлены как можно быстрее. Значит, во-первых, если мы напихали в очередь 10 срочных сообщений все они должны быть отправлены друг за другом без всяких пауз. Во-вторых, если поток отправил несрочное сообщение и теперь спит установленный для несрочных сообщений интервал, то при появлении срочного сообщения поток должен проснуться и немедленно обработать срочное сообщение. В твоем коде я не вижу никакой разницы в обработке срочных и несрочных сообщений.
Re[13]: Thread-safe specialized queue
От: Arboz Россия  
Дата: 03.07.09 09:45
Оценка:
Здравствуйте, Undying, Вы писали:

U>Здравствуйте, Arboz, Вы писали:


A>>В нашем случае — две очереди, вместо одной _queues


U>И где в приведенном коде срочные сообщения? Срочные это значит, что они должны быть отправлены как можно быстрее. Значит, U>во-первых, если мы напихали в очередь 10 срочных сообщений все они должны быть отправлены друг за другом без всяких пауз. U>Во-вторых, если поток отправил несрочное сообщение и теперь спит установленный для несрочных сообщений интервал, то при U>появлении срочного сообщения поток должен проснуться и немедленно обработать срочное сообщение.


Опять же — не вижу проблемы.
заводим еще

private static readonly EventWaitHandle _prior = new EventWaitHandle(false, EventResetMode.ManualReset);


далее

private void SenderJob(object state)
{
            sendEvent.WaitOne();

            string message;
            bool isPriority;
            lock(_priorityQueue)
            {
              //первыми выбираем приоритетные сообщения
              message = (_priorityQueue.Count > 0 ? _priorityQueue : _standardQueue).Dequeue();
              isPriority = _priorityQueue.Count > 0;
              //если еще есть приоритетные, то сбрасываем ожидание, иначе закрываем кран
              if(isPriority) 
                 _prior.Set()
              else
                 _prior.Reset();
            }
            
            if (message != null)
            {
                //вызов обработчиков
                Action<string> receiver = this.MessageReceived;
                if (receiver != null) receiver(message);

       
                //если это обычное сообщение, то таймаут
                if(!isPriority)
                {
                    lock(syncRoot)
                    {
                       _prior.WaitOne(interval);
                    }
                }

            }
}

public void EnqueueMessage(string message, bool isPriority)
{
        lock(_priorityQueue) {
            (isPriority ? _priorityQueue : _standardQueue).Enqueue(message);
        }

        if(isPriority)
             _prior.Set();

        ThreadPool.QueueUserWorkItem(SenderJob, null);
}




U>В твоем коде я не вижу никакой разницы в обработке срочных и несрочных сообщений.

срочные имеют более высокий приоритет, т.к. отправляются первыми

О чем мы спорим? Подобным подходом возможно реализовать любую логику.
Re[10]: Thread-safe specialized queue
От: Кондраций Россия  
Дата: 03.07.09 19:38
Оценка:
Здравствуйте, <Аноним>, Вы писали:

...

А>Да ничто вобщем-то не мешает, просто как-то нелогично. А скорее просто не привычно


А>PS RAM всё-таки принято применять к чисто ОЗУ, к регистрам процессоров оно ка-бы не совсем подходит.

Ты не понял , я имел в виду, что процессору пофиг, какое там поле обрабатываться должно, он с RAMой работает.
Скажем так: регистр — просто место обработки, этакий workplace. Сейчас в нём одно, через милисекунду — другое. Конечно, регистр — не RAM .
... << RSDN@Home 1.2.0 alpha 4 rev. 1111>>
Сообщение заговорено потомственным колдуном, целителем и магом в девятом поколении!
Модерирование или минусование сообщения ведет к половому бессилию, венерическим заболеваниям, венцу безбрачия и диарее!
Re[11]: Thread-safe specialized queue
От: Аноним  
Дата: 04.07.09 07:00
Оценка:
Здравствуйте, Кондраций, Вы писали:

К>Здравствуйте, <Аноним>, Вы писали:


К>...


А>>Да ничто вобщем-то не мешает, просто как-то нелогично. А скорее просто не привычно


А>>PS RAM всё-таки принято применять к чисто ОЗУ, к регистрам процессоров оно ка-бы не совсем подходит.

К>Ты не понял , я имел в виду, что процессору пофиг, какое там поле обрабатываться должно, он с RAMой работает.
К>Скажем так: регистр — просто место обработки, этакий workplace. Сейчас в нём одно, через милисекунду — другое. Конечно, регистр — не RAM .

Действительно, не понял. У меня в тот день вообще что-то понималка сильно глючила
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.