Привет!
Недавно возникла задача написания очереди для передачи сообщений от одного потока другому.
После рассмотрения существующих реализаций, решил попробовать написать свою реализацию.
Несколько дней ушло на обдумывание и написание пробного варианта.
Возможно я изобретал колесо, тогда посьба сильно не пинать
Идея состоит в следующем.
Постановка задачи:
Есть два потока. Один пишет в очередь, другой — читает из нее.
Реализовать очередь без использования механизма блокировок.
Реализация:
В классе очереди реализовать 3 указателя на цепочки сообщений:
1. Цепочка для записи
2. Промежуточная цепочка
3. Цепочка для чтения
и флаг состояния указателя промежуточной цепочки.
Код:
template <class TYPE> class QueueNoMutex
{
public:
QueueNoMutex();
~QueueNoMutex();
// Запись в очередь
void Enqueue(const TYPE& data);
// Сброс в промежуточный указатель, если пишущий поток завершился раньше.
bool Flush();
// Прочитать из очереди следующий елемент
bool Dequeue(TYPE& data);
private:
struct Element
{
TYPE data;
Element* next;
};
void DeleteChain(Element* el);
volatile bool isTempQueueSet;
Element *readerTop;
Element *tempTop;
Element *writerTop, *writerBottom;
};
template<class TYPE>
QueueNoMutex<TYPE>::QueueNoMutex()
{
readerTop = tempTop = writerTop = writerBottom = NULL;
isTempQueueSet = false;
}
template<class TYPE>
QueueNoMutex<TYPE>::~QueueNoMutex()
{
DeleteChain(readerTop);
DeleteChain(tempTop);
DeleteChain(writerTop);
}
//------------------------------------------------------------------------------
/**
*/
template<class TYPE>
void
QueueNoMutex<TYPE>::Enqueue(const TYPE& data)
{
// создаем новый елемент цепочки и помещаем в него данные
Element* el = new Element;
el->data = data;
el->next = NULL;
// если цепочка сообщений для записи не пуста, добавляем елемент в ее конец
if (writerTop)
{
writerBottom->next = el;
writerBottom = el;
}
else
{
// иначе это будет первый елемент цепочки
writerTop = writerBottom = el;
}
// проверяем свободна ли промежуточная цепочка
if (!isTempQueueSet)
{
// перекидываем в нее цепочку записи
tempTop = writerTop;
// освобождаем указатель вершины цепочки записи
writerTop = NULL;
// и устанавливаем флаг готовности промежуточной цепочки
isTempQueueSet = true;
}
}
template<class TYPE>
bool
QueueNoMutex<TYPE>::Flush()
{
// если цепочка записи пуста, возвращаем успешность сброса
if (!writerTop) return true;
// проверяем свободна ли промежуточная цепочка
if (!isTempQueueSet)
{
// перекидываем в нее цепочку записи
tempTop = writerTop;
// освобождаем указатель вершины цепочки записи
writerTop = NULL;
// и устанавливаем флаг готовности промежуточной цепочки
isTempQueueSet = true;
// возвращаем успешность сброса
return true;
}
// цепочка все еще не сброшена
return false;
}
template<class TYPE>
bool
QueueNoMutex<TYPE>::Dequeue(TYPE& data)
{
// если цепочка для чтения пуста
if (!readerTop)
{
// проверяем, есть ли данные в промежуточной цепочке
if (isTempQueueSet)
{
// перебрасываем промежуточную цепочку в цепочку для чтения
readerTop = tempTop;
// обнуляем промежуточную цепочку (если используется Flush(), то не нужно)
tempTop = NULL;
// сообщаем, что промежуточная цепочка пуста
isTempQueueSet = false;
}
else
{
// нет данных для чтения
return false;
}
}
// вычитываем данные из верхнего елемента цепочки
data = readerTop->data;
// удаляем и смещаем вершину цепочки для чтения
Element* cur = readerTop;
readerTop = cur->next;
delete cur;
return true;
}
template<class TYPE>
void
QueueNoMutex<TYPE>::DeleteChain(Element* el)
{
while (el)
{
Element* cur = el;
el = cur->next;
delete cur;
}
}
Код потоков для целочисленного варианта даных:
typedef QueueNoMutex<int> MyQueue;
class WriterThread: public Thread
{
public:
void DoWork()
{
int data = 100;
// основной цикл
while (data >= 0)
{
queue->Enqueue(data--);
// эмуляция работы реального потока
Sleep(100);
}
// подчищаем хвосты, если последняя запись не перебросила в промежуточную цепочку
while (!queue->Flush())
{
Sleep(100);
}
}
// ...
private:
MyQueue* queue;
};
class ReaderThread: public Thread
{
public:
void DoWork()
{
int curVal = 1, data = 1;
bool found = false;
// основной цикл чтения
while (data > 0)
{
while (queue->Dequeue(data))
{
if (curVal != data)
{
printf("%d != %d\n", curVal, data);
}
curVal = data - 1;
found = true;
}
if (found)
{
printf("Last read: %d\n", data);
found = false;
}
// эмуляция работы реального потока
Sleep(200);
}
printf("Done.\n");
}
// ...
private:
MyQueue* queue;
};
Еще раз уточню, что эта очередь предназначена только для одного потока записи и одного чтения.
Проверял в тестовом режиме с эмуляцией различной загруженности потоков. Работало без проблем.
Также надо будет сравнить с очередью с блокировками по скорости.
В ближайшем будуще попробую применить в реальном приложении.
Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.