Swap 2 buffers
От: Nuzhny Россия https://github.com/Nuzhny007
Дата: 08.08.19 16:25
Оценка:
Приветствую!
Хочу разобраться с тем, как сделать работу с двумя буферами на С++ без явного использования мьютексов в качестве упражнения. Реальный прототип — это два изображения, а не целые числа, с двумя мьютексами и условными переменными у меня работает, но хочется сделать это проще, быстрее и посовременней.
Есть два буфера, в один из которых что-то записывается в одном потоке, а другой, в который уже записалось, обрабатывается и выводится на консоль:
    int buf[2] = { 0 }; // Two buffers
    size_t currInd = 0; // Current ready buffer index


Генерация данных происходит в отдельном потоке Produce, а "обработка" и вывод на консоль в основном потоке. Синхронизироваться пытаюсь с помощью очереди с указателем на текущий буфер и std::promise.
В результате должны выводиться последовательно числа от 10 до 50, но оно падает где-то посередине с std::future_error: Promise already satisfied.
Вот. Начал задумываться над всем этим после подсказки eao197 (ему спасибо большое), захотелось разобраться самому, а не брать сразу sobjectizer, но как-то тяжеловато даётся многопоточность на современном С++.

Сам код:
#include <iostream>
#include <ctime>
#include <future>
#include <thread>
#include <random>
#include <deque>

///
struct Data
{
    Data(int* pBuf) : m_pBuf(pBuf)
    {
    }

    int* m_pBuf = nullptr;
    std::promise<bool> m_promise;
};

///
void Produce(bool& stopFlag, std::deque<Data>& tasks)
{
    std::cout << "Produce initialization..." << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100 + rand() % 1000));

    int numbers = 10;

    for (; !stopFlag;)
    {
        while (tasks.empty());

        std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
        tasks.front().m_pBuf[0] = ++numbers;
        tasks.front().m_promise.set_value(true);
        std::cout << "Produce next " << numbers << std::endl;
    }

    std::cout << "Produce finish" << std::endl;
}

///
int main()
{
    srand(1024);

    int buf[2] = { 0 }; // Two buffers
    size_t currInd = 0; // Cureent ready buffer

    bool stopFlag = false;

    std::deque<Data> tasks;

    auto produceTh = std::thread(Produce, std::ref(stopFlag), std::ref(tasks));

    // Wait for initialization and first result
    {
        tasks.emplace_back(&buf[0]);
        std::future<bool> answer = tasks.back().m_promise.get_future();
        answer.wait();
        std::cout << "Initialization result " << answer.get() << " with value " << buf[0] << std::endl;
        tasks.pop_back();
    }

    // Work with swap buffers
    for (; !stopFlag;)
    {
        tasks.emplace_back(currInd ? &buf[0] : &buf[1]); // Push next buf
        std::future<bool> answer = tasks.back().m_promise.get_future();

        auto resNumber = buf[currInd];
        std::cout << "Read prev result " << resNumber << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
        std::cout << "Process prev result " << resNumber << std::endl;

        currInd = currInd ? 0 : 1;
        answer.wait();
        std::cout << "Get produce result " << answer.get() << " with value " << buf[currInd] << std::endl;

        tasks.pop_back();

        if (buf[currInd] > 50)
        {
            stopFlag = true;
        }
    }

    produceTh.join();

    std::cout << "The end" << std::endl;
    return 0;
}
Re: Swap 2 buffers
От: andrey.desman  
Дата: 08.08.19 16:41
Оценка:
Здравствуйте, Nuzhny, Вы писали:

А как оно вообще должно работать?
!tasks.empty() тебе не гарантирует, что tasks.front() вернет что-то сомысленное или ссылку на что-то осмысленное.
m_promise.set_value(true) не гарантирует, что произойдет переключение на того, кто ждет, и он успеет сделать pop_back().
Даже с учетом слипов.
Re: Swap 2 buffers
От: watchmaker  
Дата: 08.08.19 16:43
Оценка:
Здравствуйте, Nuzhny, Вы писали:


N>
N>        tasks.front().m_promise.set_value(true);
N>

