помогите написать коллектор траффика
От: deadka  
Дата: 05.11.07 12:29
Оценка:
Здравствуйте, всемогущий и всезнающий all!

Задача — написать нечто вроде коллектора траффика. Циска маршрутизатор 7206-G1 посылает netflow-траффик по udp (примерно 300 кб в секунду). Нужно все это счастье поймать через сокет, распарсить, и по определенному критерию некоторое количество записей сложить в базу данных (примерно 10% от общего количества). Обращаюсь потому что не могу решить задачу без потерь — где-то 3 или 4 процента теряются стабильно . Архитектура приложения примерно такова: while (1) { recvfrom(...buf...); // далее этот буфер парсится, нужное загоняется в mysql-базу }. Другой вариант тоже пробовал — в несколько потоков — один исключительно слушает сокет и кладет данные в буфер, другой фильтрует буфер и кладет в другой буфер, третий содержимое второго буфера кладет в mysql — эффект — те же три процента. В то же время если класть не в базу данных, а, допустим, в файл — то все нормально, потери крохотные — все же udp.

Основная проблема сейчас в том, что пока парсятся данные, полученные через сокет "курсор" кода не успевает вернуться к recvfrom и пакет теряется . Пробовал увеличить размер буфера сокета путем setsockopt() с параметром SO_RCVBUF — не помогло, размер как был 8192 так и остался (проверял через getsockopt), в то же время WSAGetLastError() вернул 0 . А размер netflow-пакета — 1464 байт, то есть, как я понимаю, если бы я один пропустил — то он все равно должен был бы прийти в следующем буфере. Таким образом возникли следующий вопросы:

1) C учетом того, что размер буфера сокета равен 8192 байт, а размер пакет — 1464 байт — должен ли сокет накапливать пропущенные пакеты в буфере?
2) Как можно сделать чтобы он накапливал?
3) Как лучше организовать работу — последовательно или в несколько потоков.

Операционная система — windows xp, MS VC 6.0. Код приведен ниже. Спасибо всем, кто откликнется!
/**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**/

#include <windows.h>
#include <winsock.h>
#include <stdio.h>
#include "mysql.h"

WSADATA ws;
SOCKET s;
struct sockaddr_in addr,Addr;
hostent *d_addr;
unsigned char pkt[1464000]=""; // буфер под netflow-пакет
char szBuf[1000];
int err;
int rcvbufsz = 1000*1464;


typedef struct { // то, что лежит в пакете — 30 записей
unsigned int srcaddr;
unsigned int dstaddr;
unsigned int nexthop;
unsigned short input;
unsigned short output;
unsigned int dPkts;
unsigned int dOctets;
unsigned int first;
unsigned int last;
unsigned short srcport;
unsigned short dstport;
unsigned char pad1;
unsigned char tcp_flags;
unsigned char prot;
unsigned char tos;
unsigned short pointx;
unsigned short pointy;
unsigned int dbindex;
} dbstruct;

