10K problem for keep-alive utility
От: avovana Россия  
Дата: 08.11.23 21:28
Оценка:
Добрый день, дорогие форумчане!

Есть файл со списком из 10 000 записей "ip+port".
Нужно мониторить состояние подключения по адресу. Реализовать чек "жив ли сервер", фактически.
И оповещать — выводить в консоль и файл запись "ts + address + up/down".
Всё это нужно делать быстро.

1ая реализация
epoll + main thread + output
Придумал, что можно в epoll получать список fd, у которых случилось событие "соединение упало/соединение установлено".
В моменте выводить информацию. Идти к следующему fd, ...
Минус реализации, что проходимся в цикле по выданному пулу fd синхронно записывая в консоль, файл. А в это время уже события на fd новые могут придти.

2ая реализация
epoll + threads + background output
Создаём пулл потоков с theads_number = hardware_concurrency. Чтобы вышло сколько ядер, столько и потоков. Честное распараллеливание.
К примеру, будет 4.
class ThreadPool {
using Task = std::future<void>;
public:
    explicit ThreadPool(std::size_t threads_num);
    ~ThreadPool();

    template <typename F, typename ... Args>
    void addTask(F f, Args&& ... args) {
        {
            std::lock_guard<std::mutex> l(cv_m_);
            tasks_.emplace(std::async(std::launch::deferred, std::forward<F>(f), std::forward<Args>(args)...));
        }
        condition_.notify_one();
    }

private:
    std::mutex cv_m_;
    std::condition_variable_any condition_;
    std::queue<Task> tasks_;
    std::vector<std::jthread> workers_;
};

ThreadPool::ThreadPool(std::size_t threads_num)
{
    for (std::size_t i = 0; i < threads_num; ++i) {
        workers_.emplace_back(
                [this](std::stop_token stop_token){
                    while (true) {
                        std::unique_lock<std::mutex> lk(this->cv_m_);
                        condition_.wait(lk, stop_token, [this](){
                            return !this->tasks_.empty();
                        });
                        if (stop_token.stop_requested()) {
                            return;
                        }
                        if (!this->tasks_.empty()) {
                            auto f = std::move(this->tasks_.front());
                            this->tasks_.pop();
                            lk.unlock();
                            f.get();
                        }
                    }
                }
        );
    }
}

Получаем от epoll список fd. Делим его на 4 — получаем 4 списка. Каждый отдаём на обработку в пул потоков.
Минус в том, что сгородили целый пулл потоков ради всего-лишь вывода в файл. И файл-то один. Нужно же синхронизировать к нему доступ. Через мьютекс? Вся многопоточность убьётся об него.

Какие ещё могут быть идеи?

Подумал над memory mapped file + спин лок. Спин локом защищаем общую переменную — смещение. Поток подготовил строку для вставки в файл. Теперь ему нужно узнать по какому смещению её записать.
Он лочит спин лок, сохраняет себе смещение, обновляет его — прибавляет к нему длину строки, которую сейчас вставит. Отпускает спин лок. Вставляет по полученному смещению строку.
Т.е. критическая секция получилась маленькая.
Re: 10K problem for keep-alive utility
От: watchmaker  
Дата: 09.11.23 00:06
Оценка: 1 (1)
Здравствуйте, avovana, Вы писали:

A> 1ая реализация

Конечно же ты замерил производительность, и она тебя не устроила? Не устроила чем?
Потому что реализация хоть и простая, но если одного потока хватает для обработки всех событий, то она же и будет быстрой.


A> А в это время уже события на fd новые могут придти.

Ну и что?
Они не потеряются. А если запись в output тормозит, то всё равно не сможешь в output вывести об этом сообщение, даже если сумеешь обработать событие на fd раньше.

Уметь обрабатывать события раньше имеет смысл только если почему-то нужно узнавать более точное время, когда оно наступило. Или если нужно уметь дедуплицировать события: например, если произошёл переход up->down->up, но в output ещё не успели записать строку про up->down, то не записывать в output ничего вообще — как будто соединение и не падало.

A> синхронно записывая в консоль, файл.

Можешь писать асинхронно :)
Добавить файл в epoll и точно так же слушать на нём события.
Можно и одним потоком обойтись, но ниже будет вариант с парой, который может быть проще для восприятия и реализации — потому что стандартные примитивы очередей и future/promise проще, чем автомат поверх событий epoll.



A> проходимся в цикле по выданному пулу fd синхронно записывая в консоль, файл

Скорее имеет смысл подготовить данные про группу дескрипторов и один раз записать в файл, а не на каждый дескриптор дёргать вывод в файл. То есть либо использовать, условно говоря, один writev вместо многих write, либо использовать обычную буферизацию записи (просто при выводе в консоль она может быть всего лишь построчной по умолчанию).

A> Подумал над memory mapped file + спин лок. Спин локом защищаем общую переменную — смещение. Поток подготовил строку для вставки в файл. Теперь ему нужно узнать по какому смещению её записать.

A> Он лочит спин лок, сохраняет себе смещение, обновляет его — прибавляет к нему длину строки, которую сейчас вставит. Отпускает спин лок. Вставляет по полученному смещению строку.
A> Т.е. критическая секция получилась маленькая.

У тебя есть замер производительности, который обнаружил проблемы в записи в файл в этом месте? Правда?
Это выглядит как злющая предварительная оптимизация.

К тому же с таким подходом не всё так просто: нужно будет ведь ещё как-то уведомлять читателя о том, откуда из файла уже можно читать данные. Читатель тогда тоже должен какой-то счётчик со смещением чтения иметь, который не совпадает со смещением для записи. И который не тривиально обновлять, так как записи блоков будет завершаться не в монотонном порядке — и счётчик обязан будет уметь как задерживаться, если за ним идёт дыра, так и прыгать на несколько записей вперёд, когда в дыру закончится запись.