По сути тут вызывается m_promise.set_value в цикле много-много раз. Конечно, это работать не будет: в promise можно положить значение только раз.

N>
N>        while (tasks.empty());
N>

Тут читается tasks, который записывается без синхронизации из другого потока. Тоже баг
Re[2]: Swap 2 buffers
От: Nuzhny Россия https://github.com/Nuzhny007
Дата: 09.08.19 03:20
Оценка:
Здравствуйте, andrey.desman, Вы писали:

AD>А как оно вообще должно работать?


Как обычно в этих случаях и работает: в один буфер пишем со звуковой карты, а другой слушаем — потом меняем, в один буфер рисуем, а другой выводим на экран — потом меняем. Приём старый, решается мьютексами и condition variable, а мне интересно без них.
Re[2]: Swap 2 buffers
От: Nuzhny Россия https://github.com/Nuzhny007
Дата: 09.08.19 03:29
Оценка:
Здравствуйте, watchmaker, Вы писали:

W>По сути тут вызывается m_promise.set_value в цикле много-много раз. Конечно, это работать не будет: в promise можно положить значение только раз.

Да, так оно и есть, ошибка об этом и говорит.

W>Тут читается tasks, который записывается без синхронизации из другого потока. Тоже баг

Я как-то не могу прикинуть, сколько надо примитивов, чтобы всё заработало и каких. Так-то с 2-мя мьютексами и двумя condition variable работает: produce ждёт v1 — вывод одного буфера завершён, начинает заполнять, устанавливает v2, что он закончил. Основной поток ждёт v2, начинает вывод того, что сгенерировалось и выводит, устанавливает v1, что он закончил.
Но мня в процессе чтения книжки про conrurrency захотелось сделать это на более высоком уровне.
Я не прошу написать код, интересно, как другие решают эту задачу, её-то уже тонны лет. Видимо, одного механизма типа promise мало, надо минимум 2. Или не их совсем.
Re: Swap 2 buffers
От: Masterspline  
Дата: 09.08.19 03:55
Оценка:
N>Есть два буфера, в один из которых что-то записывается в одном потоке, а другой, в который уже записалось, обрабатывается и выводится на консоль:

Выглядит, как стандартная задача производитель-потребитель (producer/consumer). Есть очередь, в которую производитель кладет указатели на буферы с готовыми данными, а потребитель их обрабатывает и ждет на условной переменной, когда нет готовых буферов. Если не хочется выделять память под буфер каждый раз (или по другой причине), то освобожденные буферы отдаются обратно через другую очередь. Тогда будет две очереди, два мьютекса и две условных переменных (для буферов идущих в одну сторону — с данными и в другую — пустыми). Возможно, в данной задаче очередь — это единственный указатель.

Можно сделать lockfree очереди, но тогда придется использовать специфические для OS примитивы для ожидания потока, когда ему нужно ждать (нет пустых буферов или нет заполненных). Для Linux — это futex, для Win есть аналог, начиная с win8 и 2012, кажется, для Mac, похоже, используют семафоры.

Еще, в конкретно этой задаче, возможно, можно сделать, чтобы оба потока работали в режиме считал данные в буфер-обработал-считал-обработал. И, если чтение или обработка требуют эксклюзивного доступа — использовать мьютексы (один на чтение в буфер, другой на обработку). Останется только гарантировать упорядочивание обработки, чтобы ранее считанные данные и обработаны были раньше, но с мьютексами это совсем просто.
Отредактировано 09.08.2019 4:09 Ssd13 . Предыдущая версия .
Re[2]: Swap 2 buffers
От: Masterspline  
Дата: 09.08.19 04:21
Оценка:
N>>
N>>        tasks.front().m_promise.set_value(true);
N>>

W>По сути тут вызывается m_promise.set_value в цикле много-много раз. Конечно, это работать не будет: в promise можно положить значение только раз.