int WINAPI WinMain(HINSTANCE hInstance,HINSTANCE hPrevInstance,LPSTR lpCmdLine,int nShowCmd)
{
dbstruct dbsInstance[30]; // экземпляры для занесения в базу данных
MYSQL mysql,*sock;
mysql_init(&mysql);
if (!(sock = mysql_real_connect(&mysql,"localhost","root","deadka","flowtest",3306,NULL,0)))
{
MessageBox(NULL,mysql_error(sock),"Error",MB_ICONERROR);
exit(1);
}


if (FAILED (WSAStartup (MAKEWORD( 1,1 ), &ws)))
{
//printf("Error in WSAStartup(...)\n");
return -1;
}

s = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (s == INVALID_SOCKET)
{
return -1;
}


memset(&addr,0,sizeof(sockaddr_in));
addr.sin_addr.s_addr =htons(INADDR_ANY);
addr.sin_family = PF_INET;
addr.sin_port = htons(55555);



if (bind(s,(sockaddr*)(&addr),sizeof(addr))==SOCKET_ERROR)
{
return -1;
}






int nSizer=sizeof(int);
int _1=getsockopt(s,SOL_SOCKET,SO_RCVBUF,(char *)(&rcvbufsz),&nSizer);
err=WSAGetLastError();
setsockopt (s, SOL_SOCKET, SO_RCVBUF,(char *)(&rcvbufsz),sizeof(int));
err=WSAGetLastError();
_1=getsockopt(s,SOL_SOCKET,SO_RCVBUF,(char *)(&rcvbufsz),&nSizer);

int sockbufsize = 0, size = sizeof(int);
err = getsockopt(s, SOL_SOCKET, SO_RCVBUF,(char *)&sockbufsize, &size);


err=WSAGetLastError();
int notUsed = sizeof(sockaddr);
int n;
while (1)
{
n=recvfrom(s,(char*)pkt,sizeof(pkt),0,((struct sockaddr *) &Addr),&notUsed); // получим пакет


int nRealRecords(0);


for (register int ind=0; ind<pkt[3]; ++ind)
{
int i=24+ind*48;
if ((0!=pkt[i+40]) || (0!=pkt[i+41]) || (0!=pkt[i+42]) || (0!=pkt[i+43])) // если запись соотв. некоторому критерию
{
unsigned short px=(((((((pkt[i+40]<<8)+pkt[i+41])<<8)+pkt[i+42])<<8)+pkt[i+43])-120)%(7+1);
unsigned short py=(((((((pkt[i+40]<<8)+pkt[i+41])<<8)+pkt[i+42])<<8)+pkt[i+43])-120)/(7+1);
if ((px==1) || (py==1)) // если запись соотв. некоторому критерию
{
dbsInstance[nRealRecords].srcaddr=(((((pkt[i]<<8)+pkt[i+1])<<8)+pkt[i+2])<<8)+pkt[i+3];
dbsInstance[nRealRecords].dstaddr=(((((pkt[i+4]<<4)+pkt[i+5])<<8)+pkt[i+6])<<8)+pkt[i+7];
dbsInstance[nRealRecords].nexthop=(((((pkt[i+8]<<4)+pkt[i+9])<<8)+pkt[i+10])<<8)+pkt[i+11];
dbsInstance[nRealRecords].input=(pkt[i+12]<<8)+pkt[i+13];
dbsInstance[nRealRecords].output=(pkt[i+14]<<8)+pkt[i+15];
dbsInstance[nRealRecords].dPkts=(((((pkt[i+16]<<8)+pkt[i+17])<<8)+pkt[i+18])<<8)+pkt[i+19];
dbsInstance[nRealRecords].dOctets=(((((pkt[i+20]<<8)+pkt[i+21])<<8)+pkt[i+22])<<8)+pkt[i+23];
dbsInstance[nRealRecords].first=(((((pkt[i+24]<<8)+pkt[25])<<8)+pkt[26])<<8)+pkt[27];
dbsInstance[nRealRecords].last=(((((pkt[28]<<8)+pkt[29])<<8)+pkt[30])<<8)+pkt[31];
dbsInstance[nRealRecords].srcport=(pkt[i+32]<<8)+pkt[i+33];
dbsInstance[nRealRecords].dstport=(pkt[i+34]<<8)+pkt[i+35];
dbsInstance[nRealRecords].pad1=pkt[i+36];
dbsInstance[nRealRecords].tcp_flags=pkt[i+37];
dbsInstance[nRealRecords].prot=pkt[i+38];
dbsInstance[nRealRecords].tos=pkt[i+39];
dbsInstance[nRealRecords].pointx=(((((((pkt[i+40]<<8)+pkt[i+41])<<8)+pkt[i+42])<<8)+pkt[i+43])-120)%(7+1);
dbsInstance[nRealRecords].pointy=(((((((pkt[i+40]<<8)+pkt[i+41])<<8)+pkt[i+42])<<8)+pkt[i+43])-120)/(7+1);
dbsInstance[nRealRecords].dbindex=(((((pkt[i+44]<<8)+pkt[i+45])<<8)+pkt[i+46])<<8)+pkt[i+47];
nRealRecords++;
}
}
}
if (nRealRecords) // выбрали какие-нибудь записи
{

memset(szBuf,'\0',sizeof(szBuf));
sprintf(szBuf,"insert into nfpkt(srcaddr,dstaddr,dpkts,dOctets,tfirst,tlast,srcport,dstport,prot,pointx,pointy,dbindex) values");
for (register unsigned int i=0; i<nRealRecords; ++i)
{
dbstruct dbsInstanc=dbsInstance[i];
sprintf(szBuf+strlen(szBuf),"(%lu,%lu,%lu,%lu,%lu,%lu,%u,%u,%d,%u,%u,%lu),",dbsInstanc.srcaddr,dbsInstanc.dstaddr,dbsInstanc.dPkts,dbsInstanc.dOctets,dbsInstanc.first,dbsInstanc.last,dbsInstanc.srcport,dbsInstanc.dstport,dbsInstanc.prot,dbsInstanc.pointx,dbsInstanc.pointy,dbsInstanc.dbindex);
}
szBuf[strlen(szBuf)-1]=';';
if(mysql_query(sock,szBuf)) MessageBox(NULL,mysql_error(sock),"Error",MB_ICONERROR);
}


if (n!=1464) {
MessageBox(NULL,"YEAH!!!","",MB_OK); }
//if(mysql_query(sock,"insert into nfpkt(srcaddr,dstaddr,dpkts,dOctets,tfirst,tlast,srcport,dstport,prot,pointx,pointy,dbindex) values(3553440986,103617602,2,60,2643119385,1424818202,14383,7996,17,1,2,55929945);")) MessageBox(NULL,mysql_error(sock),"Error",MB_ICONERROR);
int r=123;



}



closesocket(s);

return 0;
}
Re: помогите написать коллектор траффика
От: Michael Chelnokov Украина  
Дата: 05.11.07 12:46
Оценка:
Здравствуйте, deadka, Вы писали:

D>где-то 3 или 4 процента теряются стабильно


Поток чтения действительно не должен заниматься ничем кроме чтения. Кроме того, можешь сделать несколько таких (одинаковых) потоков. Смысл в том чтобы всегда хоть один из потоков висел в recvfrom, это должно уменьшить потери. Если, конечно, потери действительно вызваны переполнением буфера сокета, а не сетью. Да, и приемный буфер сокета можешь сделать побольше (SO_RCVBUF).

Ну а остальное разделять на потоки особого смысла не имеет, если и один парсер-писатель в базу справляется. Если не справляется, то имеет смысл не разделять парсинг и запись в базу между разными потоками, а запустить несколько одинаковых парсер-писателей. Очевидно что "тормоза" тут в ожидании ответа от сервера БД.
Re[2]: помогите написать коллектор траффика
От: deadka  
Дата: 05.11.07 15:50
Оценка:
Здравствуйте, Михаил, спасибо за ответ.

MC>Поток чтения действительно не должен заниматься ничем кроме чтения. Кроме того, можешь сделать несколько таких (одинаковых) потоков. Смысл в том >чтобы всегда хоть один из потоков висел в recvfrom, это должно уменьшить потери. Если, конечно, потери действительно вызваны переполнением буфера >сокета, а не сетью. Да, и приемный буфер сокета можешь сделать побольше (SO_RCVBUF).


1) Вы имеете в виду запустить несколько абсолютно одинаковых потоков, каждый из которых будет слушать сокет?

2) В этом и был один из моих вопросов — вызывал я setsockopt с параметром SO_RCVBUF (в коде указано) — но буфер не менялся — как был 8192 байт, так и остался (возвращаемое значение функции не дает ошибки, WSAGetLastError=0). Может быть у сокета какой-то флажок имеется, который позволяет/не позволяет размер буфера менять, не знаете случайно?

MC>Ну а остальное разделять на потоки особого смысла не имеет, если и один парсер-писатель в базу справляется. Если не справляется, то имеет смысл не >разделять парсинг и запись в базу между разными потоками, а запустить несколько одинаковых парсер-писателей. Очевидно что "тормоза" тут в ожидании >ответа от сервера БД.


Наверное вы правы, попробую. Еще раз спасибо!
Re[3]: помогите написать коллектор траффика
От: Michael Chelnokov Украина  
Дата: 05.11.07 18:35
Оценка:
Здравствуйте, deadka, Вы писали:

D>1) Вы имеете в виду запустить несколько абсолютно одинаковых потоков, каждый из которых будет слушать сокет?


"Слушать"? Нет, вызывать recvfrom в цикле. Постарайся минимизировать паузы между вызовами recvfrom, т.е. ничего не делать кроме добавления пакета в очередь на обработку.

