Чтение из сети и обработка
От: tryAnother  
Дата: 09.02.22 12:28
Оценка:
Приветствую.
Есть программа (windows сервис) которая принимает поток udp пакетов от специальной железки (в выделенной сети) примерно 20 000 пакетов в секунду примерно по 1 КБ.
Сейчас программа организована так, есть поток, который просыпается примерно раз в 20 мс и забирает из приемного буфера все принятые пакеты (размер приемного буфера устанавливается на примерно 5000 пакетов)
Вычитанные пакеты копируются каждый в свой буфер. Буферы складываются в лист. После того как вычитаны все пакеты в текущем такте обработке, данный список под критической секцией добавляется (splice) в другой и ставится event другому потоку о готовности.
Другой поток забирает список себе, сортирует в нем пакеты обрабатывает некоторые флаги складывает в новый список и ставит event следующему потоку обработки.
Следующий поток уже занимается собственно обработкой данных в пакетах, паковкой и тд.

Код написан давно и возможно скоро будет проводится рефакторинг, поэтому вопрос что можно улучшить. Особенно интересует обработка и передача данных между потоками.

Попробовал сделать вычитывание на основе синхронного и асинхронного boost::asio производительность оказалась недостаточная, стали происходить потери пакетов от железки (единицы пакетов в секунду при передаче 20000) при том что тестовый код никак больше не обрабатывал пакеты, а только смотрел за шагом номера пакета.
Поэтому сейчас оставлен периодически просыпающийся поток с setsockopt(sock, SO_RCVBUF, ...) и ioctlsocket(sock, FIONREAD, ...).

Пакет хранится в векторе, обернутом самодельным подсчетом ссылок и обрабатывается самодельным пулом (по историческим причинам), на сколько я знаю подсчет ссылок не лучший вариант для обработки в нескольких потоках из-за синхронизации кэшей процессора можно получить серьезную просадку производительности.
Что тут можно сделать? Перейти на boost подсчет ссылок но там смысл тот же.
В общем может кто подскажет интересные решения
Re: Чтение из сети и обработка
От: reversecode google
Дата: 09.02.22 13:05
Оценка: -4
оставьте как есть
или приложите усилия в изучения данной тематики в интернете

объяснения как лучше сделать в данной задаче
тянет на приличное финансовое вознаграждение

я просто сомневаюсь что hft-ник с опытом написания сорм или им подобных систем
зайдет бесплатно делиться опытом
Re: Чтение из сети и обработка
От: B0FEE664  
Дата: 09.02.22 13:26
Оценка:
Здравствуйте, tryAnother, Вы писали:

A>Код написан давно и возможно скоро будет проводится рефакторинг, поэтому вопрос что можно улучшить. Особенно интересует обработка и передача данных между потоками.

...
A>В общем может кто подскажет интересные решения
Решение выглядит адекватным, зачем переписывать — не понятно.
Единственное, что можно оптимизировать, если это ещё не сделано — выделение памяти под буферы. В идеале все буферы выделяются при старте, а потом переиспользуются. Для этого можно организовать вторую очередь передающую отработанные буферы обратно в поток вычитывающий пакеты.
Про подсчёт ссылок не совсем понятно: чтобы избежать одновременного доступа из двух потоков логичнее использовать std::unique_ptr, но тут многое зависит от того, как организован "самодельный пул (по историческим причинам)". Я для аналогичных задач тоже использую самоделку: класс содержащий вектор защищённый мьютексом с двумя операциями: Push — на добавление одного элемента и Swap — на извлечение всего добавленного с обменом на уже использованный (и очищенный) вектор. (вектор при clear не освобождает память, так что перезаказ памяти не происходит)
И каждый день — без права на ошибку...
Re: Чтение из сети и обработка
От: Mr.Delphist  
Дата: 09.02.22 14:01
Оценка: 2 (1)
Здравствуйте, tryAnother, Вы писали:

A>Попробовал сделать вычитывание на основе синхронного и асинхронного boost::asio производительность оказалась недостаточная, стали происходить потери пакетов от железки (единицы пакетов в секунду при передаче 20000) при том что тестовый код никак больше не обрабатывал пакеты, а только смотрел за шагом номера пакета.


https://rsdn.org/forum/network/5427299.1
Автор:
Дата: 13.01.14
Re: Чтение из сети и обработка
От: Pzz Россия https://github.com/alexpevzner
Дата: 09.02.22 15:48
Оценка:
Здравствуйте, tryAnother, Вы писали:

A>Пакет хранится в векторе, обернутом самодельным подсчетом ссылок и обрабатывается самодельным пулом (по историческим причинам), на сколько я знаю подсчет ссылок не лучший вариант для обработки в нескольких потоках из-за синхронизации кэшей процессора можно получить серьезную просадку производительности.


Самодельный пул пакетов фиксированной длинны (вероятно, там сделан именно он) может быть очень быстрым. В разы, если не на порядки, быстрее, чем "стандартная" аллокация из кучи.

Насчет счетчиков ссылок я бы особенно не парился. Это становится проблемой, когда несколько потоков с разных ядер процессора ну прям одновременно этот счетчик ссылок теребят. А у вас, я подозреваю, такое вот столкновение активности с разных ядер редко происходит.

Единственное, что смущает, что пакеты вычитываются периодически по времени, а не по мере готовности. Казалось бы, запустили операцию чтения (либо блокирующуюся, либо асинхронную), как пакет пришел, она сразу его и вернула. Зачем еще сколько-то миллисекунд ждать?

В общем, на вид все разумно сделано. В чем цель переделки-то? Надо отталкиваться от того, что в реальности мешает/плохо работает, а не заниматься общим улучшайзингом ради усиления степени красоты мира.
Re: Чтение из сети и обработка
От: Kernan Ниоткуда https://rsdn.ru/forum/flame.politics/
Дата: 10.02.22 00:43
Оценка:
Здравствуйте, tryAnother, Вы писали:

A>Вычитанные пакеты копируются каждый в свой буфер. Буферы складываются в лист. После того как вычитаны все пакеты в текущем такте обработке, данный список под критической секцией добавляется (splice) в другой и ставится event другому потоку о готовности.

Можно попробовать сделать свои аллокаторы которые будут предвыделять память большим куском и использовать её по алгоритму циклического буфера. Возможно, этого будет достаточно.

A>Попробовал сделать вычитывание на основе синхронного и асинхронного boost::asio производительность оказалась недостаточная, стали происходить потери пакетов от железки (единицы пакетов в секунду при передаче 20000) при том что тестовый код никак больше не обрабатывал пакеты, а только смотрел за шагом номера пакета.

A>Поэтому сейчас оставлен периодически просыпающийся поток с setsockopt(sock, SO_RCVBUF, ...) и ioctlsocket(sock, FIONREAD, ...).
Пытайся предвыделять память сразу большими кусками после чего используй её как циклический буфер для входящих пакетов на протяжении жизни программы. Это значит что тебе надо сделать буфер на 100.000Кб, например, на 5секунд трафика. Размер определяется экспериментально поэтому можно и больше. Чем меньше у тебя выделений памяти, тем лучше, но в идеале их не должно быть в процессе работы программы. Дальше оптимизируй копирования, в идеале у тебя вся работа должна быть на этом же буфере. Потом избавься от критических секций т.к. при большом буфере вряд ли обработчик не успеет обработать данные, но надо повесить ассерты на это. Потом прибей поток который вычитывает данные из сети к одному ядру и такое же сделай для обработчиков, должно помочь с кэш мисами. Естественно, все сокеты должны быть асинхронные, никаких "поток просыпается раз в 20мс" быть не должно.

A>В общем может кто подскажет интересные решения

Профилируй, посмотри сколько раз выделяется память, сколько копирований, что за алгоритм сортировки у тебя используется и как он работает.
Sic luceat lux!
Отредактировано 10.02.2022 0:58 Kernan . Предыдущая версия .
Re[2]: Чтение из сети и обработка
От: Kernan Ниоткуда https://rsdn.ru/forum/flame.politics/
Дата: 10.02.22 00:45
Оценка: +2
Здравствуйте, reversecode, Вы писали:

R>тянет на приличное финансовое вознаграждение

Не тянет. Стандартные подходы давно известны и везде описаны.
R>я просто сомневаюсь что hft-ник с опытом написания сорм или им подобных систем
Видимо, ты знаешь что это за 20000 пакетов в секунду.
R>зайдет бесплатно делиться опытом
Зачем тогда форум нужен? О политике трындеть?
Sic luceat lux!
Отредактировано 10.02.2022 0:57 Kernan . Предыдущая версия .
Re: Чтение из сети и обработка
От: kaa.python Ниоткуда РСДН профессионально мёртв и завален ватой.
Дата: 10.02.22 01:04
Оценка: +1
Здравствуйте, tryAnother, Вы писали:

A>Что тут можно сделать? Перейти на boost подсчет ссылок но там смысл тот же.

A>В общем может кто подскажет интересные решения

Я бы попробовал сделать следующее:

1) Убрать "просыпается" и базироваться на событиях типа "пакет пришел". Можно сделать через condition_variable, о оно будет уходить в ядро спать, и при том что у тебя 20К пакетов в секунду, может оказаться что дешевле ждать на спинлоках.
2) Сделать специальный аллокатор что бы убрать все копирования данных, асинхронное IO должно писать сразу в твою предаллоцированную память. Сделать должно быть очень просто т.к. буферы фактически фиксированного размера и можно выделать разом большой кусок который делить на участки по 1Кб.
3) Для обмена данными между разными фазами лучше использовать либо какой-то lock-free контейнер, либо что-то из tbb (именно с них я бы и начал, хорошая очень производительность). Так как у тебя данные будут в предвыделенных буферах (см. пункт 2), то ты можешь просто чиселку класть в контейнер в качестве индикатора буфера с данными которые надо обработать.
Re[3]: Чтение из сети и обработка
От: reversecode google
Дата: 10.02.22 01:10
Оценка: -2
обожаю когда выборочно цитируют разводя флейм)

очевидно что почти все описано и на писано
только почему то даже автор асио работающий в hft(Тёмыч так сказал) не соизволил ничего внятного реализовать

но топик же не о том?

тс явно хочет что бы кто то прочитал его топик
систематизировал его проблему в своем мозгу
потом погуглил за него
перечитал за него
разложил по полочкам у себя
и красиво по пунктам вступил с ним в дискуссию подробно разжевывая
что бы ему все все стало понятно

вы сделаете ему это ?
потому что как по мне это тянет уже на финансовое вознаграждение

ну итд

ps всем форумом ждем когда ВЫ вложите свой опыт и время для того что бы озарить ТС
Re[4]: Чтение из сети и обработка
От: tryAnother  
Дата: 10.02.22 08:54
Оценка:
Здравствуйте, reversecode, Вы писали:

R>тс явно хочет что бы кто то прочитал его топик

R>систематизировал его проблему в своем мозгу

да было бы интересно послушать мысли предолжения других людей,
возможно получить ссылки на github с чем то похожим

R>потом погуглил за него


нет, гуглить за меня не надо,
но если что-то интересно в голове есть прямо сейчас и этим не сложно поделится то почему бы и нет

R>разложил по полочкам у себя

R>и красиво по пунктам вступил с ним в дискуссию подробно разжевывая
R>что бы ему все все стало понятно

было бы здорово, но на это я не рассчитывал
Re: Чтение из сети и обработка
От: ArtDenis Россия  
Дата: 10.02.22 09:01
Оценка: +1
Здравствуйте, tryAnother, Вы писали:

A>Код написан давно и возможно скоро будет проводится рефакторинг, поэтому вопрос что можно улучшить. Особенно интересует обработка и передача данных между потоками.


Не совсем понятно что нужно улучшать: производительность, архитектуру, или понятность/поддерживаемость кода?