Как раз эта часть должна работать. Там каждый раз в tasks кладут новый promise. Немного нестандартный подход, но, похоже, это единственная часть, которая написана правильно, потому что на этом promise делается синхронизация. И то, что положили в данные в одном потоке, смогут прочитать на другом.
Отредактировано 09.08.2019 5:42 Ssd13 . Предыдущая версия .
Re[3]: Swap 2 buffers
От: Masterspline  
Дата: 09.08.19 04:29
Оценка:
N>Как обычно в этих случаях и работает: в один буфер пишем со звуковой карты, а другой слушаем — потом меняем, в один буфер рисуем, а другой выводим на экран — потом меняем. Приём старый, решается мьютексами и condition variable, а мне интересно без них.

Массив из двух структур: данные, флаг готовности данных и мьютекс. Потребитель захватывает мьютекс и если данные готовы — обрабатывает, сбрасывает флаг, отпускает мьютекс. Производитель наоборот, захватывает мьютекс, если буфер чист, записывает данные, ставит флаг, отпускает мьютекс. Только надо что-то придумать, чтобы не получилось, что при отсутствии данных в обоих структурах потребитель начнет их перебирать очень быстро, а производитель заполнит оба буфера и потребитель считает сначала более поздние данные. Можно, например, завести атомарный счетчик, сколько буферов заполнил производитель, чтобы потребитель делал count%2 и пытался захватить мьютекс на этой структуре с данными. Или каждый производитель и потребитель ведут свой счетчик (неатомарный) и не пытаются захватывать мьютекс на неподходящей структуре с буфером.
Отредактировано 09.08.2019 4:31 Ssd13 . Предыдущая версия .
Re: Swap 2 buffers
От: Masterspline  
Дата: 09.08.19 05:42
Оценка: 5 (1) +1
Во-первых, в твоем коде stopFlag должен быть атомарным. Суть атомарных переменных в первую очередь в том, что они обеспечивают синхронизацию памяти. Грубо говоря, если ты сделал чтение и записи в одном потоке, а затем записал атомарную переменную, то в другом потоке, увидев запись в атомик, также увидят и предыдущие записи в неатомарные переменные. Иначе, компилятор или процессор их может переупорядочить, а атомик такую оптимизацию запрещает (до атомика можно переупорядочивать, но нельзя его пересекать). Кроме того, неатомарная переменная может быть закеширована (компилятор считает, что только текущий поток может к ней обращаться).

Также твой task может быть модифицирован в main и одновременно прочитан в producer. Его нужно защитить мьютексом.

Вот мой код, описание в предыдущем комментарии.
  Код
#include <iostream>
#include <atomic>
#include <mutex>
#include <thread>

struct Data
{
    Data( int* pBuf ) : m_pBuf( pBuf )
    {}

    int* m_pBuf = nullptr;
    bool m_empty = true;
    std::mutex m_mutex;
};

void Produce( std::atomic<bool>& stopFlag, Data* data )
{
    std::cout << "Produce initialization..." << std::endl;

    int numbers = 10;
    uint32_t done_counter = 0;

    while( !stopFlag )
    {
        auto& curr_data = data[done_counter % 2];
        std::unique_lock lock( curr_data.m_mutex );
        if( !curr_data.m_empty )
        {
            lock.unlock();
            // busy wait
            std::this_thread::sleep_for( std::chrono::microseconds( 10 ));
            continue;
        }

        std::this_thread::sleep_for( std::chrono::milliseconds( rand() % 100 ));
        curr_data.m_pBuf[0] = ++numbers;
        curr_data.m_empty = false;

        done_counter++;

        std::cout << "Produce next " << numbers << std::endl;
    }

    std::cout << "Produce finish" << std::endl;
}

int main()
{
    srand( 1024 );

    int buf[2] = {0}; // Two buffers
    uint32_t done_counter = 0; // buffers proccessed
    std::atomic<bool> stopFlag = false;
    Data data[2] = { buf, buf + 1 };

    auto produceTh = std::thread( Produce, std::ref( stopFlag ), data );

    // Work with swap buffers
    while( !stopFlag )
    {
        auto& curr_data = data[done_counter % 2];

        std::unique_lock lock( curr_data.m_mutex );
        if( curr_data.m_empty )
        {
            lock.unlock();
            // busy wait
            std::this_thread::sleep_for( std::chrono::microseconds( 10 ));
            continue;
        }

        auto& curr_buff = buf[done_counter % 2];
        // use data in buffer
        std::cout << "Get produce result with value "
                  << curr_buff << std::endl;

        curr_data.m_empty = true;
        done_counter++;

        if( curr_buff > 50 )
            stopFlag = true;
    }

    produceTh.join();

    std::cout << "The end" << std::endl;
    return 0;
}
Re[3]: Swap 2 buffers
От: Masterspline  
Дата: 09.08.19 05:47
Оценка:
W>>По сути тут вызывается m_promise.set_value в цикле много-много раз. Конечно, это работать не будет: в promise можно положить значение только раз.
N>Да, так оно и есть, ошибка об этом и говорит.

