Здравствуйте, всемогущий и всезнающий all!
Задача — написать нечто вроде коллектора траффика. Циска маршрутизатор 7206-G1 посылает netflow-траффик по udp (примерно 300 кб в секунду). Нужно все это счастье поймать через сокет, распарсить, и по определенному критерию некоторое количество записей сложить в базу данных (примерно 10% от общего количества). Обращаюсь потому что не могу решить задачу без потерь

— где-то 3 или 4 процента теряются стабильно

. Архитектура приложения примерно такова: while (1) { recvfrom(...buf...); // далее этот буфер парсится, нужное загоняется в mysql-базу }. Другой вариант тоже пробовал — в несколько потоков — один исключительно слушает сокет и кладет данные в буфер, другой фильтрует буфер и кладет в другой буфер, третий содержимое второго буфера кладет в mysql — эффект — те же три процента. В то же время если класть не в базу данных, а, допустим, в файл — то все нормально, потери крохотные — все же udp.
Основная проблема сейчас в том, что пока парсятся данные, полученные через сокет "курсор" кода не успевает вернуться к recvfrom и пакет теряется

. Пробовал увеличить размер буфера сокета путем setsockopt() с параметром SO_RCVBUF — не помогло, размер как был 8192 так и остался (проверял через getsockopt), в то же время WSAGetLastError() вернул 0

. А размер netflow-пакета — 1464 байт, то есть, как я понимаю, если бы я один пропустил — то он все равно должен был бы прийти в следующем буфере. Таким образом возникли следующий вопросы:
1) C учетом того, что размер буфера сокета равен 8192 байт, а размер пакет — 1464 байт — должен ли сокет накапливать пропущенные пакеты в буфере?
2) Как можно сделать чтобы он накапливал?
3) Как лучше организовать работу — последовательно или в несколько потоков.
Операционная система — windows xp, MS VC 6.0. Код приведен ниже. Спасибо всем, кто откликнется!
/**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**//**/
#include <windows.h>
#include <winsock.h>
#include <stdio.h>
#include "mysql.h"
WSADATA ws;
SOCKET s;
struct sockaddr_in addr,Addr;
hostent *d_addr;
unsigned char pkt[1464000]=""; // буфер под netflow-пакет
char szBuf[1000];
int err;
int rcvbufsz = 1000*1464;
typedef struct { // то, что лежит в пакете — 30 записей
unsigned int srcaddr;
unsigned int dstaddr;
unsigned int nexthop;
unsigned short input;
unsigned short output;
unsigned int dPkts;
unsigned int dOctets;
unsigned int first;
unsigned int last;
unsigned short srcport;
unsigned short dstport;
unsigned char pad1;
unsigned char tcp_flags;
unsigned char prot;
unsigned char tos;
unsigned short pointx;
unsigned short pointy;
unsigned int dbindex;
} dbstruct;
int WINAPI WinMain(HINSTANCE hInstance,HINSTANCE hPrevInstance,LPSTR lpCmdLine,int nShowCmd)
{
dbstruct dbsInstance[30]; // экземпляры для занесения в базу данных
MYSQL mysql,*sock;
mysql_init(&mysql);
if (!(sock = mysql_real_connect(&mysql,"localhost","root","deadka","flowtest",3306,NULL,0)))
{
MessageBox(NULL,mysql_error(sock),"Error",MB_ICONERROR);
exit(1);
}
if (FAILED (WSAStartup (MAKEWORD( 1,1 ), &ws)))
{
//printf("Error in WSAStartup(...)\n");
return -1;
}
s = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (s == INVALID_SOCKET)
{
return -1;
}
memset(&addr,0,sizeof(sockaddr_in));
addr.sin_addr.s_addr =htons(INADDR_ANY);
addr.sin_family = PF_INET;
addr.sin_port = htons(55555);
if (bind(s,(sockaddr*)(&addr),sizeof(addr))==SOCKET_ERROR)
{
return -1;
}
int nSizer=sizeof(int);
int _1=getsockopt(s,SOL_SOCKET,SO_RCVBUF,(char *)(&rcvbufsz),&nSizer);
err=WSAGetLastError();
setsockopt (s, SOL_SOCKET, SO_RCVBUF,(char *)(&rcvbufsz),sizeof(int));
err=WSAGetLastError();
_1=getsockopt(s,SOL_SOCKET,SO_RCVBUF,(char *)(&rcvbufsz),&nSizer);
int sockbufsize = 0, size = sizeof(int);
err = getsockopt(s, SOL_SOCKET, SO_RCVBUF,(char *)&sockbufsize, &size);
err=WSAGetLastError();
int notUsed = sizeof(sockaddr);
int n;
while (1)
{
n=recvfrom(s,(char*)pkt,sizeof(pkt),0,((struct sockaddr *) &Addr),¬Used); // получим пакет
int nRealRecords(0);
for (register int ind=0; ind<pkt[3]; ++ind)
{
int i=24+ind*48;
if ((0!=pkt[i+40]) || (0!=pkt[i+41]) || (0!=pkt[i+42]) || (0!=pkt[i+43])) // если запись соотв. некоторому критерию
{
unsigned short px=(((((((pkt[i+40]<<8)+pkt[i+41])<<8)+pkt[i+42])<<8)+pkt[i+43])-120)%(7+1);
unsigned short py=(((((((pkt[i+40]<<8)+pkt[i+41])<<8)+pkt[i+42])<<8)+pkt[i+43])-120)/(7+1);
if ((px==1) || (py==1)) // если запись соотв. некоторому критерию
{
dbsInstance[nRealRecords].srcaddr=(((((pkt[i]<<8)+pkt[i+1])<<8)+pkt[i+2])<<8)+pkt[i+3];
dbsInstance[nRealRecords].dstaddr=(((((pkt[i+4]<<4)+pkt[i+5])<<8)+pkt[i+6])<<8)+pkt[i+7];
dbsInstance[nRealRecords].nexthop=(((((pkt[i+8]<<4)+pkt[i+9])<<8)+pkt[i+10])<<8)+pkt[i+11];
dbsInstance[nRealRecords].input=(pkt[i+12]<<8)+pkt[i+13];
dbsInstance[nRealRecords].output=(pkt[i+14]<<8)+pkt[i+15];
dbsInstance[nRealRecords].dPkts=(((((pkt[i+16]<<8)+pkt[i+17])<<8)+pkt[i+18])<<8)+pkt[i+19];
dbsInstance[nRealRecords].dOctets=(((((pkt[i+20]<<8)+pkt[i+21])<<8)+pkt[i+22])<<8)+pkt[i+23];
dbsInstance[nRealRecords].first=(((((pkt[i+24]<<8)+pkt[25])<<8)+pkt[26])<<8)+pkt[27];
dbsInstance[nRealRecords].last=(((((pkt[28]<<8)+pkt[29])<<8)+pkt[30])<<8)+pkt[31];
dbsInstance[nRealRecords].srcport=(pkt[i+32]<<8)+pkt[i+33];
dbsInstance[nRealRecords].dstport=(pkt[i+34]<<8)+pkt[i+35];
dbsInstance[nRealRecords].pad1=pkt[i+36];
dbsInstance[nRealRecords].tcp_flags=pkt[i+37];
dbsInstance[nRealRecords].prot=pkt[i+38];
dbsInstance[nRealRecords].tos=pkt[i+39];
dbsInstance[nRealRecords].pointx=(((((((pkt[i+40]<<8)+pkt[i+41])<<8)+pkt[i+42])<<8)+pkt[i+43])-120)%(7+1);
dbsInstance[nRealRecords].pointy=(((((((pkt[i+40]<<8)+pkt[i+41])<<8)+pkt[i+42])<<8)+pkt[i+43])-120)/(7+1);
dbsInstance[nRealRecords].dbindex=(((((pkt[i+44]<<8)+pkt[i+45])<<8)+pkt[i+46])<<8)+pkt[i+47];
nRealRecords++;
}
}
}
if (nRealRecords) // выбрали какие-нибудь записи
{
memset(szBuf,'\0',sizeof(szBuf));
sprintf(szBuf,"insert into nfpkt(srcaddr,dstaddr,dpkts,dOctets,tfirst,tlast,srcport,dstport,prot,pointx,pointy,dbindex) values");
for (register unsigned int i=0; i<nRealRecords; ++i)
{
dbstruct dbsInstanc=dbsInstance[i];
sprintf(szBuf+strlen(szBuf),"(%lu,%lu,%lu,%lu,%lu,%lu,%u,%u,%d,%u,%u,%lu),",dbsInstanc.srcaddr,dbsInstanc.dstaddr,dbsInstanc.dPkts,dbsInstanc.dOctets,dbsInstanc.first,dbsInstanc.last,dbsInstanc.srcport,dbsInstanc.dstport,dbsInstanc.prot,dbsInstanc.pointx,dbsInstanc.pointy,dbsInstanc.dbindex);
}
szBuf[strlen(szBuf)-1]=';';
if(mysql_query(sock,szBuf)) MessageBox(NULL,mysql_error(sock),"Error",MB_ICONERROR);
}
if (n!=1464) {
MessageBox(NULL,"YEAH!!!","",MB_OK); }
//if(mysql_query(sock,"insert into nfpkt(srcaddr,dstaddr,dpkts,dOctets,tfirst,tlast,srcport,dstport,prot,pointx,pointy,dbindex) values(3553440986,103617602,2,60,2643119385,1424818202,14383,7996,17,1,2,55929945);")) MessageBox(NULL,mysql_error(sock),"Error",MB_ICONERROR);
int r=123;
}
closesocket(s);
return 0;
}