Можно посмотреть как это делается правильно в lock-free-like очередях поверх циклических буферов. Но это не совсем просто и, главное, слишком избыточно.


A>Получаем от epoll список fd. Делим его на 4 — получаем 4 списка. Каждый отдаём на обработку в пул потоков.

A>Минус в том, что сгородили целый пулл потоков ради всего-лишь вывода в файл. И файл-то один. Нужно же синхронизировать к нему доступ. Через мьютекс? Вся многопоточность убьётся об него.

Идея почти нормальная. Только откуда-то из непонятного места возникает синхронизация к файлу.

Заводи два типа потоков:
Собственно, это почти всё.
Каких-то сложных потоков данных или блокировок нет — всё собирается из стандартных примитивов.
В фантастическом случае, если например промежуточная очередь тормозит, то понятно как её можно заменить (уверен, что кто-нибудь посоветует тут поиграть со всякими lock-free очередями).

И легко сделать дедупликацию событий, чтобы промежуточная очередь не росла бесконечно и чтобы программа не упала по превышению памяти, если вдруг запись в output застопорится. Так как обрабатывающий поток будет видеть, что его же блок событий с предыдущей итерации ещё не был передан для записи, и может легко сделать агрегацию (опять же, можно это делать не блокируя соседние потоки, если они есть).
Re[2]: 10K problem for keep-alive utility
От: reversecode google
Дата: 09.11.23 00:10
Оценка: 16 (1)
да это он домашку решает https://rsdn.org/forum/unix/8630780.1
Автор: avovana
Дата: 06.11.23
Re: 10K problem for keep-alive utility
От: reversecode google
Дата: 09.11.23 00:18
Оценка:
а еще есть iouring

вообщем то вопрос наверное на умение и понимание устройста реакторов
или на знание фреймворков ?

ну берите asio или seastar и колбасте
Re[3]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 09.11.23 06:52
Оценка:
Здравствуйте, reversecode, Вы писали:

R>да это он домашку решает https://rsdn.org/forum/unix/8630780.1
Автор: avovana
Дата: 06.11.23

Нет. Это другая домашка Та из собеса минувших дней )

Кстати, там есть не совсем очевидные ответы. Спасибо, что стали отвечать. Получил их в том числе с Линукс форума, с cpplang.slack. Дополню сейчас в той ветки, какие мысли ещё высказывались.
Отредактировано 09.11.2023 7:04 avovana . Предыдущая версия .
Re[2]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 09.11.23 06:53
Оценка:
Здравствуйте, reversecode, Вы писали:

R>а еще есть iouring


R>вообщем то вопрос наверное на умение и понимание устройста реакторов

R>или на знание фреймворков ?

R>ну берите asio или seastar и колбасте


Справедливо. Уточню, что условие — не использовать фреймворки.
Re[2]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 09.11.23 07:02
Оценка:
Здравствуйте, watchmaker, Вы писали:

W>Второй тип потоков — перекладыватель в output. Он работает в единственном экземпляре.

W>В бесконечном цикле ждёт событие "в очереди что-то появилось", под мьютексом делает swap со своей локальной пустой очередью, и потом начинает писать из своей локальной очереди все события в файл/консоль без каких-либо синхронизаций.
W>
W>Собственно, это почти всё.
W>Каких-то сложных потоков данных или блокировок нет — всё собирается из стандартных примитивов.
W>В фантастическом случае, если например промежуточная очередь тормозит, то понятно как её можно заменить (уверен, что кто-нибудь посоветует тут поиграть со всякими lock-free очередями).

W>И легко сделать дедупликацию событий, чтобы промежуточная очередь не росла бесконечно и чтобы программа не упала по превышению памяти, если вдруг запись в output застопорится. Так как обрабатывающий поток будет видеть, что его же блок событий с предыдущей итерации ещё не был передан для записи, и может легко сделать агрегацию (опять же, можно это делать не блокируя соседние потоки, если они есть).


Спасибо за такой развёрнутый ответ! Есть над чем подумать.
То есть, возможная более простая, понятная и быстрая реализация:
1 поток слушает по epoll соединению, перекладывает в общую очередь под мьютексом, cond var нотифаит другой поток разгребатель после записи. Другой поток не читает их под мьютексом а берёт себе, чтобы уже параллельно не мешая 1му потоку писать.

Правильно понял?

А как сделать этот swap? Чтобы быстро взять данные? Т.е. не использовать какую-то очередь, а просто буфер, указатель на который меняем? Для нового же нужно выделить место? malloc с каким-то значением? Но мы же не знаем сколько epoll поток вычитает. Или фиксируем сколько читать этой константой len — malloc(len)? А в случае быстрой работы не хочется дергать системные функции.

Можно, в принципе, тогда заранее определить размер общего буфера, в который пишет 1ый поток. И этот же размер будет у локального буфера у 2ого потока. Общая len. Тогда 1ый поток получив набор дескрипторов сможет записать только сколько влезет в len. Оповестить. А с остальными что делать? Может, получил 10 событий = 10 fd, а в этом фиксированном буфере выделено байт только для 5ти.
Отредактировано 09.11.2023 7:06 avovana . Предыдущая версия . Еще …
Отредактировано 09.11.2023 7:05 avovana . Предыдущая версия .
Re[3]: 10K problem for keep-alive utility
От: reversecode google
Дата: 09.11.23 08:26
Оценка: +2
ну тоесть они хотят что бы вы им написали в качестве тестового задания ядро tokio из раста на си или с++ с ворк стилиногом?

