Вопрос по boost lockfree queue
От: Engler Беларусь  
Дата: 26.04.15 11:27
Оценка:
Коллеги,

Есть есть boost lock free очередь. 2 producer и 1 consumer.
(Код основан на примере из буста. Только тип сообщения из очереди не int , а мой кастомный ). Пытаюсь замерить среднее время сообщения в очереди.
Соответсвено перед тем как послать сообщение, получаем текущее время.
Записываем в сообщение. На принимающей стороне вычитываем сообщение.
Получаем текущее время. Переводим в наносекунды. Суммируем.
В конце делим получившееся время на кол-во сообщений.
Но у меня получаются старнные результаты.

Если ставить lock все работает как и ожидалось, нормально (имеется ввиду producerLock в методe produce). Результат будет дальше.
Если лок снять, то происходят странности.

  Исходный код
#include <iostream>
#include <mutex>

#include <boost/thread/thread.hpp>
#include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>

typedef struct Data
{
    uint64_t sec;
    uint64_t nano;
} DataType;

boost::atomic<bool> done (false);
boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);
boost::atomic_uint64_t sum(0);

boost::lockfree::queue<DataType ,
    boost::lockfree::fixed_sized<true>,
    boost::lockfree::capacity < 128 >
    > queue;

const int iterations = 1 * 1000 * 1000;
const int producer_thread_count = 2;
const int consumer_thread_count = 1;

std::mutex producerLock;

void producer()
{
    DataType msg;

    struct timespec localTime;
    for (int i = 0; i != iterations; ++i)
    {
        ++producer_count;
        clock_gettime( CLOCK_REALTIME,  &localTime );

        msg.sec = localTime.tv_sec;
        msg.nano = localTime.tv_nsec;

        producerLock.lock();
        while (!queue.push(msg));
        producerLock.unlock();
    }
}




inline void calc(DataType& msg)
{
    struct timespec localTime;
    clock_gettime( CLOCK_REALTIME, &localTime );

    uint64_t sec = (localTime.tv_sec - msg.sec);
    uint64_t nsec = (localTime.tv_nsec - msg.nano) + (1000L * 1000L * 1000L * sec);
    sum += nsec;
    ++consumer_count;
}

void consumer(void)
{
    DataType msg;
    while (!done) {

        while (queue.pop(msg)) {
                calc(msg);
        }
    }

    while (queue.pop(msg))
    {
        calc(msg);
    }
}


void PrintResults()
{
    std::cout << "produced   " << producer_count << " objects." << std::endl;
    std::cout << "consumed   " << consumer_count << " objects." << std::endl;
    std::cout << "Total time " << sum << " ns." << std::endl;
    std::cout << "Messages   " << sum / consumer_count << " ns per msg" << std::endl;
}

int main(int argc, char* argv[])
{
    using namespace std;
    cout << "boost::lockfree::queue is ";
    if (!queue.is_lock_free())
        cout << "not ";
    cout << "lockfree" << endl;

    boost::thread_group producer_threads, consumer_threads;

    for (int i = 0; i != producer_thread_count; ++i)
      producer_threads.create_thread(producer);

    for (int i = 0; i != consumer_thread_count; ++i)
        consumer_threads.create_thread(consumer);

    producer_threads.join_all();
    done = true;
    consumer_threads.join_all();

    PrintResults();

    return 0;
}


Total time = это сколько суммарно мы насчитали нано секунд
(1с = 1,000,000,000 нано секунд ).
Программку запускал под утилитой time, что бы проверить результаты.
Вариант, с Lock работате "как надо", т.е суммарно Total time ~= user time (из утилиты time)

  Результат с раскомментированным Lock
boost::lockfree::queue is lockfree
produced   2000000 objects.
consumed   2000000 objects.
Total time 2748765264 ns.
Messages   1374 ns per msg

real    0m1.374s
user    0m2.868s
sys     0m0.880s


Без Lock результаты не совпадают. Причем очень сильно.

  Результат с закомментированным Lock
boost::lockfree::queue is lockfree
produced   2000000 objects.
consumed   2000000 objects.
Total time 67803937295 ns.
Messages   33901 ns per msg

real    0m0.541s
user    0m1.603s
sys     0m0.002s


  Подробнее о системе
Centos 7: 3.10.0-123.el7.x86_64
g++ (GCC) 4.8.3 20140911 (Red Hat 4.8.3-9)
boost 1.58_0: system and thread
Буст собирал сам с этим же компилятором.


Очень внимательно изучил документацию. Не один раз.

Ограничения на тип выполнены:

Requirements:
•T must have a copy constructor
•T must have a trivial assignment operator
•T must have a trivial destructor


Вот метод push
Говорят thread-safe.

Что пробовал:
Заменял DataType на DataType* ( т.е передовать не сообщение а указатель )

Линковать 32 и 64 битные релизные версии библиотеки.
(В случае с 32 бибтной версией, естесвенно компилировалал с ключом -m32 )

Различные версии gcc ( 4.8.2 / 4.8.3 / 4.9.1 / еще что то там ).
Различные версии linux: Alt / Ubuntu 14.04 LTS / Centos 7

Игрался с выравниванием струткруты.
Ииспользовал метод push_bounded
Использовал очередь с динамическим выделением объектов
Пробовал создавать соощение в динамической памяти, а не на стеке.
Пробовал создавать каждый раз новое сообщение, и слать его.
....

Пока ничего не помогло...


Есть у кого какие-нибудь идеи?
Отредактировано 26.04.2015 11:29 Engler . Предыдущая версия .
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.