Проблема при работе с std::queue.
От: .erax  
Дата: 09.11.04 11:09
Оценка:
Пишу сервер, все входящие сообщения храню в контейнере std::queue, т.к. работают с одной очередью много потоков,- реализована внешняя синхронизация. Во время тестирования системы, на DDOS (поводилось под ОС Win 2000 Profesional build 5.00.2195 SP3 RAM 512 MB AMD Athlon 2.5 Barton SL-FRN2). Загрузка процессора 100% при этом время ядра 90-95%, в TaskManager-е, заметил, что происходит 10 — 40 тысяч ошибочных обращений к памяти в секунду. Возможная причина: приняв пакет с данными от клиента сервер запоминает его в пямяти, причем страница памяти перегрузается в файл подкачки и помечается как не доступная, далее при обращении к ней генерируется прерывание и ядро системы загрузает её в ОП из свопа.
Вопрос: есть ли способ запретить сохранение страниц в память. Ну а если мой диагноз неверен, буду рад услышать Вашы рассуждения. Заранее благодарен.

P.S. При отключении сохранения пакетов в очередь, т.е. получаю и игнорирую его. Используя подключение типа LOOP_BACK скорость получения данных была в районе 200 MB в секунду время ядра 5-10%.
Re: Проблема при работе с std::queue.
От: ssm Россия  
Дата: 09.11.04 11:18
Оценка:
Здравствуйте, .erax, Вы писали:

E>P.S. При отключении сохранения пакетов в очередь, т.е. получаю и игнорирую его. Используя подключение типа LOOP_BACK скорость получения данных была в районе 200 MB в секунду время ядра 5-10%.


покажи
1.сохранение пакетов в очереди
2.цикл обработки сообщений из очереди
Re[2]: Проблема при работе с std::queue.
От: .erax  
Дата: 09.11.04 11:47
Оценка:
Здравствуйте, ssm, Вы писали:

ssm>Здравствуйте, .erax, Вы писали:


E>>P.S. При отключении сохранения пакетов в очередь, т.е. получаю и игнорирую его. Используя подключение типа LOOP_BACK скорость получения данных была в районе 200 MB в секунду время ядра 5-10%.


ssm>покажи

ssm>1.сохранение пакетов в очереди
ssm>2.цикл обработки сообщений из очереди

//////////////////////////////////////////////////////
int cMPPacketQueue::Push(SServerPacket *_packet)
{
size_t size;
assert(_packet);
try
{
size = m_packetQueue.size();
if(size <= m_QueueMaxLimit)
m_packetQueue.push(_packet);
else
return 0;
}
catch(...)
{
return 0;
}
return (int)size;
}

inline bool cMPPacketQueue::IsEmpty(){return m_packetQueue.empty();}
////////////////////////////////////////////////////////
while(isWorking)
{
mtserverClass->m_queue_mutex.LockMutex();
{
isEmpty = mtserverClass->m_queue.IsEmpty();
}
mtserverClass->m_queue_mutex.UnlockMutex();

if(!isEmpty)
{
mtserverClass->m_queue_mutex.LockMutex();
{
sp= mtserverClass->m_queue.Pop();
assert(sp);
}
mtserverClass->m_queue_mutex.UnlockMutex();
if(sp->spHeader.isRelayPacket)
mtserverClass->Send(sp);
else
mtserverClass->OnReceive( \
sp->spHeader.userIdFrom,
sp->spHeader.RawSize,
sp->lpData );
safe_delete(sp);

}
else
{
Sleep(1);
}
mtserverClass->m_server_info_mutex.LockMutex();
{
isWorking = mtserverClass->m_isWorking;
}
mtserverClass->m_server_info_mutex.UnlockMutex();
}
Re[3]: Проблема при работе с std::queue.
От: ssm Россия  
Дата: 09.11.04 12:13
Оценка:
Здравствуйте, .erax, Вы писали:



1. при добавлении пакетов в очередь, надо ее блокировать. у тебя этим и не пахнет
2. std::queue не имеет метода Pop(), у нее есть front() и pop()
3. нафига столько блокировок в основном цикле? вот версия с критической секцией:


    bool bTask;
    while(!bFinish || !tasks.empty())
    {        
        

        bTask = !tasks.empty();
        if(bTask)
        {
            cs.Lock();
            if(!tasks.empty())
            {
                task = tasks.front();
                tasks.pop();            
            }            
            cs.Unlock();
        }
        

        if(task)
        {    
            
            //...
        }
        else
        {
            ::Sleep(50);
        }

    }