Как я понял, там ошибка возникает из-за того, что Produce пытается положить данные в тот же таск, который был на предыдущей итерации. Из-за того, что main не прочитал эти данные и не пересоздал task. Т.е. ошибка из-за многократной записи, но потому что main не успел сделать tasks.pop_back() и создать новый task. Если бы оба потока успевали отрабатывать по очереди — ошибки не было бы.
Re: Swap 2 buffers
От: Masterspline  
Дата: 09.08.19 05:55
Оценка:
Если делать без busy wait, потребуются условные переменные.
Re[2]: Swap 2 buffers
От: Nuzhny Россия https://github.com/Nuzhny007
Дата: 09.08.19 06:13
Оценка:
Здравствуйте, Masterspline, Вы писали:

M>Во-первых, в твоем коде stopFlag должен быть атомарным. Суть атомарных переменных в первую очередь в том, что они обеспечивают синхронизацию памяти. Грубо говоря, если ты сделал чтение и записи в одном потоке, а затем записал атомарную переменную, то в другом потоке, увидев запись в атомик, также увидят и предыдущие записи в неатомарные переменные. Иначе, компилятор или процессор их может переупорядочить, а атомик такую оптимизацию запрещает (до атомика можно переупорядочивать, но нельзя его пересекать). Кроме того, неатомарная переменная может быть закеширована (компилятор считает, что только текущий поток может к ней обращаться).


О, точно, за atomic спасибо.

M>Также твой task может быть модифицирован в main и одновременно прочитан в producer. Его нужно защитить мьютексом.

M>Вот мой код, описание в предыдущем комментарии.

Ну, это как раз и получится код с двумя мьютексами и двумя условными переменными (ну или wait в цикле).
Re[3]: Swap 2 buffers
От: sergii.p  
Дата: 09.08.19 06:22
Оценка: +1
Здравствуйте, Masterspline, Вы писали:

N>>>
N>>>        tasks.front().m_promise.set_value(true);
N>>>

W>>По сути тут вызывается m_promise.set_value в цикле много-много раз. Конечно, это работать не будет: в promise можно положить значение только раз.

M>Как раз эта часть должна работать. Там каждый раз в tasks кладут новый promise. Немного нестандартный подход, но, похоже, это единственная часть, которая написана правильно, потому что на этом promise делается синхронизация. И то, что положили в данные в одном потоке, смогут прочитать на другом.


Producer может успеть положить дважды значение по tasks.front(). При этом consumer не успеет запихнуть новый элемент в tasks. По хорошему, producer должен и положить новые данные в буфер. Но тогда совсем не понятно зачем там promise — ведь если данные в очереди, значит они уже обработаны.
Вообще мне кажется, эта задача проще решается через conditional_variable и TS зря пытается уйти от этого решения.
Re[3]: Swap 2 buffers
От: Masterspline  
Дата: 09.08.19 06:23
Оценка:
N>Я как-то не могу прикинуть, сколько надо примитивов, чтобы всё заработало и каких.

Если тебе нужно ожидание без busy wait, то нужна условная переменная, а значит и mutex. Если нужно реализовать ожидание обоих потоков — две условных переменных. Мьютексов тоже будет два, потому что он защищает данные, с которыми работает поток (два потока — два мьютекса).

Можно попробовать продумать ожидание на одной условной переменной, т.к. ждать будет максимум один поток (либо производитель, либо потребитель). Но код, однозначно, станет сложнее и менее стандартным. А значит, проще запутаться и сложнее разобраться (новому участнику проекта). А главное, если потоков-производителей (или потребителей) станет не один, а два, то может получится, что на этой переменной ждут и производитель и потребитель и notify_all разбудит обоих...