Если производительность, то первую очередь надо сделать профилирование и выявить места узкие места. По данным профайлера можно будет решить, стоит ли ускорять текущее решение или требуется другой подход, а возможно и изменение архитектуры.
[ 🎯 Дартс-лига Уфы | 🌙 Программа для сложения астрофото ]
Re[2]: Чтение из сети и обработка
От: tryAnother  
Дата: 10.02.22 09:03
Оценка:
Здравствуйте, B0FEE664, Вы писали:

BFE>Решение выглядит адекватным, зачем переписывать — не понятно.


хотелось узнать как это делают другие люди, более опытные
переделывать не самоцель, а повод к изучению и экспериментам

BFE>Единственное, что можно оптимизировать, если это ещё не сделано — выделение памяти под буферы. В идеале все буферы выделяются при старте, а потом переиспользуются. Для этого можно организовать вторую очередь передающую отработанные буферы обратно в поток вычитывающий пакеты.


сейчас почти так и есть, часть буферов выделяется при старте, а потом гоняются по кругу, если на паковке происходит какой то затык, то новые буферы выделяются при чтении и идут в пул
удобство тут в том, что не нужно следить за временем жизни пакетов, они освобождаться могут не только после поковки в 3 потоке, но и во втором по некоторым признакам из пакета (флаги валидности, или не удалось отсортировать из за приема дубля (железка может слать) или прямо в первом потоке чтения (есть таймауты пока не установится режим работы железки)

BFE>Про подсчёт ссылок не совсем понятно: чтобы избежать одновременного доступа из двух потоков логичнее использовать std::unique_ptr, но тут многое зависит от того, как организован "самодельный пул (по историческим причинам)". Я для аналогичных задач тоже использую самоделку: класс содержащий вектор защищённый мьютексом с двумя операциями: Push — на добавление одного элемента и Swap — на извлечение всего добавленного с обменом на уже использованный (и очищенный) вектор. (вектор при clear не освобождает память, так что перезаказ памяти не происходит)


каждый буфер у меня смысла защищать критической секцией смысла нет, у меня к ним нет доступа одновременного из разных потоков
защищается счетчик ссылок
Re[2]: Чтение из сети и обработка
От: tryAnother  
Дата: 10.02.22 09:25
Оценка:
Здравствуйте, Pzz, Вы писали:

Pzz>Здравствуйте, tryAnother, Вы писали:


A>>Пакет хранится в векторе, обернутом самодельным подсчетом ссылок и обрабатывается самодельным пулом (по историческим причинам), на сколько я знаю подсчет ссылок не лучший вариант для обработки в нескольких потоках из-за синхронизации кэшей процессора можно получить серьезную просадку производительности.


Pzz>Самодельный пул пакетов фиксированной длинны (вероятно, там сделан именно он) может быть очень быстрым. В разы, если не на порядки, быстрее, чем "стандартная" аллокация из кучи.


нет, пул используется "универсальный", хранит в списке освобожденные объекты в которых должен быть встроен счетчик.
я смотрел на boost:pool, но у него не все хорошо с использованием в нескольких потоках, нужна внешняя синхронизация

Pzz>Насчет счетчиков ссылок я бы особенно не парился. Это становится проблемой, когда несколько потоков с разных ядер процессора ну прям одновременно этот счетчик ссылок теребят. А у вас, я подозреваю, такое вот столкновение активности с разных ядер редко происходит.


в целом да при передаче от потока в поток весть список просто переносится под критической сессией,
проблема возможно при возврате объекта в пул после обработки пакета (в деструкторе обертки буфера), это может происходить в любом из потоков,
конечно можно было бы собирать обработанные пакеты в отдельный список и возвращать в пул сразу по многу, но это не так удобно, как просто полагаться на деструктор.

Pzz>Единственное, что смущает, что пакеты вычитываются периодически по времени, а не по мере готовности. Казалось бы, запустили операцию чтения (либо блокирующуюся, либо асинхронную), как пакет пришел, она сразу его и вернула. Зачем еще сколько-то миллисекунд ждать?


такой алгоритм чтения был выработан исторически, на старых машинах (~10 лет назад) так работало стабильнее,
процессору не приходилось просыпаться по каждому пакету и запускать его обработку.
в целом такой алгоритм чтения работает хорошо, фактически я копирую все собранные данные из сокета к себе за один присест, а потом поток отдыхает
был эксперимент по чтению пакетов по мере приема в цикле, стали появляется пропуски в пакетах, причем пропуски одного порядка что в программе написанной на С что на Python

Pzz>В общем, на вид все разумно сделано. В чем цель переделки-то? Надо отталкиваться от того, что в реальности мешает/плохо работает, а не заниматься общим улучшайзингом ради усиления степени красоты мира.


личные интерес, желание посмотреть как сделано у других
Re[2]: Чтение из сети и обработка
От: tryAnother  
Дата: 10.02.22 09:48
Оценка:
Здравствуйте, Kernan, Вы писали:

K>Здравствуйте, tryAnother, Вы писали:


A>>Вычитанные пакеты копируются каждый в свой буфер. Буферы складываются в лист. После того как вычитаны все пакеты в текущем такте обработке, данный список под критической секцией добавляется (splice) в другой и ставится event другому потоку о готовности.

K>Можно попробовать сделать свои аллокаторы которые будут предвыделять память большим куском и использовать её по алгоритму циклического буфера. Возможно, этого будет достаточно.

да, я сейчас изучаю эту возможность
проблема в том, что пакеты нужно сортировать перед обработкой, удалять дубли
если память будет одним куском нужно ее както размечать, чтоб обрабатывать пакеты не по порядку в памяти, чтото вроде

template<size_t S>
struct Buf
{
char data[S];
Buf<S> * next;
}


K>Пытайся предвыделять память сразу большими кусками после чего используй её как циклический буфер для входящих пакетов на протяжении жизни программы. Это значит что тебе надо сделать буфер на 100.000Кб, например, на 5секунд трафика. Размер определяется экспериментально поэтому можно и больше. Чем меньше у тебя выделений памяти, тем лучше, но в идеале их не должно быть в процессе работы программы. Дальше оптимизируй копирования, в идеале у тебя вся работа должна быть на этом же буфере.


сейчас почти так и есть, выделяется на старте некоторое количество буферов, ресайзятся они правда в момент чтения, но тк потом эти векторы не удаляются а переносятся в пул, то аллокаций лишних не будет,
кроме алокаций собственно элементов листа, то там счет на десятки байт (как я понимаю в реализации stl от МС)

K>Потом избавься от критических секций т.к. при большом буфере вряд ли обработчик не успеет обработать данные, но надо повесить ассерты на это. Потом прибей поток который вычитывает данные из сети к одному ядру и такое же сделай для обработчиков, должно помочь с кэш мисами.


дело в том, что время обработки не постоянно, большая часть времени просто копирование данных с пакета, но иногда сбор статистики и перепаковка ранее вычитанных данных, что может занимать до полусукунды, чтение на столько прерывать нельзя,
с привязкой потоков к ядрам, надо посмотреть, хорошая мысль, спасибо!

K> Естественно, все сокеты должны быть асинхронные, никаких "поток просыпается раз в 20мс" быть не должно.


A>>В общем может кто подскажет интересные решения

K>Профилируй, посмотри сколько раз выделяется память, сколько копирований, что за алгоритм сортировки у тебя используется и как он работает.

сортировка имеется ввиду, что переупорядочивает пакеты по их номеру (железка ставит номер), и выкидывает дубли
небольшой буфер в который складываются пакеты если нарушается порядок номеров, в ожидании переставленного пакета,
если буфер заполнился а пакет не пришел, то считам его потерянным и выталкиваем приняыте пакеты на обработку,
если пошла новая последовательность (железка перегрузилась) старые выбрасываем и начинаем считать заново
Re[2]: Чтение из сети и обработка
От: tryAnother  
Дата: 10.02.22 09:50
Оценка:
Здравствуйте, ArtDenis, Вы писали:

AD>Здравствуйте, tryAnother, Вы писали:


AD>Не совсем понятно что нужно улучшать: производительность, архитектуру, или понятность/поддерживаемость кода?

AD>Если производительность, то первую очередь надо сделать профилирование и выявить места узкие места. По данным профайлера можно будет решить, стоит ли ускорять текущее решение или требуется другой подход, а возможно и изменение архитектуры.

скорее архитектуру, или понятность/поддерживаемость кода
с производительностью сейчас приемлемо
Re[3]: Чтение из сети и обработка
От: Kernan Ниоткуда https://rsdn.ru/forum/flame.politics/
Дата: 10.02.22 13:42
Оценка:
Здравствуйте, tryAnother, Вы писали:

A>если память будет одним куском нужно ее както размечать, чтоб обрабатывать пакеты не по порядку в памяти, чтото вроде

Это неправильно. Ты не выкупаешь принципа который стоит за плоским буфером, принцип пространственной локальности. Тебе не нужно физически реордерить пакеты или удалять дубликаты, достаточно создать список индексов(адресов, называй как хочешь) входящего буфера и отдать их в обработчик в нужном порядке. Индексы, кстати, вкладываются в подобный плоский массив.

A>с привязкой потоков к ядрам, надо посмотреть, хорошая мысль, спасибо!

Это не поможет если ядро всё время будет спотыкаться о кэш мисс и залезать в память подгружая нужные данные. Подумай, зачем нужно прибивать поток к ядру в данном контексте.

A>сортировка имеется ввиду, что переупорядочивает пакеты по их номеру (железка ставит номер), и выкидывает дубли

Я знаю что имеется ввиду, не надо ничего выталкивать, надо работать с индексами и просто их двигать. А сортировка, может у тебя там пузырёк или что-то что не даёт стабильной сложности на твоих сценариях.
Sic luceat lux!
Re[4]: Чтение из сети и обработка
От: Kernan Ниоткуда https://rsdn.ru/forum/flame.politics/
Дата: 10.02.22 13:47
Оценка:
Здравствуйте, reversecode, Вы писали:

R>но топик же не о том?

Почему бы не начать про это? В этом и смысл подобных топиков когда задача ТС это только повод обсудить как бы лучше сделать и уже как бы не для ТС, а вообще, а может и немного в сторону, но по теме. До того как ВладД2 не стал таким политически базированным тут были интересные рассуждения начинавшиеся как раз с подобных вопросов.
R>тс явно хочет что бы кто то прочитал его топик
Гугл ему мало поможет. В гугле какие-то абстракции всегда, ответы всегда в книгах и научных статьях, но знания оттуда ещё надо суметь использовать.
Sic luceat lux!
Re: Чтение из сети и обработка
От: Умака Кумакаки Ниоткуда  
Дата: 11.02.22 17:54
Оценка: +2
Здравствуйте, tryAnother, Вы писали:


A>Попробовал сделать вычитывание на основе синхронного и асинхронного boost::asio производительность оказалась недостаточная, стали происходить потери пакетов от железки (единицы пакетов в секунду при передаче 20000) при том что тестовый код никак больше не обрабатывал пакеты, а только смотрел за шагом номера пакета.


99% кривые руки, асио не напрягаясь может хендлить на одном ядре на порядок больше пакетов. уверен там аллокации на каждый чих. код в студию.
нормально делай — нормально будет
Re: Чтение из сети и обработка
От: DiPaolo Россия  
Дата: 11.02.22 22:00
Оценка:
Идея следующая. Это нечто схожее с методом двойной буферизации в компьютерной графике (не знаю, может так уже давно не делают , когда пока один буфер отрисовывается, второй в то же время заполняется данными.

Можно завести пул буферов (по одному на каждый поток, он же этап обработки + запас), каждый из которых будет фиксированного размера на 1200 пакетов (судя по вашим данным: 20000/20 мс + 20% сверху). Пул можно взять размером в 6 или 9 элементов для начала.

Далее, создать три потока: один уже есть — читающий, второй будет делать первую обработку, третий — второй описанный вами этап обработки. Никаких копирований. Вместо этого все выполняется на одних и тех же объектах.

Процесс происходит следующим образом:
1) первый читающий поток берет первый свободный буфер из 1200 элементов из пула, пишет в него. Как только записал в очередь (в те самые 1200 элементов должны умещаться все пакеты за один такт), помечает его готовым ко второму этапу
2) второй поток, как только понимает, что готов буфер для обработки, начинает работу с этим буфером. Для начала можно просто висеть в цикле с yield, spin lock или sleep (зависит от скоростей обработки в первом и втором потоках — это позже можно подтюнить; потом можно переделать на ивенты — тут тоже надо смотреть конкретные условия выполнения и задачу). При этом проставляет флажки в тех же элементах прямо по месту. Как только обработал весь буфер (1200 элементов), помечает весь буфер как готовый к 3му этапу.
3) третий действует как второй. в конце помечает буфер как свободный, который может взять первый поток. Ничего подчищать или ресетить не надо для экономии времени.

