public interface IEventBus
{
void Publish(IntegrationEvent @event);
void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void SubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
void UnsubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
}
Как бы представлено в общем виде — публикация, подписка, отписка. Все ли вам тут нравится?
Предлагаю типа такой вариант:
Скрытый текст
public interface IEventBus
{
Task PublishAsync(IntegrationEvent @event);
Task SubscribeAsync<TEvent>()
where TEvent : IntegrationEvent;
Task UnsubscribeAsync<TEvent>()
where TEvent : IntegrationEvent;
IAsyncEnumerable<IntegrationEvent> SelectEventsAsync();
Task ConfirmAsync(string deliveryTag);
}
1. Добавлена асинхронность.
2. Не обязательно для каждого типа событий создавать свой обработчик. Получаем все типы событий, на которые подписались — если хотите только определенные события — то создайте новый инстанс IEventBus и сделайте подписку только на определенные — вполне логично, кмк.
3. Спорный момент — подтверждение сделано явным в виде отдельного метода. Т.к. теперь не вызываем обработчики событий а просто их возвращаем — то делаем явно.
Что используете вы?
Re: Образцово-показательный интерфейс для EventBus
Здравствуйте, Shmj, Вы писали:
S> Task SubscribeAsync<TEvent>() S> where TEvent : IntegrationEvent;
Стейтфул? Фу, бяка.
S>3. Спорный момент — подтверждение сделано явным в виде отдельного метода.
Отличные грабли. Забыл метод позвать и привет многократные обработки одного события и бесконечно растущая очередь.
S>Что используете вы?
public interface IEventPublisher
{
Task PublishAsync(
Uri source,
string eventCode,
object payload,
IDictionary<string, object>? headers = null,
PublishEventParameters? parameters = null,
CancellationToken cancellation = default);
}
public interface IEventSubscriber
{
// IMPORTANT: dispose the returned result to stop the subscription.
// cancellation WILL NOT cancel the subscription after subscribe completion.
// IMPORTANT: use parameters.SubscriptionFailedCallback to handle non-recoverable subscription failures.
[MustUseReturnValue]
Task<IAsyncDisposable> SubscribeAsync(
Uri source,
string? eventCode,
Type payloadType,
[InstantHandle] Func<EventMessage, CancellationToken, Task> eventHandler,
EventSubscriptionParameters? subscriptionParameters = null,
CancellationToken cancellation = default);
// Do we need generic method here?
// IMPORTANT: dispose the returned result to stop the subscription.
// cancellation WILL NOT cancel the subscription after subscribe completion.
// IMPORTANT: use parameters.SubscriptionFailedCallback to handle non-recoverable subscription failures.
[MustUseReturnValue]
Task<IAsyncDisposable> SubscribeAsync<T>(
Uri source,
string? eventCode,
[InstantHandle] Func<EventMessage<T>, CancellationToken, Task> eventHandler,
EventSubscriptionParameters? subscriptionParameters = null,
CancellationToken cancellation = default)
where T : notnull;
}
Здравствуйте, Ночной Смотрящий, Вы писали: S>>3. Спорный момент — подтверждение сделано явным в виде отдельного метода. НС>Отличные грабли. Забыл метод позвать и привет многократные обработки одного события и бесконечно растущая очередь.
А что если не забыл — а не захотел? S>>Что используете вы?
Скрытый текст
НС>public interface IEventSubscriber НС>{ НС> // IMPORTANT: dispose the returned result to stop the subscription. НС> // cancellation WILL NOT cancel the subscription after subscribe completion. НС> // IMPORTANT: use parameters.SubscriptionFailedCallback to handle non-recoverable subscription failures. НС> [MustUseReturnValue] НС> Task<IAsyncDisposable> SubscribeAsync( НС> Uri source, НС> string? eventCode, НС> Type payloadType, НС> [InstantHandle] Func<EventMessage, CancellationToken, Task> eventHandler, НС> EventSubscriptionParameters? subscriptionParameters = null, НС> CancellationToken cancellation = default);
У вас есть плюс, в сравнении с версией eShopOnContainers — не нужно для каждой подписки создавать отдельный объект-обработчик.
Здравствуйте, Shmj, Вы писали:
НС>>Отличные грабли. Забыл метод позвать и привет многократные обработки одного события и бесконечно растущая очередь. S>А что если не забыл — а не захотел?
Кинь исключение. Или, если вспомнишь страшную фразу "логика на исключениях", можно передать cancel делегат, который, если дернуть его за яйца, отменит confirm.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[2]: Образцово-показательный интерфейс для EventBus
НС> // IMPORTANT: dispose the returned result to stop the subscription.
НС> // cancellation WILL NOT cancel the subscription after subscribe completion.
НС> // IMPORTANT: use parameters.SubscriptionFailedCallback to handle non-recoverable subscription failures.
НС>
Вот, кстати, этот момент у вас наиболее неочевидный.
Вы передаете CancellationToken, он умеет отменять как сам процесс подписки так и, очевидно, выполнение обработчика (который тоже принимает CancellationToken). Так почему же не использовать его и для отмены подписки?
И еще вопрос. Вы внутри метода SubscribeAsync, очевидно, вызываете подтверждение обработки события. Так? У меня это отдельный метод.
Так вот вопрос — у вас для каждого нового события создается новая задача или поток? И обработку следующего вы начинаете только после того, как закончили с предыдущим? Или же обрабатываете параллельно? Вопрос очень важный — что если я хочу, чтобы обработчики работали одновременно, а не в один поток? Или еще больше — что если я хочу ограничить количество одновременных потоков для обработчиков — сделать очередь?
Здравствуйте, Shmj, Вы писали:
S>Вы передаете CancellationToken, он умеет отменять как сам процесс подписки
Да.
S> так и, очевидно, выполнение обработчика
Нет. Там другой токен.
S> (который тоже принимает CancellationToken). Так почему же не использовать его и для отмены подписки?
Потому что это разные сценарии.
S>И еще вопрос. Вы внутри метода SubscribeAsync, очевидно, вызываете подтверждение обработки события. Так?
Нет. Успешное выполнение означает автоматический коммит. Исключение — откат. В нативных клиентах брокеров иногда бывает специальный флажок, autocommit, который это поведение отключает, и тогда нужно звать commit вручную. Но это очень опасная практика, и по факту потребности в таком ни разу не возникло. Поэтому в публичных универсальных интерфейсах такого режима нет.
S>Так вот вопрос — у вас для каждого нового события создается новая задача или поток?
Это целиком зависит от нативного клиента конкретного брокера. Обычно да, отдельный поток создается.
S> И обработку следующего вы начинаете только после того, как закончили с предыдущим?
Зависит от настройки FetchCount. Сколько она выставлена, столько параллельных обработок запускается максимум.
S>Или еще больше — что если я хочу ограничить количество одновременных потоков для обработчиков — сделать очередь?
Не нужно ничего самому делать. Этот функционал уже есть во всех известных мне нативных клиентах.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[4]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Ночной Смотрящий, Вы писали:
S>> так и, очевидно, выполнение обработчика НС>Нет. Там другой токен.
А где вы его берете? В своем методе SubscribeAsync вы, очевидно, вызываете eventHandler и предаете ему этот другой CancellationToken. Откуда вы берете этот другой CancellationToken?
Re[5]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Ночной Смотрящий, Вы писали:
НС>В смиысле где? Нормально спроектированное асинк API его обычно содержит в параметрах всех методов.
Пояснил на примере, там где !!!! — вопрос:
static void SubscribeAsync(Func<string, CancellationToken, Task> eventHandler,
CancellationToken cancellation = default)
{
// Тут словили событие (для упрощения string)string eventMessage = "event1";
// Теперь вызываете eventHandler не дожидаясь выполнения (чтобы продолжать отлавливать новые события).
_ = Task.Factory.StartNew(() =>
{
CancellationToken cancellation2 = CancellationToken.None;
// !!!! А вот при вызове eventHandler нужен этот другой cancellation2, который отменит eventHandler, а не процесс подписки!
await eventHandler(eventMessage, cancellation2);
// тут подтверждаем обработку.
});
}
Здравствуйте, Shmj, Вы писали:
НС>>В смиысле где? Нормально спроектированное асинк API его обычно содержит в параметрах всех методов. S>Пояснил на примере, там где !!!! — вопрос:
Я ничего в твоем примере не понял, там какая то каша. Никто в коде подписки обработчик события руками не вызывает. Схематически оно выглядит примерно так:
Здравствуйте, Ночной Смотрящий, Вы писали:
НС>Я ничего в твоем примере не понял, там какая то каша. Никто в коде подписки обработчик события руками не вызывает. Схематически оно выглядит примерно так: НС>
В вашем вызове вы передаете canc. Где его берет nativeClient? Видимо использует тот же cancellation, который вы ему передаете, ведь так? Получается у вас 1 единственный cancellation по сути. Так?
Почему тогда вы его не используете для отмены подписки? В nativeClient.SubscribeAsync он что отменяет?
По сути вы переписали nativeClient таким же, как ваш метод. А что если nativeClient работает по классической схеме событий — давайте такой случай рассмотрим для наглядности — там же вручную придется все делать. К примеру клиент для RabbitMQ так и работает — на событиях.
Здравствуйте, Shmj, Вы писали:
S>В вашем вызове вы передаете canc. Где его берет nativeClient?
Это в контексте разговора важно? Где то в самом начале стека есть CancellationTokenSource.
S> Видимо использует тот же cancellation, который вы ему передаете, ведь так?
Нет, не так.
S>По сути вы переписали nativeClient таким же, как ваш метод.
Ничего я не переписывал.
S>К примеру клиент для RabbitMQ так и работает — на событиях.
Ну, в кролике не предусмотрели отмены, увы. Поэтому в его случае приходится втаскивать токен более высокого уровня, например от события остановки хоста.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re: Образцово-показательный интерфейс для EventBus
Здравствуйте, Shmj, Вы писали:
S>Думаю так или иначе почти всем с этим приходится работать, так что достойно обсуждения.
S>Возьмем, для примера, из образцового проекта: S>Что используете вы?
Для меня эти проекты от dotnet-architecture — хороший пример того, как делать не надо. Там нет ничего, кроме говнокода.
Re[2]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Vladek, Вы писали:
V>Для меня эти проекты от dotnet-architecture — хороший пример того, как делать не надо. Там нет ничего, кроме говнокода.
Давайте свой IEventBus — обкашляем.
Re[10]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Shmj, Вы писали:
S>>>Как его получает nativeClient? НС>>Вызывает конструктор CancellationTokenSource. S>Откуда nativeClient знает, когда нужно вызвать CancellationTokenSource?
В смысле откуда? А что, по твоему, может являться причиной отмены обработки события? В нашем случае, примеру, это может быть пришедший извне сигнал отмены конкретной таски, таймаут по времени ее обработки, сигнал для soft shutdown хоста. Во всех трех случаях есть соответствующие способы отреагировать и позвать CancellationTokenSource.Cancel.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[14]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Ночной Смотрящий, Вы писали:
НС>В смысле откуда? А что, по твоему, может являться причиной отмены обработки события?
Дело не в причине — а в том, как вы его будете отменять, если CancellationToken находится во владении nativeClient, а не передается вами?
Т.е. тот CancellationToken, что вы передаете в свой метод SubscribeAsync — не может отменить исполнение eventHandler. А тот что может — не в вашей власти — его создает nativeClient.
НС>В нашем случае, примеру, это может быть пришедший извне сигнал отмены конкретной таски
И как это реализовано в интерфейсах? Вот нужно отменить исполнение конкретного eventHandler, который только что передал в SubscribeAsync. Как? У вас же нет метода для этого.
Re[15]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Shmj, Вы писали:
НС>>В смысле откуда? А что, по твоему, может являться причиной отмены обработки события? S>Дело не в причине
Дело именно в причине, так как именно причина отмены ответит на вопрос как это все инициировать и откуда всять токен.
S>- а в том, как вы его будете отменять, если CancellationToken находится во владении nativeClient, а не передается вами?
Например так.
S>И как это реализовано в интерфейсах?
Зачем это реализовывать в интерфейсах? Это особенность реализации. А в интерфейсах есть универсальный CancellationToken.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[16]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Ночной Смотрящий, Вы писали:
НС>Дело именно в причине, так как именно причина отмены ответит на вопрос как это все инициировать и откуда всять токен.
Вот вы сделали Dispose для подписки — а отменится ли автоматом исполнение уже запущенного eventHandler?
S>>- а в том, как вы его будете отменять, если CancellationToken находится во владении nativeClient, а не передается вами?
НС>Например так.
И к чему линковать будем?
S>>И как это реализовано в интерфейсах? НС>Зачем это реализовывать в интерфейсах? Это особенность реализации. А в интерфейсах есть универсальный CancellationToken.
Почему вы называете его универсальным, если он он не отменяет eventHandler?
Re[17]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Shmj, Вы писали:
НС>>Дело именно в причине, так как именно причина отмены ответит на вопрос как это все инициировать и откуда всять токен. S>Вот вы сделали Dispose для подписки — а отменится ли автоматом исполнение уже запущенного eventHandler?
Непонятен вопрос. Как тебе надо, так и сделай.
S>>>- а в том, как вы его будете отменять, если CancellationToken находится во владении nativeClient, а не передается вами? НС>>Например так. S>И к чему линковать будем?
К оригинальному токену.
S>>>И как это реализовано в интерфейсах? НС>>Зачем это реализовывать в интерфейсах? Это особенность реализации. А в интерфейсах есть универсальный CancellationToken. S>Почему вы называете его универсальным, если он он не отменяет eventHandler?
Потому что он универсальный. А почему он должен отменять eventHandler?
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[2]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Shmj, Вы писали:
S>И еще вопрос. Вы внутри метода SubscribeAsync, очевидно, вызываете подтверждение обработки события. Так? У меня это отдельный метод.
По идее за это брокер должен отвечать -- если сообщение принято подписчиком и нет исключения, то считаем что оно вообще
успешно принято. Как минимум с тз брокера.
Кодом людям нужно помогать!
Re[4]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Shmj, Вы писали:
S>Здравствуйте, Vladek, Вы писали:
V>>Для меня эти проекты от dotnet-architecture — хороший пример того, как делать не надо. Там нет ничего, кроме говнокода.
S>Давайте свой IEventBus — обкашляем.
Раньше были какие-то зачатки, я вроде бы использовал это с MVVM: https://github.com/vborovikov/vividkit/blob/master/src/Core/VividKit/RequestModel/Infrastructure/DefaultEventDispatcher.cs — идея была в том, что один компонент использовал диспетчер для публикации событий, а любой другой компонент, заинтересованный в этом событии, должен был реализовать обработчик события. И диспетчер автомагически выполнял функции медиатора между компонентами. Один диспетчер можно вкладывать в другой, и строить на этом дополнительную логику — но я оставил это пока на будущее, когда реально понадобится.
На данный момент я использую только CQRS: https://github.com/vborovikov/relay/blob/master/src/Relay/RequestModel/IRequestDispatcher.cs — в основном в UI — этот интерфейс удачно встраивается в контроллеры ASP.NET MVC, модели вида MVVM (класс Presenter в том же проекте), консольные приложения, использующие System.CommandLine, и даже недавно использовал его для отправки и получения команд и запросов через MSMQ. Это интерфейс позволяет мне ядро приложения надёжно изолировать от UI каким бы он не был. Именно такой API меня устраивает — минимум методов и всяких ритуалов вокруг них.
Re[5]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Sharov, Вы писали:
S>>>А почему стейтфул? НС>>Потому что нет параметра кого подписываем. S>Т.е. реализация обработки будет в самом подписчике?
Это вопрос к ТС.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[6]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Ночной Смотрящий, Вы писали:
S>>Т.е. реализация обработки будет в самом подписчике? НС>Это вопрос к ТС.
Не совсем, я не про детали, а про идею. Stateful потому что обработчик будет назначен через конструктор или
setter? Т.е. это ненужная связанность для подписчика (реализация IEventBus), так?
Кодом людям нужно помогать!
Re[7]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Sharov, Вы писали:
S>>>Т.е. реализация обработки будет в самом подписчике? НС>>Это вопрос к ТС. S>Не совсем, я не про детали, а про идею.
Все равно не ко мне — не я этот интерфейс выдумал.
S> Stateful потому что обработчик будет назначен через конструктор или S>setter?
Стейтфул потому что если у метода нет параметров и возвращаемого значения, то единственный способ сделать с его помощью что то полезное — поменять стейт.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[8]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Ночной Смотрящий, Вы писали:
S>> Stateful потому что обработчик будет назначен через конструктор или S>>setter? НС>Стейтфул потому что если у метода нет параметров и возвращаемого значения, то единственный способ сделать с его помощью что то полезное — поменять стейт.
Там отличие только в параметре типа обработчика, и сразу stateful?
ТС
Task SubscribeAsync<TEvent>()
where TEvent : IntegrationEvent;
исходный
void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
Или критиковались сразу оба варианта? Просто я не понял как наличие или отсутствие параметра типа
влияет на состояние?
Кодом людям нужно помогать!
Re[8]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Ночной Смотрящий, Вы писали:
НС>Стейтфул потому что если у метода нет параметров и возвращаемого значения, то единственный способ сделать с его помощью что то полезное — поменять стейт.
Можно полюбопытствовать — занимаетесь ли разработкой архитектуры? Просто не каждый же сразу обратит внимание.
Re[9]: Образцово-показательный интерфейс для EventBus
Здравствуйте, Ночной Смотрящий, Вы писали:
S>>Можно полюбопытствовать — занимаетесь ли разработкой архитектуры? НС>Почти каждый рабочий день. У меня, собственно, две основные обязанности — архитектура и качество кода.
Могли бы порекомендовать какую-нибудь литературу, которая вам помогла?
Re[11]: Образцово-показательный интерфейс для EventBus