не ну понятно что на коленке его можно накидать в условных десяток другой строчек
но я бы предложил им для начала закинуть вам аванс хотя бы 5к баксов
и 5к баксов после выполнения
а что бы не кинули
сделать это через гаранта
Re: 10K problem for keep-alive utility
От: Zhendos  
Дата: 09.11.23 09:23
Оценка:
Здравствуйте, avovana, Вы писали:

A>Добрый день, дорогие форумчане!


A>Есть файл со списком из 10 000 записей "ip+port".

A>Нужно мониторить состояние подключения по адресу. Реализовать чек "жив ли сервер", фактически.
A>И оповещать — выводить в консоль и файл запись "ts + address + up/down".
A>Всё это нужно делать быстро.

A>1ая реализация

A>epoll + main thread + output
A>Придумал, что можно в epoll получать список fd, у которых случилось событие "соединение упало/соединение установлено".
A>В моменте выводить информацию. Идти к следующему fd, ...
A>Минус реализации, что проходимся в цикле по выданному пулу fd синхронно записывая в консоль, файл. А в это время уже события на fd новые могут придти.

Тогда нужно использовать io_uring, тогда и файлы можно будет писать асихнронно.
Re[2]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 09.11.23 11:46
Оценка:
Здравствуйте, Zhendos, Вы писали:

Z>Тогда нужно использовать io_uring, тогда и файлы можно будет писать асихнронно.


Спасибо, не сталкивался ещё с таким.
Re[4]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 09.11.23 12:28
Оценка:
Здравствуйте, reversecode, Вы писали:

R>ну тоесть они хотят что бы вы им написали в качестве тестового задания ядро tokio из раста на си или с++ с ворк стилиногом?

Что такое ворк стилиног?

Скорее просто решение epoll + что-нибудь нативное. М.б. пул потоков. Не особо вот как Вы описали что-то мощное. На пару часов.
Re[5]: 10K problem for keep-alive utility
От: reversecode google
Дата: 09.11.23 12:33
Оценка: 1 (1)
ну вот видите вы погулите и узнаете
как без него в нетворкинг шагать я не особо понимаю

а чего там они хотели я хз
я вообще если какие то домашки беру на собесах
то не парюсь крутыми решениями
мне за это не платят
и всегда понимаю домашки — как задачу абы как, что бы дальше было о чем позвиздеть с собеседующим при сдаче

так что делайте в лоб на epoll
максимум еще можете какуюто mutex+cv очередь на воркерах накидать
и сойдет
Re[5]: 10K problem for keep-alive utility
От: so5team https://stiffstream.com
Дата: 09.11.23 13:27
Оценка: +1
Здравствуйте, avovana, Вы писали:

R>>ну тоесть они хотят что бы вы им написали в качестве тестового задания ядро tokio из раста на си или с++ с ворк стилиногом?

A>Что такое ворк стилиног?

В общих словах: Work Stealing
Re: 10K problem for keep-alive utility
От: SkyDance Земля  
Дата: 09.11.23 19:41
Оценка: +1
A>Всё это нужно делать быстро.

Начинать надо с определения "быстро". Что значит "быстро"? Вот, предположим, на машине, где выполняется код, сеть "мигнула", и все соединения сбросились. Что нужно сделать "быстро", вывести 10.000 строк в консоль? А потом еще 10.000 что восстановилось? А, простите, зачем? Человеку это быстро не переварить, а машине из консоли читать неудобно.

Иными словами, в чем смысл очень быстро и неблокирующе обнаруживать up/down для тысяч соединений, если все равно этой информацией никак разумно не воспользоваться? Следует уточнить требования.

Ну и еще. Предположим, вы сделаете два потока, в одном пишете в консоль, в другом получаете события up/down. Где там bottleneck? Имеет ли значение то, что вы обнаружите потерю соединения на 0.1 (или 1, или даже 5 секунд) быстрее? Ведь все равно консоль занята выводом предыдущих сообщений. Или вы хотите правильно timestamp'ы вывести? Тогда да, два потока, в одном только обработка данных epoll (или io_uring, а еще лучше kqueue), и неблокирующая запись в message queue другого потока.
Re[2]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 09.11.23 22:15
Оценка:
Здравствуйте, SkyDance, Вы писали:

SD>Ну и еще. Предположим, вы сделаете два потока, в одном пишете в консоль, в другом получаете события up/down. Где там bottleneck? Имеет ли значение то, что вы обнаружите потерю соединения на 0.1 (или 1, или даже 5 секунд) быстрее? Ведь все равно консоль занята выводом предыдущих сообщений. Или вы хотите правильно timestamp'ы вывести? Тогда да, два потока, в одном только обработка данных epoll (или io_uring, а еще лучше kqueue), и неблокирующая запись в message queue другого потока.


Вот, да. Спасибо. Тоже думал про баланс — быстрее определить или быстрее выводить.
Re[6]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 09.11.23 22:23
Оценка:
Здравствуйте, reversecode, Вы писали:

R>так что делайте в лоб на epoll

R>максимум еще можете какуюто mutex+cv очередь на воркерах накидать
R>и сойдет

Да. Это самое понятное решение. Можно 1 воркер сделать, можно по числу ядер. Тред пул с очередью, mutex, cv, как Вы подметили и я привёл код.
Я сейчас задумался над предложением watchmaker. Он писал про буфер, а не соединяющую поток очередь. Что у каждого потока по буферу. Когда epoll считывающий поток получает набор фд он закидывает их в буфер.
Логгер поток, к примеру, по cv просыпается и всё что делает — swap указатели. Отдаёт указатель на буфер что у него есть(к примеру, такого же размера), забирает указатель на буффер с заполненными данными.
Это кажется быстро и кэш френдли.