Для начала ставятся всякие трейсы, чтобы понять, как ведет себя заполненность очередей по трем потокам. После чего либо тюнится (слипы, если есть, размеры буферов и прочие параметры) один раз под ваши условия, либо делается адаптивная подстройка (чтобы не терять буфера, и чтобы никто не ждал, пока другие потоки освободят буфер).

При таком подходе не будет перевыделяться память, количество ожиданий в семафорах/критических секциях и прочее сведено к минимуму. Не то чтобы lock-free, но минимальные блокировки. И еще плюс: никаких лишних копирований памяти. Да, по памяти будет бОльший расход.

Код примерно такой навскидку:
template<class T>
class BufferQueue {
public:
   struct Buffer {
      std::vector<T> m_elements[m_elementCount];
      int m_fullness; // сколько по факту валидных записанных элементов (пишет первый поток)
   }

public:
   // все операции по лоченью будут тут;
   // некоторые операции возможно получится сделать даже без лока
   // (какое-нибудь чтение, например)
   T &takeBufferFor(Operation op);
   void releaseBufferFor(T &buffer, Operation op);

   ...

private:
   // эти параметры можно будет потом покрутить под свои нужды и особенности
   const auto m_size = 6;
   const auto m_elementCount = 1200;
   ...

   std::vector<Buffer> m_queue[m_size];
};

