Вопрос про неблокирующий буфер
От: SergASh  
Дата: 28.04.15 19:37
Оценка:
Привет всем!

Имеем класс MessageGateway, скелет которого ниже. Идея в том, что из разных потоков
вызывается MessageGateway.Instance.ConveyMessage( "что-то там..." ), сообщению присваивается
порядковый номер и метка времени. После этого сообщение передается дальше по конвейеру (здесь не показано).

Из другого потока код может попросить отдать ему все (GetAllMessages) или часть сообщений (GetRecentMessages).

Требуется:
1. В любой момент времени хранить не более N сообщений.
2. Не хранить сообщения, добавленные ранее, чем M секунд назад.
3. Потокобезопасно отдавать все актуальные сообщения (GetAllMessages)
4. Потокобезопасно отдавать сообщения с порядковым номером большим некоторого произвольного числа (GetRecentMessages)
5. Не блокировать поток при вызове ConveyMessage.
6. Не копировать сообщения в GetAllMessages и в GetRecentMessages.
7. Неохотно, но готов смириться с блокировкой при вызове GetAllMessages и GetRecentMessages, но они не должны блокировать добавление новых сообщений.

1 решается кольцевым буфером. 2-4 можно решить обвешав все локами.

Насколько я понимаю, обычный лок ничем не поможет, если надо вернуть перечислитель.
Тут надо либо копировать всю последовательность, то есть прости-прощай №6, либо вызывать Monitor.Exit из метода Dispose перечислителя.
В последнем случае если про Dispose забудут, то все ляжет.

Как сделать №5 пока не представляю.

  Скрытый текст
public class MessageGateway
{
  #region Singleton
  private MessageGateway()
  {}
  public static MessageGateway Instance
  {
    get { return instance__.Value; }
  }
  private static readonly Lazy<MessageGateway> instance__ = new Lazy<MessageGateway>( () => new MessageGateway() );
  #endregion // Singleton

  private long identity_;

  private static readonly long maximumMessagesStored__ = 1000;
  private static readonly TimeSpan maximumMessageRetainPeriod__ = TimeSpan.FromSeconds( 600 );

  private void Enqueue( Message message )
  {
    throw new NotImplementedException();
  }

  public void ConveyMessage( string message )
  {
    var msg = new Message
              {
                MessageId = Interlocked.Increment( ref identity_ ),
                TimeStamp = DateTime.UtcNow,
                Text = message
              };
    Enqueue( msg );
  }

  public IEnumerable<Message> GetAllMessages()
  {
    throw new NotImplementedException();
  }
  public IEnumerable<Message> GetRecentMessages( long startingFromId )
  {
    throw new NotImplementedException();
  }
}
public class Message
{
  public long MessageId { get; set; }
  public DateTime TimeStamp { get; set; }
  public string Text { get; set; }
}
Отредактировано 28.04.2015 19:38 SergASh . Предыдущая версия .
Re: Вопрос про неблокирующий буфер
От: Sinix  
Дата: 28.04.15 19:51
Оценка:
Здравствуйте, SergASh, Вы писали:


SAS>Как сделать №5 пока не представляю.

Я бы начал с прототипа на Rx. Тем более что для всех пунктов готовые методы есть, если ничего не упустил.

Как правило или прототипа будет достаточно, или станет понятно, что от всей идеи придётся отказаться, т.к она не вписывается в дизайн остальной части софта.
Re[2]: Вопрос про неблокирующий буфер
От: SergASh  
Дата: 28.04.15 20:04
Оценка:
Здравствуйте, Sinix, Вы писали:

S>Я бы начал с прототипа на Rx. Тем более что для всех пунктов готовые методы есть, если ничего не упустил.


Спасибо, посмотрю. Я Rx ещё в глаза не видел.
Сколько вам понадобилось времени на его освоение?
Time budget на эту штуку у меня очень скудный, увы.
Re[3]: Вопрос про неблокирующий буфер
От: Sinix  
Дата: 28.04.15 21:08
Оценка: 3 (1)
Здравствуйте, SergASh, Вы писали:

SAS>Сколько вам понадобилось времени на его освоение?


Часов 5 на примеры и параллельное пролистывание introtorx.com. Но это по вечерам в свободное время, т.е было время чтобы материал отлежался в мозгах. Добил недели через полторы по-моему.

SAS>Time budget на эту штуку у меня очень скудный, увы.

А тогда упс. Rx мягко говоря не самая простая и понятная штука. Предлагаю подождать, может, ещё вариантов подбросят.
Re: Вопрос про неблокирующий буфер
От: Sharov Россия  
Дата: 29.04.15 09:06
Оценка:
Здравствуйте, SergASh, Вы писали:

SAS>Привет всем!


SAS>Имеем класс MessageGateway, скелет которого ниже. Идея в том, что из разных потоков

SAS>вызывается MessageGateway.Instance.ConveyMessage( "что-то там..." ), сообщению присваивается
SAS>порядковый номер и метка времени. После этого сообщение передается дальше по конвейеру (здесь не показано).

SAS>Из другого потока код может попросить отдать ему все (GetAllMessages) или часть сообщений (GetRecentMessages).


SAS>Требуется:


Персистенс сообщений не рассматриваете?

SAS>Как сделать №5 пока не представляю.


Через какие-нибудь две очереди. Навскидку если. Либо Concurrent*что-нибудь там
Кодом людям нужно помогать!
Re[2]: Вопрос про неблокирующий буфер
От: SergASh  
Дата: 29.04.15 11:08
Оценка:
Здравствуйте, Sharov, Вы писали:

S>Персистенс сообщений не рассматриваете?


Нет. Разве что в качестве последнего средства если ничего умнее не придумаю.

SAS>>Как сделать №5 пока не представляю.


S>Через какие-нибудь две очереди. Навскидку если. Либо Concurrent*что-нибудь там


Про две очереди можно подробнее?

Что до ConcurrentQueue и компании, то я туда первым делом посмотрел.
Непонятно как можно без блокировки ограничивать длину очереди. Про пункт 2 тоже непонятно.
Все упирается в то, что TryDequeue не поддерживает предикаты.
Re[3]: Вопрос про неблокирующий буфер
От: Sharov Россия  
Дата: 29.04.15 11:13
Оценка:
Здравствуйте, SergASh, Вы писали:

SAS>Здравствуйте, Sharov, Вы писали:


S>>Персистенс сообщений не рассматриваете?


SAS>Нет. Разве что в качестве последнего средства если ничего умнее не придумаю.


SAS>>>Как сделать №5 пока не представляю.


S>>Через какие-нибудь две очереди. Навскидку если. Либо Concurrent*что-нибудь там


SAS>Про две очереди можно подробнее?


Одна очередь для внутреннего потребления, другая для внешнего. И они должны быть синхронизированы. Но это overkill,
т.е. худшее из возможных решений, так что забудем...

SAS>Что до ConcurrentQueue и компании, то я туда первым делом посмотрел.

SAS>Непонятно как можно без блокировки ограничивать длину очереди. Про пункт 2 тоже непонятно.
SAS>Все упирается в то, что TryDequeue не поддерживает предикаты.

Сделать таймер, который каждые M секунд отрабатывал бы по очереди.
Кодом людям нужно помогать!
Re[4]: Вопрос про неблокирующий буфер
От: SergASh  
Дата: 29.04.15 11:35
Оценка:
Здравствуйте, Sharov, Вы писали:

SAS>>Что до ConcurrentQueue и компании, то я туда первым делом посмотрел.

SAS>>Непонятно как можно без блокировки ограничивать длину очереди. Про пункт 2 тоже непонятно.
SAS>>Все упирается в то, что TryDequeue не поддерживает предикаты.