D>2) В этом и был один из моих вопросов — вызывал я setsockopt с параметром SO_RCVBUF (в коде указано) — но буфер не менялся


Попробуй поменьше значения взять, реализация вольна игнорировать попытки установки размера буфера, тем более такого большого размера.
(и извини, большие куски кода тут я обычно не смотрю, на работе хватает)
Re: помогите написать коллектор траффика
От: zabbix  
Дата: 05.11.07 19:09
Оценка:
Здравствуйте, deadka, Вы писали:

D>Здравствуйте, всемогущий и всезнающий all!


D>Задача — написать нечто вроде коллектора траффика. Циска маршрутизатор 7206-G1 посылает netflow-траффик по udp (примерно 300 кб в секунду). Нужно все это счастье поймать через сокет, распарсить, и по определенному критерию некоторое количество записей сложить в базу данных (примерно 10% от общего количества).


— можно принимать данные в нескольких потоках (например, через completion ports)
— сразу пихать в базу — медленно, кидай в бинарник, потом распарсишь
//wbr
Re: помогите написать коллектор траффика
От: Anton Batenev Россия https://github.com/abbat
Дата: 06.11.07 05:06
Оценка:
Здравствуйте, deadka, Вы писали:

D> Задача — написать нечто вроде коллектора траффика. Циска маршрутизатор 7206-G1 посылает netflow-траффик по udp (примерно 300 кб в секунду).


300 KB/s — это трафик NetFlow, или трафик, который прошел через киску?
Если первое, то это ~200 пакетов/с и объем обсчитываемого трафика ~30 MB/s
Даже если после парсинга остается около 10%, то это ~600 flow записей/с и, как я понял, запросов на вставку в MySQL, если трафик не аггрегируется в течении некоторого времени перед вставкой в базу.
Лучше забыть про MySQL, при такой вводной.

D> Обращаюсь потому что не могу решить задачу без потерь — где-то 3 или 4 процента теряются стабильно


При таких скоростях, размер буфера должен быть около 30MB.

D>3) Как лучше организовать работу — последовательно или в несколько потоков.


Асинхронные сокеты с аггрегацией данных.
Поток получает пакет, парсит его, закидывает во временный буфер, проверяет время аггрегации, если время не вышло, возвращается за новой порцией данных. Если время вышло, сбрасывает данные одним пакетом во временную таблицу на сервере и очищает буфер. Дальше данные обрабатываются уже сервером из временной таблицы (раскидываются по клиентам, начисляются деньги и т.д.). В этом случае, у тебя потоки блокируют друг-друга только на операциях работы с буфером аггрегации. Сброс данных на сервер одним пакетом достаточно быстр. Узкое горлышко — быстро найти нужную запись в буфере аггрегации для обновления в ней данных. Решается в зависимости от задачи.

Вот как-то так, может, немного сумбурно, но подобная реализация (правда на C#) у меня проверена временем и прекрасно справляется со своими с задачами.
Re[2]: помогите написать коллектор траффика
От: Michael Chelnokov Украина  
Дата: 06.11.07 12:53
Оценка:
Здравствуйте, zabbix, Вы писали:

Z>(например, через completion ports)


При всем моем уважении к IOCP... В данном случае это будет лишь усложнение решения (если автор с IOCP не знаком), которое не даст никакого выигрыша.
Re: помогите написать коллектор траффика
От: fefelov Россия  
Дата: 06.11.07 16:59
Оценка:
Здравствуйте, deadka, Вы писали:

D> много всего про netflow


Буквально неделю назад я написал мини-NetFlow-collector. Не забавы ради, а изучения Apache MINA и NetFlow для. В процессе написания обнаружено:

a) Краткая (но полезная) справка по NetFlow — http://netflow.caligare.com/
Почитайте, пригодится.

b) NetFlow — это уже агрегатор, причем "the amount of export data being about 1.5 percent of the switched traffic in the router".
Исходя из этого не могу понять, каким образом у вас NetFlow генерирует трафик 300 кБайт/с (ведь это 2,4 Мбит,с)? Какой сумасшедшего объема трафик генерируют ваши пользователи?

