Здравствуйте, dr. Acula, Вы писали:
DA>Так вот вопрос.
DA>Есть ликакие-то подводные камни, на которые следует обратить внимание при совместной работе SO и boost::asio?
Главный камень -- это то, что для Asio нужен свой набор рабочих потоков, на которых будет крутиться asio::io_service::run(), а SO нужен свой набор рабочих потоков, на которых будут работать SO-шные агенты. И просто так эти наборы рабочих потоков не подружить.
У нас сейчас
есть реализация однопоточного SO, в которой Asio и SO работают на одном потоке (фактически, там работает asio::io_service::run(), а SO пользуется средствами Asio для диспетчеризации своих событий). Но она именно что однопоточная, реализации для совместной работы Asio и SO на thread_pool-е пока еще нет.
Но даже если Asio и SO используют один и тот же рабочий контекст, все равно идея об вызове Asio-шных accept/read/write из агентов не выглядит разумной. Дело в том, что если read/write синхронные и блокирующие, то агент просто заблокирует рабочую нить и это не позволит другим агентам использовать эту рабочую нить. Можно попробовать использовать async_read/async_write, но тогда нужно будет обвешивать агентов stand-ами, дабы не получилось, что Asio дергает коллбэк для async-операции на одной рабочей нити, а SO дергает событие этого же агента на другой.
А почему вы решили, что вам здесь нужны акторы? По вашему описанию создается ощущение, что вам достаточно родных средств самого Asio. Так, у вас будет какой-то объект, реагирующий на успешные accept-ы. Он создает другие объекты, которые получают готовый сокет и выполняют асинхронные I/O операции посредством вызова async_read/async_write. Что-то вроде:
class connection_handler final : public std::enable_shared_from_this<connection_handler> {
asio::tcp::socket socket_;
asio::io_service::strand strand_;
...
void on_receive(const asio::error_code & ec, std::size_t bytes_received) {
if(!ec) {
... // Какая-то обработка полученных данных.
// Запись ответа.
socket_.async_write(some_outgoing_buffer(),
strand_.wrap(std::bind(connection_handler::on_send, shared_from_this(), _1, _2)));
}
else { /* Обработка ошибки */ }
}
void on_send(const asio::error_code & ec, std::size_t bytes_sent) {
if(!ec) {
... // Какая-то реакция на успешную запись.
}
else { /* Обработка ошибки */ }
}
...
};
Есть ощущение, что вот такого использования Asio вам может хватить и без привлечения какого-либо акторного фреймворка.
Если же все-таки нужны кроме обработчиков I/O операций еще и акторы, то на данный момент единственный способ сделать такое -- это иметь отдельные объекты для I/O операций и отдельных агентов для выполнения прикладной логики. Так, I/O-объекты будут вычитывать данные из сокетов и пересылать эти данные агентам. Агенты будут обрабатывать полученные от I/O-объектов данные, выполнять какую-то прикладную работу и будут генерировать ответные данные для отсылки. Отсылку же будут выполнять I/O-объекты. Может получиться что-то вроде:
// Сообщение для передачи прочитанных данных прикладному агенту.
struct data_read final : public so_5::message_t {
std::vector<std::uint8_t> data_;
std::shared_ptr<connection_handler> io_handler_;
data_read(std::vector<std::uint8_t> data, std::shared_ptr<connection_handler> io_handler)
: data_(std::move(data)), io_handler_(std::move(io_handler))
{}
};
// Объект, который обрабатывает сокет.
class connection_handler final : public std::enable_shared_from_this<connection_handler> {
asio::tcp::socket socket_;
asio::io_service::strand strand_;
...
so_5::mbox_t receiver_; // Адрес агента, которому нужно отсылать прочитанные данные.
...
std::vector<std::uint8_t> data_; // Буфер-приемник для чтения данных.
...
void start() {
// Начинаем читать входящие данные.
data_.resize(some_appropriate_size);
socket_.async_read_some(asio::buffer(data_),
strand_.wrap(std::bind(connection_handler::on_receive, shared_from_this(), _1, _2)));
...
}
void on_receive(const asio::error_code & ec, std::size_t bytes_received) {
if(!ec) {
// Отсылаем данные агенту для обработки.
data_.resize(bytes_received);
so_5::send<data_read>(receiver_, std::move(data_), shared_from_this());
}
else { /* Обработка ошибки */ }
}
...
};
// Агент, который получает входящие данные.
class data_handler final : public so_5::agent_t {
public:
data_handler(context_t ctx, ...) : so_5::agent_t(std::move(ctx)), ... {
// Создаем подписку на прочитанные данные.
so_subscribe_self().event(data_handler::on_data_read);
...
}
...
private:
void on_data_read(mhood_t<data_read> cmd) {
...
}
};
В этом случае вообще нет надобности создавать thread_pool для Asio, достаточно будет и одной рабочей нити, на которой будет работать asio::io_service::run(). Ну а SO-шных агентов можно будет распределить по тем контекстам, которые им нужны. Тут от специфики задачи зависит.