Реализация паттерна Async Completion Token
От: eao197 Беларусь http://eao197.blogspot.com
Дата: 20.05.05 14:40
Оценка: 20 (3)
Вот здесь профессор Дуглас Шмидт описывает паттерн Asynchronous Completion Token. Суть патерна в том, что клиент, выполняя запрос к серверу, передает на сервер специальный объект (Asynchronous Completion Token -- ACT). Сервер возвращает этот объект клиенту в ответе. Благодоря ACT клиент понимает, на какой именно из его запросов ответил сервер. Особенно это удобно при асинхронном взаимодействии клиента и сервера, когда по одному каналу связи идут потоки запросов и ответов. Ключевым моментом в паттерне Asynchronous Completion Token является то, что объект ACT, в общем случае, является непрозрачным для сервера (т.е. сервер совершенно не знает, что именно находится в ACT).

Забавно, что я наткнулся на описание этого паттерна вскоре после того, как реализовал с помощью ObjESSty подобный механизм. В моем случае требовалось организовать цепочку процессов (которые, возможно, работают на разных узлах сети) и которые в асинхронном режиме обслуживают запросы (транзакции) клиента, подключенного к одному из концов этой цепочки (эта цепочка выглядит для клиента как один сервер). Причем каждая транзакция состоит из нескольких сообщений. Например, одна из транзакций состоит из трех сообщений:
1. send -- инициируется клиентом и содержит описание необходимых действий. Клиент повторяет его до тех пор, пока не получит send_result.
2. send_result -- отсылается клиенту в ответ на send. Повторяется до тех пор, пока клиент не пришлет send_finish. Сделано это для того, чтобы в случае потери единичного send_result клиент не инициировал запрос send еще раз.
3. send_finish -- отсылается клиентом в ответ на send_result. Это сообщение говорит серверу, что клиент полностью завершил транзакцию и что сервер может удалить у себя информацию о данной транзакции.
Такая трехфазная схема позволяет организовать асинхронное взаимодействие между клиентом и сервером через один канал. Клиент может инициировать несколько send не дожидаясь ответа на предудущие send. Однако, эта схема требует, чтобы каждая транзакция имела уникальный идентификатор.

Вопрос был в том, как представить идентификаторы транзакций. Особенность еще и в том, что к серверу могли подключаться несколько клиентов, каждый из которых использует собственные идентификаторы. А внутри сервера сообщения от разных клиентов могли смешиваться. В типовом случае сервер состоял из трех процессов:
— первый являлся шлюзом между клиентами и остальной частью сервера (gate). Этих процессов может быть несколько, в зависимости от того, какой транспорт предпочитает конкретный клиент;
— второй процесс (router) занимался выбором конкретного процесса для выполнения запроса клиента. В простейшем случае сервер содержал только один router, к которому подключаются все gate-ы;
— третий процесс (service) занимался выполнением запросов клиентов. Этих процессов может быть несколько.
Таким образом, в один процесс router стекаются запросы от всех клиентов. И router должен получать вместе с каждым запросом уникальный идентификатор запроса. И последующие в цепочке запросы так же должны получать уникальные идентификаторы.

Если бы в качестве идентификаторов транзакций использовались какие-либо простые значения, например уникальные целые числа (UID), то каждый процесс в цепочке, который мог обрабатывать запросы нескольких клиентов, был бы вынужден сохранять полученный от клиента UID, назначать транзакции собственный UID и передавать дальше именно собственный UID. Получив ответ процессу небходимо было по своему UID восстановить исходный UID и ретранслировать ответ клиенту уже с исходным UID. А это означало бы, что все процессы в цепочке должны были бы быть state-ful. Но для некоторых процессов это было слишком накладно. Для того же router-а например.

Поэтому здесь хорошо подходит идея ACT в качестве идентификатора транзакции. Каждый процесс в цепочке получает от предыдущего узла ACT, строит на его основе собственный ACT, сохраняя прямо в нем исходное значение, и передает запрос дальше со своим ACT. Получив ответ, процесс из своего ACT извлекает исходное значение и уже с ним ретранслирует ответ.

Вопрос здесь лишь в том, как удобно сделать работу с непрозрачными ACT. В случае с ObjESSty он разрешился посредством использования механизма subclassing by extension. Для идентификатора транзакции был заведен базовый класс:

namespace mbsms_2
{

class    MBSMS_2_TYPE    trx_id_t
    :    public oess_1::stdsn::serializable_t
    {
        OESS_SERIALIZER_EX( trx_id_t, MBSMS_2_TYPE )
    public :
        virtual ~trx_id_t();

        /*!
            \brief Клонировать объект.
        */
        virtual std::auto_ptr< trx_id_t >
        clone() const = 0;
    };

}