S>Сделать таймер, который каждые M секунд отрабатывал бы по очереди.


То есть потокобезопасность достигается тем, что удаляет из очереди только поток таймера, и он всегда единственный?
Тут все осложняется тем, что в пиковый период за М секунд в очередь может попасть гораздо больше разрешенных N
сообщений, возможно на 2-3 порядка больше. Так что длину очереди желательно ограничивать в момент вставки.
Re[5]: Вопрос про неблокирующий буфер
От: Sharov Россия  
Дата: 29.04.15 11:57
Оценка:
Здравствуйте, SergASh, Вы писали:

S>>Сделать таймер, который каждые M секунд отрабатывал бы по очереди.


SAS>То есть потокобезопасность достигается тем, что удаляет из очереди только поток таймера, и он всегда единственный?


Не понял ...

SAS>Тут все осложняется тем, что в пиковый период за М секунд в очередь может попасть гораздо больше разрешенных N

SAS>сообщений, возможно на 2-3 порядка больше. Так что длину очереди желательно ограничивать в момент вставки.

Похоже без локов не получится, смотрите в сторону read/write lock'ов.
Кодом людям нужно помогать!
Re[6]: Вопрос про неблокирующий буфер
От: SergASh  
Дата: 29.04.15 12:16
Оценка:
Здравствуйте, Sharov, Вы писали:

S>Здравствуйте, SergASh, Вы писали:


S>>>Сделать таймер, который каждые M секунд отрабатывал бы по очереди.


SAS>>То есть потокобезопасность достигается тем, что удаляет из очереди только поток таймера, и он всегда единственный?


S>Не понял ...


Ваше предложение я понял так, что удаляет из очереди только один поток — поток таймера. В таком слачае последовательность
операций
не нужно делать атомарной, а значит можно обойтись без блокировки.

Если удаление разрешать и разных потоков, то последовательность придется защищать локом.
Re[7]: Вопрос про неблокирующий буфер
От: Sharov Россия  
Дата: 29.04.15 12:23
Оценка:
Здравствуйте, SergASh, Вы писали:

SAS>Здравствуйте, Sharov, Вы писали:


S>>Здравствуйте, SergASh, Вы писали:


S>>>>Сделать таймер, который каждые M секунд отрабатывал бы по очереди.


SAS>>>То есть потокобезопасность достигается тем, что удаляет из очереди только поток таймера, и он всегда единственный?


S>>Не понял ...


SAS>Ваше предложение я понял так, что удаляет из очереди только один поток — поток таймера. В таком слачае последовательность

SAS>операций
SAS> SAS>не нужно делать атомарной, а значит можно обойтись без блокировки.

Типа того.

SAS>Если удаление разрешать и разных потоков, то последовательность придется защищать локом.


Но у таймера и будет другой поток, просто я предполагал неблокирующую очередь.
Кодом людям нужно помогать!
Re[8]: Вопрос про неблокирующий буфер
От: SergASh  
Дата: 29.04.15 12:44
Оценка:
Здравствуйте, Sharov, Вы писали:

SAS>>Если удаление разрешать и разных потоков, то последовательность придется защищать локом.


S>Но у таймера и будет другой поток, просто я предполагал неблокирующую очередь.


Очередь само собой должна быть неблокирующей, иначе быстрых параллельных вставок не будет.

Проблема в том, что стандартная ConcurrentQueue не умеет выполнять приведенную мной
последовательность атомарно. Что приводит к мысли завести отдельный поток, из которого
будет выполняться удаление, и будить его а) по таймеру, б) при вставке новых сообщений
если обнаруживается, что очередь превысила допустимую длину. Последнее можно проверить
без блокировок.
Re: Вопрос про неблокирующий буфер
От: WolfHound  
Дата: 29.04.15 18:37
Оценка:
Здравствуйте, SergASh, Вы писали:

SAS>3. Потокобезопасно отдавать все актуальные сообщения (GetAllMessages)

