Re[2]: 10K problem for keep-alive utility
От: landerhigh Пират  
Дата: 10.11.23 21:51
Оценка:
Здравствуйте, SkyDance, Вы писали:

SD> Тогда да, два потока, в одном только обработка данных epoll (или io_uring, а еще лучше kqueue), и неблокирующая запись в message queue другого потока.


И вот после всего этого будет очень забавно обнаружить, что необходимо также отслеживать half-open TCP соединения.
www.blinnov.com
Re: 10K problem for keep-alive utility
От: reversecode google
Дата: 10.11.23 22:00
Оценка:
кажеться netch80 который работает в люксофте(если не ошибаюсь)
будет очень не доволен
не проще было бы его попросить вас по блату принять?
Re[3]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 11.11.23 09:39
Оценка:
Здравствуйте, landerhigh, Вы писали:

L>И вот после всего этого будет очень забавно обнаружить, что необходимо также отслеживать half-open TCP соединения.


Вот да. Тоже мысль пришла, что epoll_wait с событием на чтение + последующий read смогут показать, что соединение закрыто штатно:
n = read(socketfd, buffer, ...)
if (n == 0) // peer disconnected
    break;
else if (n == -1) // error
{
    perror("read");
    break;
}
else // received 'n' bytes
{
    printf("%.*s", n, buffer);
}


А не штатно? Сеть пропала, сервер взорвался. Нашёл такое на so:

epoll_wait will return a EPOLLHUP or EPOLLERR for the socket if the other side disconnects. EPOLLHUP and EPOLLERR are set automatically but you can also set the newer EPOLLRDHUP which explicitly reports peer shutdown.
Also if you use send with the flag MSG_NOSIGNAL it will set EPIPE on closed connections.
int resp = send ( sock, buf, buflen, MSG_NOSIGNAL );

if ( resp == -1 && errno == EPIPE ) { /* other side gone away */ }

Much nicer than getting a signal.

Есть ответы про фичу сокета KEEP-ALIVE. Но там нет единого мнения насколько это нормально работает.

upd.
1. Ещё что-то с MSG_PEAK придумывают. Когда можно посмотреть если ли что-то во внутреннем буфере, не выгребая из него.
2. https://www.rsdn.org/forum/info/FAQ.network.socket.winlin
KEEP-ALIVE посылает пакеты редко. Советуют реализовать свой протокол проверки поверх tcp.
Отредактировано 11.11.2023 12:19 avovana . Предыдущая версия . Еще …
Отредактировано 11.11.2023 11:25 avovana . Предыдущая версия .
Re[4]: 10K problem for keep-alive utility
От: landerhigh Пират  
Дата: 11.11.23 13:04
Оценка: +1
Здравствуйте, avovana, Вы писали:

A>Здравствуйте, landerhigh, Вы писали:


L>>И вот после всего этого будет очень забавно обнаружить, что необходимо также отслеживать half-open TCP соединения.


A>Вот да. Тоже мысль пришла, что epoll_wait с событием на чтение + последующий read смогут показать, что соединение закрыто штатно:


Дело в том, что не существует API, которое позволяет обнаруживать такие зомби TCP соединения.

Для их быстрого обнаружения (ждать TCP keepalive в большинстве случаев слишком долго, не говоря уже о том, что он опциональный) нужно в протокол вносить heartbeat.
А это сразу уже другой уровень.

В этом фундаментальная проблема таких опросников.
www.blinnov.com
Re: 10K problem for keep-alive utility
От: Ip Man Китай  
Дата: 11.11.23 13:29
Оценка:
Два потока.

— Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue
— Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.

Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.
Re[2]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 11.11.23 17:02
Оценка:
Здравствуйте, Ip Man, Вы писали:

IM>Два потока.


IM>- Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue

IM>- Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.

IM>Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.


Здорово! Думал о memory mapped file. Но его нужно расширять. Как подметили выше, что если много потоков, то каждый немного в свой кусок будет писать.
Для читателя сложно будет вычитывать. Непоследовательно будет. Думал просто сделать фикс. размера.