Про lockfree я уже писал. Сделать можно, довольно легко, но будут примитивы для ожидания, специфичные для ОС. Правда, lockfree это называть слишком громко — будет массив атомарных указателей для свободных буферов и буферов с данными (по два указателя в обоих массивах). И нечто типа условной переменной (futex), на которой будет спать поток (по одной для производителя и потребителя). Этому futex нужно будет сделать notify, когда в соответствующий массив записывается атомарный указатель на буфер.

  Мой Futex для Linux
futex.hpp
Там в случае пустой очереди wait() придется сделать два раза. Т.е. пройтись по lockfree очереди с данными (у тебя массив из двух указателей), если оба нули, то сделать wait(), он установит флаг и сразу вернется, затем снова пройтись по пустой очереди и сделать wait() и вот тут поток уснет. На самом деле ты в любом случае будешь бегать по очереди с данными в цикле (while(!stopFlag)). И два wait() + лишний проход по очереди, лишь слегка замедлит засыпание потока, но ему и так по сути нечего делать. Сделано, чтобы не пропустить момент, когда другой поток добавил данные в очередь и сделал notify(). Но это уже не просто многопоточка, это уже lockfree, там нет эксклюзивного владения разделяемыми данными. Только при работе с локальными данными можно работать эксклюзивно, а, значит, данные того же Futex или очереди задач может в любой момент изменить любой поток.
Re[4]: Swap 2 buffers
От: Nuzhny Россия https://github.com/Nuzhny007
Дата: 09.08.19 07:29
Оценка:
Здравствуйте, sergii.p, Вы писали:

SP>Producer может успеть положить дважды значение по tasks.front(). При этом consumer не успеет запихнуть новый элемент в tasks. По хорошему, producer должен и положить новые данные в буфер. Но тогда совсем не понятно зачем там promise — ведь если данные в очереди, значит они уже обработаны.

SP>Вообще мне кажется, эта задача проще решается через conditional_variable и TS зря пытается уйти от этого решения.

Мне интересно. Например, с помощью feature|async это можно решить очень легко:
  Код
///
class Producer
{
public:
    Producer(int initVal)
        : m_numbers(initVal)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100 + rand() % 1000));
    }

    int Generate()
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
        return ++m_numbers;
    }
    int Value()
    {
        return m_numbers;
    }

private:
    int m_numbers = 0;
};

///
int main()
{
    srand(1024);

    int buf[2] = { 0 }; // Two buffers
    size_t currInd = 0; // Current ready buffer index

    std::cout << "Produce initialization..." << std::endl;
    Producer prod(10);
    buf[0] = prod.Generate();

    ///
    struct Produce
    {
        Produce(Producer* prod)
            : m_prod(prod)
        {}

        bool operator()(int* buf)
        {
            *buf = m_prod->Generate();
            std::cout << "Produce next " << m_prod->Value() << std::endl;
            return true;
        }

        Producer* m_prod = nullptr;
    };
    Produce prodFunc(&prod);

    // Work with swap buffers
    for (;;)
    {
        std::future<bool> answer = std::async(std::launch::async, std::ref(prodFunc), (currInd ? &buf[0] : &buf[1]));

        auto resNumber = buf[currInd];
        std::cout << "Read prev result " << resNumber << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
        std::cout << "Process prev result " << resNumber << std::endl;

        currInd = currInd ? 0 : 1;
        answer.wait();
        std::cout << "Get produce result " << answer.get() << " with value " << buf[currInd] << std::endl;

        if (buf[currInd] > 50)
        {
            break;
        }
    }

    std::cout << "The end" << std::endl;
    return 0;
}

Но тут сразу 2 минуса:
1. На каждой итерации цикла будет запускаться поток.
2. Инициализация объекта Producer будет происходить в одном потоке, а использоваться он станет каждый раз в новых. Это может быть критично (OpenCL использует TLS) и хочется избежать такого случая.

