ревью multi-queue processor
От: Hydrophobia  
Дата: 06.09.19 12:10
Оценка:
Доброго времени суток!

Решил тут попробывать себя в одну кипрскую контору, дали мне задание:

Необходимо сделать универсальный (библиотечный) обработчик нескольких очередей, позволяющий:

добавлять в очередь из нескольких продюсеров (из разных потоков)
подписаться/отписаться на разбор очереди в отдельном потоке одним фиксированным консьюмером (одна очередь — один консьюмер)

Общее требования к обработчику очередей:

потокобезопасный
эффективный, быстрый, масштабируемый
легко расширяемый (с точки зрения кода)


и набор интерфейсов с "неправильной" реализацией:

  пример предоставленного кода
#pragma once
#include <map>
#include <list>
#include <thread>
#include <mutex>

template<typename Key, typename Value>
struct IConsumer
{
    virtual void Consume(Key id, const Value &value)
    {
        id;
        value;
    }
};

#define MaxCapacity 1000

template<typename Key, typename Value>
class MultiQueueProcessor
{
public:
    MultiQueueProcessor() :
        running{ true },
        th(std::bind(&MultiQueueProcessor::Process, this)) {}

    ~MultiQueueProcessor()
    {
        StopProcessing();
        th.join();
    }

    void StopProcessing()
    {
        running = false;
    }

    void Subscribe(Key id, IConsumer<Key, Value> * consumer)
    {
        std::lock_guard<std::recursive_mutex> lock{ mtx };
        auto iter = consumers.find(id);
        if (iter == consumers.end())
        {
            consumers.insert(std::make_pair(id, consumer));
        }
    }

    void Unsubscribe(Key id)
    {
        std::lock_guard<std::recursive_mutex> lock{ mtx };
        auto iter = consumers.find(id);
        if (iter != consumers.end())
            consumers.erase(id);
    }

    void Enqueue(Key id, Value value)
    {
        std::lock_guard<std::recursive_mutex> lock{ mtx };
        auto iter = queues.find(id);
        if (iter != queues.end())
        {
            if (iter->second.size() < MaxCapacity)
                iter->second.push_back(value);
        }
        else
        {
            queues.insert(std::make_pair(id, std::list<Value>()));
            iter = queues.find(id);
            if (iter != queues.end())
            {
                if (iter->second.size() < MaxCapacity)
                    iter->second.push_back(value);
            }
        }
    }

    Value Dequeue(Key id)
    {
        std::lock_guard<std::recursive_mutex> lock{ mtx };
        auto iter = queues.find(id);
        if (iter != queues.end())
        {
            if (iter->second.size() > 0)
            {
                auto front = iter->second.front();
                iter->second.pop_front();
                return front;
            }
        }
        return Value{};
    }

protected:
    void Process()
    {
        while (running)
        {
            Sleep(10);
            std::lock_guard<std::recursive_mutex> lock{ mtx };
            for (auto iter = queues.begin(); iter != queues.end(); ++iter)
            {
                auto consumerIter = consumers.find(iter->first);
                if (consumerIter != consumers.end())
                {
                    Value front = Dequeue(iter->first);
                    if (front != Value{})
                        consumerIter->second->Consume(iter->first, front);
                }
            }
        }
    }

protected:
    std::map<Key, IConsumer<Key, Value> *> consumers;
    std::map<Key, std::list<Value>> queues;

    bool running;
    std::recursive_mutex mtx;
    std::thread th;
};


Ограничения — C++ и boost.

Я написал реализацию, но она их чем-то не удовлетворила (const мало, да лямбд не обнаружено, да аллокаций много). Моя реализация доступна по линку

https://www.dropbox.com/s/lfqbaxpnz58rmal/multiqueue-processor.zip?dl=1

Пожалуйста, прокоментируйте реализацию, архитектуру и пр. Очень интересно создать "продуктовое решение"

Update1: Добавил интерфейсы
Отредактировано 07.09.2019 6:21 Hydrophobia . Предыдущая версия .
c++ multithreading
Re: ревью multi-queue processor
От: Denis Ivlev  
Дата: 06.09.19 16:04
Оценка:
Здравствуйте, Hydrophobia, Вы писали:

H> потокобезопасный

H> эффективный, быстрый, масштабируемый
H> легко расширяемый (с точки зрения кода)

Я бы тоже отказал. Первое, что смутило — никаких тестов, никакой документации, не понятно почему не на гитхабе. shared_ptr? Зачем? Ты же на плюсах пишешь, а не на джаве, map вместо unordered_map. Интерфейс мутный, понять как это надо использовать сложно. push в очередь при ее заполнии начинает молча продалбывать сообщения, настроить это поведение негде, никаких подсказок о таком поведении нет. MultiQueueProcessor стал узким местом все продьюсеры на нем встают в очередь. Дальше надоело.

Я бы ожидал что-то вроде этого (псевдокод):

queue_builder<T> builder;

auto queue1 = builder.create_circullar_queue(100);
auto queue2 = builder.create_drop_oldest_queue(100);

template <class T>
struct consumer
{
    void process(T&& data)
    {
        std::cout << data;
    }
};

consumer cons1;

queue1.set_consumer(cons1);

for (int i = 0; i < 10; ++i)
    std::async([&]() { queue1.push(i); });

queue1.remove_consumer();
Re: ревью multi-queue processor
От: AndrewJD США  
Дата: 06.09.19 17:10
Оценка:
Здравствуйте, Hydrophobia, Вы писали:

H>и набор интерфейсов. Ограничения — C++ и boost.

Как интерфейс ожидался от очереди? Это важно, поскольку эффективная реализация невозможна при кривом интерфейсе(типа STL-like).

H>Общее требования к обработчику очередей:

H> потокобезопасный
H> эффективный, быстрый, масштабируемый
H> легко расширяемый (с точки зрения кода)
H> подписаться/отписаться на разбор очереди в отдельном потоке одним фиксированным консьюмером (одна очередь — один консьюмер)
H>Я написал реализацию, но она их чем-то не удовлетворила (const мало, да лямбд не обнаружено, да аллокаций много).

Разве не достаточно что пункт №2 не выполнен? Тормозная, использующий много аллокаций и как следствие не масштабируемая реализация. Никак не использовано ограничение: много писателей, один читатель.
Если это какая-то финансовая контора для которой критична скорость — результат ожидаем.


H>Пожалуйста, прокоментируйте реализацию, архитектуру и пр. Очень интересно создать "продуктовое решение"

Используя тот же Boost Asio можно было все проще и эффективней сделать.
"For every complex problem, there is a solution that is simple, neat,
and wrong."
Re: ревью multi-queue processor
От: lpd Черногория  
Дата: 06.09.19 17:24
Оценка:
Здравствуйте, Hydrophobia, Вы писали:

H>https://www.dropbox.com/s/lfqbaxpnz58rmal/multiqueue-processor.zip?dl=1


H>Пожалуйста, прокоментируйте реализацию, архитектуру и пр. Очень интересно создать "продуктовое решение"


Все не смотрел, но использование sleep() в MultiQueueProcessor:Process() очень странно. Лучше использовать conditional variable. Здесь вроде описано: статья
У сложных вещей обычно есть и хорошие, и плохие аспекты.
Берегите Родину, мать вашу. (ДДТ)
Re: ревью multi-queue processor
От: Kernan Ниоткуда https://rsdn.ru/forum/flame.politics/
Дата: 07.09.19 02:32
Оценка:
Здравствуйте, Hydrophobia, Вы писали:

H>Пожалуйста, прокоментируйте реализацию, архитектуру и пр. Очень интересно создать "продуктовое решение"

1. Самое важное. Если это то, что ты выложил это то, что ты послал, это уже фейл. Его ни собрать, ни потестить нельзя. Смотреть код ИМХО смысла нет после этого. Про тесты сказали, но ИМХО, надо хотя бы минимальный тестовый код в main иметь чтобы это запускалось и выводило что-то. Юнит тесты это дополнительная штука за + к баллам.
2. Ты нифига не понял что от тебя хотят. Если бы ты привёл конкретное задание, то было бы проще.
Sic luceat lux!
Re[2]: ревью multi-queue processor
От: Hydrophobia  
Дата: 07.09.19 05:51
Оценка:
Здравствуйте, AndrewJD, Вы писали:

H>>и набор интерфейсов. Ограничения — C++ и boost.

AJD>Как интерфейс ожидался от очереди? Это важно, поскольку эффективная реализация невозможна при кривом интерфейсе(типа STL-like).

интерфейсы для IConsumer и MultiQueueProcessor были предоставлены, но, насколько я понял, сигнатуру методов можно было менять. Все методы были типа

void Consume(Key key, Value value);

H>>Общее требования к обработчику очередей:

H>> потокобезопасный
H>> эффективный, быстрый, масштабируемый
H>> легко расширяемый (с точки зрения кода)
H>> подписаться/отписаться на разбор очереди в отдельном потоке одним фиксированным консьюмером (одна очередь — один консьюмер)
H>>Я написал реализацию, но она их чем-то не удовлетворила (const мало, да лямбд не обнаружено, да аллокаций много).

AJD>Разве не достаточно что пункт №2 не выполнен? Тормозная, использующий много аллокаций и как следствие не масштабируемая реализация. Никак не использовано ограничение: много писателей, один читатель.


Нашел описание и реализации MRSW очередей и локов, попробую через них + фиксированный размер очереди (массив). Можете порекомендовать литературу для чтения (на англ)?

AJD>Если это какая-то финансовая контора для которой критична скорость — результат ожидаем.


Ага, она самая.


H>>Пожалуйста, прокоментируйте реализацию, архитектуру и пр. Очень интересно создать "продуктовое решение"

AJD>Используя тот же Boost Asio можно было все проще и эффективней сделать.

Надо с бустом разобраться.
Re[2]: ревью multi-queue processor
От: Hydrophobia  
Дата: 07.09.19 06:03
Оценка:
Здравствуйте, lpd, Вы писали:


H>>Пожалуйста, прокоментируйте реализацию, архитектуру и пр. Очень интересно создать "продуктовое решение"


lpd>Все не смотрел, но использование sleep() в MultiQueueProcessor:Process() очень странно. Лучше использовать conditional variable. Здесь вроде описано: статья


Я пробывал через CV, но почему-то лок на мьютексе CV был самым узким местом, вероятно надо было делать notify_one() на вставке и освобождать лок как можно скорее при пробуждении.
Re[2]: ревью multi-queue processor
От: Hydrophobia  
Дата: 07.09.19 06:10
Оценка:
Здравствуйте, Kernan, Вы писали:

H>>Пожалуйста, прокоментируйте реализацию, архитектуру и пр. Очень интересно создать "продуктовое решение"

K>1. Самое важное. Если это то, что ты выложил это то, что ты послал, это уже фейл. Его ни собрать, ни потестить нельзя. Смотреть код ИМХО смысла нет после этого. Про тесты сказали, но ИМХО, надо хотя бы минимальный тестовый код в main иметь чтобы это запускалось и выводило что-то. Юнит тесты это дополнительная штука за + к баллам.

Моя практика показывает, что на юнит-тесты никто не смотрит и никакие доп. баллы не зачисляет.

K>2. Ты нифига не понял что от тебя хотят. Если бы ты привёл конкретное задание, то было бы проще.


Текст задания я привел в изначальном посте, интерефейсы для IConsumer и MultiQueueProcessor были даны. Я понял, что есть N поставщиков данных, есть К потребителей. Через подписку потребитель получает те данные, на которые подписался. В реализации старался учитывать, что поставщиков и потребителей может быть под 1M в разных пропорциях от 1М потребителей без поставщиков до 1М поставщиков и потребителей. Основной проблемой (и затыком в производительности) стала выборка актуальных пар (поставщик:потребитель) и рассылка данных.
Re[3]: ревью multi-queue processor
От: PM  
Дата: 07.09.19 07:24
Оценка:
Здравствуйте, Hydrophobia, Вы писали:

H>Нашел описание и реализации MRSW очередей и локов, попробую через них + фиксированный размер очереди (массив). Можете порекомендовать литературу для чтения (на англ)?