Данный метод должен удалять сообщения из очереди или нет?

SAS>4. Потокобезопасно отдавать сообщения с порядковым номером большим некоторого произвольного числа (GetRecentMessages)

Тот же вопрос плюс, что происходит с сообщениями, у которых номер меньше?
... << RSDN@Home 1.2.0 alpha 5 rev. 62>>
Пусть это будет просто:
просто, как только можно,
но не проще.
(C) А. Эйнштейн
Re[2]: Вопрос про неблокирующий буфер
От: SergASh  
Дата: 30.04.15 09:38
Оценка:
Здравствуйте, WolfHound, Вы писали:

WH>Здравствуйте, SergASh, Вы писали:


SAS>>3. Потокобезопасно отдавать все актуальные сообщения (GetAllMessages)

WH>Данный метод должен удалять сообщения из очереди или нет?

SAS>>4. Потокобезопасно отдавать сообщения с порядковым номером большим некоторого произвольного числа (GetRecentMessages)

WH>Тот же вопрос плюс, что происходит с сообщениями, у которых номер меньше?

Это немодифицирующие методы, удалять они ничего не будут.
Имелось в виду, что они должны позволять себя вызывать одновременно из нескольких потоков
и не разрушать буфер если в середине итерации происходит вставка.

Логика вот какая:
  1. Клиент подключается
  2. Первым делом получает все накопленные до сих пор сообщения (GetAllMessages)
  3. Сообщения, появившиеся после подключения, передаются клиенту напрямую. К буферу это не относится.
  4. Клиент становится на паузу, ну или соединение обрывается.
  5. Клиент переподключается.
  6. Зная номер последнего полученного сообщения, клиент вызывает GetRecentMessages чтобы дополучить то, что он пропустил
  7. Если во время обрыва часть неполученных сообщений была удалена, то клиент их уже никогда не увидит. Такова жизнь
  8. Сообщения, появившиеся после переподключения, передаются клиенту напрямую.

ConcurrentQueue, как мне кажется, рашает задачу без блокировок если гарантировать, что удаления выполняются только из одного потока.
Re: Вопрос про неблокирующий буфер
От: Iso12  
Дата: 30.04.15 21:44
Оценка:
Здравствуйте, SergASh, Вы писали:


Можно попробовать использовать ConcurrentDictionary<TKey, TValue>. Как key использовать ваш порядковый номер сообщения, как value — само сообщение.
(К сожелению под рукой нет студии)

public class MyMessageBuffer
{
...
 private ConcurrentDictionary<long, MyMessageClass> messageDictionary = ConcurrentDictionary<long, MyMessageClass>();
 private long firstIndentity;
 private long lastIdentity;
...


firstIndentity и lastIdentity- первый и последний порядковый номер сообщения. Доступ к ним и операторы присвоения должны быть атомарными (Class Interlocked).


В GetAllMessages() реалезуем:

MyMessageClass message =null; 
List <MyMessageClass> retList = new List<MyMessageClass>();
for (long index = firstIndentity; index<=lastIdentity; index++)
{
  if(messageDictionary.TryGetValue(index, out message))
    retList.Add(message);
}
return retMesssage;


В GetRecentMessages(long startingFromId) делаете тоже самое , только стартовый индекс у вас будет startingFromId.

Успехов
Re[2]: Вопрос про неблокирующий буфер
От: Iso12  
Дата: 02.05.15 07:59
Оценка:
Пункт 2 реалезуется, как тут уже подсказали, добавлением Timer в классе MyMessageBuffer.
Пункт 1 реалезуете в ConveyMessage методе. После занесения сообщения, проверяете кличество сообщений в Dictionary. Если больше чем Max, то удаляете первое сообщение. Удаление сообщений надо будет сихронизировать lock-ом, чтобы не получилось так, что например два параллельных потока при полном буфере удалят, при добавлении новых сообщений, больше чем два старых сообщения.

Успехов
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.