struct Element {
   // используется всеми
   int size;
   int property1;

   // флаги и прочее, нужное для второго потока
   int flags;
   ...


   // то, что выставляет третий поток
   int state;
   ...
};

// создаем очередь буферов
BufferQueue<Element> queue;

// запускаем три потока и каким-то образом проваливаем всем эту очередь
...
Патриот здравого смысла
Re[2]: Чтение из сети и обработка
От: tryAnother  
Дата: 14.02.22 09:59
Оценка:
Здравствуйте, Умака Кумакаки, Вы писали:

УК>Здравствуйте, tryAnother, Вы писали:



A>>Попробовал сделать вычитывание на основе синхронного и асинхронного boost::asio производительность оказалась недостаточная, стали происходить потери пакетов от железки (единицы пакетов в секунду при передаче 20000) при том что тестовый код никак больше не обрабатывал пакеты, а только смотрел за шагом номера пакета.


УК>99% кривые руки, асио не напрягаясь может хендлить на одном ядре на порядок больше пакетов. уверен там аллокации на каждый чих. код в студию.


#include <stdio.h>
#include <tchar.h>

#include <iostream>
#include <string>

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>

namespace ba = boost::asio;
using ba::ip::udp;

enum { PACKET_LEN = 1100 };


class CalcStat
{
public:
    CalcStat()
        : seq_(-1)
    {
    }