Который описывался на DDL следующим образом:
{type    mbsms_2::trx_id_t
    {abstract}
    {extensible}
    {subclassing_by_extension}
}


Реальные идентификаторы транзакций должны были создаваться путем наследования от trx_id_t. Например, вот так:
// В качестве реального идентификатора используется уникальное целое число.
class trx_id_t
    :    public mbsms_2::trx_id_t
    {
    OESS_SERIALIZER( trx_id_t )
    private :
        unsigned int    m_uid;

    public :
        trx_id_t()
            :    m_uid( 0 )
            {}
        trx_id_t(
            unsigned int uid )
            :    m_uid( uid )
            {}

        virtual std::auto_ptr< mbsms_2::trx_id_t >
        clone() const
            {
                return std::auto_ptr< mbsms_2::trx_id_t >(
                        new trx_id_t( *this ) );
            }

        unsigned int
        uid() const
            {
                return m_uid;
            }
    };

{type    trx_id_t
    {extensible}
    {subclassing_by_extension {extension_of mbsms_2::trx_id_t}}

    {attr m_uid {of oess_1::uint_t}}
}


Для того, чтобы передавать производные от mbsms_2::trx_id_t идентификаторы транзакций в сообщениях send, send_result, send_finish нужно использовать указатели на mbsms_2::trx_id_t и динамически созданные объекты конкретных типов идентификаторов. Для упрощения работы с арибутами-указателями был создан вспомогательный сериализуемый умный указатель (на самом деле он не такой уже и умный -- для копирования объектов используется их клонирование):
class    MBSMS_2_TYPE    trx_id_wrapper_t
    :    public oess_1::stdsn::shptr_skeleton_t<
                trx_id_t,
                oess_1::stdsn::cloneable_policy_t< trx_id_t > >
    {
        //! Псевдоним для базового типа.
        typedef oess_1::stdsn::shptr_skeleton_t<
                trx_id_t,
                oess_1::stdsn::cloneable_policy_t< trx_id_t > >
            base_type_t;
        OESS_SERIALIZER_EX( trx_id_wrapper_t, MBSMS_2_TYPE )
        OESS_1_SHPTR_IFACE( trx_id_wrapper_t,
                mbsms_2::trx_id_t,
                base_type_t )
    };

{type    mbsms_2::trx_id_wrapper_t
    {attr    m_ptr {of {extension_of} mbsms_2::trx_id_t}}
}


Далее в сообщениях хранятся именно mbsms_2::trx_id_wrapper_t:
class    MBSMS_2_TYPE send_t :
    public mbapi_3::msg_t
{
    OESS_SERIALIZER_EX( send_t, MBSMS_2_TYPE )
    public :
        //! Конструктор по умолчанию.
        /*!
            Присваивает всем полям пустые значения.
        */
        send_t();
        //! Инициализирующий конструктор.
        send_t(
            //! Идентификатор данной транзакции на стороне отправителя
            //! сообщения send_t.
            const mbsms_2::trx_id_wrapper_t & originator_trx,
            ....... );
        /*!
            \since 2.0.0

            Конструктор для случая перемаршрутизации сообщения.
            Явно задается новый originator_trx, остальные атрибуты берутся
            из исходного объекта.
        */
        send_t(
            //! Идентификатор данной транзакции на стороне отправителя сообщения.
            const mbsms_2::trx_id_wrapper_t & originator_trx,
            //! Исходный объект.
            const send_t & original );
        ...
    private :
        //! Идентификатор данной транзакции на стороне отправителя
        //! сообщения send_t.
        /*! \since v.2.0.0 */
        mbsms_2::trx_id_wrapper_t m_originator_trx;
        ...
};

{type    mbsms_2::send_t
    {extensible}
    {super mbapi_3::msg_t}

    {attr    m_originator_trx    {of    mbsms_2::trx_id_wrapper_t}}
    ...
}


