Swap 2 buffers
От: Nuzhny Россия https://github.com/Nuzhny007
Дата: 08.08.19 16:25
Оценка:
Приветствую!
Хочу разобраться с тем, как сделать работу с двумя буферами на С++ без явного использования мьютексов в качестве упражнения. Реальный прототип — это два изображения, а не целые числа, с двумя мьютексами и условными переменными у меня работает, но хочется сделать это проще, быстрее и посовременней.
Есть два буфера, в один из которых что-то записывается в одном потоке, а другой, в который уже записалось, обрабатывается и выводится на консоль:
    int buf[2] = { 0 }; // Two buffers
    size_t currInd = 0; // Current ready buffer index


Генерация данных происходит в отдельном потоке Produce, а "обработка" и вывод на консоль в основном потоке. Синхронизироваться пытаюсь с помощью очереди с указателем на текущий буфер и std::promise.
В результате должны выводиться последовательно числа от 10 до 50, но оно падает где-то посередине с std::future_error: Promise already satisfied.
Вот. Начал задумываться над всем этим после подсказки eao197 (ему спасибо большое), захотелось разобраться самому, а не брать сразу sobjectizer, но как-то тяжеловато даётся многопоточность на современном С++.

Сам код:
#include <iostream>
#include <ctime>
#include <future>
#include <thread>
#include <random>
#include <deque>

///
struct Data
{
    Data(int* pBuf) : m_pBuf(pBuf)
    {
    }

    int* m_pBuf = nullptr;
    std::promise<bool> m_promise;
};

///
void Produce(bool& stopFlag, std::deque<Data>& tasks)
{
    std::cout << "Produce initialization..." << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100 + rand() % 1000));

    int numbers = 10;

    for (; !stopFlag;)
    {
        while (tasks.empty());

        std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
        tasks.front().m_pBuf[0] = ++numbers;
        tasks.front().m_promise.set_value(true);
        std::cout << "Produce next " << numbers << std::endl;
    }

    std::cout << "Produce finish" << std::endl;
}

///
int main()
{
    srand(1024);

    int buf[2] = { 0 }; // Two buffers
    size_t currInd = 0; // Cureent ready buffer

    bool stopFlag = false;

    std::deque<Data> tasks;

    auto produceTh = std::thread(Produce, std::ref(stopFlag), std::ref(tasks));

    // Wait for initialization and first result
    {
        tasks.emplace_back(&buf[0]);
        std::future<bool> answer = tasks.back().m_promise.get_future();
        answer.wait();
        std::cout << "Initialization result " << answer.get() << " with value " << buf[0] << std::endl;
        tasks.pop_back();
    }

    // Work with swap buffers
    for (; !stopFlag;)
    {
        tasks.emplace_back(currInd ? &buf[0] : &buf[1]); // Push next buf
        std::future<bool> answer = tasks.back().m_promise.get_future();

        auto resNumber = buf[currInd];
        std::cout << "Read prev result " << resNumber << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
        std::cout << "Process prev result " << resNumber << std::endl;

        currInd = currInd ? 0 : 1;
        answer.wait();
        std::cout << "Get produce result " << answer.get() << " with value " << buf[currInd] << std::endl;

        tasks.pop_back();

        if (buf[currInd] > 50)
        {
            stopFlag = true;
        }
    }

    produceTh.join();

    std::cout << "The end" << std::endl;
    return 0;
}
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.