Можно ли сделать так, чтобы код был такой же простой и без явных ручных синхронизаций мьютексами? Но при этом лишь один раз запустить поток и класс Producer prod(10) создать в нём.
Re[5]: Swap 2 buffers
От: andrey.desman  
Дата: 09.08.19 07:59
Оценка:
Здравствуйте, Nuzhny, Вы писали:

N>Но тут сразу 2 минуса:

N>1. На каждой итерации цикла будет запускаться поток.

Не обязательно, там может быть пул потоков.

Тут есть еще и 3-й минус — это обычный синхронный код. Можно напрямую дергать Producer::Generate() без всяких async и это бдует даже быстрее.
Re[6]: Swap 2 buffers
От: Nuzhny Россия https://github.com/Nuzhny007
Дата: 09.08.19 08:08
Оценка:
Здравствуйте, andrey.desman, Вы писали:

AD>Не обязательно, там может быть пул потоков.


Как async'у можно сказать использовать пулл потоков? Кажется, что вручную такое сделать нельзя, а на самоуправство компилятора нет смысла надеяться.

AD>Тут есть еще и 3-й минус — это обычный синхронный код. Можно напрямую дергать Producer::Generate() без всяких async и это бдует даже быстрее.


Нет, асинхронный, это видно по перемешивающемуся выводу в консоль. Но можно поставить слипы точно по 1 секунде и посмотреть на время работы:
1. мой вариант — 0m41,117s
2. Прямой вызов Producer::Generate() — 1m21,116s
Re[3]: Swap 2 buffers
От: andrey.desman  
Дата: 09.08.19 08:23
Оценка:
Здравствуйте, Nuzhny, Вы писали:

AD>>А как оно вообще должно работать?

N>Как обычно в этих случаях и работает: в один буфер пишем со звуковой карты, а другой слушаем — потом меняем, в один буфер рисуем, а другой выводим на экран — потом меняем. Приём старый, решается мьютексами и condition variable, а мне интересно без них.

На таких задачах обычно есть какой-то дроп-скип, если кто-то чего-то вовремя не получил/не успел. И такие задачи обычно либо тикающие (в смысле на таймере сидят), либо синхронизируются на системе, с которой работают (vsync, push/record buffer). От этого и надо плясать.
Пытаться сделать очередь, пусть и из двух элементов, для которой ожидание необходимо, но при этом пытаться не ждать (на мьютексе, спин-локе — не важно), несколько странно.
Re[4]: Swap 2 buffers
От: Nuzhny Россия https://github.com/Nuzhny007
Дата: 09.08.19 08:29
Оценка:
Здравствуйте, andrey.desman, Вы писали:

AD>Пытаться сделать очередь, пусть и из двух элементов, для которой ожидание необходимо, но при этом пытаться не ждать (на мьютексе, спин-локе — не важно), несколько странно.


Не будем забывать, что это учебная задача. Но всё равно не так уж и странно. Можно представить, что у нас Producer получает на вход картинку, на которой тяжёлая нейросеть пытается найти объекты. Вот мы и запускаем её в отдельном потоке, а пока обрабатываем результаты предыдущего распознавания. Подчеркну, что не стоит акцентировать вопрос на предметной области. Кажется, что такая задача весьма общая и может иметь такое же общее решение. Может быть у неё есть общепринятое название? Я бы с удовольствием загуглил, но не знаю что.
Re[7]: Swap 2 buffers
От: andrey.desman  
Дата: 09.08.19 08:30
Оценка:
Здравствуйте, Nuzhny, Вы писали:

AD>>Тут есть еще и 3-й минус — это обычный синхронный код. Можно напрямую дергать Producer::Generate() без всяких async и это бдует даже быстрее.

N>Нет, асинхронный, это видно по перемешивающемуся выводу в консоль. Но можно поставить слипы точно по 1 секунде и посмотреть на время работы:
N>1. мой вариант — 0m41,117s
N>2. Прямой вызов Producer::Generate() — 1m21,116s

Тут же цикл, где на каждой итерации запускается поток (std::async), а потом синхронно ожидается его заверешение (answer.wait()). Выигрышь по времени тут только из-за параллельно выполняющихся слипов. В принципе, если в цикле вместо слипа делать что-то осознанное, но если ничего не делать, то прямой вызов будет лучше.
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.