Нюанс в том, что не понятно какой этот буфер делать. Если чтобы влезло 5000 fd, то может придти 7000 fd, не влезут в этот буфер. Или же логгер поток еще не записал данные в файл/консоль. Что делать epoll потоку? А ему надо быстро откинуть данные, чтобы epoll'ить снова.
malloc'ом каждый раз не хочется пользоваться. Системными функциями в целом не хочется пользоваться. Чтобы не давать повод ОС к выгрузки процесса.

Что можно с этим буфером сделать? Есть идеи? Какой-то циклический прикрутить? Но там проблема с набеганием указателя писателя.

upd. А может и не страшно, что писатель перезаписывает данные.
Т.е. да. Баланс между:
1. сейчас точно знать состояние.
Или же
2. историю хочется знать.
Наверное, для 1 — цикл буфер. Для 2 — std::queue<Task> tasks_ из сниппета выше.
Отредактировано 09.11.2023 22:35 avovana . Предыдущая версия . Еще …
Отредактировано 09.11.2023 22:30 avovana . Предыдущая версия .
Re[7]: 10K problem for keep-alive utility
От: reversecode google
Дата: 09.11.23 22:54
Оценка:
лучше перечитать тз задачи
а еще лучше если можно уточнять вопросы по задаче у постановщиков
потому что это по факту то что я сказал
написать нетворк фреймворк на коленке, за спасибо и поговорить это не делается

есть разные подходы
epoll
1) в разных воркерах
2) в одном, а результаты через очередь раздаются на других воркеров
итд
есть разные режимы epoll когда можно всеми форкерами ждать в одном epoll

вообщем долго сложно не понятно
когда не ясно чего хочет постановщик задачи
и когда у вас мало знаний о том как где в каких фреймворках используют epoll
что бы завалить собеседующего контр аргументами, мол а в известном нетворк фрейморвке сделано так
а в другом извесном сделано сяк
и вот такие плюсы и минусы
Re[8]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 10.11.23 11:12
Оценка:
Здравствуйте, reversecode, Вы писали:

R>вообщем долго сложно не понятно

R>когда не ясно чего хочет постановщик задачи
R>и когда у вас мало знаний о том как где в каких фреймворках используют epoll
R>что бы завалить собеседующего контр аргументами, мол а в известном нетворк фрейморвке сделано так
R>а в другом извесном сделано сяк
R>и вот такие плюсы и минусы

Классно. Спасибо за беседу! Реализую парочку вариантов. Посмотрим на результат
Re[9]: 10K problem for keep-alive utility
От: reversecode google
Дата: 10.11.23 12:14
Оценка: 1 (1)
народ на zig реализовывал некоторые варианты
помоему первые два
даже с воркстилингом + uring
смутно помню результаты очень приблизительно сравнимы
даже с go сравнивали

https://gist.github.com/kprotty/5a41e9612657de00788478a7dde43d78

в других ревизиях надо смотреть
они там разные варианты реализовывали
сейчас глянул уже оставили только один
Отредактировано 10.11.2023 12:20 reversecode . Предыдущая версия .
Re[10]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 10.11.23 19:01
Оценка:
Здравствуйте, reversecode, Вы писали:

R>народ на zig реализовывал некоторые варианты

R>помоему первые два
R>даже с воркстилингом + uring
R>смутно помню результаты очень приблизительно сравнимы
R>даже с go сравнивали

R>https://gist.github.com/kprotty/5a41e9612657de00788478a7dde43d78


R>в других ревизиях надо смотреть

R>они там разные варианты реализовывали
R>сейчас глянул уже оставили только один

Ну вот мы тоже общались по задаче. Что, мол, можно сделать вариант с запуском в баше этой утилиты в несколько экзэмпляров.
Как я понял, входящий список отслеживаемых адресов просто бьётся. К примеру, на 4. И запускаются 4 экземпляра. Что тогда утилиту можно сделать однопоточной.

Но я вижу в этом минус. Как в system design — проблема с популярной личностью с шардами. Один шард нагружен, который обслуживает личность, другие простаивают.
Так и тут одно подмножество адресов, допустим, будет постоянно мигать/много событий. Другие нет. Тогда на этот один из 4ёх экзэмляров утилиты упадёт основная работа.
А утилита однопоточная.

Если же многопоточная:
1 поток eloll_wait и отдаёт через общую очередь таски, как в снипетте. И есть потоки разгребатели. То лучше будет.
Re[2]: 10K problem for keep-alive utility
От: landerhigh Пират  
Дата: 10.11.23 21:51
Оценка:
Здравствуйте, SkyDance, Вы писали:

SD> Тогда да, два потока, в одном только обработка данных epoll (или io_uring, а еще лучше kqueue), и неблокирующая запись в message queue другого потока.


И вот после всего этого будет очень забавно обнаружить, что необходимо также отслеживать half-open TCP соединения.
www.blinnov.com
Re: 10K problem for keep-alive utility
От: reversecode google
Дата: 10.11.23 22:00
Оценка:
кажеться netch80 который работает в люксофте(если не ошибаюсь)
будет очень не доволен
не проще было бы его попросить вас по блату принять?
Re[3]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 11.11.23 09:39
Оценка:
Здравствуйте, landerhigh, Вы писали:

L>И вот после всего этого будет очень забавно обнаружить, что необходимо также отслеживать half-open TCP соединения.


Вот да. Тоже мысль пришла, что epoll_wait с событием на чтение + последующий read смогут показать, что соединение закрыто штатно:
n = read(socketfd, buffer, ...)
if (n == 0) // peer disconnected
    break;
