Добрый день!
Выбираю библиотеку для записи в кафку.
Можете порекомендовать или предостеречь?
Пока нашел 3шт:
https://github.com/edenhill/librdkafka — Сишная
https://github.com/mfontanini/cppkafka
https://github.com/Morgan-Stanley/modern-cpp-kafka
Что еще стоит знать при работе с кафкой?
Здравствуйте, avovana, Вы писали:
A>Пока нашел 3шт:
A>https://github.com/edenhill/librdkafka — Сишная
Там есть якобы плюсовое API в очень старом стиле. Его и использую.
A>https://github.com/mfontanini/cppkafka
A>https://github.com/Morgan-Stanley/modern-cpp-kafka
Это С++ обертки над первой либой
Здравствуйте, Nuzhny, Вы писали:
A>>https://github.com/edenhill/librdkafka — Сишная
N>Там есть якобы плюсовое API в очень старом стиле. Его и использую.
Спасибо за ответ!
Такой пример нашел:
https://github.com/edenhill/librdkafka/blob/master/examples/producer.cpp
Изначально хотел увидеть обычный синхронный sent с ошибкой в случае чего.
Где-то прочитал, что библиотека под такой не затачивалась.
Что нужно юзать асинхронный.
Ок. Буду привыкать.
Всплывает концепция с асинхронной отправкой и коллбэком.
Ладно бы так. Но, оказывается, этот колбэк вызывается в недрах poll() или flash().
Которые нужно периодически вызывать.
Оказывается, у producer есть внутренний буфер с
1) сообщениями для отправки после вызова sent
2) отправленными сообщениями со статусами ОТПРАВЛЕНО ОК/НЕ ОТПРАВЛЕНО
Правильно понимаю, что коллбэк выгребает(с удалением) из этого внутреннего буфера?
А чтобы вызвался коллбэк нужно вызыватать poll()?
Как советовали в примере, можно его вызывать сразу после sent.
Т.е. алгоритм такой:
И когда вызовется коллбэк, залогировать если отправлено, послать еще раз, если нет. Так?
Какая скорость(сообщений/сек) получалась при такой асинхронной отправке?
При завершение программы такой код приводят.
/* Wait for final messages to be delivered or fail.
* flush() is an abstraction over poll() which
* waits for all messages to be delivered. */
std::cerr << "% Flushing final messages..." << std::endl;
producer->flush(10*1000 /* wait for max 10 seconds */);
Вроде бы, ждёт чтобы отправить все сообщения. Но в то же время ограничивает 10 секундами?
Небольшая заминка у меня в том, что в текущей реализации есть отправители, наследуемые от абстрактного класса и переопределившие send:
В основной логике в бесконечном цикле выгребаются сообщения из очереди и вызывается такой синхронный send класса-наследника.
Но, получается, с асинхронным send кафки нужно будет подумать...