Процесс router использует для идентификации проходящих через него транзакций собственный тип идентификатора:
class    trx_id_t
    :    public mbsms_2::trx_id_t
    { 
        OESS_SERIALIZER( trx_id_t )
    private :
        //! Идентификатор клиента, от которого был получен исходный
        //! идентификатор транзакции.
        /*!
            Может быть пустым, если исходного идентификатора транзакции
            не было.
        */
        mbapi_3::client_dest_t    m_client;

        //! Исходный идентификатор транзакции, который был получен от
        //! клиента.
        /*!
            Может быть нулевым, если исходного идентификатора транзакции
            не было.
        */
        mbsms_2::trx_id_wrapper_t    m_original;

    public :
        /*!
            Устанавливает все значения пустыми значениями.
        */
        trx_id_t();
        /*!
            Полностью инициализирующий конструктор.
        */
        trx_id_t(
            const mbapi_3::client_dest_t & client,
            const mbsms_2::trx_id_wrapper_t & original );

        //! Клонирование объекта.
        std::auto_ptr< mbsms_2::trx_id_t >
        clone() const;

        //! Является ли объект пустым.
        /*!
            \return true, если m_client является пустым.
        */
        bool
        is_empty() const;

        //! Получить доступ к идентификатору клиента.
        const mbapi_3::client_dest_t &
        client() const;

        //! Получить доступ к идентификатор транзакции клиента.
        const mbsms_2::trx_id_wrapper_t &
        original() const;
    };

{type    aag_3::smsc_map::trx_id_t
    {extensible}
    {subclassing_by_extension {extension_of mbsms_2::trx_id_t}}

    {attr m_client {of mbapi_3::client_dest_t}}
    {attr m_original {of mbsms_2::trx_id_wrapper_t}}
}


И вот как router маршрутизирует сообщение send_t на нужный процесс-service:
void
route_send_to_smsc(
    const so_4::rt::agent_t & router_agent,
    const mbsms_2::send_t & original,
    const mbapi_3::client_dest_t & cprov_id,
    const mbapi_3::client_dest_t & smsc_id,
    //! Будет пустой строкой, если номер отправителя SMS заменять не нужно.
    const std::string & actual_sms_sender,
    const mbapi_3::server_dest_t & router_addr )
    {
        // Сообщение должно уйти sms-центру с нашим идентификатором транзакции.
        mbsms_2::send_t outgoing(
                mbsms_2::trx_id_wrapper_t(
                        new trx_id_t(
                                cprov_id,
                                original.query_originator_trx() ) ),
                original );

        if( actual_sms_sender.size() )
            // В процессе маршрутизации сообщению был назначен
            // другой номер отправителя.
            outgoing.set_sms_sender(
                    mbsms_2::phone_dest_t(
                            mbsms_2::unknown_smsc_id,
                            actual_sms_sender ) );

        so_log_1::logic[ router_agent ][ so_log_1::low ]
                [ send_routing_log_tag ]
                [ so_log_1::d() << "cprov: " << cprov_id
                        << ", sms_receiver: " << original.query_sms_receiver()
                        << ", smsc_id: " << smsc_id ]();

        mbapi_3::router::route( smsc_id, outgoing, router_addr );
    }


А вот так router маршрутизирует ответное сообщение от service к нужному клиенту:
void
route_send_result(
    const so_4::rt::agent_t & router_agent,
    const mbsms_2::send_result_t & original,
    const mbapi_3::client_dest_t & smsc_id,
    const mbapi_3::server_dest_t & router_addr )
    {
        // Извлекаем исходный trx_id.
        const trx_id_t * trx = original.query_originator_trx().cast_to(
                oess_1::stdsn::shptr_type_tag< trx_id_t >() );
        if( !trx )
            throw std::domain_error(
                    "pointer to originator trx_id is 0!" );

        mbsms_2::send_result_t outgoing(
                trx->original(),
                mbsms_2::trx_id_wrapper_t(
                        new trx_id_t( smsc_id,
                                original.query_recipient_trx() ) ),
                router_addr,
                original );

        so_log_1::logic[ router_agent ][ so_log_1::lowest ]
                [ send_result_routing_log_tag ]
                [ so_log_1::d() << "smsc_id: " << smsc_id
                        << ", cprov: " << trx->client()
                        << ", result: "
                        << original.query_result() ]();

        mbapi_3::router::route( trx->client(), outgoing, router_addr );
    }


При такой схеме идентификаторы транзакций (ACT) получаются непрозрачными, но с ними удобно работать, т.к. они являются производными от одного базового типа.

Конечно же, такую схему можно было бы реализовать и без ObjESSty. Например, можно было использовать в качестве идентификаторов транзакций строки. И затем процесс router формировал свою строку, в которую включал бы идентификатор клиента и исходный ACT. А затем парсил бы ее. Так, собственно, все и происходит на самом нижнем уровне в ObjESSty, просто это скрыто от программиста.




Подобный механизм "непрозрачных" ACT можно реализовать с использованием любой системы сериализации, это не только ObjESSty. И даже без системы сериализации, вручную. Просто здесь я показал, как это получилось сделать посредством ObjESSty.
... << RSDN@Home 1.1.4 beta 6a rev. 436>>


SObjectizer: <микро>Агентно-ориентированное программирование на C++.