Образцово-показательный интерфейс для EventBus
От: Shmj Ниоткуда  
Дата: 23.11.21 06:54
Оценка: -3
Думаю так или иначе почти всем с этим приходится работать, так что достойно обсуждения.

Возьмем, для примера, из образцового проекта:

  Скрытый текст
public interface IEventBus
{
    void Publish(IntegrationEvent @event);

    void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>;

    void SubscribeDynamic<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler;

    void UnsubscribeDynamic<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler;

    void Unsubscribe<T, TH>()
        where TH : IIntegrationEventHandler<T>
        where T : IntegrationEvent;
}


Как бы представлено в общем виде — публикация, подписка, отписка. Все ли вам тут нравится?

Предлагаю типа такой вариант:

  Скрытый текст
    public interface IEventBus
    {
        Task PublishAsync(IntegrationEvent @event);

        Task SubscribeAsync<TEvent>()
            where TEvent : IntegrationEvent;

        Task UnsubscribeAsync<TEvent>()
            where TEvent : IntegrationEvent;

        IAsyncEnumerable<IntegrationEvent> SelectEventsAsync();

        Task ConfirmAsync(string deliveryTag);
    }


1. Добавлена асинхронность.
2. Не обязательно для каждого типа событий создавать свой обработчик. Получаем все типы событий, на которые подписались — если хотите только определенные события — то создайте новый инстанс IEventBus и сделайте подписку только на определенные — вполне логично, кмк.
3. Спорный момент — подтверждение сделано явным в виде отдельного метода. Т.к. теперь не вызываем обработчики событий а просто их возвращаем — то делаем явно.

Что используете вы?
Re: Образцово-показательный интерфейс для EventBus
От: Ночной Смотрящий Россия  
Дата: 23.11.21 14:35
Оценка: 3 (1)
Здравствуйте, Shmj, Вы писали:

S> Task SubscribeAsync<TEvent>()

S> where TEvent : IntegrationEvent;

Стейтфул? Фу, бяка.

S>3. Спорный момент — подтверждение сделано явным в виде отдельного метода.


Отличные грабли. Забыл метод позвать и привет многократные обработки одного события и бесконечно растущая очередь.

S>Что используете вы?


public interface IEventPublisher
{
    Task PublishAsync(
        Uri source,
        string eventCode,
        object payload,
        IDictionary<string, object>? headers = null,
        PublishEventParameters? parameters = null,
        CancellationToken cancellation = default);
}

public interface IEventSubscriber
{
    // IMPORTANT: dispose the returned result to stop the subscription.
    // cancellation WILL NOT cancel the subscription after subscribe completion.
    // IMPORTANT: use parameters.SubscriptionFailedCallback to handle non-recoverable subscription failures.
    [MustUseReturnValue]
    Task<IAsyncDisposable> SubscribeAsync(
        Uri source,
        string? eventCode,
        Type payloadType,
        [InstantHandle] Func<EventMessage, CancellationToken, Task> eventHandler,
        EventSubscriptionParameters? subscriptionParameters = null,
        CancellationToken cancellation = default);

    // Do we need generic method here?
    // IMPORTANT: dispose the returned result to stop the subscription.
    // cancellation WILL NOT cancel the subscription after subscribe completion.
    // IMPORTANT: use parameters.SubscriptionFailedCallback to handle non-recoverable subscription failures.
    [MustUseReturnValue]
    Task<IAsyncDisposable> SubscribeAsync<T>(
        Uri source,
        string? eventCode,
        [InstantHandle] Func<EventMessage<T>, CancellationToken, Task> eventHandler,
        EventSubscriptionParameters? subscriptionParameters = null,
        CancellationToken cancellation = default)
        where T : notnull;
}
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Отредактировано 23.11.2021 14:37 Ночной Смотрящий . Предыдущая версия .
Re[2]: Образцово-показательный интерфейс для EventBus
От: Shmj Ниоткуда  
Дата: 23.11.21 16:43
Оценка:
Здравствуйте, Ночной Смотрящий, Вы писали:

S>>3. Спорный момент — подтверждение сделано явным в виде отдельного метода.


НС>Отличные грабли. Забыл метод позвать и привет многократные обработки одного события и бесконечно растущая очередь.


А что если не забыл — а не захотел?

S>>Что используете вы?


  Скрытый текст