    void OnPacket(const char * packet)
    {
        int seq = get_seq(packet);
        if(seq_ != -1)
        {
            ++seq_;
            if(seq_ != seq)
                printf("Sequence broken, recv %d expect %d, dif %d\n", seq, seq_, seq-seq_);
        }
        seq_ = seq;
    }
protected:
    static int get_seq(const char * packet)
    {
        int seq;
        memcpy(&seq, packet + 4, sizeof(seq));
        return seq;
    }

protected:
    int        seq_;
};

// ========================================================= //

class async_server: boost::noncopyable
{
public:
    async_server(ba::io_service& io_service, short port)
        : io_service_(io_service),
        socket_(io_service, udp::endpoint(udp::v4(), port))
    {
        socket_.async_receive_from(
            ba::buffer(data_, PACKET_LEN), sender_endpoint_,
            boost::bind(&async_server::handle_receive_from, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred)
            );
    }

    void handle_receive_from(const boost::system::error_code& error,
        size_t bytes_recvd)
    {
        stat_.OnPacket(data_);

        socket_.async_receive_from(
            ba::buffer(data_, PACKET_LEN), sender_endpoint_,
            boost::bind(&async_server::handle_receive_from, this,
            ba::placeholders::error,
            ba::placeholders::bytes_transferred));
    }

private:
    ba::io_service    &    io_service_;
    udp::socket            socket_;
    udp::endpoint        sender_endpoint_;
    char                data_[PACKET_LEN];
    CalcStat            stat_;
};

