Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 19.12.07 15:46
Оценка:
Привет!

Недавно возникла задача написания очереди для передачи сообщений от одного потока другому.
После рассмотрения существующих реализаций, решил попробовать написать свою реализацию.
Несколько дней ушло на обдумывание и написание пробного варианта.

Возможно я изобретал колесо, тогда посьба сильно не пинать

Идея состоит в следующем.

Постановка задачи:
Есть два потока. Один пишет в очередь, другой — читает из нее.
Реализовать очередь без использования механизма блокировок.

Реализация:
В классе очереди реализовать 3 указателя на цепочки сообщений:
1. Цепочка для записи
2. Промежуточная цепочка
3. Цепочка для чтения
и флаг состояния указателя промежуточной цепочки.

Код:

template <class TYPE> class QueueNoMutex
{
public:
    QueueNoMutex();
    ~QueueNoMutex();

    // Запись в очередь
    void Enqueue(const TYPE& data);

    // Сброс в промежуточный указатель, если пишущий поток завершился раньше.
    bool Flush();

    // Прочитать из очереди следующий елемент
    bool Dequeue(TYPE& data);

private:
    struct Element
    {
        TYPE data;
        Element* next;
    };

    void DeleteChain(Element* el);

    volatile bool isTempQueueSet;

    Element *readerTop;
    Element *tempTop;
    Element *writerTop, *writerBottom;
};

template<class TYPE>
QueueNoMutex<TYPE>::QueueNoMutex()
{
    readerTop = tempTop = writerTop = writerBottom = NULL;
    isTempQueueSet = false;
}

template<class TYPE>
QueueNoMutex<TYPE>::~QueueNoMutex()
{
    DeleteChain(readerTop);
    DeleteChain(tempTop);
    DeleteChain(writerTop);
}

//------------------------------------------------------------------------------
/**
*/
template<class TYPE>
void
QueueNoMutex<TYPE>::Enqueue(const TYPE& data)
{
    // создаем новый елемент цепочки и помещаем в него данные
    Element* el = new Element;
    el->data = data;
    el->next = NULL;

    // если цепочка сообщений для записи не пуста, добавляем елемент в ее конец
    if (writerTop)
    {
        writerBottom->next = el;
        writerBottom = el;
    }
    else
    {
        // иначе это будет первый елемент цепочки
        writerTop = writerBottom = el;
    }

    // проверяем свободна ли промежуточная цепочка
    if (!isTempQueueSet)
    {
        // перекидываем в нее цепочку записи
        tempTop = writerTop;
        // освобождаем указатель вершины цепочки записи
        writerTop = NULL;
        // и устанавливаем флаг готовности промежуточной цепочки
        isTempQueueSet = true;
    }
}

template<class TYPE>
bool
QueueNoMutex<TYPE>::Flush()
{
    // если цепочка записи пуста, возвращаем успешность сброса
    if (!writerTop) return true;

    // проверяем свободна ли промежуточная цепочка
    if (!isTempQueueSet)
    {
        // перекидываем в нее цепочку записи
        tempTop = writerTop;
        // освобождаем указатель вершины цепочки записи
        writerTop = NULL;
        // и устанавливаем флаг готовности промежуточной цепочки
        isTempQueueSet = true;

        // возвращаем успешность сброса
        return true;
    }

    // цепочка все еще не сброшена
    return false;
}

template<class TYPE>
bool
QueueNoMutex<TYPE>::Dequeue(TYPE& data)
{
    // если цепочка для чтения пуста
    if (!readerTop)
    {
        // проверяем, есть ли данные в промежуточной цепочке
        if (isTempQueueSet)
        {
            // перебрасываем промежуточную цепочку в цепочку для чтения
            readerTop = tempTop;
            // обнуляем промежуточную цепочку (если используется Flush(), то не нужно)
            tempTop = NULL;
            // сообщаем, что промежуточная цепочка пуста
            isTempQueueSet = false;
        }
        else
        {
            // нет данных для чтения
            return false;
        }
    }

    // вычитываем данные из верхнего елемента цепочки
    data = readerTop->data;

    // удаляем и смещаем вершину цепочки для чтения
    Element* cur = readerTop;
    readerTop = cur->next;
    delete cur;

    return true;
}

template<class TYPE>
void
QueueNoMutex<TYPE>::DeleteChain(Element* el)
{
    while (el)
    {
        Element* cur = el;
        el = cur->next;
        delete cur;
    }
}


Код потоков для целочисленного варианта даных:


typedef QueueNoMutex<int> MyQueue;

class WriterThread: public Thread
{
public:
    void DoWork()
    {
        int data = 100;

        // основной цикл
        while (data >= 0)
        {
            queue->Enqueue(data--);

            // эмуляция работы реального потока
            Sleep(100);
        }

        // подчищаем хвосты, если последняя запись не перебросила в промежуточную цепочку
        while (!queue->Flush())
        {
            Sleep(100);
        }
    }

    // ...

private:
    MyQueue* queue;
};

class ReaderThread: public Thread
{
public:
    void DoWork()
    {
        int curVal = 1, data = 1;
        bool found = false;

        // основной цикл чтения
        while (data > 0)
        {
            while (queue->Dequeue(data))
            {
            if (curVal != data)
        {
            printf("%d != %d\n", curVal, data);
        }
        curVal = data - 1;
                found = true;
            }

            if (found)
            {
        printf("Last read: %d\n", data);
                found = false;
            }

            // эмуляция работы реального потока
            Sleep(200);
        }
        printf("Done.\n");
    }

    // ...

private:
    MyQueue* queue;
};


Еще раз уточню, что эта очередь предназначена только для одного потока записи и одного чтения.

Проверял в тестовом режиме с эмуляцией различной загруженности потоков. Работало без проблем.
Также надо будет сравнить с очередью с блокировками по скорости.
В ближайшем будуще попробую применить в реальном приложении.

Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.