Насколько я понял задание, нужна multi-producer, single-consumer queue, remark раньше много написал на эту тему. На английском есть у него на сайте: http://www.1024cores.net/home/lock-free-algorithms/queues
Re[4]: ревью multi-queue processor
От: Hydrophobia  
Дата: 07.09.19 08:15
Оценка:
Здравствуйте, PM, Вы писали:

H>>Нашел описание и реализации MRSW очередей и локов, попробую через них + фиксированный размер очереди (массив). Можете порекомендовать литературу для чтения (на англ)?



MWSR конечно же.

PM>Насколько я понял задание, нужна multi-producer, single-consumer queue, remark раньше много написал на эту тему. На английском есть у него на сайте: http://www.1024cores.net/home/lock-free-algorithms/queues


спасибо, почитаю.
Re[3]: ревью multi-queue processor
От: AndrewJD США  
Дата: 09.09.19 17:17
Оценка:
Здравствуйте, Hydrophobia, Вы писали:

H>Нашел описание и реализации MRSW очередей и локов, попробую через них + фиксированный размер очереди (массив). Можете порекомендовать литературу для чтения (на англ)?


Например
"For every complex problem, there is a solution that is simple, neat,
and wrong."
Re: ревью multi-queue processor
От: VVV Россия  
Дата: 09.09.19 19:33
Оценка:
Здравствуйте, Hydrophobia, Вы писали:

..попробовать... (к делу не относится, но, всё же, порядок должен быть!)
H>    эффективный, быстрый
H>            Sleep(10);
            std::lock_guard<std::recursive_mutex> lock{ mtx };
...
H>                        consumerIter->second->Consume(iter->first, front);

1. Ничего со Sleep не может быть "быстрым и эффективным". Sleep(10) -это всего лишь 100 раз в секунду и, судя по коду, всего лишь по _одному_ 'Value' из каждой очереди.
2. Если приглядеться, то вся обработка идёт в функции Process, т.е. в контексте одного потока, хотя в задании сказано, что на каждую очередь может быть свой поток-обработчик. Т.е. ты реализовал _однопоточную_ обработку очередей вместо многопоточной.
3. Ты блокируешь через std::lock_guard<std::recursive_mutex> lock{ mtx }; доступ к очередям пока обрабатывается consumerIter->second->Consume(iter->first, front);, таким образом в этот момент никто не может обратиться к очереди — ни записать, ни прочитать, т.е. если обработка одной задачи займёт минуту в Consume, то _все_ потоки будут бесцельно висеть на этом мутексе. Для примера: если у тебя веб-сервер и очереди — это запросы GET на страницы, то одна лишь долгогенерируемая страница подвесит весь сервер!

Что можно сделать (не претендую на конечную истину):
1. не делать MQP активным, т.е. не должно там быть такой функции как Process, всё равно она не сможет вызвать функции в контексте другого потока.
2. сделать на каждую очередь мутекс доступа и эвент наличия данных в очереди.
например:
struct QueueHandle
{
   HANDLE mtx;
   HANDLE hasData;
   Queue *queue;
};


Эта структура или указатель или ссылка или... на неё возвращаются из Subsctibe.

Писатель пишет (псевдокод):
WaitForSingleObject(qh->mtx);
qh->queue->push_back(...);
SetEvent(qh->hasData)
ReleaseMutex(qh->mtx);


Читатель читает (псевдокод):
WaitForSingleObject(qh->hasData);
while(true)
{
   WaitForSingleObject(qh->mtx);
   Value *d=qh->queue->GetData();
   RelaseMutex(qh->mtx);
   if(d == NULL) 
     break;

   Consume(id_, d);
}


При данном подходе данные будут обрабатываться в контексте нужного потока, обработка будет многопоточной и отдельный поток не будет "тормозить" всю программу (ну, кроме как, если queue->GetData повиснет, то будет, но, обычно, это очень лёгкая функция.. (просьба к lock-free тут не апеллировать, тут это не нужно IMHO)).
Re[3]: ревью multi-queue processor
От: Mr.Delphist  
Дата: 10.09.19 06:08
Оценка:
Здравствуйте, Hydrophobia, Вы писали:

H>Моя практика показывает, что на юнит-тесты никто не смотрит и никакие доп. баллы не зачисляет.


Самому педалить легче, ибо понимаешь, как этим можно будет пользоваться
Re[2]: ревью multi-queue processor
От: Hydrophobia  
Дата: 10.09.19 06:45
Оценка:
Здравствуйте, VVV, Вы писали:


VVV>..попробовать... (к делу не относится, но, всё же, порядок должен быть!)


спасибо за ревью, но Вы посмотрели код, который прислали вместе с тестовым заданием. Моя реализация доступна на dropbox по линку из поста
Re[3]: ревью multi-queue processor
От: VVV Россия  
Дата: 10.09.19 10:55
Оценка:
Здравствуйте, Hydrophobia, Вы писали:


H>спасибо за ревью, но Вы посмотрели код, который прислали вместе с тестовым заданием.


Ок.

пример предоставленного кода

понял как "пример предоставленного ТОБОЙ кода".

Смотрим твой код:
1. sleep в библиотеке в

эффективный, быстрый

использоваться не должен! К тому же, у тебя нет функции для изменения этого таймаута.

2. Вместо одного потока, жрущего процессорное время, ты сделал maxThreads потоков. Если посмотреть что делает потоковая функция Process без консумеров и без данных в очередях, то это просто пустой бесконечный цикл:

    void Process()
    {

        while (running)
        {
            continue;
        }
    }


скорее всего, 100% загрузки процессора, а на мобильных устройствах — ещё и бешеный разряд батарейки.

3. Самый главный недостаток реализации: ты вызываешь link.mConsumer->Consume(link.mKey, value); из своего потока, а не из потока консумера! Это значит, что Tls (thread local storage) консумера будет этому консумеру _недоступен_. И потоконебезопасные библиотеки невозможно будет использовать в этом коде.

4. возможный AV:
        while (running)
        {
            if (!links.get(iter, link)) {
                ++iter;
                continue;
            }
//
// в этом временном интервале может случиться Unsubscribe и удаление консумера из памяти
//

            try {
                auto value = Dequeue(link.mKey);
                if (value)
                    link.mConsumer->Consume(link.mKey, value); //<<<=== тут указатель mConsumer может быть уже "протухший"
            }
            catch (...) {
            }
        }


5. не очень понял идею links
6. не существенно, но в интерфейсы обычно не вставляют виртуальный деструктор:
struct IConsumer
{
    virtual ~IConsumer() noexcept {} // <<<=== не нужен здесь.
    virtual void Consume(Key const &key, std::shared_ptr<Value> value) = 0;
};
Re[4]: ревью multi-queue processor
От: Hydrophobia  
Дата: 11.09.19 07:21
Оценка:
Здравствуйте, VVV, Вы писали:

VVV>Смотрим твой код:

VVV>1. sleep в библиотеке в
VVV>

VVV>эффективный, быстрый

VVV> использоваться не должен! К тому же, у тебя нет функции для изменения этого таймаута.

VVV>2. Вместо одного потока, жрущего процессорное время, ты сделал maxThreads потоков. Если посмотреть что делает потоковая функция Process без консумеров и без данных в очередях, то это просто пустой бесконечный цикл:

VVV>скорее всего, 100% загрузки процессора, а на мобильных устройствах — ещё и бешеный разряд батарейки.


таймаут контролируется при создании MultiQueueProcessor и должен быть рассчитан исходя из данных о потребителях (среднее время Consume) и желаемой длины очереди данных. 100% загрузки не будет, т.к. висит sleep, можно переделать на CV, эффект тот же.

VVV>3. Самый главный недостаток реализации: ты вызываешь link.mConsumer->Consume(link.mKey, value); из своего потока, а не из потока консумера! Это значит, что Tls (thread local storage) консумера будет этому консумеру _недоступен_. И потоконебезопасные библиотеки невозможно будет использовать в этом коде.


задача была "вызвать consume подписчика", если бы задача стояла "послать уведомление подписчику и он бы сам выгребал данные из очереди" то было бы так, как Вы описали.


VVV>4. возможный AV:


указатель на Consumer — std::shared_ptr, объект links хранит свою копию std::shared_ptr<IConsumer>, поэтому вызов будет безопасным.


VVV>5. не очень понял идею links


идея простая — иметь список валидных пар {очередь, потребитель} для уменьшения издержек при выборке "есть данные, кому их можно отдать?" и какой-то балансировки. Например, есть 1М потребителей и 1М очередей, как быстро найти связи между ними и обеспечить более-менее равномерную выдачу?

VVV>6. не существенно, но в интерфейсы обычно не вставляют виртуальный деструктор:


Это C++ и наличие реализации виртуального деструктора в интерфейсе необходимо. Иначе будет утечка памяти при удалении указателя на объект через базовый.

class IConsumer {};
class Consumer : IConsumer {};

IConsumer *ptr = new Consumer(...);
delete ptr; // без вирт. деструктора удалится только часть объекта, принадлежащая к IConsumer.
Re[5]: ревью multi-queue processor
От: VVV Россия  
Дата: 11.09.19 16:03
Оценка:
Здравствуйте, Hydrophobia, Вы писали:

Попробую ещё раз проревьювить твой код:

1. sleep не должен использоваться в подобной библиотеке. Только из-за наличия sleep в том месте, где он у тебя используется, твой код могли забраковать

2.H>таймаут контролируется при создании MultiQueueProcessor и должен быть рассчитан исходя из данных о потребителях

кто и по каким критериям "должен" рассчитать таймаут? Это задача может быть трудновыполнима или совсем невыполнима. Например, это очереди звонков в разных колл-центрах, с 8:00 до 17:00 — 1000 звонков в секунду, а с 17:00 до 8:00 — 0 звонков и в праздники мы не работаем — 0 звонков. Но есть одна очередь — может быть всего 1 звонок за всё время, но его надо обработать "моментально", т.к. это приказ на запуск ракет с ЯО по врагу. Рассчитай тут таймаут? И даже если бы ты сделал "адаптивный" таймаут (больше данных обрабатывается в единицу времени — меньше таймаут и наоборот, если данных на N итераций подряд нет, то таймаут увеличивается), то это тоже не решение для многопоточной реализации.

VVV>>3. Самый главный недостаток реализации: ты вызываешь link.mConsumer->Consume(link.mKey, value); из своего потока, а не из потока консумера! Это значит, что Tls (thread local storage) консумера будет этому консумеру _недоступен_. И потоконебезопасные библиотеки невозможно будет использовать в этом коде.


H>задача была "вызвать consume подписчика", если бы задача стояла "послать уведомление подписчику и он бы сам выгребал данные из очереди" то было бы так, как Вы описали.


Задание говорит: подписаться/отписаться на разбор очереди в отдельном потоке одним фиксированным консьюмером (одна очередь — один консьюмер). Или перевод на русский хромает, но тут сказано, что разбор очереди должен происходить в потоке консумера, у тебя же подписка идёт в потоке консумера, а обработка в другом потоке, что противоречит заданию.

VVV>>4. возможный AV:


H>указатель на Consumer — std::shared_ptr, объект links хранит свою копию std::shared_ptr<IConsumer>, поэтому вызов будет безопасным.

Ты не должен использовать шаред_птр на переданный тебе 'интерфейс' IConsumer. Твоя библиотека _не_должна_ распоряжаться временем жизни этих интерфейсов. И не должна вызывать к этим указателям delete! Твоя библиотека не владеет информацией откуда пришёл интерфейс и как он был создан: по new, по malloc, в стеке или это, вообще, статический объект static или этот интерфейс пришёл из DLL со своим менеджером памяти или, даже, эта DLL написана на другом языке: VB, Delphi, ... .

Мне кажется, что по условию задачи надо было сделать многопоточную реализацию вместо однопоточной, которую тебе дали в качестве примера. При этом сохранить все интерфейсы:

и набор интерфейсов с "неправильной" реализацией:


т.е. интерфейсы правильные, а вот реализация однопоточная ("неправильная"). Таким образом, тебе надо было оставить сигнатуры функций как они есть, но сделать новую многопоточную реализацию.

void Subscribe(Key id, IConsumer<Key, Value> * consumer);

