Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 19.12.07 15:46
Оценка:
Привет!

Недавно возникла задача написания очереди для передачи сообщений от одного потока другому.
После рассмотрения существующих реализаций, решил попробовать написать свою реализацию.
Несколько дней ушло на обдумывание и написание пробного варианта.

Возможно я изобретал колесо, тогда посьба сильно не пинать

Идея состоит в следующем.

Постановка задачи:
Есть два потока. Один пишет в очередь, другой — читает из нее.
Реализовать очередь без использования механизма блокировок.

Реализация:
В классе очереди реализовать 3 указателя на цепочки сообщений:
1. Цепочка для записи
2. Промежуточная цепочка
3. Цепочка для чтения
и флаг состояния указателя промежуточной цепочки.

Код:

template <class TYPE> class QueueNoMutex
{
public:
    QueueNoMutex();
    ~QueueNoMutex();

    // Запись в очередь
    void Enqueue(const TYPE& data);

    // Сброс в промежуточный указатель, если пишущий поток завершился раньше.
    bool Flush();

    // Прочитать из очереди следующий елемент
    bool Dequeue(TYPE& data);

private:
    struct Element
    {
        TYPE data;
        Element* next;
    };

    void DeleteChain(Element* el);

    volatile bool isTempQueueSet;

    Element *readerTop;
    Element *tempTop;
    Element *writerTop, *writerBottom;
};

template<class TYPE>
QueueNoMutex<TYPE>::QueueNoMutex()
{
    readerTop = tempTop = writerTop = writerBottom = NULL;
    isTempQueueSet = false;
}

template<class TYPE>
QueueNoMutex<TYPE>::~QueueNoMutex()
{
    DeleteChain(readerTop);
    DeleteChain(tempTop);
    DeleteChain(writerTop);
}

//------------------------------------------------------------------------------
/**
*/
template<class TYPE>
void
QueueNoMutex<TYPE>::Enqueue(const TYPE& data)
{
    // создаем новый елемент цепочки и помещаем в него данные
    Element* el = new Element;
    el->data = data;
    el->next = NULL;

    // если цепочка сообщений для записи не пуста, добавляем елемент в ее конец
    if (writerTop)
    {
        writerBottom->next = el;
        writerBottom = el;
    }
    else
    {
        // иначе это будет первый елемент цепочки
        writerTop = writerBottom = el;
    }

    // проверяем свободна ли промежуточная цепочка
    if (!isTempQueueSet)
    {
        // перекидываем в нее цепочку записи
        tempTop = writerTop;
        // освобождаем указатель вершины цепочки записи
        writerTop = NULL;
        // и устанавливаем флаг готовности промежуточной цепочки
        isTempQueueSet = true;
    }
}

template<class TYPE>
bool
QueueNoMutex<TYPE>::Flush()
{
    // если цепочка записи пуста, возвращаем успешность сброса
    if (!writerTop) return true;

    // проверяем свободна ли промежуточная цепочка
    if (!isTempQueueSet)
    {
        // перекидываем в нее цепочку записи
        tempTop = writerTop;
        // освобождаем указатель вершины цепочки записи
        writerTop = NULL;
        // и устанавливаем флаг готовности промежуточной цепочки
        isTempQueueSet = true;

        // возвращаем успешность сброса
        return true;
    }

    // цепочка все еще не сброшена
    return false;
}

template<class TYPE>
bool
QueueNoMutex<TYPE>::Dequeue(TYPE& data)
{
    // если цепочка для чтения пуста
    if (!readerTop)
    {
        // проверяем, есть ли данные в промежуточной цепочке
        if (isTempQueueSet)
        {
            // перебрасываем промежуточную цепочку в цепочку для чтения
            readerTop = tempTop;
            // обнуляем промежуточную цепочку (если используется Flush(), то не нужно)
            tempTop = NULL;
            // сообщаем, что промежуточная цепочка пуста
            isTempQueueSet = false;
        }
        else
        {
            // нет данных для чтения
            return false;
        }
    }

    // вычитываем данные из верхнего елемента цепочки
    data = readerTop->data;

    // удаляем и смещаем вершину цепочки для чтения
    Element* cur = readerTop;
    readerTop = cur->next;
    delete cur;

    return true;
}

