Покритикуйте пожалуйста, будет ли работать такая реализация, какие возможны проблемы?
По идее это multi-producer/multi-consumer очередь, но уж очень просто получилось, где подвох?
lock-free структуры — тема для меня совсем новая, заранее огромное спасибо за ответы!
Здравствуйте, afkos, Вы писали:
A>Покритикуйте пожалуйста, будет ли работать такая реализация, какие возможны проблемы? A>По идее это multi-producer/multi-consumer очередь, но уж очень просто получилось, где подвох? A>lock-free структуры — тема для меня совсем новая, заранее огромное спасибо за ответы!
Во-первых, очередь формально не lock-free. Если поток будет прерван между Decrement() и Increment(), то он может остановить системный прогресс:
A>
A> T * Pop()
A> {
A> if(m_Count.Decrement() < 0)
A> {
A> m_Count.Increment();
A> return 0;
A> }
A>
Например тут: A> unsigned long i = ((unsigned long)m_First.Increment() — 1) % m_BufferSize; A> return m_Buffer[i];
Представь потребитель получил какой-то индекс i, и дальше заблокировался. В это время очередь полностью освободилась и заполнилась заново. Теперь наш потребитель просыпается и считывает из m_Buffer[i] совсем не то, что он должен был вернуть.
Или опять же вот этот момент: A>
A> T * Pop()
A> {
A> if(m_Count.Decrement() < 0)
A> {
A> m_Count.Increment();
A> return 0;
A> }
A>
Допустим m_Count==0. Потребитель 1 выполняет декремент, соотв. m_Count==-1, теперь этот потребитель вытесняется ОС (т.е. инкремент он ещё не сделал). Теперь производитель кладёт элемент в очередь и увеличивает m_Count до 0. Теперь приходит потребитель 2 и выполняет декремент, m_Count опять становится равен -1, и потребитель 2 возвращает 0 вместо того, что бы вернуть элемент, который лежит в очереди. Теперь просыпается наш первый потребитель и инкрементирует счётчик до 1, но возвращает опять же 0. В общем случае это может привести к дедлоку системы, в очереди лежит элемент, но потребители не могут его вернуть.
Или допустим тут: A>
A> void Push(T * node)
A> {
A> unsigned long i = ((unsigned long)m_Last.Increment() - 1) % m_BufferSize;
A> m_Buffer[i] = node;
A> m_Count.Increment();
A> }
A>
Допустим изначально очередь пустая. Производитель 1 инкрементирует m_Last до 1, и засыпает (т.е. ещё не записал элемент в очередь). Теперь производитель 2 инкрементирует m_Last до 2, записывает свой элемент в позицию 1 очереди, и инкрементирует m_Count. Теперь приходит потребитель, видит, что в очереди лежит элемент (m_Count==1), но возвращает он элемент не из позиции 1 (где лежит реальный элемент), а из позиции 0 (где лежит мусор).
Я думаю, можно не продолжать. Да, всё действительно не так просто, как кажется на первый взгляд. Ты используешь AtomicLong, но он магическим образом не делает целые операции атомарными, он делает атомарным только одно изменение одной переменной. Тебе же нужно, что бы целые операции были атомарными, т.е. в одно неделимое действие инкрементировать m_Last, записать элемент в массив, и инкрементировать m_Count. Либо же все остальные потоки должны быть готовы к тому, что например, m_Last инкрементирован, а элемент в массив ещё не записан, и соотв. обрабатывать такие все такие ситуации.
Если ты действительно всерьёз хочешь этим заниматься и использовать такие алгоритмы в продакш-коде, то очень рекомендую использовать библиотеку Relacy Race Detector: http://groups.google.com/group/relacy
Я разработал её специально для верификации таких алгоритмов, она бы на раз вскрыла все эти гонки... и, я думаю, больше. В общем случае у меня не получается с ней соревноваться по качеству и скорости проверки алгоритмов.
Здравствуйте, remark, Вы писали:
R>Если ты действительно всерьёз хочешь этим заниматься и использовать такие алгоритмы в продакш-коде, то очень рекомендую использовать библиотеку Relacy Race Detector: R>http://groups.google.com/group/relacy R>Я разработал её специально для верификации таких алгоритмов, она бы на раз вскрыла все эти гонки... и, я думаю, больше. В общем случае у меня не получается с ней соревноваться по качеству и скорости проверки алгоритмов.
Здравствуйте, afkos, Вы писали:
A>Покритикуйте пожалуйста, будет ли работать такая реализация, какие возможны проблемы?
vector — потоко-небезопасен, следовательно при одновременном обращении из разных потоков не гарантирует консистентное состояние.
Выводы можешь сделать сам.
Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>Здравствуйте, afkos, Вы писали:
A>>Покритикуйте пожалуйста, будет ли работать такая реализация, какие возможны проблемы?
ATP>vector — потоко-небезопасен, следовательно при одновременном обращении из разных потоков не гарантирует консистентное состояние. ATP>Выводы можешь сделать сам.
Я не думаю, что какая-либо здравая реализация вектора может создать тут проблемы. Его просто сложно реализовать т.ч. он мог создать тут проблемы. Фактически он тут используется просто как С-массив, т.е. "кусок памяти".
Например реализация MSVC явно гарантирует, что std::vector безопасен для чтения из разных потоков.
Здравствуйте, remark, Вы писали:
R>Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>>Здравствуйте, afkos, Вы писали:
A>>>Покритикуйте пожалуйста, будет ли работать такая реализация, какие возможны проблемы?
ATP>>vector — потоко-небезопасен, следовательно при одновременном обращении из разных потоков не гарантирует консистентное состояние. ATP>>Выводы можешь сделать сам.
R>Я не думаю, что какая-либо здравая реализация вектора может создать тут проблемы. Его просто сложно реализовать т.ч. он мог создать тут проблемы. Фактически он тут используется просто как С-массив, т.е. "кусок памяти". R>Например реализация MSVC явно гарантирует, что std::vector безопасен для чтения из разных потоков.
R>
Согласен, скорее всего все будет ок.
Но в приведенном коде, насколько я понял не учитывается максимальный размер данных, следовательно пихающий поток может обернуться несколько раз через массив, пока читающий ждет, а вот это уже точно потеря консистентности данных.
Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>Согласен, скорее всего все будет ок. ATP>Но в приведенном коде, насколько я понял не учитывается максимальный размер данных, следовательно пихающий поток может обернуться несколько раз через массив, пока читающий ждет, а вот это уже точно потеря консистентности данных.
Это не проблема вектора. Там много чего не учитывается, и те же самые проблемы будут даже если вместо вектора использовать статически выделенный блок памяти.
Здравствуйте, remark, Вы писали:
R>Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>>Согласен, скорее всего все будет ок. ATP>>Но в приведенном коде, насколько я понял не учитывается максимальный размер данных, следовательно пихающий поток может обернуться несколько раз через массив, пока читающий ждет, а вот это уже точно потеря консистентности данных.
R>Это не проблема вектора. Там много чего не учитывается, и те же самые проблемы будут даже если вместо вектора использовать статически выделенный блок памяти.
R>
Да я понимаю, просто чего человек хотел добиться непонятно. Интерфейс неполон. Гадать без автора не хотелось бы.
Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>Здравствуйте, remark, Вы писали:
R>>Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>>>Согласен, скорее всего все будет ок. ATP>>>Но в приведенном коде, насколько я понял не учитывается максимальный размер данных, следовательно пихающий поток может обернуться несколько раз через массив, пока читающий ждет, а вот это уже точно потеря консистентности данных.
R>>Это не проблема вектора. Там много чего не учитывается, и те же самые проблемы будут даже если вместо вектора использовать статически выделенный блок памяти.
ATP> Да я понимаю, просто чего человек хотел добиться непонятно. Интерфейс неполон. Гадать без автора не хотелось бы.
А что не понятно? Очередь fixed-size, вектору один раз в конструкторе установили размер, дальше он используется просто как кусок памяти.
В целом, конечно, было бы лучше закрыть компирование и использовать просто std::auto_ptr<T*>.
Здравствуйте, remark, Вы писали:
R>Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>>Здравствуйте, remark, Вы писали:
R>>>Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>>>>Согласен, скорее всего все будет ок. ATP>>>>Но в приведенном коде, насколько я понял не учитывается максимальный размер данных, следовательно пихающий поток может обернуться несколько раз через массив, пока читающий ждет, а вот это уже точно потеря консистентности данных.
R>>>Это не проблема вектора. Там много чего не учитывается, и те же самые проблемы будут даже если вместо вектора использовать статически выделенный блок памяти.
ATP>> Да я понимаю, просто чего человек хотел добиться непонятно. Интерфейс неполон. Гадать без автора не хотелось бы.
R>А что не понятно? Очередь fixed-size, вектору один раз в конструкторе установили размер, дальше он используется просто как кусок памяти. R>В целом, конечно, было бы лучше закрыть компирование и использовать просто std::auto_ptr<T*>.
R>
Непонятно как такою очередь можно использовать не задав максимально возможный размер. Очередь ведь не должна перезатирать ранее положенные в нее элементы.
Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>Непонятно как такою очередь можно использовать не задав максимально возможный размер. Очередь ведь не должна перезатирать ранее положенные в нее элементы.
Хороший вопрос.
Раз очередь фиксированного размера, то вариантов не много. Можно либо блокировать производителя, пока не освободится место. Либо возвращать ошибку, как в функции Pop(). Либо перетирать старый элемент (самый старый/самый новый/произвольный).
В любом случае, так как оно сейчас реализованно — работать не будет:
1. Не синхронизованны инкремент/десремент и вставка/удаление.
2. Непонятно кто управляет временем жизни объектов.
Здравствуйте, remark, Вы писали:
R>Я думаю, можно не продолжать. Да, всё действительно не так просто, как кажется на первый взгляд. Ты используешь AtomicLong, но он магическим образом не делает целые операции атомарными, он делает атомарным только одно изменение одной переменной. Тебе же нужно, что бы целые операции были атомарными, т.е. в одно неделимое действие инкрементировать m_Last, записать элемент в массив, и инкрементировать m_Count. Либо же все остальные потоки должны быть готовы к тому, что например, m_Last инкрементирован, а элемент в массив ещё не записан, и соотв. обрабатывать такие все такие ситуации.
R>Если ты действительно всерьёз хочешь этим заниматься и использовать такие алгоритмы в продакш-коде, то очень рекомендую использовать библиотеку Relacy Race Detector: R>http://groups.google.com/group/relacy R>Я разработал её специально для верификации таких алгоритмов, она бы на раз вскрыла все эти гонки... и, я думаю, больше. В общем случае у меня не получается с ней соревноваться по качеству и скорости проверки алгоритмов.
Благодарю за столь развернутый ответ и отдельное спасибо за ссылки, очень интересный материал.
To AcidTheProgrammer:
Это не production код, только маленький эксперимент.
Конечно, std::vector используется только как buffer, и как уже заметил Remark, не будет никакой разницы, если его заменить на обычный массив
В целом на все вопросы Remark уже ответил
Тогда, раз уж так всё хорошо разрешилось у меня есть вопрос к remark-у:
дело в том что я по роду своей деятельности также активно занимаюсь созданием многопоточных примитивов. Может быть вы подскажите как на платформе Intel x86 — реализуются примитивы типа LL/SC?
Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>Тогда, раз уж так всё хорошо разрешилось у меня есть вопрос к remark-у: ATP>дело в том что я по роду своей деятельности также активно занимаюсь созданием многопоточных примитивов. Может быть вы подскажите как на платформе Intel x86 — реализуются примитивы типа LL/SC?
На x86 для выполнения атомарных RMW (read-modify-write) операций используется инструкция LOCK CMPXCHG, которая атомарно выполняет следующее:
В некотором смысле она мощнее LL/SC, т.к. возвращает текущее актуальное значение; но с другой стороны она и слабее, т.к. допускает ABA проблему (т.е. текущее значение могло измениться со времени загрузки, но так получилось что оно равно загруженному — CMPXCHG в этом случае успешно выполнится, тогда как LL/SC провалится).
Для частных случаев можно использовать частные команды — LOCK XADD (загрузить и прибавить), LOCK ADD (прибавить), LOCK OR (или), LOCK AND (и), LOCK XCHG (загрузить и сохранить) и др. В целом они несколько быстрее, чем цикл с CMPXCHG.
Но вообще для таких операций обычно есть интринсики компилятора. Например для CMPXCHG это _InterlockedCompareExchange() в MSVC (intrin.h), __sync_compare_exchange_val() в gcc (built-in), atomic_cas_32 в сс на Solaris (atomic.h).
Здравствуйте, remark, Вы писали:
R>Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>>Тогда, раз уж так всё хорошо разрешилось у меня есть вопрос к remark-у: ATP>>дело в том что я по роду своей деятельности также активно занимаюсь созданием многопоточных примитивов. Может быть вы подскажите как на платформе Intel x86 — реализуются примитивы типа LL/SC?
R>На x86 для выполнения атомарных RMW (read-modify-write) операций используется инструкция LOCK CMPXCHG, которая атомарно выполняет следующее: R>
R>В некотором смысле она мощнее LL/SC, т.к. возвращает текущее актуальное значение; но с другой стороны она и слабее, т.к. допускает ABA проблему (т.е. текущее значение могло измениться со времени загрузки, но так получилось что оно равно загруженному — CMPXCHG в этом случае успешно выполнится, тогда как LL/SC провалится).
Да, нет — все понятно. Если бы всё было бы так просто я бы и не спрашивал. То что вы описываете (CMPXCHG) это и есть в обобщенном виде СAS операция. Собственно с ABA проблеммой я и борюсь. Насчет мощьности с вами полностью не согласен. Опять же из-за той самой ABA-проблеммы и из-за того что через LL/SC примитивы CAS без проблем выражается, а вот наоборот... в этом и есть мой вопрос — как выразить LL/SC примитивы через те которые поддерживаются x86.
Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>Да, нет — все понятно. Если бы всё было бы так просто я бы и не спрашивал. То что вы описываете (CMPXCHG) это и есть в обобщенном виде СAS операция. Собственно с ABA проблеммой я и борюсь. Насчет мощьности с вами полностью не согласен. Опять же из-за той самой ABA-проблеммы и из-за того что через LL/SC примитивы CAS без проблем выражается, а вот наоборот... в этом и есть мой вопрос — как выразить LL/SC примитивы через те которые поддерживаются x86.
А, ты это имеешь в виду. Ну вобщем-то... никак. В смысле никак, что нет никакой магической инструкции.
Самый простой способ это обойти — это использовать IBM-тэги (IBM ABA-prevention tags), т.е. к указателю прилепляется монотонно инкрементируемый при каждом изменении счётчик, который участвует в CAS операции. Но тут проблема, что не на всех ранних х86-64 процессорах был 128-битный CAS. Для обхода этого был разработан ряд техник, таких как pointer-packing или allocation from pool.
Но мне лично, честно говоря, вообще такой подход с "затыканием" ABA не очень нравится, т.к. основной проблемы он не решает. А основная проблема — это управление временем жизни объектов. Если она решена, то и ABA по-определению нет.
А для решения проблемы управления временем жизни есть ряд алгоритмов, таких как Hazard Pointers (SMR), Read-Copy-Update (RCU), Proxy-Collector, VZOOM, и различные модификации и комбинации методов (SMR+RCU).
В частности вот ряд алгоритмов.
Амортизированный Proxy-Collector: http://groups.google.com/group/lock-free/browse_frm/thread/18a9f43b93a6b3f0
Scalable distributed reference counting: http://groups.google.com/group/lock-free/browse_frm/thread/46d00a0f543a758e
В архиве hash_map.zip есть реализация ещё одного Proxy-Collector и пример импользования: http://groups.google.com/group/lock-free/files
Да я читал про все эти прибамбасы — уныние сплошное. Не так все это просто элегантно как с LL/SC.
А как на 32-х битной x86 правильно атомарно заменить сразу два слова?