НС>public interface IEventSubscriber
НС>{
НС> // IMPORTANT: dispose the returned result to stop the subscription.
НС> // cancellation WILL NOT cancel the subscription after subscribe completion.
НС> // IMPORTANT: use parameters.SubscriptionFailedCallback to handle non-recoverable subscription failures.
НС> [MustUseReturnValue]
НС> Task<IAsyncDisposable> SubscribeAsync(
НС> Uri source,
НС> string? eventCode,
НС> Type payloadType,
НС> [InstantHandle] Func<EventMessage, CancellationToken, Task> eventHandler,
НС> EventSubscriptionParameters? subscriptionParameters = null,
НС> CancellationToken cancellation = default);


У вас есть плюс, в сравнении с версией eShopOnContainers — не нужно для каждой подписки создавать отдельный объект-обработчик.
Отредактировано 23.11.2021 16:53 Shmj . Предыдущая версия . Еще …
Отредактировано 23.11.2021 16:44 Shmj . Предыдущая версия .
Re[3]: Образцово-показательный интерфейс для EventBus
От: Ночной Смотрящий Россия  
Дата: 23.11.21 17:20
Оценка:
Здравствуйте, Shmj, Вы писали:

НС>>Отличные грабли. Забыл метод позвать и привет многократные обработки одного события и бесконечно растущая очередь.

S>А что если не забыл — а не захотел?

Кинь исключение. Или, если вспомнишь страшную фразу "логика на исключениях", можно передать cancel делегат, который, если дернуть его за яйца, отменит confirm.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[2]: Образцово-показательный интерфейс для EventBus
От: Shmj Ниоткуда  
Дата: 23.11.21 19:58
Оценка:
Здравствуйте, Ночной Смотрящий, Вы писали:

НС>
НС>    // IMPORTANT: dispose the returned result to stop the subscription.
НС>    // cancellation WILL NOT cancel the subscription after subscribe completion.
НС>    // IMPORTANT: use parameters.SubscriptionFailedCallback to handle non-recoverable subscription failures.
НС>


Вот, кстати, этот момент у вас наиболее неочевидный.

Вы передаете CancellationToken, он умеет отменять как сам процесс подписки так и, очевидно, выполнение обработчика (который тоже принимает CancellationToken). Так почему же не использовать его и для отмены подписки?

И еще вопрос. Вы внутри метода SubscribeAsync, очевидно, вызываете подтверждение обработки события. Так? У меня это отдельный метод.

Так вот вопрос — у вас для каждого нового события создается новая задача или поток? И обработку следующего вы начинаете только после того, как закончили с предыдущим? Или же обрабатываете параллельно? Вопрос очень важный — что если я хочу, чтобы обработчики работали одновременно, а не в один поток? Или еще больше — что если я хочу ограничить количество одновременных потоков для обработчиков — сделать очередь?
Отредактировано 24.11.2021 4:46 Shmj . Предыдущая версия . Еще …
Отредактировано 23.11.2021 21:01 Shmj . Предыдущая версия .
Re[3]: Образцово-показательный интерфейс для EventBus
От: Ночной Смотрящий Россия  
Дата: 24.11.21 09:18
Оценка: +1
Здравствуйте, Shmj, Вы писали:

S>Вы передаете CancellationToken, он умеет отменять как сам процесс подписки


Да.

S> так и, очевидно, выполнение обработчика


Нет. Там другой токен.

S> (который тоже принимает CancellationToken). Так почему же не использовать его и для отмены подписки?


Потому что это разные сценарии.

S>И еще вопрос. Вы внутри метода SubscribeAsync, очевидно, вызываете подтверждение обработки события. Так?


Нет. Успешное выполнение означает автоматический коммит. Исключение — откат. В нативных клиентах брокеров иногда бывает специальный флажок, autocommit, который это поведение отключает, и тогда нужно звать commit вручную. Но это очень опасная практика, и по факту потребности в таком ни разу не возникло. Поэтому в публичных универсальных интерфейсах такого режима нет.

S>Так вот вопрос — у вас для каждого нового события создается новая задача или поток?


Это целиком зависит от нативного клиента конкретного брокера. Обычно да, отдельный поток создается.

S> И обработку следующего вы начинаете только после того, как закончили с предыдущим?


Зависит от настройки FetchCount. Сколько она выставлена, столько параллельных обработок запускается максимум.

S>Или еще больше — что если я хочу ограничить количество одновременных потоков для обработчиков — сделать очередь?


Не нужно ничего самому делать. Этот функционал уже есть во всех известных мне нативных клиентах.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[4]: Образцово-показательный интерфейс для EventBus
От: Shmj Ниоткуда  
Дата: 24.11.21 10:32
Оценка:
Здравствуйте, Ночной Смотрящий, Вы писали:

S>> так и, очевидно, выполнение обработчика

НС>Нет. Там другой токен.

А где вы его берете? В своем методе SubscribeAsync вы, очевидно, вызываете eventHandler и предаете ему этот другой CancellationToken. Откуда вы берете этот другой CancellationToken?
Re[5]: Образцово-показательный интерфейс для EventBus
От: Ночной Смотрящий Россия  
Дата: 24.11.21 10:34
Оценка:
Здравствуйте, Shmj, Вы писали:

S>А где вы его берете?


В смиысле где? Нормально спроектированное асинк API его обычно содержит в параметрах всех методов.

S>Откуда вы берете этот другой CancellationToken?


Вопрос непонятен.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[6]: Образцово-показательный интерфейс для EventBus
От: Shmj Ниоткуда  
Дата: 24.11.21 10:48
Оценка:
Здравствуйте, Ночной Смотрящий, Вы писали:

НС>В смиысле где? Нормально спроектированное асинк API его обычно содержит в параметрах всех методов.


Пояснил на примере, там где !!!! — вопрос:

        static void SubscribeAsync(Func<string, CancellationToken, Task> eventHandler,
            CancellationToken cancellation = default)
        {
            // Тут словили событие (для упрощения string)
            string eventMessage = "event1";

            // Теперь вызываете eventHandler не дожидаясь выполнения (чтобы продолжать отлавливать новые события).
            _ = Task.Factory.StartNew(() =>
            {
                CancellationToken cancellation2 = CancellationToken.None;

                // !!!! А вот при вызове eventHandler нужен этот другой cancellation2, который отменит eventHandler, а не процесс подписки!
                await eventHandler(eventMessage, cancellation2);
                // тут подтверждаем обработку.
            });
        }
Отредактировано 24.11.2021 11:00 Shmj . Предыдущая версия . Еще …
Отредактировано 24.11.2021 10:50 Shmj . Предыдущая версия .
Re[7]: Образцово-показательный интерфейс для EventBus
От: Ночной Смотрящий Россия  
Дата: 24.11.21 12:36
Оценка:
Здравствуйте, Shmj, Вы писали:

НС>>В смиысле где? Нормально спроектированное асинк API его обычно содержит в параметрах всех методов.

S>Пояснил на примере, там где !!!! — вопрос:

Я ничего в твоем примере не понял, там какая то каша. Никто в коде подписки обработчик события руками не вызывает. Схематически оно выглядит примерно так:
async static void SubscribeAsync(
  Func<string, CancellationToken, Task> eventHandler,
  CancellationToken cancellation = default)
{
  await nativeClient.SubscribeAsync(
        (nativeParams, canc) => eventHandler(GetModelParams(nativeParams), canc),
        cancellation);
}
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[8]: Образцово-показательный интерфейс для EventBus
От: Shmj Ниоткуда  
Дата: 24.11.21 12:48
Оценка:
Здравствуйте, Ночной Смотрящий, Вы писали:

НС>Я ничего в твоем примере не понял, там какая то каша. Никто в коде подписки обработчик события руками не вызывает. Схематически оно выглядит примерно так:

НС>
НС>async static void SubscribeAsync(
НС>  Func<string, CancellationToken, Task> eventHandler,
НС>  CancellationToken cancellation = default)
НС>{
НС>  await nativeClient.SubscribeAsync(
НС>        (nativeParams, canc) => eventHandler(GetModelParams(nativeParams), canc),
НС>        cancellation);
НС>}
НС>


В вашем вызове вы передаете canc. Где его берет nativeClient? Видимо использует тот же cancellation, который вы ему передаете, ведь так? Получается у вас 1 единственный cancellation по сути. Так?

Почему тогда вы его не используете для отмены подписки? В nativeClient.SubscribeAsync он что отменяет?

По сути вы переписали nativeClient таким же, как ваш метод. А что если nativeClient работает по классической схеме событий — давайте такой случай рассмотрим для наглядности — там же вручную придется все делать. К примеру клиент для RabbitMQ так и работает — на событиях.
Отредактировано 24.11.2021 13:02 Shmj . Предыдущая версия . Еще …
Отредактировано 24.11.2021 13:00 Shmj . Предыдущая версия .
Отредактировано 24.11.2021 12:58 Shmj . Предыдущая версия .
Re[9]: Образцово-показательный интерфейс для EventBus
От: Ночной Смотрящий Россия  
Дата: 24.11.21 13:45
Оценка:
Здравствуйте, Shmj, Вы писали:

S>В вашем вызове вы передаете canc. Где его берет nativeClient?


Это в контексте разговора важно? Где то в самом начале стека есть CancellationTokenSource.

S> Видимо использует тот же cancellation, который вы ему передаете, ведь так?


Нет, не так.

S>По сути вы переписали nativeClient таким же, как ваш метод.


Ничего я не переписывал.

S>К примеру клиент для RabbitMQ так и работает — на событиях.


Ну, в кролике не предусмотрели отмены, увы. Поэтому в его случае приходится втаскивать токен более высокого уровня, например от события остановки хоста.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re: Образцово-показательный интерфейс для EventBus
От: Vladek Россия Github
Дата: 24.11.21 14:04
Оценка:
Здравствуйте, Shmj, Вы писали:

S>Думаю так или иначе почти всем с этим приходится работать, так что достойно обсуждения.


S>Возьмем, для примера, из образцового проекта:

S>Что используете вы?

Для меня эти проекты от dotnet-architecture — хороший пример того, как делать не надо. Там нет ничего, кроме говнокода.
Re[2]: Образцово-показательный интерфейс для EventBus
От: Shmj Ниоткуда  
Дата: 24.11.21 14:06
Оценка:
Здравствуйте, Vladek, Вы писали:

V>Для меня эти проекты от dotnet-architecture — хороший пример того, как делать не надо. Там нет ничего, кроме говнокода.


Давайте свой IEventBus — обкашляем.
Re[10]: Образцово-показательный интерфейс для EventBus
От: Shmj Ниоткуда  
Дата: 24.11.21 14:08
Оценка:
Здравствуйте, Ночной Смотрящий, Вы писали:

НС>Это в контексте разговора важно? Где то в самом начале стека есть CancellationTokenSource.


Конечно важно. Как его получает nativeClient? Почему он не определен в интерфейсе?

Вы же через интерфейс работаете — как отменить исполнение eventHandler, если CancellationToken, который в него передается, не определен в интерфейсе?
Отредактировано 24.11.2021 14:47 Shmj . Предыдущая версия .
Re[11]: Образцово-показательный интерфейс для EventBus
От: Ночной Смотрящий Россия  
Дата: 24.11.21 14:41
Оценка:
Здравствуйте, Shmj, Вы писали:

НС>>Это в контексте разговора важно? Где то в самом начале стека есть CancellationTokenSource.

S>Конечно важно.

Мне не понятно почему.

S>Как его получает nativeClient?


Вызывает конструктор CancellationTokenSource.

S>Почему он не определен в интерфейсе?


В каком?
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[12]: Образцово-показательный интерфейс для EventBus
От: Shmj Ниоткуда  
Дата: 24.11.21 15:00
Оценка:
Здравствуйте, Ночной Смотрящий, Вы писали:

S>>Как его получает nativeClient?

НС>Вызывает конструктор CancellationTokenSource.

Откуда nativeClient знает, когда нужно вызвать CancellationTokenSource?

S>>Почему он не определен в интерфейсе?

НС>В каком?

В приведенном вами, конечно. Или у вас есть еще второй интерфейс, через который вы дергаете отмену исполнения eventHandler?
Отредактировано 24.11.2021 15:04 Shmj . Предыдущая версия .
Re[13]: Образцово-показательный интерфейс для EventBus
От: Ночной Смотрящий Россия  
Дата: 24.11.21 15:05
Оценка:
Здравствуйте, Shmj, Вы писали:

S>>>Как его получает nativeClient?

НС>>Вызывает конструктор CancellationTokenSource.
S>Откуда nativeClient знает, когда нужно вызвать CancellationTokenSource?

В смысле откуда? А что, по твоему, может являться причиной отмены обработки события? В нашем случае, примеру, это может быть пришедший извне сигнал отмены конкретной таски, таймаут по времени ее обработки, сигнал для soft shutdown хоста. Во всех трех случаях есть соответствующие способы отреагировать и позвать CancellationTokenSource.Cancel.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Re[14]: Образцово-показательный интерфейс для EventBus
От: Shmj Ниоткуда  
Дата: 24.11.21 15:43
Оценка:
Здравствуйте, Ночной Смотрящий, Вы писали:

НС>В смысле откуда? А что, по твоему, может являться причиной отмены обработки события?


Дело не в причине — а в том, как вы его будете отменять, если CancellationToken находится во владении nativeClient, а не передается вами?

Т.е. тот CancellationToken, что вы передаете в свой метод SubscribeAsync — не может отменить исполнение eventHandler. А тот что может — не в вашей власти — его создает nativeClient.

НС>В нашем случае, примеру, это может быть пришедший извне сигнал отмены конкретной таски


И как это реализовано в интерфейсах? Вот нужно отменить исполнение конкретного eventHandler, который только что передал в SubscribeAsync. Как? У вас же нет метода для этого.
Re[15]: Образцово-показательный интерфейс для EventBus
От: Ночной Смотрящий Россия  
Дата: 24.11.21 17:37
Оценка:
Здравствуйте, Shmj, Вы писали:

НС>>В смысле откуда? А что, по твоему, может являться причиной отмены обработки события?

S>Дело не в причине

Дело именно в причине, так как именно причина отмены ответит на вопрос как это все инициировать и откуда всять токен.

S>- а в том, как вы его будете отменять, если CancellationToken находится во владении nativeClient, а не передается вами?


Например так.

S>И как это реализовано в интерфейсах?


Зачем это реализовывать в интерфейсах? Это особенность реализации. А в интерфейсах есть универсальный CancellationToken.
... << RSDN@Home 1.3.17 alpha 5 rev. 62>>
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.