суть в том, чтобы входить в крит. секцию, только когда это действительно необходимо. а у тебя получется все время происходит синхронизация

4. в cMPPacketQueue::Push(SServerPacket *_packet) нужен семафор на m_QueueMaxLimit потоков
Re[4]: Проблема при работе с std::queue.
От: .erax  
Дата: 09.11.04 12:40
Оценка:
Здравствуйте, ssm, Вы писали:

ssm>Здравствуйте, .erax, Вы писали:




ssm>1. при добавлении пакетов в очередь, надо ее блокировать. у тебя этим и не пахнет

Я же сказал что использую внешнюю синхроницацию.
ssm>2. std::queue не имеет метода Pop(), у нее есть front() и pop()

SServerPacket* cMPPacketQueue::Pop()
    {
        if(m_packetQueue.empty())
            return NULL;
        SServerPacket* _packet = NULL;
        try
        {
            _packet = m_packetQueue.front();
            assert(_packet);
            m_packetQueue.pop();
        }
        catch(...)
        {
            return NULL;
        }
        return _packet;
    }


ssm>3. нафига столько блокировок в основном цикле? вот версия с критической секцией:

ssm>суть в том, чтобы входить в крит. секцию, только когда это действительно необходимо. а у тебя получется все время происходит синхронизация
Да, я извиняюсь, что сразу не написал этого. Вчера тестировал момент
на возможность тормозов изза синхронизации. Вместо мютексов в винде используются крит. секции, ну а в Линуксе,- мютексы.
Дошло до того, что тело функций Lock() и Unlock() оставил пустыми, т.е. никакой синхронизации небыло вообще. Цирк в том, что тест стабильно проработал целую ночь, 5 компов с 300 клиентами на каждом, плюс 200 на локалхосте, итого 1700 клиентов, сам сервак разбился на 27 читающих потоков по 64 клиента на каждый, прлюс один поток-акцептор (в котором происходит вызов accept) и один поток для обработки очереди сообщений. Тест заключался в том, что клиент шлет пакет типа:

struct testPack
{
  DWORD dwTime;
  char dummyData[64K-sizeof(DWORD)];
}


сервер отправлает его обратно клиенту, на котором собирается статистика по времени прохождения пакета, модифицируется время и снова отправляется клиенту.
Прикол в том, что без синхронизации все отработало 16 часов без единого гюка. Через сервер прошло ~580 ГБ данных (89% с локалхоста).
Время ядра осталось на том же уровне.

Конечно можно сделать вывод о потокозащищенности std::queue .
Я ж говорю, что ядро грузится изза промахов по памяти.
ssm>4. в cMPPacketQueue::Push(SServerPacket *_packet) нужен семафор на m_QueueMaxLimit потоков
???
Re[5]: Проблема при работе с std::queue.
От: ssm Россия  
Дата: 09.11.04 12:59
Оценка:
Здравствуйте, .erax, Вы писали:

E>Здравствуйте, ssm, Вы писали:


ssm>>Здравствуйте, .erax, Вы писали:




ssm>>1. при добавлении пакетов в очередь, надо ее блокировать. у тебя этим и не пахнет

E>Я же сказал что использую внешнюю синхроницацию.

в каком смысле?

E>Время ядра осталось на том же уровне.


что и с блокировками? нереально!!!

E>Конечно можно сделать вывод о потокозащищенности std::queue .



E>Я ж говорю, что ядро грузится изза промахов по памяти.

ssm>>4. в cMPPacketQueue::Push(SServerPacket *_packet) нужен семафор на m_QueueMaxLimit потоков
E>???

смысл твоего m_QueueMaxLimit какой?


эта..., а ну ка убери все

try
{
  //...
}
catch(...)
{
  //...
}


они все равно нафиг не нужны, std::queue не кидает исключений.
Re[6]: проверь просто пустой цикл
От: ssm Россия  
Дата: 09.11.04 13:05
Оценка:
Здравствуйте, ssm, Вы писали:



while(isWorking)
{
    mtserverClass->m_queue_mutex.LockMutex();
    {
        isEmpty = mtserverClass->m_queue.IsEmpty();
    }
    mtserverClass->m_queue_mutex.UnlockMutex();

    Sleep(1);
    
    mtserverClass->m_server_info_mutex.LockMutex();
    {
        isWorking = mtserverClass->m_isWorking;
    }
    mtserverClass->m_server_info_mutex.UnlockMutex();
}
Re[6]: Проблема при работе с std::queue.
От: .erax  
Дата: 09.11.04 13:22
Оценка:
Здравствуйте, ssm, Вы писали:

E>>Я же сказал что использую внешнюю синхроницацию.

ssm> в каком смысле?
... внешнюю для класса cMPPacketQueue. Этот клас просто некоторый интекфейс, просто в случае какой либо смены контейнера придется переписань несколько строк в этом классе. Была идея делать синхронизацию внутри класса, но из-за особенностей мютексов (на которых до вчера все работало) возникали взаимоблокировки в рамках одного потока.

E>>Время ядра осталось на том же уровне.

ssm>что и с блокировками? нереально!!!
веришь, у самого глаза на лоб полезли вчера вечером когда убрал синхронизацию, т.к. думал что именно в ней и дело, я б с такими вобросами в RSDN не обращался.

E>>Конечно можно сделать вывод о потокозащищенности std::queue .

ssm>


E>>Я ж говорю, что ядро грузится изза промахов по памяти.

ssm>>>4. в cMPPacketQueue::Push(SServerPacket *_packet) нужен семафор на m_QueueMaxLimit потоков
E>>???

ssm>смысл твоего m_QueueMaxLimit какой?

самый простой когда размер очереди сообщений больше m_QueueMaxLimit в нее пакеты не ложим(просто забываем).
тупо,- но действенно.


ssm>эта..., а ну ка убери все


ssm>
ssm>try
ssm>{
ssm>  //...
ssm>}
ssm>catch(...)
ssm>{
ssm>  //...
ssm>}
ssm>


ssm>они все равно нафиг не нужны, std::queue не кидает исключений.

о еще как выбрасывает особенно когда свободной памяти 0 байт.
обнаружил это случайно,- провтыкал както — пакеты в очередь ложу, но не выгребаю, когда очередь заняла 2 ГБ памяти (больше в 32х разрядных системах нельзя выделять под нужды одного процесса, т.е. можно адресовать 4 но остальные два — резерв системы), — ооо такие исключения посыпались, что мама не горюй. А сейчас когда памяти нет, пакет тоже просто теряется, чем сохраняет работосмособность всей системы, я думаю лучже забыть один пакет чем вырубить весь сервак.
Re[7]: Проблема при работе с std::queue.
От: ssm Россия  
Дата: 09.11.04 13:36
Оценка: +1
Здравствуйте, .erax, Вы писали:

ssm>>они все равно нафиг не нужны, std::queue не кидает исключений.

E>о еще как выбрасывает особенно когда свободной памяти 0 байт.
E> обнаружил это случайно,

убери для эксперимента, что бы проверить что действительно так тупит
ты с пустым цыклом проверял?
Re[7]: проверь просто пустой цикл
От: .erax  
Дата: 09.11.04 13:39
Оценка:
Здравствуйте, ssm, Вы писали:

ssm>Здравствуйте, ssm, Вы писали:




ssm>
ssm>while(isWorking)
ssm>{
ssm>    mtserverClass->m_queue_mutex.LockMutex();
ssm>    {
ssm>        isEmpty = mtserverClass->m_queue.IsEmpty();
ssm>    }
ssm>    mtserverClass->m_queue_mutex.UnlockMutex();

ssm>    Sleep(1);
    
ssm>    mtserverClass->m_server_info_mutex.LockMutex();
ssm>    {
ssm>        isWorking = mtserverClass->m_isWorking;
ssm>    }
ssm>    mtserverClass->m_server_info_mutex.UnlockMutex();
ssm>}
ssm>


если без Sleep(1) загрузка ядра порядка ~40-45% с мютексами, ~35-40% c крит секциями, ~1-3% без ничего(с заглушками).
Ну а если со слипом, то ничего не видно, т.к. общая загрузка — 0%.
Проблема в ПРОМАХАХ по ПАМЯТИ!!!!
Re[8]: Проблема при работе с std::queue.
От: .erax  
Дата: 09.11.04 13:41
Оценка:
Здравствуйте, ssm, Вы писали:

ssm>убери для эксперимента, что бы проверить что действительно так тупит

ssm>ты с пустым цыклом проверял?

Синхронизацию проверил вдоль и поперек, по циклу оставил пост,- почитай.
Re[8]: проверь просто пустой цикл
От: ssm Россия  
Дата: 09.11.04 14:22
Оценка:
Здравствуйте, .erax, Вы писали:


E>если без Sleep(1) загрузка ядра порядка ~40-45% с мютексами, ~35-40% c крит секциями, ~1-3% без ничего(с заглушками).