А есть реализация очереди в shared_memory?
Ранее нашёл циклический буфер с атомиком. Вы про это?
https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular#heading_platform_comparison

#include <atomic>
#include <cstddef>
namespace memory_relaxed_aquire_release {
template<typename Element, size_t Size> 
class CircularFifo{
public:
  enum { Capacity = Size+1 };

  CircularFifo() : _tail(0), _head(0){}   
  virtual ~CircularFifo() {}

  bool push(const Element& item); // pushByMOve?
  bool pop(Element& item);

  bool wasEmpty() const;
  bool wasFull() const;
  bool isLockFree() const;

private:
  size_t increment(size_t idx) const; 

  std::atomic <size_t>  _tail;  // tail(input) index
  Element    _array[Capacity];
  std::atomic<size_t>   _head; // head(output) index
};

template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::push(const Element& item)
{    
  const auto current_tail = _tail.load(std::memory_order_relaxed); 
  const auto next_tail = increment(current_tail); 
  if(next_tail != _head.load(std::memory_order_acquire))                           
  {    
    _array[current_tail] = item;
    _tail.store(next_tail, std::memory_order_release); 
    return true;
  }
  
  return false; // full queue

}


// Pop by Consumer can only update the head (load with relaxed, store with release)
//     the tail must be accessed with at least aquire
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::pop(Element& item)
{
  const auto current_head = _head.load(std::memory_order_relaxed);  
  if(current_head == _tail.load(std::memory_order_acquire)) 
    return false; // empty queue

  item = _array[current_head]; 
  _head.store(increment(current_head), std::memory_order_release); 
  return true;
}

template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasEmpty() const
{
  // snapshot with acceptance of that this comparison operation is not atomic
  return (_head.load() == _tail.load()); 
}


// snapshot with acceptance that this comparison is not atomic
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasFull() const
{
  const auto next_tail = increment(_tail.load()); // aquire, we dont know who call
  return (next_tail == _head.load());
}


template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::isLockFree() const
{
  return (_tail.is_lock_free() && _head.is_lock_free());
}

template<typename Element, size_t Size>
size_t CircularFifo<Element, Size>::increment(size_t idx) const
{
  return (idx + 1) % Capacity;
}

То есть, мне сделать shm_open с данным узнаваемым для каждого потока id с размером = size of такой очереди с таким-то capacity исходя из такого-то типа элементов?
Насколько понял, так это делается. Потом получая этот кусок кастить указатель к такому типу очереди получая экзэмляр.
Отредактировано 11.11.2023 17:10 avovana . Предыдущая версия . Еще …
Отредактировано 11.11.2023 17:09 avovana . Предыдущая версия .
Re[5]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 11.11.23 17:08
Оценка:
Здравствуйте, landerhigh, Вы писали:

L>Для их быстрого обнаружения (ждать TCP keepalive в большинстве случаев слишком долго, не говоря уже о том, что он опциональный) нужно в протокол вносить heartbeat.

L>А это сразу уже другой уровень.

Получается, нужно крутиться в бесконечном цикле.
Когда read = 0:
+отлогировать утрату соединения/падение сервера
+выкинуть этот fd - epoll_ctl(DEL,...)
+close(fd)
+узнать какой ip за ним стоял
+создать новый сокет с новым fd
+epoll_ctl(ADD, fd)
+вызвать неблокирующий connect по такому ip
+в epoll_wait похоже придёт событие(EPOLLIN похоже), что соединение установлено
+отлогировать восстановление соединения

Нормальная логика?
Отредактировано 11.11.2023 19:45 avovana . Предыдущая версия .
Re[6]: 10K problem for keep-alive utility
От: landerhigh Пират  
Дата: 13.11.23 19:58
Оценка:
Здравствуйте, avovana, Вы писали:


A>Получается, нужно крутиться в бесконечном цикле.


Нет, нужно сначала на уровне прикладного протокола договориться, кто именно и как часто будет отсылать heartbeat. Обычно это делает сервер, но возможны варианты.
Без этого облом возникает уже вот на самом первом шаге

+отлогировать утрату соединения/падение сервера

www.blinnov.com
Re[7]: 10K problem for keep-alive utility
От: Pzz Россия https://github.com/alexpevzner
Дата: 13.11.23 20:34
Оценка:
Здравствуйте, avovana, Вы писали:

A>Нюанс в том, что не понятно какой этот буфер делать. Если чтобы влезло 5000 fd, то может придти 7000 fd, не влезут в этот буфер. Или же логгер поток еще не записал данные в файл/консоль. Что делать epoll потоку? А ему надо быстро откинуть данные, чтобы epoll'ить снова.


Они имеют тенденцию по нескольку штук за раз всегда приходить. Не тысячами и даже не сотнями.

И 10К соединений — это совсем немного по нашим временам. У меня есть личный опыт 300К...
Re[7]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 14.11.23 07:49
Оценка:
Здравствуйте, landerhigh, Вы писали:

L>Нет, нужно сначала на уровне прикладного протокола договориться, кто именно и как часто будет отсылать heartbeat. Обычно это делает сервер, но возможны варианты.

L>Без этого облом возникает уже вот на самом первом шаге

L>

L>+отлогировать утрату соединения/падение сервера


Считаем, что сервер не реализует какой-то протокол поверх TCP. Тогда остаётся мониторинг лишь по EPOLLIN + read() == 0, чтобы отлавливать хотя бы корректное закрытие сокета на стороне сервера.
Re[8]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 14.11.23 07:50
Оценка:
Здравствуйте, Pzz, Вы писали:

A>>Нюанс в том, что не понятно какой этот буфер делать. Если чтобы влезло 5000 fd, то может придти 7000 fd, не влезут в этот буфер. Или же логгер поток еще не записал данные в файл/консоль. Что делать epoll потоку? А ему надо быстро откинуть данные, чтобы epoll'ить снова.


Pzz>Они имеют тенденцию по нескольку штук за раз всегда приходить. Не тысячами и даже не сотнями.

Как вариант — кольцевой буффер какого-то "среднего" размера.

Pzz>И 10К соединений — это совсем немного по нашим временам. У меня есть личный опыт 300К...

На чём реализовывал?
Re[9]: 10K problem for keep-alive utility
От: Pzz Россия https://github.com/alexpevzner
Дата: 14.11.23 07:51
Оценка:
Здравствуйте, avovana, Вы писали:

Pzz>>И 10К соединений — это совсем немного по нашим временам. У меня есть личный опыт 300К...

A>На чём реализовывал?

На Си.
Re[2]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 14.11.23 07:52
Оценка:
Здравствуйте, Ip Man, Вы писали:

IM>Два потока.


IM>- Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue

IM>- Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.

IM>Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.


А ведь второй же thread, а не процесс. Тогда можно и без shared memory. Просто есть на стеке в main созданный до создания 2ого треда кольцевой буфер с таким-то capacity. И каждый поток его юзает.
Re[6]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 14.11.23 07:56
Оценка:
Здравствуйте, reversecode, Вы писали:

R>ну вот видите вы погулите и узнаете

R>как без него в нетворкинг шагать я не особо понимаю

R>а чего там они хотели я хз

R>я вообще если какие то домашки беру на собесах
R>то не парюсь крутыми решениями
R>мне за это не платят
R>и всегда понимаю домашки — как задачу абы как, что бы дальше было о чем позвиздеть с собеседующим при сдаче

R>так что делайте в лоб на epoll

R>максимум еще можете какуюто mutex+cv очередь на воркерах накидать
R>и сойдет

Сделал решение на epoll. Если кому-нибудь хочется посмотреть, пишите, поделюсь
Может, вместе оценим что сделано хорошо, что можно улучшить)
Re[2]: 10K problem for keep-alive utility
От: reversecode google
Дата: 14.11.23 07:58
Оценка: +1
в расте так же
один тред разгребает epoll и кладет в SPMC lock free буфер
остальные воркеры разгребают эту очередь