template<class TYPE>
void
QueueNoMutex<TYPE>::DeleteChain(Element* el)
{
    while (el)
    {
        Element* cur = el;
        el = cur->next;
        delete cur;
    }
}


Код потоков для целочисленного варианта даных:


typedef QueueNoMutex<int> MyQueue;

class WriterThread: public Thread
{
public:
    void DoWork()
    {
        int data = 100;

        // основной цикл
        while (data >= 0)
        {
            queue->Enqueue(data--);

            // эмуляция работы реального потока
            Sleep(100);
        }

        // подчищаем хвосты, если последняя запись не перебросила в промежуточную цепочку
        while (!queue->Flush())
        {
            Sleep(100);
        }
    }

    // ...

private:
    MyQueue* queue;
};

class ReaderThread: public Thread
{
public:
    void DoWork()
    {
        int curVal = 1, data = 1;
        bool found = false;

        // основной цикл чтения
        while (data > 0)
        {
            while (queue->Dequeue(data))
            {
            if (curVal != data)
        {
            printf("%d != %d\n", curVal, data);
        }
        curVal = data - 1;
                found = true;
            }

            if (found)
            {
        printf("Last read: %d\n", data);
                found = false;
            }

            // эмуляция работы реального потока
            Sleep(200);
        }
        printf("Done.\n");
    }

    // ...

private:
    MyQueue* queue;
};


Еще раз уточню, что эта очередь предназначена только для одного потока записи и одного чтения.

Проверял в тестовом режиме с эмуляцией различной загруженности потоков. Работало без проблем.
Также надо будет сравнить с очередью с блокировками по скорости.
В ближайшем будуще попробую применить в реальном приложении.

Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.
Re: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 19.12.07 15:53
Оценка:
Здравствуйте, HaronK, Вы писали:


HK>Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.


А вот если писатель накидал много записей и устал, читатель их всех разгребет? По тексту не ясно.
---
С уважением,
Сергей Мухин
Re[2]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 19.12.07 16:32
Оценка:
Здравствуйте, Сергей Мухин, Вы писали:

СМ>Здравствуйте, HaronK, Вы писали:



HK>>Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.


СМ>А вот если писатель накидал много записей и устал, читатель их всех разгребет? По тексту не ясно.


Что значит "устал"? Закончил посылать сообщения?
Читатель полностью обрабатывает все сообщения из своей цепочки и если писатель использует Flush() метод, как было в примере кода для записывающего потока, то читатель получит все сообщения.
Re[3]: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 19.12.07 18:39
Оценка:
Здравствуйте, HaronK, Вы писали:

HK>Здравствуйте, Сергей Мухин, Вы писали:


СМ>>Здравствуйте, HaronK, Вы писали:



HK>>>Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.


СМ>>А вот если писатель накидал много записей и устал, читатель их всех разгребет? По тексту не ясно.


HK>Что значит "устал"? Закончил посылать сообщения?

да

HK>Читатель полностью обрабатывает все сообщения из своей цепочки и если писатель использует Flush() метод, как было в примере кода для записывающего потока, то читатель получит все сообщения.


а! я и не заметил, но тогда очень странная ситуация, писатель вроде бы как все отдал, тем не менее он еще должен заниматься flush. Не удобно.Очень
---
С уважением,
Сергей Мухин
Re: Неблокируемая очередь сообщений для двух потоков
От: Pzz Россия https://github.com/alexpevzner
Дата: 20.12.07 02:27
Оценка: 1 (1)
Здравствуйте, HaronK, Вы писали:

HK>Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.


Вы полагаетесь на то, что порядок действий над памятью (все эти манипуляции с указателями) выглядит одинаково из обеих потоков.

