Re[3]: Свежая статья (разбор примера machine_control)
От: so5team https://stiffstream.com
Дата: 09.08.17 07:59
Оценка: 6 (1)
Здравствуйте, dr. Acula, Вы писали:

DA>Так вот вопрос.

DA>Есть ликакие-то подводные камни, на которые следует обратить внимание при совместной работе SO и boost::asio?

Главный камень -- это то, что для Asio нужен свой набор рабочих потоков, на которых будет крутиться asio::io_service::run(), а SO нужен свой набор рабочих потоков, на которых будут работать SO-шные агенты. И просто так эти наборы рабочих потоков не подружить.

У нас сейчас есть реализация однопоточного SO, в которой Asio и SO работают на одном потоке (фактически, там работает asio::io_service::run(), а SO пользуется средствами Asio для диспетчеризации своих событий). Но она именно что однопоточная, реализации для совместной работы Asio и SO на thread_pool-е пока еще нет.

Но даже если Asio и SO используют один и тот же рабочий контекст, все равно идея об вызове Asio-шных accept/read/write из агентов не выглядит разумной. Дело в том, что если read/write синхронные и блокирующие, то агент просто заблокирует рабочую нить и это не позволит другим агентам использовать эту рабочую нить. Можно попробовать использовать async_read/async_write, но тогда нужно будет обвешивать агентов stand-ами, дабы не получилось, что Asio дергает коллбэк для async-операции на одной рабочей нити, а SO дергает событие этого же агента на другой.

А почему вы решили, что вам здесь нужны акторы? По вашему описанию создается ощущение, что вам достаточно родных средств самого Asio. Так, у вас будет какой-то объект, реагирующий на успешные accept-ы. Он создает другие объекты, которые получают готовый сокет и выполняют асинхронные I/O операции посредством вызова async_read/async_write. Что-то вроде:
class connection_handler final : public std::enable_shared_from_this<connection_handler> {
  asio::tcp::socket socket_;
  asio::io_service::strand strand_;
  ...
  void on_receive(const asio::error_code & ec, std::size_t bytes_received) {
    if(!ec) {
      ... // Какая-то обработка полученных данных.
      // Запись ответа.
      socket_.async_write(some_outgoing_buffer(),
          strand_.wrap(std::bind(connection_handler::on_send, shared_from_this(), _1, _2)));
    }
    else { /* Обработка ошибки */ }
  }
  void on_send(const asio::error_code & ec, std::size_t bytes_sent) {
    if(!ec) {
      ... // Какая-то реакция на успешную запись.
    }
    else { /* Обработка ошибки */ }
  }
  ...
};

Есть ощущение, что вот такого использования Asio вам может хватить и без привлечения какого-либо акторного фреймворка.

Если же все-таки нужны кроме обработчиков I/O операций еще и акторы, то на данный момент единственный способ сделать такое -- это иметь отдельные объекты для I/O операций и отдельных агентов для выполнения прикладной логики. Так, I/O-объекты будут вычитывать данные из сокетов и пересылать эти данные агентам. Агенты будут обрабатывать полученные от I/O-объектов данные, выполнять какую-то прикладную работу и будут генерировать ответные данные для отсылки. Отсылку же будут выполнять I/O-объекты. Может получиться что-то вроде:
// Сообщение для передачи прочитанных данных прикладному агенту.
struct data_read final : public so_5::message_t {
  std::vector<std::uint8_t> data_;
  std::shared_ptr<connection_handler> io_handler_;
  data_read(std::vector<std::uint8_t> data, std::shared_ptr<connection_handler> io_handler)
    : data_(std::move(data)), io_handler_(std::move(io_handler))
  {}
};

// Объект, который обрабатывает сокет.
class connection_handler final : public std::enable_shared_from_this<connection_handler> {
  asio::tcp::socket socket_;
  asio::io_service::strand strand_;
  ...
  so_5::mbox_t receiver_; // Адрес агента, которому нужно отсылать прочитанные данные.
  ...
  std::vector<std::uint8_t> data_; // Буфер-приемник для чтения данных.
  ...
  void start() {
    // Начинаем читать входящие данные.
    data_.resize(some_appropriate_size);
    socket_.async_read_some(asio::buffer(data_),
      strand_.wrap(std::bind(connection_handler::on_receive, shared_from_this(), _1, _2)));
    ...
  }
  void on_receive(const asio::error_code & ec, std::size_t bytes_received) {
    if(!ec) {
      // Отсылаем данные агенту для обработки.
      data_.resize(bytes_received);
      so_5::send<data_read>(receiver_, std::move(data_), shared_from_this());
    }
    else { /* Обработка ошибки */ }
  }
  ...
};

// Агент, который получает входящие данные.
class data_handler final : public so_5::agent_t {
public:
  data_handler(context_t ctx, ...) : so_5::agent_t(std::move(ctx)), ... {
    // Создаем подписку на прочитанные данные.
    so_subscribe_self().event(data_handler::on_data_read);
    ...
  }
  ...
private:
  void on_data_read(mhood_t<data_read> cmd) {
    ...
  }
};


В этом случае вообще нет надобности создавать thread_pool для Asio, достаточно будет и одной рабочей нити, на которой будет работать asio::io_service::run(). Ну а SO-шных агентов можно будет распределить по тем контекстам, которые им нужны. Тут от специфики задачи зависит.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.