> avovana

смотрите сорсы libfev
как почти единственную подобную имплементацию где есть явные SPMC lockfree
там правда есть и дргие варианты воркеров и бенчмарки в отдельной репе того же чела
Отредактировано 14.11.2023 8:30 reversecode . Предыдущая версия .
Re[7]: 10K problem for keep-alive utility
От: reversecode google
Дата: 14.11.23 07:59
Оценка: +1
зашарите сюда как его возможное решения, когда пройдете или не пройдете собес
Re[8]: 10K problem for keep-alive utility
От: avovana Россия  
Дата: 14.11.23 09:25
Оценка:
Здравствуйте, reversecode, Вы писали:

R>зашарите сюда как его возможное решения, когда пройдете или не пройдете собес

Окей.

Асинхронный исполнитель функций с отложенным выполнением.
Смотрите, что получилось допилить. Ребята с тм группы cpp помогли финализировать с std::function<void>() + bind.
До этого не понимал как инстанцировать очередь с типом таска/джобы, которая может быть инстанцирована с любым типом функции.

#include <iostream>
#include <functional>
#include <thread>

using namespace std;

#include <chrono>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>

using namespace std::chrono;
using namespace std;

template<typename F>
struct Job {
    time_point<system_clock> time_to_exec;
 F func;
    Job(const F &j, time_point<system_clock> t) : func(j), time_to_exec(t)
    {}

