Re[5]: Свежая статья (разбор примера machine_control)
От: so5team https://stiffstream.com
Дата: 09.08.17 09:01
Оценка: 6 (1)
Здравствуйте, dr. Acula, Вы писали:

DA>С отправкой агентам — все прозрачно, so_5::send<...> рещает проблему.

DA>А как в обратную сторону передавать что-то I/O потокам в такой архитектуре?

Можно сделать как-то так (набросок): в сообщении data_read передается умный указатель на connection_handler. Поэтому у connection_handler-а можно дергать методы. Для того, чтобы инициировать запись со стороны агента в публичный интерфейс connection_handler добавляется вспомогательный метод initiate_write. Этот initiate_write задействует Asio-шный post для того, чтобы инициировать запись в сокет на контексте одного из I/O-потоков.

Получится, что агент дергает публичный метод connection_handler::initiate_write через умный указатель из сообщения data_read. Внутри initiate_write инициируется отложенная запись на контексте рабочих потоков Asio (т.е. что-то аналогичное so_5::send).

Тут фокус в том, чтобы не протухла ссылка на буфер с исходящими данными. Поэтому в наброске ниже я добавил в connection_handler поле outgoing_data_, в которое исходящие данные перемещаются перед записью. Если же возможна ситуация, когда агент может инициировать несколько send-ов (при этом следующий send может возникнуть еще до того, как закончится предыдущий), то нужно будет сделать какую-то более хитрую схему. Но это вряд ли будет представлять проблему. Так, initiate_write может постить через asio::post не вызов socket_.async_send, а операцию добавления очередного буфера в список буферов с исходящими данными. Тут большой простор для фантазии.

// Класс, который обрабатывает соединение.
class connection_handler final : public std::enable_shared_from_this<connection_handler> {
  asio::tcp::socket socket_;
  ...
  std::vector<std::uint8_t> outgoing_data_;
public:
  void initiate_write(std::vector<std::uint8_t> data) {
    outgoing_data_ = std::move(data);
    asio::post(socket_.get_executor(),
      [self=shared_from_this()] {
        self->socket_.async_send(
          asio::buffer(self->outgoing_data_),
          strand_.wrap(std::bind(connection_handler::on_sent, self, _1, _2)));
      });
  }
  ...
};
...
// Сообщение для передачи прочитанных данных прикладному агенту.
struct data_read final : public so_5::message_t {
  std::vector<std::uint8_t> data_;
  std::shared_ptr<connection_handler> io_handler_;
  data_read(std::vector<std::uint8_t> data, std::shared_ptr<connection_handler> io_handler)
    : data_(std::move(data)), io_handler_(std::move(io_handler))
  {}
};
...
// Класс агента, который обрабатывает данные.
class data_handler final : public so_5::agent_t {
  ...
private:
  void on_data_read(mhood_t<data_read> cmd) {
    ... // Какие-то действия над прочитанными данными.
    // Здесь нам нужно записать что-то в ответ.
    std::vector<std::uint8_t> outgoing_data{ ... };
    cmd->io_handler_->initiate_write(std::move(outgoing_data));
  }
};
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.