else if (n == -1) // error
{
    perror("read");
    break;
}
else // received 'n' bytes
{
    printf("%.*s", n, buffer);
}


А не штатно? Сеть пропала, сервер взорвался. Нашёл такое на so:

epoll_wait will return a EPOLLHUP or EPOLLERR for the socket if the other side disconnects. EPOLLHUP and EPOLLERR are set automatically but you can also set the newer EPOLLRDHUP which explicitly reports peer shutdown.
Also if you use send with the flag MSG_NOSIGNAL it will set EPIPE on closed connections.
int resp = send ( sock, buf, buflen, MSG_NOSIGNAL );

if ( resp == -1 && errno == EPIPE ) { /* other side gone away */ }

Much nicer than getting a signal.

Есть ответы про фичу сокета KEEP-ALIVE. Но там нет единого мнения насколько это нормально работает.

upd.
1. Ещё что-то с MSG_PEAK придумывают. Когда можно посмотреть если ли что-то во внутреннем буфере, не выгребая из него.
2. https://www.rsdn.org/forum/info/FAQ.network.socket.winlin
KEEP-ALIVE посылает пакеты редко. Советуют реализовать свой протокол проверки поверх tcp.
Отредактировано 11.11.2023 12:19 avovana . Предыдущая версия . Еще …
Отредактировано 11.11.2023 11:25 avovana . Предыдущая версия .
Re[4]: 10K problem for keep-alive utility
От: landerhigh Пират  
Дата: 11.11.23 13:04
Оценка: +1
Здравствуйте, avovana, Вы писали:

A>Здравствуйте, landerhigh, Вы писали:


L>>И вот после всего этого будет очень забавно обнаружить, что необходимо также отслеживать half-open TCP соединения.


A>Вот да. Тоже мысль пришла, что epoll_wait с событием на чтение + последующий read смогут показать, что соединение закрыто штатно:


Дело в том, что не существует API, которое позволяет обнаруживать такие зомби TCP соединения.

Для их быстрого обнаружения (ждать TCP keepalive в большинстве случаев слишком долго, не говоря уже о том, что он опциональный) нужно в протокол вносить heartbeat.
А это сразу уже другой уровень.

В этом фундаментальная проблема таких опросников.
www.blinnov.com
Re: 10K problem for keep-alive utility
От: Ip Man Китай  
Дата: 11.11.23 13:29
Оценка:
Два потока.

— Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue
— Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.

Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.
Re[2]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 11.11.23 17:02
Оценка:
Здравствуйте, Ip Man, Вы писали:

IM>Два потока.


IM>- Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue

IM>- Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.

IM>Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.


Здорово! Думал о memory mapped file. Но его нужно расширять. Как подметили выше, что если много потоков, то каждый немного в свой кусок будет писать.
Для читателя сложно будет вычитывать. Непоследовательно будет. Думал просто сделать фикс. размера.

А есть реализация очереди в shared_memory?
Ранее нашёл циклический буфер с атомиком. Вы про это?
https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular#heading_platform_comparison

#include <atomic>
#include <cstddef>
namespace memory_relaxed_aquire_release {
template<typename Element, size_t Size> 
class CircularFifo{
public:
  enum { Capacity = Size+1 };

  CircularFifo() : _tail(0), _head(0){}   
  virtual ~CircularFifo() {}

  bool push(const Element& item); // pushByMOve?
  bool pop(Element& item);

  bool wasEmpty() const;
  bool wasFull() const;
  bool isLockFree() const;

private:
  size_t increment(size_t idx) const; 