Вот так и надо было оставить сигнатуру функции и в себе хранить просто указатель consumer.

VVV>>5. не очень понял идею links


H>идея простая — иметь список валидных пар {очередь, потребитель} для уменьшения издержек при выборке "есть данные, кому их можно отдать?" и какой-то балансировки. Например, есть 1М потребителей и 1М очередей, как быстро найти связи между ними и обеспечить более-менее равномерную выдачу?


Связи найти очень просто — читать задание — (одна очередь — один консьюмер) — по key выбирается очередь и связанный с ней консумер. Т.е., даже в твоей реализации, в links достаточно хранить только лишь key, по которому найдём и очередь и консумера. Про 1М в задании не сказано, а сказано "обработчик нескольких очередей". И теперь уже не понял про "равномерную выдачу" — что это за сущность? что она делает? где про неё в задании, чтобы сравнить "что просили" и "что получилось"?


VVV>>6. не существенно, но в интерфейсы обычно не вставляют виртуальный деструктор:


H>Это C++ и наличие реализации виртуального деструктора в интерфейсе необходимо. Иначе будет утечка памяти при удалении указателя на объект через базовый.


В 'интерфейсах' виртуальный деструктор — нонсенс (в абстрактных классах — да, нормально). Смотри для примера COM-овские 'интерфейсы' IUnknown и т.д.

H>
H>class IConsumer {};
H>class Consumer : IConsumer {};

H>IConsumer *ptr = new Consumer(...);
H>delete ptr; // без вирт. деструктора удалится только часть объекта, принадлежащая к IConsumer.
H>


отсылаю к пункту 4 — твоя библиотека _не_должна_ вызывать delete к переданным ей указателям на 'интерфейсы'. Буква I в IConsumer намекает, что это Interface. По этой теме тебе стоит прочитать про 'абстрактные классы' и про 'интерфейсы' и про их сходства и различия и где какой применяется.

7. IMHO ты реализовал не то, что было в задании. В задании был пример очереди с многими потоками 'писателей' и одним потоком 'читателя'. Нужно было сделать очередь: много потоков писателей и много потоков читателей. Ты сделал много потоков писателей и пул потоков, которые умеют вызывать коллбэк функцию. Видимо, поэтому реализация "не подошла".

P.S. Но, в любом случае, ты всегда можешь остаться при своём мнении со временем всё встанет на свои места.
Re[4]: ревью multi-queue processor
От: Kernan Ниоткуда https://rsdn.ru/forum/flame.politics/
Дата: 12.09.19 16:55
Оценка:
Здравствуйте, VVV, Вы писали:

VVV>3. Самый главный недостаток реализации: ты вызываешь link.mConsumer->Consume(link.mKey, value); из своего потока, а не из потока консумера! Это значит, что Tls (thread local storage) консумера будет этому консумеру _недоступен_. И потоконебезопасные библиотеки невозможно будет использовать в этом коде.

А как будет выглядеть вызов из потока консьюмера?
Sic luceat lux!
Re[5]: ревью multi-queue processor
От: Hydrophobia  
Дата: 12.09.19 19:49
Оценка: 1 (1)
Здравствуйте, Kernan, Вы писали:


VVV>>3. Самый главный недостаток реализации: ты вызываешь link.mConsumer->Consume(link.mKey, value); из своего потока, а не из потока консумера! Это значит, что Tls (thread local storage) консумера будет этому консумеру _недоступен_. И потоконебезопасные библиотеки невозможно будет использовать в этом коде.

K>А как будет выглядеть вызов из потока консьюмера?

подозреваю что любой IPC метод подойдет (очередь сообщений, пайпы, файлы, сокеты, нотификации, пр).
Re[6]: ревью multi-queue processor
От: VVV Россия  
Дата: 12.09.19 20:36
Оценка:
Здравствуйте, Hydrophobia, Вы писали:

K>>А как будет выглядеть вызов из потока консьюмера?


H>подозреваю что любой IPC метод подойдет (очередь сообщений, пайпы, файлы, сокеты, нотификации, пр).


О! Ты не так прост!! Начальный топик был троллинг что ли???
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.