c# реализации алгоритма очереди из
Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms, Maged M. Michael, Michael L. Scott. Довольно полное описание с различными тестами, замерами и теоретическими выкладками.
Реализация array-based, то есть в данном случае — работает на массиве и оперерует индексами вместо указателей. Если нужно поместить данные в очередь берем пустой элемент из массива, кладем туда данные и передаем индекс элемента в LockFreeQueue.Enqueue, соответсвенно LockFreeQueue.Dequeue возвращает индекс элемента полученного из очереди. В конструкторе необходимо указать массив и диапазон элементов, которые необходимо изначально поместить в очередь. Не буду углубляться почему сделано именно так — под свои задачи.
Где брать/хранить пустые элементы массива? Все в той же очереди, или можно локфри стэке (там на один CAS меньше). То есть если необходима очередь с такими методами T Dequeue() и Enqueue(T data), можно сделать что-то вроде:
public class Queue<T>
{
private LockFreeItem<T>[] array;
private LockFreeStack<T> empty;
private LockFreeQueue<T> full;
internal LockFreePool(int size)
{
array = new LockFreeItem<T>[size + 1];
full = new LockFreeQueue<T>(array, 0, 1);
empty = new LockFreeStack<T>(array, 1, size);
}
public T Dequeue()
{
T result = default(T);
int index = full.Dequeue();
if (index >= 0)
{
result = array[index].Value;
array[index].Value = default(T);
empty.Push(index);
}
return result;
}
public void Put(T value)
{
int index = empty.Pop();
if (index >= 0)
{
array[index].Value = value;
full.Enqueue(index);
}
}
}
Ну и сама очередь, делалось все строго по алгоритму, с двумя исключениями вот эти две операции как бы вынесены за тело методов (на откуп пользователю) node = new node() и free(head.ptr), как решается этот вопрос я описал выше:
struct LockFreeItem<T>
{
public Int64 Next;
public T Value;
public new string ToString()
{
return string.Format("Next: {0}, Count: {1}, Value: {2}", (Int32)Next, (UInt32)(Next >> 32), (Value == null) ? @"null" : "full");
}
}
[StructLayout(LayoutKind.Explicit)]
struct LockFreeQueueVars
{
[FieldOffset(0)]
public Int64 Head;
[FieldOffset(64)]
public Int64 Tail;
[FieldOffset(128)]
public Int32 padding;
}
/// <summary>
/// Non-blocking queue implementation from:
/// Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
/// Maged M. Michael, Michael L. Scott
/// http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf
/// </summary>
class LockFreeQueue<T>
{
private LockFreeQueueVars q;
private LockFreeItem<T>[] array;
public LockFreeQueue(LockFreeItem<T>[] array1, Int32 enqueueFromDummy, Int32 enqueueCount)
{
if (enqueueCount <= 0)
throw new ArgumentOutOfRangeException(@"enqueueCount", @"Queue must include at least one dummy element");
array = array1;
q.Head = enqueueFromDummy;
q.Tail = enqueueFromDummy + enqueueCount - 1;
for (Int32 i = 0; i < enqueueCount - 1; i++)
array[i + enqueueFromDummy].Next = enqueueFromDummy + i + 1;
array[q.Tail].Next = 0xFFFFFFFFL;
}
public void Enqueue(Int32 index)
{
UInt64 tail1, next1, next2, xchg;
unchecked
{
array[index].Next |= 0xFFFFFFFFL;
for (; ; )
{
tail1 = (UInt64)Interlocked.Read(ref q.Tail);
next1 = (UInt64)Interlocked.Read(ref array[tail1 & 0xFFFFFFFFUL].Next);
if (tail1 == (UInt64)q.Tail)
{
if ((next1 & 0xFFFFFFFFUL) == 0xFFFFFFFFUL)
{
xchg = ((next1 + 0x100000000UL) & 0xFFFFFFFF00000000UL) | ((UInt64)(UInt32)index);
next2 = (UInt64)Interlocked.CompareExchange(ref array[tail1 & 0xFFFFFFFFUL].Next, (Int64)xchg, (Int64)next1);
if (next2 == next1)
break;
}
else
{
xchg = ((tail1 + 0x100000000UL) & 0xFFFFFFFF00000000UL) | (next1 & 0xFFFFFFFFUL);
Interlocked.CompareExchange(ref q.Tail, (Int64)xchg, (Int64)tail1);
}
}
}
xchg = ((tail1 + 0x100000000UL) & 0xFFFFFFFF00000000UL) | ((UInt64)(UInt32)index);
Interlocked.CompareExchange(ref q.Tail, (Int64)xchg, (Int64)tail1);
}
}
public Int32 Dequeue()
{
UInt64 head1, head2, tail1, next1, xchg;
Int32 index;
unchecked
{
for (; ; )
{
head1 = (UInt64)Interlocked.Read(ref q.Head);
tail1 = (UInt64)Interlocked.Read(ref q.Tail);
next1 = (UInt64)Interlocked.Read(ref array[head1 & 0xFFFFFFFFUL].Next);
if (head1 == (UInt64)q.Head)
{
if ((head1 & 0xFFFFFFFFUL) == (tail1 & 0xFFFFFFFFUL))
{
if ((next1 & 0xFFFFFFFFUL) == 0xFFFFFFFFUL)
return -1;
xchg = ((tail1 + 0x100000000UL) & 0xFFFFFFFF00000000UL) | (next1 & 0xFFFFFFFFUL);
Interlocked.CompareExchange(ref q.Tail, (Int64)xchg, (Int64)tail1);
}
else
{
T value = array[next1 & 0xFFFFFFFFUL].Value;
xchg = ((head1 + 0x100000000UL) & 0xFFFFFFFF00000000UL) | (next1 & 0xFFFFFFFFUL);
head2 = (UInt64)Interlocked.CompareExchange(ref q.Head, (Int64)xchg, (Int64)head1);
if (head2 == head1)
{
index = (Int32)(head1 & 0xFFFFFFFFUL);
array[index].Value = value;
return index;
}
}
}
}
}
}
}