  std::atomic <size_t>  _tail;  // tail(input) index
  Element    _array[Capacity];
  std::atomic<size_t>   _head; // head(output) index
};

template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::push(const Element& item)
{    
  const auto current_tail = _tail.load(std::memory_order_relaxed); 
  const auto next_tail = increment(current_tail); 
  if(next_tail != _head.load(std::memory_order_acquire))                           
  {    
    _array[current_tail] = item;
    _tail.store(next_tail, std::memory_order_release); 
    return true;
  }
  
  return false; // full queue

}


// Pop by Consumer can only update the head (load with relaxed, store with release)
//     the tail must be accessed with at least aquire
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::pop(Element& item)
{
  const auto current_head = _head.load(std::memory_order_relaxed);  
  if(current_head == _tail.load(std::memory_order_acquire)) 
    return false; // empty queue

  item = _array[current_head]; 
  _head.store(increment(current_head), std::memory_order_release); 
  return true;
}

template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasEmpty() const
{
  // snapshot with acceptance of that this comparison operation is not atomic
  return (_head.load() == _tail.load()); 
}


// snapshot with acceptance that this comparison is not atomic
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasFull() const
{
  const auto next_tail = increment(_tail.load()); // aquire, we dont know who call
  return (next_tail == _head.load());
}


template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::isLockFree() const
{
  return (_tail.is_lock_free() && _head.is_lock_free());
}

template<typename Element, size_t Size>
size_t CircularFifo<Element, Size>::increment(size_t idx) const
{
  return (idx + 1) % Capacity;
}

То есть, мне сделать shm_open с данным узнаваемым для каждого потока id с размером = size of такой очереди с таким-то capacity исходя из такого-то типа элементов?
Насколько понял, так это делается. Потом получая этот кусок кастить указатель к такому типу очереди получая экзэмляр.
Отредактировано 11.11.2023 17:10 avovana . Предыдущая версия . Еще …
Отредактировано 11.11.2023 17:09 avovana . Предыдущая версия .
Re[5]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 11.11.23 17:08
Оценка:
Здравствуйте, landerhigh, Вы писали:

L>Для их быстрого обнаружения (ждать TCP keepalive в большинстве случаев слишком долго, не говоря уже о том, что он опциональный) нужно в протокол вносить heartbeat.

L>А это сразу уже другой уровень.

Получается, нужно крутиться в бесконечном цикле.
Когда read = 0:
+отлогировать утрату соединения/падение сервера
+выкинуть этот fd - epoll_ctl(DEL,...)
+close(fd)
+узнать какой ip за ним стоял
+создать новый сокет с новым fd
+epoll_ctl(ADD, fd)
+вызвать неблокирующий connect по такому ip
+в epoll_wait похоже придёт событие(EPOLLIN похоже), что соединение установлено
+отлогировать восстановление соединения

Нормальная логика?
Отредактировано 11.11.2023 19:45 avovana . Предыдущая версия .
Re[6]: 10K problem for keep-alive utility
От: landerhigh Пират  
Дата: 13.11.23 19:58
Оценка:
Здравствуйте, avovana, Вы писали:


A>Получается, нужно крутиться в бесконечном цикле.


Нет, нужно сначала на уровне прикладного протокола договориться, кто именно и как часто будет отсылать heartbeat. Обычно это делает сервер, но возможны варианты.
Без этого облом возникает уже вот на самом первом шаге

+отлогировать утрату соединения/падение сервера

www.blinnov.com
Re[7]: 10K problem for keep-alive utility
От: Pzz Россия https://github.com/alexpevzner
Дата: 13.11.23 20:34
Оценка:
Здравствуйте, avovana, Вы писали:

A>Нюанс в том, что не понятно какой этот буфер делать. Если чтобы влезло 5000 fd, то может придти 7000 fd, не влезут в этот буфер. Или же логгер поток еще не записал данные в файл/консоль. Что делать epoll потоку? А ему надо быстро откинуть данные, чтобы epoll'ить снова.


Они имеют тенденцию по нескольку штук за раз всегда приходить. Не тысячами и даже не сотнями.

И 10К соединений — это совсем немного по нашим временам. У меня есть личный опыт 300К...
Re[7]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 14.11.23 07:49
Оценка:
Здравствуйте, landerhigh, Вы писали:

L>Нет, нужно сначала на уровне прикладного протокола договориться, кто именно и как часто будет отсылать heartbeat. Обычно это делает сервер, но возможны варианты.

L>Без этого облом возникает уже вот на самом первом шаге

L>

L>+отлогировать утрату соединения/падение сервера


Считаем, что сервер не реализует какой-то протокол поверх TCP. Тогда остаётся мониторинг лишь по EPOLLIN + read() == 0, чтобы отлавливать хотя бы корректное закрытие сокета на стороне сервера.
Re[8]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 14.11.23 07:50
Оценка:
Здравствуйте, Pzz, Вы писали:

A>>Нюанс в том, что не понятно какой этот буфер делать. Если чтобы влезло 5000 fd, то может придти 7000 fd, не влезут в этот буфер. Или же логгер поток еще не записал данные в файл/консоль. Что делать epoll потоку? А ему надо быстро откинуть данные, чтобы epoll'ить снова.


Pzz>Они имеют тенденцию по нескольку штук за раз всегда приходить. Не тысячами и даже не сотнями.

Как вариант — кольцевой буффер какого-то "среднего" размера.

Pzz>И 10К соединений — это совсем немного по нашим временам. У меня есть личный опыт 300К...

На чём реализовывал?
Re[9]: 10K problem for keep-alive utility
От: Pzz Россия https://github.com/alexpevzner
Дата: 14.11.23 07:51
Оценка:
Здравствуйте, avovana, Вы писали:

Pzz>>И 10К соединений — это совсем немного по нашим временам. У меня есть личный опыт 300К...

A>На чём реализовывал?

На Си.
Re[2]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 14.11.23 07:52
Оценка:
Здравствуйте, Ip Man, Вы писали:

IM>Два потока.


IM>- Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue

IM>- Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.

IM>Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.


А ведь второй же thread, а не процесс. Тогда можно и без shared memory. Просто есть на стеке в main созданный до создания 2ого треда кольцевой буфер с таким-то capacity. И каждый поток его юзает.
Re[6]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 14.11.23 07:56
Оценка:
Здравствуйте, reversecode, Вы писали:

R>ну вот видите вы погулите и узнаете

R>как без него в нетворкинг шагать я не особо понимаю

R>а чего там они хотели я хз

R>я вообще если какие то домашки беру на собесах
R>то не парюсь крутыми решениями
R>мне за это не платят
R>и всегда понимаю домашки — как задачу абы как, что бы дальше было о чем позвиздеть с собеседующим при сдаче

R>так что делайте в лоб на epoll

R>максимум еще можете какуюто mutex+cv очередь на воркерах накидать
R>и сойдет

Сделал решение на epoll. Если кому-нибудь хочется посмотреть, пишите, поделюсь
Может, вместе оценим что сделано хорошо, что можно улучшить)
Re[2]: 10K problem for keep-alive utility
От: reversecode google
Дата: 14.11.23 07:58
Оценка: +1
в расте так же
один тред разгребает epoll и кладет в SPMC lock free буфер
остальные воркеры разгребают эту очередь

> avovana

смотрите сорсы libfev
как почти единственную подобную имплементацию где есть явные SPMC lockfree
там правда есть и дргие варианты воркеров и бенчмарки в отдельной репе того же чела
Отредактировано 14.11.2023 8:30 reversecode . Предыдущая версия .
Re[7]: 10K problem for keep-alive utility
От: reversecode google
Дата: 14.11.23 07:59
Оценка: +1
зашарите сюда как его возможное решения, когда пройдете или не пройдете собес
Re[8]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 14.11.23 09:25
Оценка:
Здравствуйте, reversecode, Вы писали:

R>зашарите сюда как его возможное решения, когда пройдете или не пройдете собес

Окей.

Асинхронный исполнитель функций с отложенным выполнением.
Смотрите, что получилось допилить. Ребята с тм группы cpp помогли финализировать с std::function<void>() + bind.
До этого не понимал как инстанцировать очередь с типом таска/джобы, которая может быть инстанцирована с любым типом функции.

#include <iostream>
#include <functional>
#include <thread>

using namespace std;

#include <chrono>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>

using namespace std::chrono;
using namespace std;

template<typename F>
struct Job {
    time_point<system_clock> time_to_exec;
 F func;
    Job(const F &j, time_point<system_clock> t) : func(j), time_to_exec(t)
    {}

    void operator()() {func();}
     
};

template<typename F>
inline bool operator<(const Job<F> &lhs, const Job<F> &rhs) {
    return lhs.time_to_exec < rhs.time_to_exec;
};

template<typename F>
struct Executor
{
    Executor() {
        runner = thread(&Executor::running, this);
 }

    ~Executor() {
        {
  unique_lock<mutex> lock(queue_mutex);
  cv.notify_all();
  stop = true;
        }
  runner.join();
 }

 void running() {
  while(not stop) {
            unique_lock<mutex> lock(queue_mutex);

   if(jobs.empty()) {
                cv.wait(lock, [this] ()->bool {return not jobs.empty() || stop;});
            } else if (jobs.top().time_to_exec > system_clock::now()) {
                cv.wait_until(lock, jobs.top().time_to_exec, [this]() ->bool {return not jobs.empty() || stop;});
   } else {
    auto job = jobs.top();
    jobs.pop();
    lock.unlock();

    job();
   }
  }
 }

    //template<typename F>
    void exec(F f, milliseconds timeout) {
        Job<F> job(f, system_clock::now() + timeout);

  unique_lock<mutex> lock(queue_mutex);
  jobs.push(job);
  cv.notify_one();
 }

    priority_queue<Job<F>> jobs; // <-----------?... o_O
    mutex queue_mutex;
 condition_variable cv;
 thread runner;
 bool stop = false;
};

string str = "1234567";
void foo(int x) {
    cout << "foo: out finally!" << str[x] << endl;
};

int main()
{
    Executor<std::function<void()>> executor;
    executor.exec([](){cout << "lambda: out finally!\n";}, seconds{5});
    executor.exec(std::bind(foo, 5), seconds{7});
    cout<<"Hello World\n";

    std::this_thread::sleep_for(seconds{8});

    return 0;
}

https://godbolt.org/z/xdKoqExzo

Хотел к deadline сдачи домашки успеть. Сейчас решение без этого асинхронного исполнителя.
Думал его внедрить для асинхронного выполнения отложенных рекконектов.
Сейчас если подключение не получается, просто кладу в такую очередь с отложенным выполнением и проверкой очереди перед новым epoll_wait — выгребание из неё тех тасок/джобов на рекконект для которых пришло время.
А то если пробовать коннекится постоянно к серверу, который оказался недоступен и сразу делать рекконект — постоянно из-за него будет просыпание.
Решил сделать такой отложенный рекконект с exponential backoff — постепенно увеличение время рекконекта до какого-то предела.

Потом выложу код, можно будет подробнее посмотреть. Сейчас озвучил идею.
Re[8]: 10K problem for keep-alive utility
От: landerhigh Пират  
Дата: 15.11.23 22:19
Оценка:
Здравствуйте, avovana, Вы писали:

A>Считаем, что сервер не реализует какой-то протокол поверх TCP.


Тогда эта задача не имеет ни решения, ни смысла.
www.blinnov.com
Re[2]: 10K problem for keep-alive utility
От: ksandro Мухосранск  
Дата: 10.12.23 21:55
Оценка:
Здравствуйте, watchmaker, Вы писали:


A>> синхронно записывая в консоль, файл.

W>Можешь писать асинхронно
W>Добавить файл в epoll и точно так же слушать на нём события.

А что так можно? Я вроде слышал, что epoll с обычными файлами не работает
Re: 10K problem for keep-alive utility
От: ksandro Мухосранск  
Дата: 10.12.23 22:27
Оценка:
Здравствуйте, avovana, Вы писали:

A>Добрый день, дорогие форумчане!


A>Есть файл со списком из 10 000 записей "ip+port".

A>Нужно мониторить состояние подключения по адресу. Реализовать чек "жив ли сервер", фактически.
A>И оповещать — выводить в консоль и файл запись "ts + address + up/down".
A>Всё это нужно делать быстро.

A>1ая реализация

A>epoll + main thread + output
A>Придумал, что можно в epoll получать список fd, у которых случилось событие "соединение упало/соединение установлено".
A>В моменте выводить информацию. Идти к следующему fd, ...
A>Минус реализации, что проходимся в цикле по выданному пулу fd синхронно записывая в консоль, файл. А в это время уже события на fd новые могут придти.

A>Какие ещё могут быть идеи?


A>Подумал над memory mapped file + спин лок. Спин локом защищаем общую переменную — смещение. Поток подготовил строку для вставки в файл. Теперь ему нужно узнать по какому смещению её записать.

A>Он лочит спин лок, сохраняет себе смещение, обновляет его — прибавляет к нему длину строки, которую сейчас вставит. Отпускает спин лок. Вставляет по полученному смещению строку.
A>Т.е. критическая секция получилась маленькая.


