c# array-based lock-free queue
От: vf  
Дата: 31.10.10 17:50
Оценка: :)
c# реализации алгоритма очереди из Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms, Maged M. Michael, Michael L. Scott. Довольно полное описание с различными тестами, замерами и теоретическими выкладками.

Реализация array-based, то есть в данном случае — работает на массиве и оперерует индексами вместо указателей. Если нужно поместить данные в очередь берем пустой элемент из массива, кладем туда данные и передаем индекс элемента в LockFreeQueue.Enqueue, соответсвенно LockFreeQueue.Dequeue возвращает индекс элемента полученного из очереди. В конструкторе необходимо указать массив и диапазон элементов, которые необходимо изначально поместить в очередь. Не буду углубляться почему сделано именно так — под свои задачи.

Где брать/хранить пустые элементы массива? Все в той же очереди, или можно локфри стэке (там на один CAS меньше). То есть если необходима очередь с такими методами T Dequeue() и Enqueue(T data), можно сделать что-то вроде:

    public class Queue<T>
    {
        private LockFreeItem<T>[] array;
        private LockFreeStack<T> empty;
        private LockFreeQueue<T> full;

        internal LockFreePool(int size)
        {
            array = new LockFreeItem<T>[size + 1];

            full = new LockFreeQueue<T>(array, 0, 1);
            empty = new LockFreeStack<T>(array, 1, size);
        }

        public T Dequeue()
        {
            T result = default(T);

            int index = full.Dequeue();
            if (index >= 0)
            {
                result = array[index].Value;
                array[index].Value = default(T);

                empty.Push(index);
            }

            return result;
        }

        public void Put(T value)
        {
            int index = empty.Pop();
            if (index >= 0)
            {
                array[index].Value = value;
                full.Enqueue(index);
            }
        }
    }


Ну и сама очередь, делалось все строго по алгоритму, с двумя исключениями вот эти две операции как бы вынесены за тело методов (на откуп пользователю) node = new node() и free(head.ptr), как решается этот вопрос я описал выше:

    struct LockFreeItem<T>
    {
        public Int64 Next;
        public T Value;

        public new string ToString()
        {
            return string.Format("Next: {0}, Count: {1}, Value: {2}", (Int32)Next, (UInt32)(Next >> 32), (Value == null) ? @"null" : "full");
        }
    }

    [StructLayout(LayoutKind.Explicit)]
    struct LockFreeQueueVars
    {
        [FieldOffset(0)]
        public Int64 Head;
        [FieldOffset(64)]
        public Int64 Tail;
        [FieldOffset(128)]
        public Int32 padding;
    }

    /// <summary>
    /// Non-blocking queue implementation from:
    ///        Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
    ///        Maged M. Michael, Michael L. Scott
    ///        http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf
    /// </summary>
    class LockFreeQueue<T>
    {
        private LockFreeQueueVars q;
        private LockFreeItem<T>[] array;

        public LockFreeQueue(LockFreeItem<T>[] array1, Int32 enqueueFromDummy, Int32 enqueueCount)
        {
            if (enqueueCount <= 0)
                throw new ArgumentOutOfRangeException(@"enqueueCount", @"Queue must include at least one dummy element");

            array = array1;
            q.Head = enqueueFromDummy;
            q.Tail = enqueueFromDummy + enqueueCount - 1;

            for (Int32 i = 0; i < enqueueCount - 1; i++)
                array[i + enqueueFromDummy].Next = enqueueFromDummy + i + 1;
            array[q.Tail].Next = 0xFFFFFFFFL;
        }

        public void Enqueue(Int32 index)
        {
            UInt64 tail1, next1, next2, xchg;

            unchecked
            {
                array[index].Next |= 0xFFFFFFFFL;

                for (; ; )
                {
                    tail1 = (UInt64)Interlocked.Read(ref q.Tail);
                    next1 = (UInt64)Interlocked.Read(ref array[tail1 & 0xFFFFFFFFUL].Next);

                    if (tail1 == (UInt64)q.Tail)
                    {
                        if ((next1 & 0xFFFFFFFFUL) == 0xFFFFFFFFUL)
                        {
                            xchg = ((next1 + 0x100000000UL) & 0xFFFFFFFF00000000UL) | ((UInt64)(UInt32)index);
                            next2 = (UInt64)Interlocked.CompareExchange(ref array[tail1 & 0xFFFFFFFFUL].Next, (Int64)xchg, (Int64)next1);
                            if (next2 == next1)
                                break;
                        }
                        else
                        {
                            xchg = ((tail1 + 0x100000000UL) & 0xFFFFFFFF00000000UL) | (next1 & 0xFFFFFFFFUL);
                            Interlocked.CompareExchange(ref q.Tail, (Int64)xchg, (Int64)tail1);
                        }
                    }
                }

                xchg = ((tail1 + 0x100000000UL) & 0xFFFFFFFF00000000UL) | ((UInt64)(UInt32)index);
                Interlocked.CompareExchange(ref q.Tail, (Int64)xchg, (Int64)tail1);
            }
        }

        public Int32 Dequeue()
        {
            UInt64 head1, head2, tail1, next1, xchg;
            Int32 index;

            unchecked
            {
                for (; ; )
                {
                    head1 = (UInt64)Interlocked.Read(ref q.Head);
                    tail1 = (UInt64)Interlocked.Read(ref q.Tail);
                    next1 = (UInt64)Interlocked.Read(ref array[head1 & 0xFFFFFFFFUL].Next);

                    if (head1 == (UInt64)q.Head)
                    {
                        if ((head1 & 0xFFFFFFFFUL) == (tail1 & 0xFFFFFFFFUL))
                        {
                            if ((next1 & 0xFFFFFFFFUL) == 0xFFFFFFFFUL)
                                return -1;

                            xchg = ((tail1 + 0x100000000UL) & 0xFFFFFFFF00000000UL) | (next1 & 0xFFFFFFFFUL);
                            Interlocked.CompareExchange(ref q.Tail, (Int64)xchg, (Int64)tail1);
                        }
                        else
                        {
                            T value = array[next1 & 0xFFFFFFFFUL].Value;

                            xchg = ((head1 + 0x100000000UL) & 0xFFFFFFFF00000000UL) | (next1 & 0xFFFFFFFFUL);
                            head2 = (UInt64)Interlocked.CompareExchange(ref q.Head, (Int64)xchg, (Int64)head1);
                            if (head2 == head1)
                            {
                                index = (Int32)(head1 & 0xFFFFFFFFUL);
                                array[index].Value = value;
                                return index;
                            }
                        }
                    }
                }
            }
        }
    }
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.