    void operator()() {func();}
     
};

template<typename F>
inline bool operator<(const Job<F> &lhs, const Job<F> &rhs) {
    return lhs.time_to_exec < rhs.time_to_exec;
};

template<typename F>
struct Executor
{
    Executor() {
        runner = thread(&Executor::running, this);
 }

    ~Executor() {
        {
  unique_lock<mutex> lock(queue_mutex);
  cv.notify_all();
  stop = true;
        }
  runner.join();
 }

 void running() {
  while(not stop) {
            unique_lock<mutex> lock(queue_mutex);

   if(jobs.empty()) {
                cv.wait(lock, [this] ()->bool {return not jobs.empty() || stop;});
            } else if (jobs.top().time_to_exec > system_clock::now()) {
                cv.wait_until(lock, jobs.top().time_to_exec, [this]() ->bool {return not jobs.empty() || stop;});
   } else {
    auto job = jobs.top();
    jobs.pop();
    lock.unlock();

    job();
   }
  }
 }

    //template<typename F>
    void exec(F f, milliseconds timeout) {
        Job<F> job(f, system_clock::now() + timeout);

  unique_lock<mutex> lock(queue_mutex);
  jobs.push(job);
  cv.notify_one();
 }

    priority_queue<Job<F>> jobs; // <-----------?... o_O
    mutex queue_mutex;
 condition_variable cv;
 thread runner;
 bool stop = false;
};

string str = "1234567";
void foo(int x) {
    cout << "foo: out finally!" << str[x] << endl;
};

int main()
{
    Executor<std::function<void()>> executor;
    executor.exec([](){cout << "lambda: out finally!\n";}, seconds{5});
    executor.exec(std::bind(foo, 5), seconds{7});
    cout<<"Hello World\n";

    std::this_thread::sleep_for(seconds{8});

    return 0;
}

https://godbolt.org/z/xdKoqExzo

Хотел к deadline сдачи домашки успеть. Сейчас решение без этого асинхронного исполнителя.
Думал его внедрить для асинхронного выполнения отложенных рекконектов.
Сейчас если подключение не получается, просто кладу в такую очередь с отложенным выполнением и проверкой очереди перед новым epoll_wait — выгребание из неё тех тасок/джобов на рекконект для которых пришло время.
А то если пробовать коннекится постоянно к серверу, который оказался недоступен и сразу делать рекконект — постоянно из-за него будет просыпание.
Решил сделать такой отложенный рекконект с exponential backoff — постепенно увеличение время рекконекта до какого-то предела.

Потом выложу код, можно будет подробнее посмотреть. Сейчас озвучил идею.
Re[8]: 10K problem for keep-alive utility
От: landerhigh Пират  
Дата: 15.11.23 22:19
Оценка:
Здравствуйте, avovana, Вы писали:

A>Считаем, что сервер не реализует какой-то протокол поверх TCP.


Тогда эта задача не имеет ни решения, ни смысла.
www.blinnov.com
Re[2]: 10K problem for keep-alive utility
От: ksandro Мухосранск  
Дата: 10.12.23 21:55
Оценка:
Здравствуйте, watchmaker, Вы писали:


A>> синхронно записывая в консоль, файл.

W>Можешь писать асинхронно
W>Добавить файл в epoll и точно так же слушать на нём события.

А что так можно? Я вроде слышал, что epoll с обычными файлами не работает
Re: 10K problem for keep-alive utility
От: ksandro Мухосранск  
Дата: 10.12.23 22:27
Оценка:
Здравствуйте, avovana, Вы писали:

A>Добрый день, дорогие форумчане!


A>Есть файл со списком из 10 000 записей "ip+port".

A>Нужно мониторить состояние подключения по адресу. Реализовать чек "жив ли сервер", фактически.
A>И оповещать — выводить в консоль и файл запись "ts + address + up/down".
A>Всё это нужно делать быстро.

A>1ая реализация

A>epoll + main thread + output
A>Придумал, что можно в epoll получать список fd, у которых случилось событие "соединение упало/соединение установлено".
A>В моменте выводить информацию. Идти к следующему fd, ...
A>Минус реализации, что проходимся в цикле по выданному пулу fd синхронно записывая в консоль, файл. А в это время уже события на fd новые могут придти.

A>Какие ещё могут быть идеи?


A>Подумал над memory mapped file + спин лок. Спин локом защищаем общую переменную — смещение. Поток подготовил строку для вставки в файл. Теперь ему нужно узнать по какому смещению её записать.

A>Он лочит спин лок, сохраняет себе смещение, обновляет его — прибавляет к нему длину строки, которую сейчас вставит. Отпускает спин лок. Вставляет по полученному смещению строку.
A>Т.е. критическая секция получилась маленькая.


А нужна ли тут вообще многопоточность? Многопоточность позволяет использовать параллельно несколько ядер процессора и благодаря распараллеливанию увеличивает производительность. Но тут процессор не является узким местом. Узкие места тут: сеть, консоль и файл. По идее epoll должен прекрасно справляться с обработкой 10 тысяч соединений, наверно это самый нормальный вариант, многопоточность может только замедлить чтение, но никак не ускорить. Теперь про вывод в файл и на консоль, писать в файл ассинхронно сложнее, а если мы пишем синхронно в том же потоке, то это может затормозить нам и работу с сетью. Но вывод как на консоль так и в файлы буферизуется. Причем буферизация происходит и на уровне стандартной библиотеки и на уровне ОС, и может даже на уровне диска. Так вот, думаю, для того, чтобы не возникло тормозов, если вдруг epoll вернет за 1 раз слишком много событий, есть стандартная функция setvbuf(), которая позволяет настроить буффер вывода. Я думаю можно задать такой размер буффера, чтобы туда влезли события от всех 10 тысячь дескрипторов, и вызывать flush после того, как обработаны все дескрипторы. И все, больше ничего не надо.

Может конечно случиться так, что у нас слишком часто меняется состояние серверов, и слишком медленный диск, но и в этом случае использование очередей и отдельного пишущего потока лишь отсрочит проблемы, но не решит их полностью. В этом случае мы можем подумать, а так ли нам надо записывать все события, может если сокет меняет состояние 100 раз в секунду, достаточно записывать последнее состояние каждые 10 секунд? В этом случае мы можем сделать массив из 10000 элементов, каждый элемент содержит последнее состояние коннекшена, и флаг, было ли это состояние обновлено после последней записи. По событиям на сокете, мы обновляем состояние, а так же вешаем на epoll таймер, и по таймеру выводим информацию о коннекшенах, вот тут по идее уже можно бы наверное и многопоточность прикрутить, но не обязательно.

Насчет memory mapped файлов, я не уверен, что это что-то ускорит, ты все равно пишешь в файл последовательно и в итоге упираешься в скорость работы диска.
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.