Во-первых, надо понимать, что компилятор может просто взять, и сделать их в другом порядке. Просто потому, что ему так удобнее. С компиятором можно бороться словом volatile — есть негласное соглашение, что компилятор никогда не меняет между собой порядок действих над volatile переменными.

Но есть и более тонкая проблема. Процессор тоже может менять порядок доступа к памяти. Это заметно только если потоки бегут на разных процессорах в многопроцессорной системе (процессор с hyperthreading'ом ведет себя в этом отношении как 2 отдельных процессора). Проявляться это будет в том, что Ваша програмка будет изредка сбоить, причем эффект будет пропадать (или усиливаться, в зависимости от Вашего везения) под отладчиком, при вставлении отладочной печати и т.п. Лечится это с помощью memory barrier'ов, ищите подробности в интернете, мне на эту тему лень лекцию читать. Да, InterlockedXXX() функции содержат внутри себя barrier, поэтому они на SMP-машинках безопасны. В отличии от обычных присваиваний. Но неизвестно, что дороже, несколько InterlockedXXX подряд, или одна честная CRITICAL_SECTION (ну или pthread_mutex, если говорить про юниксы-хрюндиксы).
Re[2]: Неблокируемая очередь сообщений для двух потоков
От: What Беларусь  
Дата: 20.12.07 03:49
Оценка: -1
Здравствуйте, Pzz, Вы писали:

Pzz>Но есть и более тонкая проблема. Процессор тоже может менять порядок доступа к памяти. Это заметно только если потоки бегут на разных процессорах в многопроцессорной системе (процессор с hyperthreading'ом ведет себя в этом отношении как 2 отдельных процессора). Проявляться это будет в том, что Ваша програмка будет изредка сбоить, причем эффект будет пропадать (или усиливаться, в зависимости от Вашего везения) под отладчиком, при вставлении отладочной печати и т.п. Лечится это с помощью memory barrier'ов, ищите подробности в интернете, мне на эту тему лень лекцию читать.

Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64 такой проблемы нет.
Из Intell TBB книжки:

Some processor architectures, such as the Intel Itanium, IBM POWER and PowerPC,
and Alpha processors, have weak memory consistency, in which memory operations
on different addresses may be reordered by the hardware for the sake of efficiency.
[skip]
If you are programming only IA-32 and Intel 64/AMD64 processor platforms,
you can skip this section. These platforms have the strongest memory consistency
models.

Re[3]: Неблокируемая очередь сообщений для двух потоков
От: K13 http://akvis.com
Дата: 20.12.07 04:43
Оценка: 1 (1)
W>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64 такой проблемы нет.
W>Из Intell TBB книжки:

Some processor architectures, such as the Intel Itanium, IBM POWER and PowerPC,
and Alpha processors, have weak memory consistency, in which memory operations
on different addresses may be reordered by the hardware for the sake of efficiency.
[skip]
If you are programming only IA-32 and Intel 64/AMD64 processor platforms,
you can skip this section. These platforms have the strongest memory consistency
models.


Лучше всего обратиться в http://groups.google.com/group/comp.programming.threads?hl=en
Там все выскажут весьма обоснованно.
Причем насколько я помню, операции чтения процессор все-таки имеет право reorder даже на x86, но в каких случаях -- лучше к специалистам в этом вопросе.
Re[4]: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 20.12.07 06:53
Оценка:
Здравствуйте, K13, Вы писали:

W>>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64 такой проблемы нет.


Самая большая проблема не в этом а:
1. решение исключительно для одного читателя-писателя
2. писатель должен ждать! читателя
---
С уважением,
Сергей Мухин
Re[5]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 20.12.07 07:38
Оценка:
Сергей Мухин wrote:
> W>>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64
> такой проблемы нет.
>
> Самая большая проблема не в этом а:
> 1. решение исключительно для одного читателя-писателя
> 2. писатель должен ждать! читателя
Со второй легко справиться в этом-же стиле — достаточно установить
признак ушедшего писателя, и проверить его из читателя. И с
переупорядочиванием операций, на первый взгляд, тоже можно —
устанавливая барьер перед изменением isTempQueueSet (например, используя
аналог InterlockedExchange для ее установки и сброса).
Posted via RSDN NNTP Server 2.1 beta
Re[6]: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 20.12.07 07:45
Оценка:
Здравствуйте, hexis, Вы писали:

H>Сергей Мухин wrote:

>> W>>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64
>> такой проблемы нет.
>>
>> Самая большая проблема не в этом а:
>> 1. решение исключительно для одного читателя-писателя
>> 2. писатель должен ждать! читателя
H>Со второй легко справиться в этом-же стиле — достаточно установить
H>признак ушедшего писателя, и проверить его из читателя. И с

а если он не ушел, а просто задумался?

Не получается ли, что с такими ограничениями, надо еще делать какие-то странные действия, что бы получить спорный выигрыш?

H>переупорядочиванием операций, на первый взгляд, тоже можно -

H>устанавливая барьер перед изменением isTempQueueSet (например, используя
H>аналог InterlockedExchange для ее установки и сброса).

в общем очередной кривой велосипед.
---
С уважением,
Сергей Мухин
Re[7]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 20.12.07 08:39
Оценка:
Сергей Мухин wrote:
>
> H>Сергей Мухин wrote:
>>> W>>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64
>>> такой проблемы нет.
>>>
>>> Самая большая проблема не в этом а:
>>> 1. решение исключительно для одного читателя-писателя
>>> 2. писатель должен ждать! читателя
> H>Со второй легко справиться в этом-же стиле — достаточно установить
> H>признак ушедшего писателя, и проверить его из читателя. И с
>
> а если он не ушел, а просто задумался?

Ну, тут есть и другие проблемы — например, будет проблема, если в
среднем пишется быстрее, чем читается. Просто для универсальных проблем
существуют универсальные решения. Для частных — частные. Можно выбирать
под конкретную задачу.

>

> Не получается ли, что с такими ограничениями, надо еще делать какие-то
> странные действия, что бы получить спорный выигрыш?

В последнее время lock-free и wait-free структуры стали очень
популярными. И в принципе, встречаются задачи, где их использование
весьма желательно, или вообще — единственный возможный вариант. Например
— hard real time или взаимодействие с обработчиком прерываний. По
скорости — я не думаю, что реализация очереди на блокировках будет
существенно медленнее. Если вообще будет. Но это при на нормальной
реализации потоков. Скажем, в случае с linux, собранным с linuxthreads
(жуткий тормоз в сравнении с NPTL), будет явное преимущество по скорости.
Posted via RSDN NNTP Server 2.1 beta
Re[4]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 20.12.07 08:43
Оценка:
Здравствуйте, K13, Вы писали:

K13>Лучше всего обратиться в http://groups.google.com/group/comp.programming.threads?hl=en

K13>Там все выскажут весьма обоснованно.
K13>Причем насколько я помню, операции чтения процессор все-таки имеет право reorder даже на x86, но в каких случаях -- лучше к специалистам в этом вопросе.

Спасибо за ссылку. Закину туда вопрос тоже.
Re[3]: Неблокируемая очередь сообщений для двух потоков
От: Pzz Россия https://github.com/alexpevzner
Дата: 20.12.07 11:16
Оценка:
Здравствуйте, What, Вы писали:

W>Здравствуйте, Pzz, Вы писали:


Pzz>>Но есть и более тонкая проблема. Процессор тоже может менять порядок доступа к памяти. Это заметно только если потоки бегут на разных процессорах в многопроцессорной системе (процессор с hyperthreading'ом ведет себя в этом отношении как 2 отдельных процессора). Проявляться это будет в том, что Ваша програмка будет изредка сбоить, причем эффект будет пропадать (или усиливаться, в зависимости от Вашего везения) под отладчиком, при вставлении отладочной печати и т.п. Лечится это с помощью memory barrier'ов, ищите подробности в интернете, мне на эту тему лень лекцию читать.

W>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64 такой проблемы нет.

Я тоже так раньше думал, пока сам не нарвался. У меня, к счастию, не было Вашей уверенности, поэтому мне хватило часа, чтобы локализовать проблему, и полдня, чтобы сообразить, как переделать виноватое место. Подумайте на досуге, сколько времени это могло бы занять у Вас.
Re[8]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 20.12.07 11:24
Оценка:
Здравствуйте, hexis, Вы писали:

H>По

H>скорости — я не думаю, что реализация очереди на блокировках будет
H>существенно медленнее. Если вообще будет.


На однопроцессорной машине она может быть лучше более чем на порядок. На многопроцессорной машине выигрыш будет ещё существеннее + иметь последствия не только на голую скорость, но и на масштабиремость.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[9]: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 20.12.07 11:31
Оценка:
Здравствуйте, remark, Вы писали:

H>>По

H>>скорости — я не думаю, что реализация очереди на блокировках будет
H>>существенно медленнее. Если вообще будет.


R>На однопроцессорной машине она может быть лучше более чем на порядок. На многопроцессорной машине выигрыш будет ещё существеннее + иметь последствия не только на голую скорость, но и на масштабиремость.



кто она? Кто выиграет то?
---
С уважением,
Сергей Мухин
Re[10]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 20.12.07 11:50
Оценка:
Здравствуйте, Сергей Мухин, Вы писали:

СМ>Здравствуйте, remark, Вы писали:


H>>>По

H>>>скорости — я не думаю, что реализация очереди на блокировках будет
H>>>существенно медленнее. Если вообще будет.


R>>На однопроцессорной машине она может быть лучше более чем на порядок. На многопроцессорной машине выигрыш будет ещё существеннее + иметь последствия не только на голую скорость, но и на масштабиремость.



СМ>кто она? Кто выиграет то?



Очередь с использованием только неблокирующих примитивов (InterlockedXXX) на многопроцессорной/многоядерной машине может дать выигрыш на порядки по сравнению с очередью на мьютексе. Это можно частично вылечить, если использовать спин-мьютекс с активным ожиданием и с backoff'ом в блокировку, который (мьютекс, а не backoff) вызывает только одну InterlockedXXX операцию при захвате мьютекса.


Очередь, типа предложенной, которая использует только голые сохранения и загрузки и дружественна расслабленной модели памяти современных аппаратных платформ, может дать выигрышь ещё на порядок по сравнению с очередью, использующей InterlockedXXX.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[11]: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 20.12.07 12:05
Оценка:
Здравствуйте, remark, Вы писали:


R>Очередь с использованием только неблокирующих примитивов (InterlockedXXX) на многопроцессорной/многоядерной машине может дать выигрыш на порядки по сравнению с очередью на мьютексе. Это можно частично вылечить, если использовать спин-мьютекс с активным ожиданием и с backoff'ом в блокировку, который (мьютекс, а не backoff) вызывает только одну InterlockedXXX операцию при захвате мьютекса.



R>Очередь, типа предложенной, которая использует только голые сохранения и загрузки и дружественна расслабленной модели памяти современных аппаратных платформ, может дать выигрышь ещё на порядок по сравнению с очередью, использующей InterlockedXXX.



спасибо.

это, как я понимаю, если не учитывать ограниченность предложенной реализации.

т.е. мы сравниваем ограниченную по числу читателей-писателей, по алгоритму использования, при идеальной (для данной реализации) нагрузке с общем решением. Тогда будем иметь вышеприведённые выгоды по скорости. т.е. может использоваться автором в ограниченных задачах.

если перевести: велосипед на одном квадратном колесе ездит по одной тропинке быстро.
---
С уважением,
Сергей Мухин
Re: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 20.12.07 12:10
Оценка: 1 (1) +1
Здравствуйте, HaronK, Вы писали:

HK>Недавно возникла задача написания очереди для передачи сообщений от одного потока другому.

HK>После рассмотрения существующих реализаций, решил попробовать написать свою реализацию.
HK>Несколько дней ушло на обдумывание и написание пробного варианта.


В целом идея здравая и в правильном направлении.

По дизайну. Что касается Flush() и isTempQueueSet это, как уже заметил Сергей Мухин, не очень "красивое" решение. Во-первых, это усложняет использование для производителя, но это не очень серьёзная проблема — опустим её. Во-вторых, даже если ты введёшь некий флаг, что производитель "ушёл", поток производителя может быть заблокирован на неопределённое время в произвольном месте, т.е. он может не успеть вызвать Flush() или установить флаг. Тогда в очереди зависнут на неопределенное время часть элементов, которые как бы в очереди, но тем не менее не доступны для потребителя. Ну и в третьих, без этого можно обойтись без дополнительной платы в рантайм — об этом ниже

Далее, если ставить задачу добиться высокой производительности, элементы контейнера очень желательно сделать интрузивными. Т.е. пользователь контейнера будет обязан добавить в свой элемент указатель next. Тогда можно убрать new/delete в самой очереди. Ну и вобщем-то есть значительная вероятность, что new/delete блокирующие. А строить неблокирующий примитив из блокирующих бессмысленно.

Не знаю стоит ли у тебя задача делать блокировку на Dequeue(), т.е. что бы она всегда возврашала элемент, а если его нет, то блокировалась внутри. Добавление такой блокировки поверх такой очереди — задача далеко не тривиальная. Если такой задачи нет, то всё хорошо. Если есть, то смотри ниже.

Что касается реализации, это всё сильно не правильно. Нужно добавить volatile для подавления переупорядочивания компилятора, надо добавить барьеры памяти, для обеспечения корректного порядка операций. Надо убрать промежуточную очередь и очередь потребителя — это всё надо слить в одну очередь. Надо убрать isTempQueueSet — от него только одни проблемы.

Пример хорошей и работающей очереди смотри в библиотеке appcore:
http://appcore.home.comcast.net/~appcore/

Конкретно вот сама очередь ac_queue_spsc:
http://appcore.home.comcast.net/~appcore/appcore/include/ac_queue_spsc_h.html
http://appcore.home.comcast.net/~appcore/appcore/src/ac_queue_spsc_c.html

Если тебе нужно блокирование при пустой очереди, то смотри компонент ac_eventcount_algo1:
http://appcore.home.comcast.net/~appcore/appcore/include/ac_eventcount_algo1_h.html
http://appcore.home.comcast.net/~appcore/appcore/src/ac_eventcount_algo1_c.html
если блокирование не нужно, то просто удали вызовы ac_eventcount_algo1 из очереди.

Библиотека написана в hardcore C стиле, но переписать её на C++ достаточно не сложно. Однако при этом советую хорошо разобраться в принципе работы. Там учтены все моменты касательно переупорядочивания компилятором, порядка видимости изменений и дружественности к разделяемым данным в кэше.

з.ы. Библиотеку разрабатывает Chris Thomasson, который скорее всего уже пишет тебе ответ в comp.programming.threads, и скорее всего сошлёт на эту же реализацию Готов спорить
з.з.ы. Там у него есть небольшая ошибка в том, как соединены компоненты ac_queue_spsc и ac_eventcount_algo1. Если будешь использовать или разбираться в ac_eventcount_algo1, то скажи — я покажу ошибку. Или смотри здесь:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/00cf4a952ff5af7d
там я указал на ошибку.


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[4]: Неблокируемая очередь сообщений для двух потоков
От: What Беларусь  
Дата: 20.12.07 14:00
Оценка:
Здравствуйте, Pzz, Вы писали:

Pzz>Здравствуйте, What, Вы писали:


W>>Здравствуйте, Pzz, Вы писали:


Pzz>>>Но есть и более тонкая проблема. Процессор тоже может менять порядок доступа к памяти. Это заметно только если потоки бегут на разных процессорах в многопроцессорной системе (процессор с hyperthreading'ом ведет себя в этом отношении как 2 отдельных процессора). Проявляться это будет в том, что Ваша програмка будет изредка сбоить, причем эффект будет пропадать (или усиливаться, в зависимости от Вашего везения) под отладчиком, при вставлении отладочной печати и т.п. Лечится это с помощью memory barrier'ов, ищите подробности в интернете, мне на эту тему лень лекцию читать.

W>>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64 такой проблемы нет.

Pzz>Я тоже так раньше думал, пока сам не нарвался. У меня, к счастию, не было Вашей уверенности, поэтому мне хватило часа, чтобы локализовать проблему, и полдня, чтобы сообразить, как переделать виноватое место. Подумайте на досуге, сколько времени это могло бы занять у Вас.

Я нисколько не пропагандирую использование volatile и не считаю это хорошей практикой. Действительно, существует проблема с необходимостью использования барьеров на не x86/Intel64/AMD64 платформах.
Однако наличие ошибки где-то у Вас в коде вряд ли является убедительным опровержением того, что x86 & co используют strict memory model (это, кстати, можно погуглить).
неплохой пост про volatile, etc
Re[2]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 20.12.07 14:06
Оценка:
Здравствуйте, remark, Вы писали:

R>По дизайну. Что касается Flush() и isTempQueueSet это, как уже заметил Сергей Мухин, не очень "красивое" решение. Во-первых, это усложняет использование для производителя, но это не очень серьёзная проблема — опустим её. Во-вторых, даже если ты введёшь некий флаг, что производитель "ушёл", поток производителя может быть заблокирован на неопределённое время в произвольном месте, т.е. он может не успеть вызвать Flush() или установить флаг. Тогда в очереди зависнут на неопределенное время часть элементов, которые как бы в очереди, но тем не менее не доступны для потребителя. Ну и в третьих, без этого можно обойтись без дополнительной платы в рантайм — об этом ниже


Да, то что производитель будет заблокирован и "хвост" очереди недоступен это не есть хорошо. Но все же это уже больше вопрос архитектуры приложения. Если производитель блокируется, не выполнив положенных действий, то это скорее проблема производителя и программиста, а не очереди.
Возможно моя реализация и "некрасивая", но я пытался, чтоб алгоритм работал быстро, а не "красиво".

R>Далее, если ставить задачу добиться высокой производительности, элементы контейнера очень желательно сделать интрузивными. Т.е. пользователь контейнера будет обязан добавить в свой элемент указатель next. Тогда можно убрать new/delete в самой очереди. Ну и вобщем-то есть значительная вероятность, что new/delete блокирующие. А строить неблокирующий примитив из блокирующих бессмысленно.


Это хорошая идея. Я в начале тоже планировал возложить создание/удаление, на потоки, но тогда я реализовал эту процедуру некорректно и отказался от нее.
Реализую твою версию. Спасибо.

R>Не знаю стоит ли у тебя задача делать блокировку на Dequeue(), т.е. что бы она всегда возврашала элемент, а если его нет, то блокировалась внутри. Добавление такой блокировки поверх такой очереди — задача далеко не тривиальная. Если такой задачи нет, то всё хорошо. Если есть, то смотри ниже.


Нет, у меня читающий поток не требует наличия данных в очереди. Если их нет, то выполняются остальные задачи.

R>Что касается реализации, это всё сильно не правильно. Нужно добавить volatile для подавления переупорядочивания компилятора, надо добавить барьеры памяти, для обеспечения корректного порядка операций. Надо убрать промежуточную очередь и очередь потребителя — это всё надо слить в одну очередь. Надо убрать isTempQueueSet — от него только одни проблемы.


volatile для каких переменных? Вершин цепочек?
Барьеры добавлю.
Но вообще, если поубирать указатели и флаг, как ты сказал, то это уже другой алгоритм получается.

R>Пример хорошей и работающей очереди смотри в библиотеке appcore:

...

Гляну. Правда судя по главной странице, эта библиотека не обновлялась еще с февраля 2005.
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.