E>Ну а если со слипом, то ничего не видно, т.к. общая загрузка — 0%.

блин, ну теперь начинай добавлять по одному методу, пока не увидишь, что РЕАЛЬНО грузит проц.

E>Проблема в ПРОМАХАХ по ПАМЯТИ!!!!


да погоди, не надо тока пока кричать, я думаю, что проблема в каком-то из методов
mtserverClass->Send(...);
mtserverClass->OnReceive(...);
safe_delete(...);

бери на вооружение метод научного коментирования и сообщи результаты.
Re[9]: проверь просто пустой цикл
От: .erax  
Дата: 09.11.04 14:55
Оценка:
Здравствуйте, ssm, Вы писали:


ssm>да погоди, не надо тока пока кричать, я думаю, что проблема в каком-то из методов

ssm>mtserverClass->Send(...);
Проверено когда отключал очередь вообще. Т.е. recv и сразу send. Здесь все нормально.

ssm>mtserverClass->OnReceive(...);

просто заглушка в ней ничего нет, сделана для интерфйса с наследующим классом.

ssm>safe_delete(...);

макрос if(...) {detete ...; ...=NULL}

... эта штука ядро не грузит вообще (по крайней мере в диспетчере задач визуально не заметно, проц в целом — да на все 100%).

void CPULoadThread()
{
    int *i;
    while(1)
    {
        i = new int[10240];
        memset(i, 0, 10240);

        for(int j=0; j < 10240; j++)
        {
            i[j] = rand();
        }

        int summ = 0;
        
        for(j = 0; j < 10240; j++)
            summ+=i[j];

        double mul = 0.0;

        for(j = 1024; j > 0; j--)
            mul*=double(i[j]);

        int rndsumm = 0;

        j=10240;
        while(j--)
        {
            rndsumm+=i[rand()%10240];
        }
        delete[] i;
    }
}

и гденибудь по кнопочке сделай так...

HANDLE m_cpu_load_thead = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)CPULoadThread, NULL, 0, NULL);
CloseHandle(m_cpu_load_thead);




ssm>бери на вооружение метод научного коментирования и сообщи результаты.


... остались операции по работе с очередью.
Re[10]: других вариантов нет
От: ssm Россия  
Дата: 09.11.04 15:08
Оценка:
Здравствуйте, .erax, Вы писали:


E>Проверено когда отключал очередь вообще. Т.е. recv и сразу send. Здесь все нормально.


проверь с очередью.
у тебя три метода, два из которых не в счет, остался только mtserverClass->Send(...);


E>... остались операции по работе с очередью.

оставь только операции с очередью и убери mtserverClass->Send(...), что бы убедится
Re[11]: других вариантов нет
От: .erax  
Дата: 09.11.04 16:01
Оценка:
Здравствуйте, ssm, Вы писали:

E>>... остались операции по работе с очередью.

ssm>оставь только операции с очередью и убери mtserverClass->Send(...), что бы убедится

да, дело в операциях с очередью.
Re: Проблема при работе с std::queue.
От: e-Xecutor Россия  
Дата: 10.11.04 08:55
Оценка:
Здравствуйте, .erax, Вы писали:

std::queue ты по умолчанию на deque юзаешь?

Попробуй такой классик:
template <class T>
class CyclicQueue{
public:
  explicit CyclicQueue(int prealloc=0)
  {
    if(prealloc>0)
    {
      data=new T[prealloc];
      size=prealloc;
      end=data+size;
    }else
    {
      data=0;
      size=0;
      end=0;
    }
    head=data;
    tail=data;
    count=0;
  }
  CyclicQueue(const CyclicQueue& src)
  {
    if(src.size==0)
    {
      data=0;
      end=0;
      head=0;
      tail=0;
      size=0;
      count=0;
    }else
    {
      data=new T[src.size];
      head=data;
      tail=data;
      size=src.size;
      end=data+size;
      count=0;
      T* ptr=src.tail;
      for(int i=0;i<src.count;i++)
      {
        Push(*ptr);
        ptr++;
        if(ptr==src.end)ptr=src.data;
      }
    }
  }
  ~CyclicQueue()
  {
    if(data)delete [] data;
  }
  void Push(const T& item)
  {
    if(count==size)Realloc(size*2);
    if(head==end)head=data;
    *head=item;
    head++;
    count++;
  }
  T& Front()
  {
    if(count==0)throw std::runtime_error("CQ: attempt to use Front() on empty queue");
    return *tail;
  }
  bool Pop(T& item)
  {
    if(count==0)return false;//throw std::runtime_error("CQ: attempt to call Pop() on empty queue");
    item=*tail;
    *tail=T();
    tail++;
    if(tail==end)tail=data;
    count--;
    return true;
  }
  void Pop()
  {
    if(count==0)throw std::runtime_error("CQ: attempt to call Pop() on empty queue");
    *tail=T();
    tail++;
    if(tail==end)tail=data;
    count--;
  }
  int Count()const{return count;}

protected:
  void Realloc(int sz)
  {
    if(sz==0)sz=16;
    T *newdata=new T[sz];
    T *ptr=tail;
    T *out=newdata;
    for(int i=0;i<count;i++)
    {
      *out=*ptr;
      ptr++;
      if(ptr==end)ptr=data;
      out++;
    }
    if(data)delete [] data;
    size=sz;
    data=newdata;
    end=data+size;
    tail=data;
    head=data+count;
  }
  int count,size;
  T* data;
  T* end;
  T* head;
  T* tail;
}; //class CyclicQueue


