Коллеги,
Есть есть boost lock free очередь. 2 producer и 1 consumer.
(Код основан на
примере из буста. Только тип сообщения из очереди не int , а мой кастомный ). Пытаюсь замерить среднее время сообщения в очереди.
Соответсвено перед тем как послать сообщение, получаем текущее время.
Записываем в сообщение. На принимающей стороне вычитываем сообщение.
Получаем текущее время. Переводим в наносекунды. Суммируем.
В конце делим получившееся время на кол-во сообщений.
Но у меня получаются старнные результаты.
Если ставить lock все работает как и ожидалось, нормально (имеется ввиду producerLock в методe produce). Результат будет дальше.
Если лок снять, то происходят странности.
| Исходный код |
| #include <iostream>
#include <mutex>
#include <boost/thread/thread.hpp>
#include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>
typedef struct Data
{
uint64_t sec;
uint64_t nano;
} DataType;
boost::atomic<bool> done (false);
boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);
boost::atomic_uint64_t sum(0);
boost::lockfree::queue<DataType ,
boost::lockfree::fixed_sized<true>,
boost::lockfree::capacity < 128 >
> queue;
const int iterations = 1 * 1000 * 1000;
const int producer_thread_count = 2;
const int consumer_thread_count = 1;
std::mutex producerLock;
void producer()
{
DataType msg;
struct timespec localTime;
for (int i = 0; i != iterations; ++i)
{
++producer_count;
clock_gettime( CLOCK_REALTIME, &localTime );
msg.sec = localTime.tv_sec;
msg.nano = localTime.tv_nsec;
producerLock.lock();
while (!queue.push(msg));
producerLock.unlock();
}
}
inline void calc(DataType& msg)
{
struct timespec localTime;
clock_gettime( CLOCK_REALTIME, &localTime );
uint64_t sec = (localTime.tv_sec - msg.sec);
uint64_t nsec = (localTime.tv_nsec - msg.nano) + (1000L * 1000L * 1000L * sec);
sum += nsec;
++consumer_count;
}
void consumer(void)
{
DataType msg;
while (!done) {
while (queue.pop(msg)) {
calc(msg);
}
}
while (queue.pop(msg))
{
calc(msg);
}
}
void PrintResults()
{
std::cout << "produced " << producer_count << " objects." << std::endl;
std::cout << "consumed " << consumer_count << " objects." << std::endl;
std::cout << "Total time " << sum << " ns." << std::endl;
std::cout << "Messages " << sum / consumer_count << " ns per msg" << std::endl;
}
int main(int argc, char* argv[])
{
using namespace std;
cout << "boost::lockfree::queue is ";
if (!queue.is_lock_free())
cout << "not ";
cout << "lockfree" << endl;
boost::thread_group producer_threads, consumer_threads;
for (int i = 0; i != producer_thread_count; ++i)
producer_threads.create_thread(producer);
for (int i = 0; i != consumer_thread_count; ++i)
consumer_threads.create_thread(consumer);
producer_threads.join_all();
done = true;
consumer_threads.join_all();
PrintResults();
return 0;
}
|
| |
Total time = это сколько суммарно мы насчитали нано секунд
(1с = 1,000,000,000 нано секунд ).
Программку запускал под утилитой time, что бы проверить результаты.
Вариант, с Lock работате "как надо", т.е суммарно Total time ~= user time (из утилиты time)
| Результат с раскомментированным Lock |
| boost::lockfree::queue is lockfree
produced 2000000 objects.
consumed 2000000 objects.
Total time 2748765264 ns.
Messages 1374 ns per msg
real 0m1.374s
user 0m2.868s
sys 0m0.880s
|
| |
Без Lock результаты не совпадают. Причем очень сильно.
| Результат с закомментированным Lock |
| boost::lockfree::queue is lockfree
produced 2000000 objects.
consumed 2000000 objects.
Total time 67803937295 ns.
Messages 33901 ns per msg
real 0m0.541s
user 0m1.603s
sys 0m0.002s
|
| |
| Подробнее о системе |
| Centos 7: 3.10.0-123.el7.x86_64
g++ (GCC) 4.8.3 20140911 (Red Hat 4.8.3-9)
boost 1.58_0: system and thread
Буст собирал сам с этим же компилятором. |
| |
Очень внимательно изучил документацию. Не один раз.
Ограничения на тип выполнены:
Requirements:
•T must have a copy constructor
•T must have a trivial assignment operator
•T must have a trivial destructor
Вот метод push
Говорят thread-safe.
Что пробовал:
Заменял DataType на DataType* ( т.е передовать не сообщение а указатель )
Линковать 32 и 64 битные релизные версии библиотеки.
(В случае с 32 бибтной версией, естесвенно компилировалал с ключом -m32 )
Различные версии gcc ( 4.8.2 / 4.8.3 / 4.9.1 / еще что то там ).
Различные версии linux: Alt / Ubuntu 14.04 LTS / Centos 7
Игрался с выравниванием струткруты.
Ииспользовал метод push_bounded
Использовал очередь с динамическим выделением объектов
Пробовал создавать соощение в динамической памяти, а не на стеке.
Пробовал создавать каждый раз новое сообщение, и слать его.
....
Пока ничего не помогло...
Есть у кого какие-нибудь идеи?