А нужна ли тут вообще многопоточность? Многопоточность позволяет использовать параллельно несколько ядер процессора и благодаря распараллеливанию увеличивает производительность. Но тут процессор не является узким местом. Узкие места тут: сеть, консоль и файл. По идее epoll должен прекрасно справляться с обработкой 10 тысяч соединений, наверно это самый нормальный вариант, многопоточность может только замедлить чтение, но никак не ускорить. Теперь про вывод в файл и на консоль, писать в файл ассинхронно сложнее, а если мы пишем синхронно в том же потоке, то это может затормозить нам и работу с сетью. Но вывод как на консоль так и в файлы буферизуется. Причем буферизация происходит и на уровне стандартной библиотеки и на уровне ОС, и может даже на уровне диска. Так вот, думаю, для того, чтобы не возникло тормозов, если вдруг epoll вернет за 1 раз слишком много событий, есть стандартная функция setvbuf(), которая позволяет настроить буффер вывода. Я думаю можно задать такой размер буффера, чтобы туда влезли события от всех 10 тысячь дескрипторов, и вызывать flush после того, как обработаны все дескрипторы. И все, больше ничего не надо.

Может конечно случиться так, что у нас слишком часто меняется состояние серверов, и слишком медленный диск, но и в этом случае использование очередей и отдельного пишущего потока лишь отсрочит проблемы, но не решит их полностью. В этом случае мы можем подумать, а так ли нам надо записывать все события, может если сокет меняет состояние 100 раз в секунду, достаточно записывать последнее состояние каждые 10 секунд? В этом случае мы можем сделать массив из 10000 элементов, каждый элемент содержит последнее состояние коннекшена, и флаг, было ли это состояние обновлено после последней записи. По событиям на сокете, мы обновляем состояние, а так же вешаем на epoll таймер, и по таймеру выводим информацию о коннекшенах, вот тут по идее уже можно бы наверное и многопоточность прикрутить, но не обязательно.

Насчет memory mapped файлов, я не уверен, что это что-то ускорит, ты все равно пишешь в файл последовательно и в итоге упираешься в скорость работы диска.
Re: 10K problem for keep-alive utility
От: fk0 Россия https://fk0.name
Дата: 27.01.24 11:41
Оценка:
Здравствуйте, avovana, Вы писали:

A>Добрый день, дорогие форумчане!


A>Есть файл со списком из 10 000 записей "ip+port".

A>Нужно мониторить состояние подключения по адресу. Реализовать чек "жив ли сервер", фактически.
A>И оповещать — выводить в консоль и файл запись "ts + address + up/down".
A>Всё это нужно делать быстро.

A>1ая реализация

A>epoll + main thread + output
A>Минус реализации, что проходимся в цикле по выданному пулу fd синхронно записывая в консоль, файл. А в это время уже события на fd новые могут придти.

Обработаешь позже. А без синхронизации будет нужна потеря данных (когда соединения устанавливаются быстрей, чем об этом пишешь).
Вывод куда-либо проще и удобней кстати вынести в отдельный поток (потому, что он блокируемый в отличие от сокетов и еполла).
И общая разделяемая переменная состояния (список статусов соединений). Плюс какой-то механизм побудки выводящего статус потока
(евенты в гуях, condvar или пайп в юниксе, дело десятое).

A>2ая реализация

A>epoll + threads + background output

Так далеко не уедешь, ибо C10K problem. Всё равно потребуется epoll в потоках. Что у тебя 1 тред, что тред пул,
на тред приходится очень много соединений и работа с ними не может быть блокирующей. В перспективе тред-пул конечно
лучше, но внутри тот же epoll в каждом треде.

A>Подумал над memory mapped file + спин лок. Спин локом защищаем общую переменную — смещение. Поток подготовил строку для вставки в файл. Теперь ему нужно узнать по какому смещению её записать.

A>Он лочит спин лок, сохраняет себе смещение, обновляет его — прибавляет к нему длину строки, которую сейчас вставит. Отпускает спин лок. Вставляет по полученному смещению строку.
A>Т.е. критическая секция получилась маленькая.

Почему просто в памяти (если процесс один) нельзя иметь переменную (массив), зачем какой-то файл?
Зачем спинлок, когда можно готовый rwlock или мьютекс из библиотеки. Они хоть управление другому треду отдать могут,
если блокировка длительная.
Re[8]: 10K problem for keep-alive utility
От: fk0 Россия https://fk0.name
Дата: 27.01.24 11:46
Оценка:
Здравствуйте, Pzz, Вы писали:

Pzz>И 10К соединений — это совсем немного по нашим временам. У меня есть личный опыт 300К...


Вот интересно, я понимаю что ОС внутри не только по номеру порта, но и по адресу идентифицирует.
То-есть 300к не выглядит нереалистично (хотя портов казалось бы 64к). Но если кто-то за НАТом сидит,
то уже кажется не реалистичным, т.к. в самом НАТе портов всего 64к...
Re[9]: 10K problem for keep-alive utility
От: Pzz Россия https://github.com/alexpevzner
Дата: 27.01.24 11:51
Оценка:
Здравствуйте, fk0, Вы писали:

Pzz>>И 10К соединений — это совсем немного по нашим временам. У меня есть личный опыт 300К...


fk0> Вот интересно, я понимаю что ОС внутри не только по номеру порта, но и по адресу идентифицирует.

fk0>То-есть 300к не выглядит нереалистично (хотя портов казалось бы 64к). Но если кто-то за НАТом сидит,
fk0>то уже кажется не реалистичным, т.к. в самом НАТе портов всего 64к...

Почему нереалистично?

TCP-соединение идентифицируется четверкой чисел: {local-host, local-port, remote-host, remote-port}

Если два соединения различаются хоть в каком-то из этих параметров, они разные. Т.е., {local-host, local-port} могут быть одинаковыми хоть у всех 300К, если внешние различаются.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.