int recv_async(short port)
{
    printf("ASYNC\n");
    ba::io_service io_service;
    async_server s(io_service, port);
    io_service.run();
    return 0;
}

// ========================================================= //

int recv_sync(short port)
{
    printf("SYNC\n");
    ba::io_service    io_service;
    CalcStat        stat;

    udp::socket sock(io_service, udp::endpoint(ba::ip::udp::v4(), port)); 
    udp::endpoint sender_ep; 

    boost::array<char, PACKET_LEN>    data;
    while(true)
    {
        sock.receive_from(ba::buffer(data), sender_ep); 
        stat.OnPacket(data.data());
    }
    return 0;
}

// ========================================================= //

namespace
{
    static int get_pending_count(int socket)
    {
        LONG lPendingSize = 0;
        ioctlsocket(socket, FIONREAD, (ULONG*)&lPendingSize);
        return lPendingSize / PACKET_LEN;
    }

    static void set_socket_buf(int sock)
    {
        DWORD dwWsaBufferSize = (PACKET_LEN + 28) * 0x1000; // + UDP header size
        setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (LPCSTR)&dwWsaBufferSize, sizeof(dwWsaBufferSize));
    }

    static int prepare_socket(short port)
    {
        int sock = socket(AF_INET, SOCK_DGRAM, 0);

        SOCKADDR_IN sockAddr;
        memset(&sockAddr,0,sizeof(sockAddr));
        sockAddr.sin_family = AF_INET;
        sockAddr.sin_addr.s_addr = htonl(INADDR_ANY);
        sockAddr.sin_port = htons(port);
        bind(sock, (SOCKADDR*)&sockAddr, sizeof(sockAddr));

        set_socket_buf(sock);

        return sock;
    }
}

int read_periodic(short port)
{
    printf("PERIODIC\n");
    CalcStat        stat;
    int sock = prepare_socket(port);

    char packet[PACKET_LEN];
    
    while(true)
    {
        const int packet_awailable = get_pending_count(sock);
        for (int i = 0; i < packet_awailable; ++i)
        {
            int nRead = recvfrom(sock, packet, PACKET_LEN, 0, NULL, NULL);
            _ASSERT(nRead == PACKET_LEN);
            stat.OnPacket(packet);
        }
        Sleep(20);
    }
    closesocket(sock);

    return 0;
}

// ========================================================= //

int main(int argc, char* argv[])
{
    short port = 1024;

    try
    {
        if(argc == 2)
        {
            if(     strcmp(argv[1], "sync") == 0)        return recv_sync(port);
            else if(strcmp(argv[1], "async") == 0)        return recv_async(port);
            else if(strcmp(argv[1], "periodic") == 0)    return read_periodic(port);
        }
        printf("use [sync, async, periodic]\n");
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }

    return 1;
}
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.