SPSC-очередь в C++0x
От: Roman Odaisky Украина  
Дата: 04.12.09 13:03
Оценка:
Понадобилось написать очередь с одним писателем и одним читателем (single-producer/single-consumer queue) для межпоточного взаимодействия. Как всегда, готовых приличных решений не обнаружилось, пришлось писать велосипед. За основу взял эту реализацию: http://www.ddj.com/hpc-high-performance-computing/210604448, которая представляет собой очередь на основе односвязного списка и требует DefaultConstructible для хранимого типа, потому что в очереди всегда должен быть хотя бы один элемент. Идеологическая неправильность последнего побудила меня вручную заниматься конструированием объектов и выделением памяти, для чего я воспользовался аллокатором (его интерфейс проще и безопаснее, чем использование operator new напрямую). Чтобы было интереснее, стал писать на C++0x. Получилось так:
template <class X, class Allocator = std::allocator<X>>
class spsc_queue
{
public:
    spsc_queue()
    {
        node* n = allocate_node();
        front(n);
        back(n);
        zombie(n);
    }

   ~spsc_queue()
    {
        node* n = zombie()->m_next;
        deallocate_node(zombie());

        while(n)
        {
            node* next = n->m_next;
            n->destroy_value();
            deallocate_node(n);
            n = next;
        }
    }

    spsc_queue(spsc_queue const &) = delete;
    spsc_queue& operator =(spsc_queue const &) = delete;

    void push(X const& x, bool do_gc = true)
    {
        node* n = make_node(x, NULL);
        back()->m_next = n;
        back(n);

        if(do_gc)
        {
            gc();
        }
    }

    void gc()
    {
        while(zombie() != front())
        {
            deallocate_node(detach_zombie());
        }
    }

    bool pop(X& result)
    {
        if(front() == back())
        {
            return false;
        }

        result = front()->m_next->m_value;
        front(front()->m_next);
        return true;
    }

private:
    struct node
    {
        node* m_next;
        X m_value;

        void construct_value(X const& x)
        {
            new(&m_value) X(x);
        }

        void destroy_value()
        {
            m_value.~X();
        }
    };

    typedef typename Allocator::template rebind<node>::other node_allocator;

    node* allocate_node()
    {
        return allocator().allocate(1);
    }

    void deallocate_node(node* n)
    {
        allocator().deallocate(n, 1);
    }

    node* make_node(X const& x, node* next)
    {
        node* n = zombie() != front() ? detach_zombie() : allocate_node();
        n->construct_value(x);
        n->m_next = next;
        return n;
    }

    node* detach_zombie()
    {
        node* prev = zombie();
        zombie(zombie()->m_next);
        zombie()->destroy_value();
        return prev;
    }

    node* zombie() const  { return m_zombie_and_allocator.first(); }
    void  zombie(node* n) { m_zombie_and_allocator.first() = n; }

    node_allocator& allocator() { return m_zombie_and_allocator.second(); }

    node* front() const  { return m_front.load(std::memory_order_relaxed); }
    void  front(node* n) { m_front.store(n, std::memory_order_relaxed); }

    node* back() const  { return m_back.load(std::memory_order_relaxed); }
    void  back(node* n) { m_back.store(n, std::memory_order_relaxed); }

    // Written by producer
    boost::compressed_pair<node *, node_allocator> m_zombie_and_allocator;
    std::atomic<node *> m_back;

    CACHELINE_PADDING; // unsigned char[64 примерно]

    // Written by consumer
    std::atomic<node *> m_front;

};

Читатель читает элементы и двигает указатель front вперед, обозначая им последний прочитанный элемент, писатель цепляет новые узлы в хвост, обновляет back и удаляет обработанные узлы по указателю zombie. Например:
[~] → [x1] → [x2] → [x3] → [x4] → nullptr
↑            ↑             ↑
zombie       front         back

Здесь читателю доступны x3 и x4, затем pop вернет false. Ближайший вызов gc разрушит x1 и x2 и удалит первые два узла, после чего zombie и front будут оба указывать на останки x2. Вызов push создаст узел (или, чтоб добру не пропадать, возьмет zombie, если тот еще не дошел до front), инициализирует его, прицепит его после x4 и заставит back указывать на новый узел.

Весь этот механизм atomic-free (std::atomic в коде — оптическая иллюзия, в современных процессорах операции над маленькими операндами и без того атомарны).

Чтобы заставить GCC 4.4 скомпилировать сие, сверх -std=c++0x требуется и другая черная магия. У GCC есть две реализации атомарных операций, одна lock-based и одна lock-free. Последняя активизируется только в том случае, если процессор поддерживает 1-, 2-, 4- и 8-байтовые атомарные инструкции. Естественно, мало какой 32-битный процессор умеет последнее, но это же не повод вообще отключать все механизмы lock-free! Приходится подло врать компилятору, что -D_GLIBCXX_ATOMIC_BUILTINS_8=1.

Затем, разработчики libstdc++, очевидно, решили, что «чукча не писатель», соответственно, реализации std::atomic::store в библиотеке нет. Пришлось приделать вручную:
template <class X>
void std::atomic<X *>::store(X* x, std::memory_order order) volatile
{
    /*this->*/atomic_address::store(/*implicit_cast<void *>*/(x), order);
}

После этих танцев с бубном можно написать тестовую программу, скомпилировать ее с -pthread (иначе std::thread падает с SIGSEGV), после -D_GLIBCXX_USE_NANOSLEEP=1 даже заработает std::this_thread::sleep_for, и очередь вроде работает как надо.



А теперь может ли уважаемый All ответить на несколько вопросов:

1. Как теперь заставить читателя блокироваться на пустой очереди? По-видимому, надо организовать семафор (и опять писать свой велосипед вокруг <semaphore.h>, ибо из Boost семафоры выбросили уже давно), только когда его обновлять? У писателя не наблюдается надежного способа определить, остались ли еще непрочитанные элементы в очереди. Каждый раз делать семафору post очень не хочется, иначе зачем нужна такая чудесная очередь без атомарных операций?

2.
    void*
    /*std::atomic_address::*/load(memory_order __m = memory_order_seq_cst) const volatile
    {
      __glibcxx_assert(__m != memory_order_release);
      __glibcxx_assert(__m != memory_order_acq_rel);

      __sync_synchronize();
      void* __ret = _M_i;
      __sync_synchronize();
      return __ret;
    }
Зачем здесь два забора?

3. Как приделать сюда move semantics? По-видимому, надо сделать variadic templates (и std::forward) в аргументе push и разделить void pop(X &) на X& front() и void pop(), или как-то иначе?

4. Может, список нужно замкнуть? То, что back()->m_next == nullptr, играет роль только в деструкторе, да и там это не особенно нужно. Будет ли лучше, если избавиться от указателя zombie — чтобы на этот элемент указывал back()->m_next?

5. Вообще, как лучше организовывать такие очереди — с помощью списков или с помощью circular buffers?

6. А, может, вообще очередь в топку, и пусть рабочие потоки/процессы разговаривают через pipe(2)?
До последнего не верил в пирамиду Лебедева.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.