Если у тебя много писателей и один читатель можно еще один трюк попробовать провернуть...
Re[12]: других вариантов нет
От: ssm Россия  
Дата: 10.11.04 09:43
Оценка:
Здравствуйте, .erax, Вы писали:

E>Здравствуйте, ssm, Вы писали:


E>>>... остались операции по работе с очередью.

ssm>>оставь только операции с очередью и убери mtserverClass->Send(...), что бы убедится

E>да, дело в операциях с очередью.


НЕТ, дело НЕ в операциях с очередью. я накалякал ниже простой примерчик, суть которого:
есть сервер, умеющий выполнять асинхронно комманду(Command::execute)
есть 100 клиентов(работают в отдельных потоках), каждый из которых просит сервер асинхронно выполнить 20 комманд. При выполнении примера, загрузка процессора 0-1%, если интерестно поиграй с количеством клиентов и комманд. Все сделано на одной критической секции, для std::queue


#include <algorithm>
#include <iostream>
#include <queue>

#include <atlbase.h>


class Command
{
public:
    Command()        
    {
    }

    void execute()
    {
        static int i = 0;
        std::cerr << ++i << std::endl;
    }    
};

class Server
{
public:
    Server()        
        :    joined(false)
    {
        threadHandle.Attach(AtlCreateThread(&Server::run, this));
    }

    void queueCommand(Command *cmd)
    {
        cs.Lock();
        commands.push(cmd);
        cs.Unlock();
    }
    
    static DWORD WINAPI run(Server *srv)
    {
        srv->runLoop();
        return 0;
    }

    void join()
    {    
        joined = true;
        ::WaitForSingleObject(threadHandle, INFINITE);        
    }
    
private:
    bool joined;
    CHandle threadHandle;
    std::queue<Command*> commands;
    CComAutoCriticalSection cs;
private:
    void runLoop()
    {
        while (true) 
        {
            if(!commands.empty())
            {
                Command *cmd = 0;

                //extract command
                cs.Lock();
                if(!commands.empty())
                {
                    cmd = commands.front();
                    commands.pop();                    
                }
                cs.Unlock();

                if(cmd)
                {
                    //execute command
                    cmd->execute();

                    //delete command
                    delete cmd;
                }
            }
            else if(joined)
            {
                break;
            }
            
            Sleep(1);
        }
    }        
};


class Client
{
public:
    void sendCommand(Server *server)
    {
        threadHandle.Attach(AtlCreateThread(&Client::run, server));
    }

    ~Client()
    {
        ::WaitForSingleObject(threadHandle, 10);
    }
private:
    CHandle threadHandle;
    

    static DWORD WINAPI run(Server *server)
    {
        for(int i = 0; i < 20; ++i)
        {
            server->queueCommand(new Command());
        }

        //даем время серверу
        Sleep(1);

        return 0;
    }
};



int main()
{        
    //создали сервер 
    Server srv;    
        
    //дали время раздуплится потоку сервера
    Sleep(100);
    
    //деструкторы клиентов ждут окончания
    //своих потоков 
    {

        //кол.во клиентских потоков
        const size_t clientCount = 100;    
        Client clients[clientCount];

        //запустить клиентские потоки
        std::for_each(
            clients, clients + clientCount, 
            std::bind2nd(std::mem_fun1_ref(&Client::sendCommand), &srv));

    }
    

    //ждем пока сервер не обработает все сообщения
    srv.join();
    
    return 0;
}
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.