Kafka С++ client lib
От: avovana Россия  
Дата: 14.03.21 15:09
Оценка:
Добрый день!

Выбираю библиотеку для записи в кафку.
Можете порекомендовать или предостеречь?

Пока нашел 3шт:
https://github.com/edenhill/librdkafka — Сишная
https://github.com/mfontanini/cppkafka
https://github.com/Morgan-Stanley/modern-cpp-kafka

Что еще стоит знать при работе с кафкой?
Re: Kafka С++ client lib
От: Nuzhny Россия https://github.com/Nuzhny007
Дата: 14.03.21 16:22
Оценка:
Здравствуйте, avovana, Вы писали:

A>Пока нашел 3шт:

A>https://github.com/edenhill/librdkafka — Сишная

Там есть якобы плюсовое API в очень старом стиле. Его и использую.


A>https://github.com/mfontanini/cppkafka

A>https://github.com/Morgan-Stanley/modern-cpp-kafka

Это С++ обертки над первой либой
Re[2]: Kafka С++ client lib
От: avovana Россия  
Дата: 15.03.21 13:49
Оценка:
Здравствуйте, Nuzhny, Вы писали:

A>>https://github.com/edenhill/librdkafka — Сишная


N>Там есть якобы плюсовое API в очень старом стиле. Его и использую.


Спасибо за ответ!
Такой пример нашел:
https://github.com/edenhill/librdkafka/blob/master/examples/producer.cpp

Изначально хотел увидеть обычный синхронный sent с ошибкой в случае чего.
Где-то прочитал, что библиотека под такой не затачивалась.
Что нужно юзать асинхронный.

Ок. Буду привыкать.
Всплывает концепция с асинхронной отправкой и коллбэком.
Ладно бы так. Но, оказывается, этот колбэк вызывается в недрах poll() или flash().
Которые нужно периодически вызывать.

Оказывается, у producer есть внутренний буфер с
1) сообщениями для отправки после вызова sent
2) отправленными сообщениями со статусами ОТПРАВЛЕНО ОК/НЕ ОТПРАВЛЕНО

Правильно понимаю, что коллбэк выгребает(с удалением) из этого внутреннего буфера?
А чтобы вызвался коллбэк нужно вызыватать poll()?
Как советовали в примере, можно его вызывать сразу после sent.

Т.е. алгоритм такой:


И когда вызовется коллбэк, залогировать если отправлено, послать еще раз, если нет. Так?

Какая скорость(сообщений/сек) получалась при такой асинхронной отправке?

При завершение программы такой код приводят.
  /* Wait for final messages to be delivered or fail.
   * flush() is an abstraction over poll() which
   * waits for all messages to be delivered. */
  std::cerr << "% Flushing final messages..." << std::endl;
  producer->flush(10*1000 /* wait for max 10 seconds */);

Вроде бы, ждёт чтобы отправить все сообщения. Но в то же время ограничивает 10 секундами?
Re[3]: Kafka С++ client lib
От: Nuzhny Россия https://github.com/Nuzhny007
Дата: 15.03.21 13:57
Оценка:
Здравствуйте, avovana, Вы писали:

A>И когда вызовется коллбэк, залогировать если отправлено, послать еще раз, если нет. Так?

A>Какая скорость(сообщений/сек) получалась при такой асинхронной отправке?

Да, всё так. Я использую как раз самый классический вариант практически без изменений. И у меня всё очень лайтово: я посылаю сообщения из сервиса локально, частота посылок — частота кадров в видео, то есть 25-30 раз в секунду. Разумеется всё работает быстро, нагрузки никакой.
Re[4]: Kafka С++ client lib
От: avovana Россия  
Дата: 16.03.21 09:40
Оценка:
Небольшая заминка у меня в том, что в текущей реализации есть отправители, наследуемые от абстрактного класса и переопределившие send:


В основной логике в бесконечном цикле выгребаются сообщения из очереди и вызывается такой синхронный send класса-наследника.
Но, получается, с асинхронным send кафки нужно будет подумать...
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.