c) При обработке данных о трафике принято (IMHO) использовать *многоуровневую* агрегацию.
К примеру:
— NetFlow агрегирует (по определению) сессии (грубовато, смотри пункт d, но это так),
— софт, отключающий пользователей от инета, агрегирует трафик за малый период времени (например, 15 минут),
— бухгалтерский софт агрегирует трафик за большой период времени (например, месяц).
При этом, все более крупные агрегации выполняются независимо от более мелких.
В примере получается три базы данных:
— БД "сырой трафик" (т. е. NetFlow, в данном случае),
— БД "отключатель пользователей от интернета",
— БД бухгалтерии.
Из этого следует, что задача колектора NetFlow — максимально быстро скинуть полученные данные куда-нибудь в произвольной форме. Другие агрегаторы будут обрабатывать эти данные не в оперативном режиме.

d) NetFlow — настраиваемый протокол.
Я получаю NetFlow не от циски, но у моего устройства есть три важных параметра NetFlow:
— cache-entries — number of flows which can be in router's memory simultaneously,
— active-flow-timeout — maximum life-time of a flow,
— inactive-flow-timeout — how long to keep the flow active, if it is idle.
Думаю, что ваш агент NetFlow просто неправильно настроен.
Re[4]: помогите написать коллектор траффика
От: deadka  
Дата: 07.11.07 08:34
Оценка:
Здравствуйте, Michael Chelnokov, Вы писали:

MC>Здравствуйте, deadka, Вы писали:


D>>1) Вы имеете в виду запустить несколько абсолютно одинаковых потоков, каждый из которых будет слушать сокет?


MC>"Слушать"? Нет, вызывать recvfrom в цикле. Постарайся минимизировать паузы между вызовами recvfrom, т.е. ничего не делать кроме добавления пакета в очередь на обработку.


D>>2) В этом и был один из моих вопросов — вызывал я setsockopt с параметром SO_RCVBUF (в коде указано) — но буфер не менялся


MC>Попробуй поменьше значения взять, реализация вольна игнорировать попытки установки размера буфера, тем более такого большого размера.

MC>(и извини, большие куски кода тут я обычно не смотрю, на работе хватает)

Да, вы были правы, надо было поменьше буфер установить. Установил — и все пошло как надо. Спасибо большое за содействие! Дальнейшие вопросы видимо будут про то как увеличить быстродействие БД, но это уже в другую ветку пойду...
Re[2]: помогите написать коллектор траффика
От: deadka  
Дата: 14.11.07 08:21
Оценка:
Здравствуйте, Anton Batenev, Вы писали:

AB>Здравствуйте, deadka, Вы писали:


D>> Задача — написать нечто вроде коллектора траффика. Циска маршрутизатор 7206-G1 посылает netflow-траффик по udp (примерно 300 кб в секунду).


AB>300 KB/s — это трафик NetFlow, или трафик, который прошел через киску?

AB>Если первое, то это ~200 пакетов/с и объем обсчитываемого трафика ~30 MB/s
AB>Даже если после парсинга остается около 10%, то это ~600 flow записей/с и, как я понял, запросов на вставку в MySQL, если трафик не аггрегируется в течении некоторого времени перед вставкой в базу.
AB>Лучше забыть про MySQL, при такой вводной.

D>> Обращаюсь потому что не могу решить задачу без потерь — где-то 3 или 4 процента теряются стабильно


AB>При таких скоростях, размер буфера должен быть около 30MB.


D>>3) Как лучше организовать работу — последовательно или в несколько потоков.


AB>Асинхронные сокеты с аггрегацией данных.

AB>Поток получает пакет, парсит его, закидывает во временный буфер, проверяет время аггрегации, если время не вышло, возвращается за новой порцией данных. Если время вышло, сбрасывает данные одним пакетом во временную таблицу на сервере и очищает буфер. Дальше данные обрабатываются уже сервером из временной таблицы (раскидываются по клиентам, начисляются деньги и т.д.). В этом случае, у тебя потоки блокируют друг-друга только на операциях работы с буфером аггрегации. Сброс данных на сервер одним пакетом достаточно быстр. Узкое горлышко — быстро найти нужную запись в буфере аггрегации для обновления в ней данных. Решается в зависимости от задачи.

AB>Вот как-то так, может, немного сумбурно, но подобная реализация (правда на C#) у меня проверена временем и прекрасно справляется со своими с задачами.


Спасибо, я сделал похоже. ПОка работает .
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.