Коллеги, возникло непреодолимое желание поделиться своим велосипедом, получить комментарии, может кому пригодится Готового на .нет не нашел, вот сделал свое, с пятой попытки.
Основная задача, была сделать tread-safe пул буферов для сервера, первоначально реализовал это дело через lock, скорость работы lock подхода вынудила копать дальше, в сторону Interlocked, потом узал что правильно такой подход называется lock-free, а так же, что часто возникающая проблема в таких алгоритмах называется ABA (я с ней столкнулся в 3-4 попытке).
В процессе работы над пулом и появился ниже приведенный Stack. В последствии, находил ссылки на pdf-ы с алгоритмами LIFO, FIFO, но не разбирался, вероятно там тоже самое. Читал про .нет 4.0, но там, по описанию, алгоритмы основаны на SpinLock, так что возможно мое будет чуть быстрее (хотя и со своими ограничениями)
class SafeStackItem<T>
{
public T Value;
public volatile Int32 Next;
}
class SafeStack<T>
where T : class
{
private volatile Int32 head;
private volatile Int32 count;
private SafeStackItem<T>[] array;
public SafeStack(SafeStackItem<T>[] array, int pushFrom, int pushCount)
{
this.head = -1;
this.array = array;
count = pushCount;
head = pushFrom;
for (int i = 0; i < pushCount - 1; i++)
array[i + pushFrom].Next = pushFrom + i + 1;
array[pushFrom + pushCount - 1].Next = -1;
}
#pragma warning disable 0420
public Int32 Pop()
{
while (count > 1)
{
Int32 head1 = head;
Int32 next1 = Interlocked.Exchange(ref array[head1].Next, -1);
if (next1 >= 0)
{
if (Interlocked.CompareExchange(ref head, next1, head1) == head1)
{
Interlocked.Decrement(ref count);
return head1;
}
else
Interlocked.Exchange(ref array[head1].Next, next1);
}
}
return -1;
}
public void Push(Int32 index)
{
Int32 head1, head2;
do
{
head1 = head;
array[index].Next = head1;
head2 = Interlocked.CompareExchange(ref head, index, head1);
} while (head1 != head2);
Interlocked.Increment(ref count);
}
#pragma warning restore 0420
}
Здравствуйте, vf, Вы писали:
vf>Коллеги, возникло непреодолимое желание поделиться своим велосипедом, получить комментарии, может кому пригодится Готового на .нет не нашел, вот сделал свое, с пятой попытки.
vf>Основная задача, была сделать tread-safe пул буферов для сервера, первоначально реализовал это дело через lock, скорость работы lock подхода вынудила копать дальше, в сторону Interlocked, потом узал что правильно такой подход называется lock-free, а так же, что часто возникающая проблема в таких алгоритмах называется ABA (я с ней столкнулся в 3-4 попытке). vf>В процессе работы над пулом и появился ниже приведенный Stack. В последствии, находил ссылки на pdf-ы с алгоритмами LIFO, FIFO, но не разбирался, вероятно там тоже самое. Читал про .нет 4.0, но там, по описанию, алгоритмы основаны на SpinLock, так что возможно мое будет чуть быстрее (хотя и со своими ограничениями)
vf>
vf>class SafeStackItem<T>
vf>{
vf> public T Value;
vf> public volatile Int32 Next;
vf>}
vf>class SafeStack<T>
vf> where T : class
vf>{
vf> private volatile Int32 head;
vf> private volatile Int32 count;
vf> private SafeStackItem<T>[] array;
vf> public SafeStack(SafeStackItem<T>[] array, int pushFrom, int pushCount)
vf> {
vf> this.head = -1;
vf> this.array = array;
vf> count = pushCount;
vf> head = pushFrom;
vf> for (int i = 0; i < pushCount - 1; i++)
vf> array[i + pushFrom].Next = pushFrom + i + 1;
vf> array[pushFrom + pushCount - 1].Next = -1;
vf> }
vf>#pragma warning disable 0420
vf> public Int32 Pop()
vf> {
vf> while (count > 1)
vf> {
vf> Int32 head1 = head;
vf> Int32 next1 = Interlocked.Exchange(ref array[head1].Next, -1);
vf> if (next1 >= 0)
vf> {
vf> if (Interlocked.CompareExchange(ref head, next1, head1) == head1)
vf> {
vf> Interlocked.Decrement(ref count);
vf> return head1;
vf> }
vf> else
vf> Interlocked.Exchange(ref array[head1].Next, next1);
vf> }
vf> }
vf> return -1;
vf> }
vf> public void Push(Int32 index)
vf> {
vf> Int32 head1, head2;
vf> do
vf> {
vf> head1 = head;
vf> array[index].Next = head1;
vf> head2 = Interlocked.CompareExchange(ref head, index, head1);
vf> } while (head1 != head2);
vf> Interlocked.Increment(ref count);
vf> }
vf>#pragma warning restore 0420
vf>}
vf>
Мне не понятно где и когда идёт обращение к SafeStackItem<T>.Value
Здравствуйте, Caracrist, Вы писали:
C>Мне не понятно где и когда идёт обращение к SafeStackItem<T>.Value
Снаружи, после Pop никакой другой поток к этому элементу уже обращаться не будет. Можно было конечно и функцию завернуть, но мне показалось так удобнее.
А>воистину из исходника ровным счетом ничего не понятно. А>храним элементы пользователского типа Т, а "пушим" и "попим" инты
Согласен, че то в исходном сообщении одной лирики без конкретики понаписал.
А>как минимум пример использования нужен (как его протетсить то?)
У себя я использую таким образом, создаю два таких стека empty full где входной аргумент один и тот же массив, и в последствии перекидываю из одного в другой элементы. Но я с Вами согласен, получается масло маслянное (класс который все это обертывает тоже по сути стек), поэтому перепешу все это дело, и спрячу индексы массива внутри. Делов не много, завести еще один массив для хранения свободных индексов ед-но манипулировать через интерлокед. Тогда должно все получиться совсем красиво. Тут довольно очевидное решение.
Но основная находка именно в том коде который я уже выложил, с push все более-менее понятно, а вот pop имхо интересный получился. По поводу проверок, я этот класс мучал в тестах — несколько потоков выполняю pop-push и проверяют. И плюс, в процессе поиска решения наткнулся на МС-оский CHESS — тоже запускал — не знаю насколько я правильно это сделал — но он проблем тоже не выявил.
Ну во-первых, твой стек не lock-free, а lock-based. В функции Pop() у тебя записано не что иное, как spin-lock. Т.е. используются мелко-гранулярные спинлоки на каждый элемент. Это будет очевидно, если переписать функцию как:
При этом спинлок реализован, мягко-говоря, не особо хорошо. Хороший спинлок должен вызывать инстркцию PAUSE (или аналог) на каждой итерации, плюс переключаться на пассивное ожидание при очень большом кол-ве неудач захвата. Иначе это может очень негативно сказываться на производительности.
Во-вторых, я сомневаюсь, что он будет быстрее стека, защищенного спинлоком. Почему? Потому что при использовании спинлока функции Push()/Pop() будут выполнять по 1 атомарной RMW операции. В твоём же стеке функция Pop() выполняет 3, а функиця Push() — 2. Т.е. Грубо говоря твой стек в 2.5 раза медленнее спинлока.
В принципе, твоему стеку можно провести небольшую липосакцию. Известно, что lock-free стек реализуется с 1 атомарной RMW операцией в Push() и Pop().
Первое, можно избавиться от переменной count. Для этого необходимо положить в Next последнего узла -2, тогда поток, поток снимающий элемент со стека, будет понимать, что элемент временно заблокирован (Next=-1) или стек пустой (Next=-2).
Далее, в функции Pop() можно разблокировать элемент с помощью обычного сохранения, а не Interlocked.Exchange (точно так же как элемент разблокируется в функции Push() перед помещением в стек).
В-третьих, в алгоритме содержится гонка, которая может приводить к падениям, зависаниям или выдаче некорректных результатов. В целом, это та же ABA проблема.
Вот конкретный сценарий:
1.Поток 0 считывает head=0 в Pop().
2.Поток 1 считывает head=0 в Pop().
3.Поток 0 снимает элемент 0 со стека, теперь head=1.
4.Поток 0 начинает добавлять элемент 0 в стек.
5.Поток 0 считывает head=1, и записывает 1 в поле Next элемента 0.
6.Поток 1 делает Exchange для поля Next элемента 0. Он записывает туда -1 и получает значение 1.
7.Поток 2 снимает элемент 1 со стека, теперь head=2.
8.Поток 0 делает неудачный CompareExchange, перечитывает head=2, и записывает 2 в поле Next элемента 0.
9.Поток 0 делает удачный CompareExchange (добавялет элемент 0 в стек), теперь head=0.
10. Поток 1 делает удачный CompareExchange (снимает элемент 0 со стека), теперь head=1, однако должен быть 2, т.к. Поле Next элемента 0 было равно 2, когда он добавлялся в стек.
11. Теперь любой поток (для определенности 0) снимает элемент 1 со стека. И получаем, что поток 0 и поток 2 одновременно получают элемент 1 из стека. Это может привести к любым негативным последствиям. В хорошем случае программа сразу упадёт. В случае похуже один или несколько потоков зависнут. В самом плохом случае программа выдаст некорректные результаты (переведёт деньги на некорректный счёт).
Добро пожаловать в мир lock-free
Я бы рекомендовал просто реализовать классический lock-free стек. Тут есть 2 варианта. Первый, динамически выделять на каждый элемент дополнительный вспомогательный элемент узла. Второй — добавить т.н. IBM ABA-tag рядом с каждым полем Next. Любой из этих вариантов будет быстрее (1 atomic RMW на операцию), давать гарантии lock-free, и главное будет надёжно работать.
Верификация алгоритмов синхронизации — это экстремально сложная задача. Если ситуация кажется простой, скорее всего ты просто что-то упускаешь. Советую не пренебрегать чтением pdf'ок по теме, или просто использовать готовые библиотеки; хотя и там и там бывают ошибки (особенно в самопальных библиотеках, которых в интернете разводится всё больше и больше). Или просто заказать необходимые компоненты у экспертов.
Здравствуйте, Аноним, Вы писали:
А>воистину из исходника ровным счетом ничего не понятно. А>храним элементы пользователского типа Т, а "пушим" и "попим" инты А>либо "контейнер" незаконченный (% на 50), либо примененение уж очень узкое и известное только автору. А>как минимум пример использования нужен (как его протетсить то?)
По-моему, всё прозрачно. Получаешь со стека индекс. Дальше этим индексом индексируешь массив, который передаётся в конструктор, т.о. получаешь необходимые данные. Дальше возвращаешь этот индекс обратно на стек.
Здравствуйте, vf, Вы писали:
vf>И плюс, в процессе поиска решения наткнулся на МС-оский CHESS — тоже запускал — не знаю насколько я правильно это сделал — но он проблем тоже не выявил.
Любительская поделка
Мой Relacy Race Detector находит гонку в алгоритме слёту
Если есть желание серьёзно верифицировать, то могу проконсультировать здесь или тут.
Здравствуйте, Jolly Roger, Вы писали:
JR>По-моему, SafeStackItem<T> логичнее было-бы сделать структурой. JR>Посмотрите это и это(pdf) JR>Возможно, будет любопытно.
Той проблемы, которую пытается решить автор, просто нет в C#, т.к. GC её решает.
з.ы. и back-off elimination стек *не* wait-free.
Здравствуйте, remark, Вы писали:
JR>>По-моему, SafeStackItem<T> логичнее было-бы сделать структурой. JR>>Посмотрите это и это(pdf) JR>>Возможно, будет любопытно.
Только если ради любопытства, там та же ABA. Это то, что я имел в виду касательно самопального кода в интернет. Сделать *почти* работающий алгоритм легко, сделать же надёжно работающий, да ещё и быстрый, алгоритм крайне сложно.
Автор решил ABA для основного стека с помощью вспомогательного, но вот во вспомогательном та же самая ABA, которая никак не решена (ввести третьий стек? ).
Более конкретно.
void FreeMemory()
{
for(;;) {
TNode *current = FreePtr;
if (!current)
break;
if (atomic_add(DequeueCount, 0) == 0) {
// node current is in free list, no dequeus were active at least once since thenif (cas(&FreePtr, (TNode*)0, current))
EraseList(current);
} else
break;
}
}
void EraseList(TNode * volatile p)
{
while (p) {
TNode *next = p->Next;
delete p;
p = next;
}
}
Первый поток приходит в FreeMemory(), считывает какой-то current, потом успешно проверяет, что DequeueCount==0, потом засыпает.
Приходит второй поток и освобождает все элементы из freelist.
Далее эти же элементы выделяются повторно, добавляются в стек, удаляются из стека, и попадает опять во freelist.
Далее просыпается первый поток и успешно делает cas(&FreePtr, (TNode*)0, current), и удаляет все элементы, хотя DequeueCount сейчас может быть != 0. Возвращаемся к нашему разбитому корыту.
Здравствуйте, remark, Вы писали:
R>Первый поток приходит в FreeMemory(), считывает какой-то current, потом успешно проверяет, что DequeueCount==0, потом засыпает. R>Приходит второй поток и освобождает все элементы из freelist. R>Далее эти же элементы выделяются повторно, добавляются в стек, удаляются из стека, и попадает опять во freelist. R>Далее просыпается первый поток и успешно делает cas(&FreePtr, (TNode*)0, current), и удаляет все элементы, хотя DequeueCount сейчас может быть != 0. Возвращаемся к нашему разбитому корыту.
Не удивляйтесь, если Яндекс у вас иногда выдаёт 500 Internal Server Error
Здравствуйте, remark, Вы писали:
R>Той проблемы, которую пытается решить автор, просто нет в C#, т.к. GC её решает. R>з.ы. и back-off elimination стек *не* wait-free.
Проблему обращения к освобождённой памяти — да, но проблема создания нового элемента на том-же адресе по-моему всё равно остаётся, разве нет?
Я просто подумал, что автору всё равно переделывать свой алгоритм, и будет полезно ознакомиться с примерами рассуждений на эту тему
Здравствуйте, Jolly Roger, Вы писали:
R>>Той проблемы, которую пытается решить автор, просто нет в C#, т.к. GC её решает. R>>з.ы. и back-off elimination стек *не* wait-free.
JR>Проблему обращения к освобождённой памяти — да, но проблема создания нового элемента на том-же адресе по-моему всё равно остаётся, разве нет?
Нет. Объект-то может создаться по тому же адресу, но это не будет проблемой, т.к. он может там создаться только тогда, когда ни у одного потока не осталось указателей на старый объект (соотв. он не может быть аргументом для CAS), поэтому ABA быть не может.
JR>Я просто подумал, что автору всё равно переделывать свой алгоритм, и будет полезно ознакомиться с примерами рассуждений на эту тему
Это не то, с чем следует ознакамливаться, особенно без комментариев о неработоспособности.
Здравствуйте, remark, Вы писали:
R>Нет. Объект-то может создаться по тому же адресу, но это не будет проблемой, т.к. он может там создаться только тогда, когда ни у одного потока не осталось указателей на старый объект (соотв. он не может быть аргументом для CAS), поэтому ABA быть не может.
Да, посидев и подумав, я уже начал это подозревать — просто не смог придумать сценарий когда-бы это стало проблемой
R>Это не то, с чем следует ознакамливаться, особенно без комментариев о неработоспособности.
Ну зато Вы прокомментировали — всё равно польза
Набравшись наглости, рискну обратиться с просьбой — Вы не могли бы раскритиковать ещё один пример
public class LockFreeStack<T> where T: class
{
class StackItem<T1> where T1 : class
{
public T1 Value;
public volatile StackItem<T1> Next = null;
public StackItem(T1 data) { Value = data; }
}
private volatile int count = 0;
private volatile StackItem<T> Head = null;
public int Count { get { return count; } }
public T Pop()
{
StackItem<T> tmpHead;
do
{
tmpHead = Head;
Thread.MemoryBarrier();
if (tmpHead == null) return null;
if (Interlocked.CompareExchange<StackItem<T>>(
ref Head, tmpHead.Next, tmpHead) == tmpHead)
{
Interlocked.Decrement(ref count);
return tmpHead.Value;
}
}
while (true);
}
public void Push(T data)
{
StackItem<T> item = new StackItem<T>(data);
StackItem<T> tmpHead;
do
{
tmpHead = Head;
item.Next = tmpHead;
Thread.MemoryBarrier();
if (Interlocked.CompareExchange<StackItem<T>>(
ref Head, item, tmpHead) == tmpHead)
{
Interlocked.Increment(ref count);
return;
}
}
while (true);
}
}
Здравствуйте, Jolly Roger, Вы писали:
JR>Набравшись наглости, рискну обратиться с просьбой — Вы не могли бы раскритиковать ещё один пример
Вроде выглядит нормально. Это классический стек, про который известно, что он работает... если тут, конечно, нет никаких глупых опечаток, но вроде я не вижу.
Единственное, вызовы Thread.MemoryBarrier() излишни, но очень дороги. Их надо убрать.
Плюс, я бы убрал переменную count, т.к. пользы от неё обычно мало, а стоимость Push/Pop она удваивает. Кстати, она тут может возвращать -1, что может быть весьма удивительно для вызывающего кода. Если она *действительно* нужна, то я бы перенёс Interlocked.Increment(ref count) в начало метода — обычно лучше вернуть на 1 больше, чем на 1 меньше.
Если уж совсем придираться, то лучше использовать возвращаемое значение из CompareExchange(), т.к. это гарантированно актуальное значение, а дополнительное перечитывание разделяемой переменной может вносить негативный эффект при большой нагрузке.
Но это всё исключительно по производительности, а не по корректности.
В итоге получается что-то типа такого, я думаю это должно работать видимо пошустрее на синтетическом тесте:
public T Pop()
{
StackItem<T> tmpHead = Head;
StackItem<T> tmpHead2;
for (;;)
{
if (tmpHead == null)
return null;
tmpHead2 = Interlocked.CompareExchange<StackItem<T>>(ref Head, tmpHead.Next, tmpHead);
if (tmpHead2 == tmpHead)
return tmpHead.Value;
tmpHead = tmpHead2;
}
}
public void Push(T data)
{
StackItem<T> item = new StackItem<T>(data);
StackItem<T> tmpHead = Head;
StackItem<T> tmpHead2;
for (;;)
{
item.Next = tmpHead;
tmpHead2 = Interlocked.CompareExchange<StackItem<T>>(ref Head, item, tmpHead);
if (tmpHead2 == tmpHead)
return;
tmpHead = tmpHead2;
}
}
}
Здравствуйте, remark, Вы писали:
R>В-третьих, в алгоритме содержится гонка, которая может приводить к падениям, зависаниям или выдаче некорректных результатов. В целом, это та же ABA проблема.
Да... я резко приземлился Спасибо, о таком подвыверте я не подумал.
Вы как ошибку нашли, спомощью своего инструмента или вручную?
R>Добро пожаловать в мир lock-free
Все равно его не брошу, потому что он х.. что то в нем есть
R>Верификация алгоритмов синхронизации — это экстремально сложная задача.
Я поверил, контр-пример отличный. Возвращаясь к своей задаче, где можно взять готовый lock free для .net 3.5, стек или очередь, мне абсолютно не важен порядок (мне нужен pool буферов)? Так чтобы без выделения памяти, не хочеться переносить эту задачу на GC.
Здравствуйте, vf, Вы писали:
R>>В-третьих, в алгоритме содержится гонка, которая может приводить к падениям, зависаниям или выдаче некорректных результатов. В целом, это та же ABA проблема.
vf>Да... я резко приземлился Спасибо, о таком подвыверте я не подумал. vf>Вы как ошибку нашли, спомощью своего инструмента или вручную?
С помощью инструмента. У меня не получается досконально проверять 10^5 перекрытий потоков в секунду. Бесполезно соревноваться
Хотя часть работы остаётся всё равно на пользователе — создание теста, подбор параметров, интерпретация результатов.
R>>Добро пожаловать в мир lock-free
vf>Все равно его не брошу, потому что он х.. что то в нем есть
Здравствуйте, vf, Вы писали:
vf>Я поверил, контр-пример отличный. Возвращаясь к своей задаче, где можно взять готовый lock free для .net 3.5, стек или очередь, мне абсолютно не важен порядок (мне нужен pool буферов)? Так чтобы без выделения памяти, не хочеться переносить эту задачу на GC.
И добавить к нему ABA-counter вместо постоянно создания и разрушения вспомогательных объектов.
У меня получилось что-то типа такого, набирал наскорую руку, поэтому могут быть опечатки, но идея рабочая, и надеюсь понятна — в переменную head встраивается монотонно инкрементируемый при каждом изменении счётчик, и он участвует в CAS, поэтому ABA быть не может — даже если "указатель" (тот, который тут кодируется int'ом) получается одинаковый, ABA-счётчик уже будет отличаться:
struct SafeStackItem<T>
{
public volatile UInt32 Next;
public T Value;
}
class SafeStack<T> where T : class
{
private Int64 head;
private SafeStackItem<T>[] array;
public SafeStack(SafeStackItem<T>[] array, UInt32 pushFrom, UInt32 pushCount)
{
this.array = array;
head = (Int64)(UInt64)pushFrom;
for (UInt32 i = 0; i < pushCount - 1; i++)
array[i + pushFrom].Next = pushFrom + i + 1;
array[pushFrom + pushCount - 1].Next = 0xFFFFFFFF;
}
public UInt32 Pop()
{
UInt64 head1 = (UInt64)head;
for (;;)
{
UInt32 next = (UInt32)(head1 & 0xFFFFFFFF);
UInt64 counter = (head1 & 0xFFFFFFFF00000000);
if (next == 0xFFFFFFFF)
return 0xFFFFFFFF;
UInt32 nextnext = array[next].Next;
UInt64 xchg = (UInt64)nextnext | counter;
UInt64 head2 = (UInt64)Interlocked.CompareExchange(ref head, (Int64)xchg, (Int64)head1);
if (head1 == head2)
return next;
head1 = head2;
}
}
public void Push(UInt32 index)
{
UInt64 head1 = (UInt64)head;
for (;;)
{
UInt32 next = (UInt32)(head1 & 0xFFFFFFFF);
UInt64 counter = (head1 & 0xFFFFFFFF00000000) + (1 << 32);
array[index].Next = next;
UInt64 xchg = (UInt64)index | counter;
UInt64 head2 = (UInt64)Interlocked.CompareExchange(ref head, (Int64)xchg, (Int64)head1);
if (head1 == head2)
return;
head1 = head2;
}
}
}