[пост получился достаточно длинный, поэтому сразу скажу о бонусе — это простой и эффективный алгоритм очереди на основе буфера фиксированного размера — нетерпеливые могут сразу перемещаться к концу поста ]
Последние дни на RSDN шла своего рода распределенная игра по написанию lock-free контейнеров. Вначале vf запостил lock-free стек для .Net: http://www.rsdn.ru/forum/dotnet/3719883.aspx
К чему я это? Что не надо писать lock-free контейнеры? Это смотрите сами, мне всё равно. Это пусть говорят другие, те которые сами не умеют писать lock-free алгоритмы В целом lock-free алгоритмы писать вполне реалистично, единственное надо немного сноровки, ревью коллег, и соответствующие инструменты.
Кстати, по поводу инструментов, тут я не могу не упомянуть в качестве рекламы свой Relacy Race Detector. Если последние 2 ошибки удалось найти при просмотре кода, то первую смог вскрыть только Relacy Race Detector (кстати, если кто следит за такими тулзами, то CHESS от Microsoft не смог её найти — ограничение планировщика потоков).
Однако алгоритм получился достаточно сложный — 6 независимых переменных состояния, плюс достаточно не эффективный — 6 Interlocked инструкций на операцию. Для сравнения аналогичный алгоритм с применением спин-мьютекса будет исполнять по 1 Interlocked инструкций на операцию, и соотв. Будет в 6 раз быстрее в случае низкой конкуренции.
В чём сложность создания таких алгоритмов? В том, что последовательность атомарных операций сама по себе не является атомарной операций, это по-прежнему разрозненная последовательность (в отличие от алгоритмов, основанных на мьютексах). Наибольшую сложность представляют промежуточные состояния структуры данных, которые возникают в процессе выполнения последовательности. Каждое такое состояние должно быть полностью целостным. Целостным в том плане, что если придёт другой поток, то он должен посмотреть на структуру; определить, что это за состояние; понять, что ему делать дальше; и при этом не порушить планы первого потока. Даже хуже — прерываний потоков может быть несколько, т.е. первый поток перевёл структуру в какое промежуточное состояние; потом пришёл второй поток, и тоже выполнил часть операции; потом пришёл третий поток, выполнил ещё несколько незаконченных действий, и т.д. И после всего этого структура всё ещё должна оставаться в понятном консистентном состоянии.
В идеале поток целиком выполняет операцию за одну атомарную операцию, т.е. считывает состояние структуры, проверяет его, вычисляет новое состояние, и пытается атомарно перевести структуру из исходного состояния в новое с помощью CAS (InterlockedCompareExchange). Так работает известный алгоритм lock-free стека: http://www.rsdn.ru/forum/dotnet/3721740.1.aspx
В таком алгоритме нет промежуточных (читай — потенциально проблематичных) состояний, структура либо не модифицирована вообще, либо операция полностью выполнена.
Однако это применимо только для простейших случаев, в более сложных случаях невозможно выполнить операцию за одну атомарную операцию — как в случае с очередью на основе фиксированного буфера, тут надо и записать данные и сместить позицию записи.
Кстати, к вопросу о сложных операциях с множеством промежуточных состояний, есть такая интересная презентация Cliff Click (сейчас он работает в Azul Systems, ранее в Sun – и там и там он был ключевой фигурой в разработке JVM) о реализации полностью lock-free хэш-мап (код доступен в сети): http://www.azulsystems.com/events/javaone_2007/2007_LockFreeHash.pdf
Представьте себе хэш-мап, в котором несколько потоков одновременно добавляют один и тот же элемент, несколько пытаются его считать, несколько удалить, и плюс одновременно с этим происходит увеличение размера таблицы. Каково? Клиф подошёл к вопросу фундаментально — он нарисовал в явном виде стейт-машину для ячейки таблицы; для каждого состояния описал как его можно определить (в смысле понять, что ячейка сейчас находится именно в этом состоянии); для каждой пары (состояние, тип операции (чтение, добавление, удаление)) описал, что потоку делать дальше; и потом реализовал все переходы между состояниями с помощью атомарных CAS.
Это если поверхностно, в реальности там много деталей и есть некоторые негативные моменты, которые он сам же внёс дабы реализовать хэш полностью без блокировок. Например, элементы никогда полностью не удаляются из таблицы, они просто помечаются как удалённые. Как следствие, даже если количество элементов в таблице остаётся постоянным, но происходят вставки и удаления элементов, ему приходится делать периодические фиктивные ресайзы таблицы, что бы «подчистить мусор». Ещё там могут быть проблематичными рекурсивные ресайзы таблицы... ну ладно, это я уже удаляюсь от темы.
Теперь обещанный бонус — алгоритм bounded multi-producer/multi-consumer очереди без блокировок. Контейнер не использует динамического выделения/управления памятью в процессе работы (за исключением изначального выделения фиксированного буфера).
Каждая операция содержит всего по 1 Interlocked инструкции, и как следствие будет достаточно быстрой.
Тут ситуация существенно попроще, чем у Клифа с его таблицей (и это к лучшему). Каждая операция (чтение/запись) состоит из 2 атомарных действий. (1) поток проверяет и резервирует элемент для чтения/записи, и (2) после выполнения фактического чтения/записи, помечает элемент как доступный для следующей операции (записи/чтения соответственно). Тут есть только одно промежуточное состояние — между (1) и (2) — которое должным образом обрабатывается.
Далее собственно код с комментариями:
template<typename T>
class mpmc_bounded_queue
{
public:
mpmc_bounded_queue(size_t buffer_size)
: buffer_(new cell_t [buffer_size])
, buffer_mask_(buffer_size - 1)
{
typedef char assert_nothrow [__has_nothrow_assign(T) || __has_trivial_assign(T) || !__is_class(T) ? 1 : -1];
assert((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0));
for (size_t i = 0; i != buffer_size; i += 1)
buffer_[i].sequence_.store(i, std::memory_order_relaxed);
enqueue_pos_.store(0, std::memory_order_relaxed);
dequeue_pos_.store(0, std::memory_order_relaxed);
}
~mpmc_bounded_queue()
{
delete [] buffer_;
}
bool enqueue(T const& data)
{
cell_t* cell;
// загружаем текущую позицию для добавления в очередь
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
for (;;)
{
// находим текущий элемент
cell = &buffer_[pos & buffer_mask_];
// загружаем статус (sequence) текущего элемента
size_t seq = cell->sequence_.load(std::memory_order_acquire);
intptr_t dif = (intptr_t)seq - (intptr_t)pos;
// элемент готов для записиif (dif == 0)
{
// пытаемся сдвинуть позицию для добавленияif (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
break;
// если не получилось, то начинаем сначала
}
// элемент ещё не готов для записи (очередь полна или типа того)else if (dif < 0)
return false;
// нас кто-то опередил
// перезагружаем текущий элемент и начинаем сначалаelse/* if (dif > 0) */
pos = enqueue_pos_.load(std::memory_order_relaxed);
}
// в данной точке мы зарезервировали элемент для записи
// пишем данные
cell->data_ = data;
// помечаем элемент как готовый для потребления
cell->sequence_.store(pos + 1, std::memory_order_release);
return true;
}
bool dequeue(T& data)
{
cell_t* cell;
// загружаем текущую позицию для извлечения из очереди
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
for (;;)
{
// находим текущий элемент
cell = &buffer_[pos & buffer_mask_];
// загружаем статус (sequence) текущего элемента
size_t seq = cell->sequence_.load(std::memory_order_acquire);
intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
// элемент готов для извлеченияif (dif == 0)
{
// пытаемся сдвинуть позицию для извлеченияif (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
break;
// если не получилось, то начинаем сначала
}
// элемент ещё не готов для потребления (очередь пуста или типа того)else if (dif < 0)
return false;
// нас кто-то опередил
// перезагружаем текущий элемент и начинаем сначалаelse/* if (dif > 0) */
pos = dequeue_pos_.load(std::memory_order_relaxed);
}
// в данной точке мы зарезервировали элемент для чтения
// читаем данные
data = cell->data_;
// помечаем элемент как готовый для следующей записи
cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
return true;
}
private:
struct cell_t
{
std::atomic<size_t> sequence_;
T data_;
};
static size_t const cacheline_size = 64;
typedef char cacheline_pad_t [cacheline_size];
cacheline_pad_t pad0_;
cell_t* const buffer_;
size_t const buffer_mask_;
cacheline_pad_t pad1_;
std::atomic<size_t> enqueue_pos_;
cacheline_pad_t pad2_;
std::atomic<size_t> dequeue_pos_;
cacheline_pad_t pad3_;
mpmc_bounded_queue(mpmc_bounded_queue const&);
void operator = (mpmc_bounded_queue const&);
};
И вот реализация подмножества C++0x std::atomic, необходимого для компиляции очереди (MSVC, x86-32):
раз пошла такая пьянка, можно я свой контейнер запощу?
суть его- один поток может только писать, а много потоков при этом могут читать, контейнер может только увеличиваеться со временем, максимальный размер ограничен параметрами шаблонов, при этом память выделяется блоками как в std::deque, все итераторы дествительные ранее- остаются действительными в дальнейшем, если end() — указывал на элемент за 10м, то и когда контейнер увеличится- он будет указывать на элемент за 10м.
Здравствуйте, Sni4ok, Вы писали:
S>раз пошла такая пьянка, можно я свой контейнер запощу?
S>суть его- один поток может только писать, а много потоков при этом могут читать, контейнер может только увеличиваеться со временем, максимальный размер ограничен параметрами шаблонов, при этом память выделяется блоками как в std::deque, все итераторы дествительные ранее- остаются действительными в дальнейшем, если end() — указывал на элемент за 10м, то и когда контейнер увеличится- он будет указывать на элемент за 10м.
Я ошибок не вижу, по-моему, всё корректно.
Тут достаточно простой паттерн синхронизации, т.к. поток, меняющий данные, только один; нет освобождения памяти во время работы; нет переиспользования объектов.
Единственное могу добавить только про параметры block_size и num_blocks, хотя это не имеет никакого отношения к корректности. Их выбор может быть достаточно проблематичен для пользователя, и вполне возможен существенный перерасход памяти на массив bulks. Такие контейнеры (типа concurrent_vector) зачастую делают следующим образом.
Первичный массив делается маленького размера, вторичные же массивы растут по экспоненте, как степени двойки. Таким образом можно (1) полностью освободить пользователя от подбора параметров, (2) обеспечить перерасход памяти не более N/2 (как это обычно бывает для std::vector), (3) не иметь ограничения на максимальное кол-во элементов (точнее иметь возможность заюзать весь диапазон size_t как для обычных контейнеров).
Более подробно: на 32-битной системе первичный массив имеет размер всего 32, вторичные массивы — 1, 2, 4, 8... На 64-битной системе первичный массив соотв. — 64. Для оптимизации так же обычно объединяют несколько первых массивов в единый блок; т.е. если объединить 3 первых массива, то размеры будут — 7, 8, 16, 32, 64...
Конечно, если речь идёт о единичном использовании и текущая реализация устраивает, то такое изменение не имеет смысла. Если же речь идёт о более широком переиспользовании, то рекомендую.
Здравствуйте, remark, Вы писали:
R>Первичный массив делается маленького размера, вторичные же массивы растут по экспоненте, как степени двойки.
и поллучить bad alloc'и на x86 при очень небольшом колличестве эллементов
вся прелесть вектор'а(это имхо) по сравнению с деком не то, что колличество аллокаций пропорциональна логорифму от размера(хотя это конечно само посебе неплохо, если не учитывать оверхед на неиспользуемую память), а то, что доступ к элементу просто косвенный, в отличае от двойной косвенности для дека, тут от двойной косвенности избавиться нереально(только если сразу выделить память для максимально возможного колличества элементов), поэтому экспоненциальный рост каждого следующего блока- просто вреден.
S>и поллучить bad alloc'и на x86 при очень небольшом колличестве эллементов S>вся прелесть вектор'а(это имхо) по сравнению с деком не то, что колличество аллокаций пропорциональна логорифму от размера(хотя это конечно само посебе неплохо, если не учитывать оверхед на неиспользуемую память), а то, что доступ к элементу просто косвенный, в отличае от двойной косвенности для дека, тут от двойной косвенности избавиться нереально(только если сразу выделить память для максимально возможного колличества элементов), поэтому экспоненциальный рост каждого следующего блока- просто вреден.
не используйте std::deque если у вас фрагментированная память -- обломится.
Как минимум в MS VS, каждый блок деки очень маленький, и объектов туда влазит немного.
Зато список блоков -- этот тот же самый std::vector, вид сбоку.
Я долго не мог понять, на кой хрен деке сотня метров одним куском... пока не залез в исходники.
Здравствуйте, remark, Вы писали:
R>Каждая операция (чтение/запись) состоит из 2 атомарных действий. (1) поток проверяет и резервирует элемент для чтения/записи, и (2) после выполнения фактического чтения/записи, помечает элемент как доступный для следующей операции (записи/чтения соответственно). Тут есть только одно промежуточное состояние — между (1) и (2) — которое должным образом обрабатывается. R>Далее собственно код с комментариями:
Кольцевой буфер -> ограничение на кол-во элементов. Ты же сам на каком-то из форумов, то ли гугловском, то ли ibm-овском, предложил использовать набор массивов, связанных в список:
— head и tail содержат пары (указатель на массив, индекс в массиве),
— head добавляет новый массив в голову списка, если его массив полностью записан,
— tail удаляет свой массив из хвоста списка, если тот полностью прочитан.
Еще ...
R> if (dif == 0)
R> {
R> // пытаемся сдвинуть позицию для добавления
R> if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
R> break;
R> // если не получилось, то начинаем сначалаА тут не надо перезачитать pos заново?
В этой ветке pos не перезачитывается.
R> }
Здравствуйте, remark, Вы писали:
R>[пост получился достаточно длинный, поэтому сразу скажу о бонусе — это простой и эффективный алгоритм очереди на основе буфера фиксированного размера — нетерпеливые могут сразу перемещаться к концу поста ] R> ........
А можно спросить по поводу atomic_uint, что там за "махинации" делаются с load/store и _ReadWriteBarrier? Это для чего, сам не могу пока понять.
Здравствуйте, Sni4ok, Вы писали:
R>>Первичный массив делается маленького размера, вторичные же массивы растут по экспоненте, как степени двойки.
S>и поллучить bad alloc'и на x86 при очень небольшом колличестве эллементов
Каким образом? Тут неиспользуемая память не более N/2, поэтому на небольшом кол-ве элементов нельзя получить bad_alloc.
S>вся прелесть вектор'а(это имхо) по сравнению с деком не то, что колличество аллокаций пропорциональна логорифму от размера(хотя это конечно само посебе неплохо, если не учитывать оверхед на неиспользуемую память), а то, что доступ к элементу просто косвенный, в отличае от двойной косвенности для дека, тут от двойной косвенности избавиться нереально(только если сразу выделить память для максимально возможного колличества элементов), поэтому экспоненциальный рост каждого следующего блока- просто вреден.
Ну почему же вреден. При экспоненциальном росте никогда не будет слишком частых аллокаций памяти, никогда не упрёмся в ограничение по кол-ву элементов, и никогда не будет есть слишком много памяти. В случае же с линейным ростом обязательно будет что-то из этого.
Это при индексации не избежать двойной косвенности, а в случае с итерированием для сегментированного контейнера тривиально сделать одинарную косвенность и общую стоимость итерирования равной итерированию по вектору.
Здравствуйте, AcidTheProgrammer, Вы писали:
R>>[пост получился достаточно длинный, поэтому сразу скажу о бонусе — это простой и эффективный алгоритм очереди на основе буфера фиксированного размера — нетерпеливые могут сразу перемещаться к концу поста ] R>> ........
ATP>А можно спросить по поводу atomic_uint, что там за "махинации" делаются с load/store и _ReadWriteBarrier? Это для чего, сам не могу пока понять.
.
В принципе я не могу сказать, что при удалении там любой конструкции программа может сломаться (потому что некоторые конструкции там избыточные, но тут как в поговорке — кашу маслом не испортишь), но в целом они все нужны, а как без них это сделать?
Здравствуйте, rus blood, Вы писали:
RB>Еще ... RB>
R>> if (dif == 0)
R>> {
R>> // пытаемся сдвинуть позицию для добавления
R>> if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
R>> break;
R>> // если не получилось, то начинаем сначала
RB>А тут не надо перезачитать pos заново?
RB>В этой ветке pos не перезачитывается.
R>> }
RB>
compare_exchange_weak/strong обновляют comparand актуальным значением при неудаче.
Другие интерфейсы теряют либо флаг, либо актуальное значение, а они всегда нужны оба. Это самый вразумительный интерфейс.
Здравствуйте, remark, Вы писали:
S>>и поллучить bad alloc'и на x86 при очень небольшом колличестве эллементов
R>Каким образом? Тут неиспользуемая память не более N/2, поэтому на небольшом кол-ве элементов нельзя получить bad_alloc.
"небольшое колличество" — понятие субьективное, скажем миллиона обьектов по 40 байт — 40 мб, вполне достаточно чтобы начались сыпаться bad alloc'и, при сильной фрагментации памяти.
R>Это при индексации не избежать двойной косвенности, а в случае с итерированием для сегментированного контейнера тривиально сделать одинарную косвенность и общую стоимость итерирования равной итерированию по вектору.
Здравствуйте, remark, Вы писали:
R>Здравствуйте, AcidTheProgrammer, Вы писали:
R>>>[пост получился достаточно длинный, поэтому сразу скажу о бонусе — это простой и эффективный алгоритм очереди на основе буфера фиксированного размера — нетерпеливые могут сразу перемещаться к концу поста ] R>>> ........
ATP>>А можно спросить по поводу atomic_uint, что там за "махинации" делаются с load/store и _ReadWriteBarrier? Это для чего, сам не могу пока понять.
R>Это реализация семантики std::atomic<> для MSVC, все эти махинации для обеспечения атомарности и упорядочивания
. R>В принципе я не могу сказать, что при удалении там любой конструкции программа может сломаться (потому что некоторые конструкции там избыточные, но тут как в поговорке — кашу маслом не испортишь), но в целом они все нужны, а как без них это сделать?
Да я в общем и не утверждаю что не нужны, я как раз интересуюсь в чем здесь тонкие моменты, зачем нужны короче .
Здравствуйте, Sni4ok, Вы писали:
S>>>и поллучить bad alloc'и на x86 при очень небольшом колличестве эллементов
R>>Каким образом? Тут неиспользуемая память не более N/2, поэтому на небольшом кол-ве элементов нельзя получить bad_alloc.
S>"небольшое колличество" — понятие субьективное, скажем миллиона обьектов по 40 байт — 40 мб, вполне достаточно чтобы начались сыпаться bad alloc'и, при сильной фрагментации памяти.
Здравствуйте, AcidTheProgrammer, Вы писали:
ATP>Да я в общем и не утверждаю что не нужны, я как раз интересуюсь в чем здесь тонкие моменты, зачем нужны короче .
Они нужны для обеспечения корректного взаимного упорядочивания обращений к памяти.
Основной паттерн такой — производитель записывает данные в объект, потом записывает указатель на этот объект в разделяемую переменную, потом потребитель считывает указатель на объект из разделяемой переменной, потом считывает данные из объекта. Этот паттерн используется и в очередях производитель-потребитель, и в ленивом многопоточном синглтоне, и в разделяемом хэш-мапе.
В многопоточном окружении *не* гарантируется последовательная консистентность (sequential consistency), т.е. другие потоки могут видеть обращения к памяти не так как они записаны в программе. Причин для неправильного упорядочивания есть 2: первая — переупорядочивания компилятором, вторая — переупорядочивания процессором.
А результате этих переупорядочиваний мы можем получить следующие неприятные ситуации: производитель может записать указатель на объект в разделяемую переменную ещё до записи в него данных и/или потребитель может считать данные из объекта ещё до считывания указателя на объект. В любом случае мы получаем UB.
В алгоритмах многопоточной синхронизации необходимо обеспечивать требуемый взаимный порядок обращений к памяти явно. Для этого служат барьеры памяти (memory barrier, memory fence), в C++0x будет стандартное АПИ для этого, пока же приходится довольствоваться компиляторо-зависимым средствами (в частности _ReadWriteBarrier() подавляет переупорядочивания компилятором вокруг себя).
Вот простой пример (в терминах C++0x):
std::memory_order_release гарантирует (не формально), что все предыдущие обращения к памяти будут завершены до данного сохранения.
std::memory_order_acquire гарантирует, что все последующие обращения к памяти будут начаты только после завершения данной загрузки.
Здравствуйте, remark, Вы писали:
R>[пост получился достаточно длинный ... ]
Дык, не мудренно! Это же почти статья.
Уважаемый! Надо было оформить ее в нашем шаблоне и прислать на сабмит. А то ведь потеряется через некоторое время. А в виде статьи ею будут еще долго люди пользоваться и тебя благодарными словами вспоминать.
Есть логика намерений и логика обстоятельств, последняя всегда сильнее.
Здравствуйте, VladD2, Вы писали:
R>>[пост получился достаточно длинный ... ]
VD>Дык, не мудренно! Это же почти статья.
VD>Уважаемый! Надо было оформить ее в нашем шаблоне и прислать на сабмит. А то ведь потеряется через некоторое время. А в виде статьи ею будут еще долго люди пользоваться и тебя благодарными словами вспоминать.
Здравствуйте, VladD2, Вы писали:
R>>[пост получился достаточно длинный ... ]
VD>Дык, не мудренно! Это же почти статья.
VD>Уважаемый! Надо было оформить ее в нашем шаблоне и прислать на сабмит. А то ведь потеряется через некоторое время. А в виде статьи ею будут еще долго люди пользоваться и тебя благодарными словами вспоминать.
В очередной раз скачал AuthPack, теперь Avira находит в нём некий ADSPY/MDH.A.60 — стрёмно...
К тому же не вижу заявленной поддержки OpenOffice, а покупать из-за этого MSOffice...
Здравствуйте, VladD2, Вы писали:
NB>>а это будет считаться публикацией в ВАК журнале?
VD>Публикацией конечно считаться будет. Вопрос только в том, что ценность у нее будет сугубо практическая (т.е. не очень научная).
VD>Вообще публикация есть публикация. Что значит считаться или нет?
ну я краем уха слышал, что для получения к.ф.м.н. нужно сколько-то публикаций.
так и для Дмитрия польза (если решит в аспирантуру пойти), и для народа.