Помогите с многопоточным кодом
От: vsb Казахстан  
Дата: 05.09.22 17:36
Оценка:
Функция должна возвращать два значения: long x, int y. x это текущее время в миллисекундах. y это случайное число. Если вызывать функцию два раза подряд, то должно соблюдаться условие (x1, y1) < (x2, y2). Это, конечно, очевидно, если вызовы идут из одного потока. Если из двух потоков, то нужно, чтобы это условие сочеталось со стандартным java happens before (т.е. если потоки никак не связаны друг с другом, то как получится, если есть какие-то синхронизации между потоками, то должно условие соблюдаться). Ну и понятно, что оно должно работать как можно быстрей.

Я написал 3 версии функции. Первая очевидно правильная, но через sychronized, что слишком сильно для данных требований. Вторая и третья через volatile и atomic соответственно. Хочется понять, правильно ли я их написал и какой вариант лучше.


    record Pair(long x, int y) {
    }

    private static final Random random = new Random();

    private static long lastSynchronizedX;
    private static int lastSynchronizedY;

    static synchronized Pair synchronizedNext() {
        while (true) {
            int y = random.nextInt();
            long x = System.currentTimeMillis();
            if (x > lastSynchronizedX || y > lastSynchronizedY) {
                lastSynchronizedX = x;
                lastSynchronizedY = y;
                return new Pair(x, y);
            }
        }
    }

    private static volatile long lastVolatileX;
    private static volatile int lastVolatileY;

    static Pair volatileNext() {
        while (true) {
            int y = random.nextInt();
            long x = System.currentTimeMillis();
            if (x > lastVolatileX || y > lastVolatileY) {
                lastVolatileX = x;
                lastVolatileY = y;
                return new Pair(x, y);
            }
        }
    }

    private static final AtomicLong lastAtomicX = new AtomicLong();
    private static final AtomicInteger lastAtomicY = new AtomicInteger();

    static Pair atomicNext() {
        while (true) {
            long x = System.currentTimeMillis();
            int y = random.nextInt();
            long lx = lastAtomicX.get();
            int ly = lastAtomicY.get();
            if (x > lx || y > ly) {
                if (lastAtomicX.compareAndSet(lx, x)) {
                    if (lastAtomicY.compareAndSet(ly, y)) {
                        return new Pair(x, y);
                    }
                }
            }
        }
    }
Re: Помогите с многопоточным кодом
От: · Великобритания  
Дата: 05.09.22 18:00
Оценка:
Здравствуйте, vsb, Вы писали:

vsb>Функция должна возвращать два значения: long x, int y. x это текущее время в миллисекундах. y это случайное число. Если вызывать функцию два раза подряд, то должно соблюдаться условие (x1, y1) < (x2, y2). Это, конечно, очевидно, если вызовы идут из одного потока. Если из двух потоков, то нужно, чтобы это условие сочеталось со стандартным java happens before (т.е. если потоки никак не связаны друг с другом, то как получится, если есть какие-то синхронизации между потоками, то должно условие соблюдаться). Ну и понятно, что оно должно работать как можно быстрей.


vsb>Я написал 3 версии функции. Первая очевидно правильная, но через sychronized, что слишком сильно для данных требований. Вторая и третья через volatile и atomic соответственно. Хочется понять, правильно ли я их написал и какой вариант лучше.

В третьей я вроде не обнаружил проблему, вроде эквивалентно первой.
Во второй что-то подозрительно

// lastVolatileX = 99, lastVolatileY=5
 while (true) {
            int y = random.nextInt(); // thread1.y=10, thread2.y=5
            long x = System.currentTimeMillis(); //thread1.x=99, thread2.x = 100
            if (x > lastVolatileX || y > lastVolatileY) {// x > lastVolatileX == true in thread2  || y > lastVolatileY == true in thread1
                lastVolatileX = x;//race
                lastVolatileY = y;//race
                return new Pair(x, y);//thread1 returns (99, 10), thread2 returns (100, 5)
            }
        }
но это не зря, хотя, может быть, невзначай
гÅрмония мира не знает границ — сейчас мы будем пить чай
Отредактировано 05.09.2022 18:01 · . Предыдущая версия .
Re: Помогите с многопоточным кодом
От: StanislavK Великобритания  
Дата: 05.09.22 19:32
Оценка:
Здравствуйте, vsb, Вы писали:

vsb>Функция должна возвращать два значения: long x, int y. x это текущее время в миллисекундах. y это случайное число. Если вызывать функцию два раза подряд, то должно соблюдаться условие (x1, y1) < (x2, y2). Это, конечно, очевидно, если вызовы идут из одного потока. Если из двух потоков, то нужно, чтобы это условие сочеталось со стандартным java happens before (т.е. если потоки никак не связаны друг с другом, то как получится, если есть какие-то синхронизации между потоками, то должно условие соблюдаться). Ну и понятно, что оно должно работать как можно быстрей.

Не понятно как это может работать как можно быстрее и ваще хоть как-то работать. Учитывая условие задачи это функция все равно не будет возвращать значения быстрее чем раз в миллисекунду, так как условие y > prevY будет истинно раз 20-30, если повезет. Так что ставьте synchronized.
Re[2]: Помогите с многопоточным кодом
От: vsb Казахстан  
Дата: 05.09.22 19:33
Оценка:
Что-то у меня вариант с synchronized в итоге работает в полтора раза быстрей варианта с атомиками (на 8 потоках Apple M1). Моя мультипоточная интуиция вообще не сработала.
Re[2]: Помогите с многопоточным кодом
От: vsb Казахстан  
Дата: 05.09.22 19:33
Оценка:
Здравствуйте, StanislavK, Вы писали:

SK>Не понятно как это может работать как можно быстрее и ваще хоть как-то работать. Учитывая условие задачи это функция все равно не будет возвращать значения быстрее чем раз в миллисекунду, так как условие y > prevY будет истинно раз 20-30, если повезет. Так что ставьте synchronized.


Не, у меня там в реальном коде немного по-другому, в это не упирается.

Если интересно — полный код, но там специфика UUID формата. Генерацию случайного числа я заменил на счетчик.

public class AscUuidGen {
    private static final ThreadLocal<SecureRandom> secureRandom = ThreadLocal.withInitial(SecureRandom::new);
    private static final AtomicLong timeMillis = new AtomicLong();
    private static final AtomicInteger counter = new AtomicInteger();

    public static UUID next() {
        long timeMillis2;
        int counter2;

        while (true) {
            long timeMillis1 = timeMillis.get();
            timeMillis2 = currentTimeMillis();
            if (timeMillis2 < timeMillis1) {
                continue;
            }

            int counter1 = counter.get();
            counter2 = timeMillis1 == timeMillis2 ? counter1 + 1 : 0;
            if (counter2 >= 0x10_00) {
                continue;
            }

            if (!timeMillis.compareAndSet(timeMillis1, timeMillis2)) {
                continue;
            }
            if (!counter.compareAndSet(counter1, counter2)) {
                continue;
            }

            break;
        }

        long lsb1 = secureRandom.get().nextLong(0x40_00_00_00_00_00_00_00L);

        long msb = timeMillis2 << 16 | 0x40_00 | counter2;
        long lsb = 0x80_00_00_00_00_00_00_00L | lsb1;
        return new UUID(msb, lsb);
    }
}


import java.security.SecureRandom;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.Runtime.getRuntime;
import static java.lang.System.currentTimeMillis;

public class AscUuidGen {
    private static final ThreadLocal<SecureRandom> secureRandom = ThreadLocal.withInitial(SecureRandom::new);
    private static long timeMillis;
    private static int counter;

    public static synchronized UUID next() {
        long currentTimeMillis;
        int currentCounter;

        do {
            currentTimeMillis = currentTimeMillis();
            assert currentTimeMillis >= timeMillis;
            currentCounter = timeMillis == currentTimeMillis ? counter + 1 : 0;
        } while (currentCounter >= 0x10_00);

        timeMillis = currentTimeMillis;
        counter = currentCounter;

        long lsb1 = secureRandom.get().nextLong(0x40_00_00_00_00_00_00_00L);

        long msb = currentTimeMillis << 16 | 0x40_00 | currentCounter;
        long lsb = 0x80_00_00_00_00_00_00_00L | lsb1;
        return new UUID(msb, lsb);
    }

    // test code

    private static final AtomicLong speed = new AtomicLong();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < getRuntime().availableProcessors(); i++) {
            new Thread(AscUuidGen::main1).start();
        }
        long speed1 = 0;
        while (true) {
            Thread.sleep(1000);
            long speed2 = speed.get();
            System.out.println(speed2 - speed1);
            speed1 = speed2;
        }
    }

    public static void main1() {
        UUID uuid1 = next();
        while (true) {
            UUID uuid2 = next();
            if (uuid1.compareTo(uuid2) >= 0) {
                System.out.println("error");
            }
            speed.incrementAndGet();
        }
    }
}


Первый вариант выдаёт 2 млн значений в секунду, второй вариант выдаёт около 3, формат ограничивает 4 млн в секунду.
Отредактировано 05.09.2022 19:46 vsb . Предыдущая версия . Еще …
Отредактировано 05.09.2022 19:37 vsb . Предыдущая версия .
Отредактировано 05.09.2022 19:36 vsb . Предыдущая версия .
Re[3]: Помогите с многопоточным кодом
От: StanislavK Великобритания  
Дата: 05.09.22 19:36
Оценка:
SK>>Не понятно как это может работать как можно быстрее и ваще хоть как-то работать. Учитывая условие задачи это функция все равно не будет возвращать значения быстрее чем раз в миллисекунду, так как условие y > prevY будет истинно раз 20-30, если повезет. Так что ставьте synchronized.

vsb>Не, у меня там в реальном коде немного по-другому, в это не упирается.


тогда и надо реальный код постить. С такими задачами важно, что именно делается.
Re[3]: Помогите с многопоточным кодом
От: maxkar  
Дата: 06.09.22 12:28
Оценка: 18 (1)
Здравствуйте, vsb, Вы писали:

vsb>Здравствуйте, StanislavK, Вы писали:


vsb>Если интересно — полный код, но там специфика UUID формата.

Этот код может генерировать дубликаты в части (timeMillis, counter). Три потока. Пусть в глобальных переменных (tM = 125, c = 0). Два потока (A и B) начинают выполняться одновременно в t=125. Оба читают переменные, проходят tM.compareAndSet(125, 125). Поток A продолжает выполнение и (успешно) делает counter.compareAndSet(0, 1). А вот поток B вытесняется потоком C. Поток C выполняется уже в t=126 и после потока A. Он успешно видит (tM=125, c=1), полностью выполняется и приводит к состоянию (tM=126, c=0> С этой точки поток B продолжает выполнение counter.compareAndSet(0, 1). Условие соответствует, так что код выполняется дальше. В результате и поток A, и поток B сгенерировали (125, 0) в качестве результата цикла.

Чтобы поправить атомики, нужно сделать AtomicReference и объект с двумя полями — для времени и для счетчика. Вот тогда на целом объекте нужно делать Compare and Set, операция получится атомарная. Или в объекте сделать свой AtomicInt и использовать его при совпадении метки времени (для другой метки времени будет свой AtomicInteger).

vsb>Первый вариант выдаёт 2 млн значений в секунду, второй вариант выдаёт около 3.

Там слишком большой contention получается в случае continue (и особенно — переполнения счетчика). Ему бы Thread.sleep(1) вставить.
Re[4]: Помогите с многопоточным кодом
От: vsb Казахстан  
Дата: 06.09.22 12:55
Оценка:
Здравствуйте, maxkar, Вы писали:

M>Этот код может генерировать дубликаты в части (timeMillis, counter)


В принципе это не страшно, вторая часть заполняется случайными значениями, там совпадение практически невозможно. Но спасибо, всё же этот сценарий я хотел избежать и проглядел.

M>Чтобы поправить атомики, нужно сделать AtomicReference и объект с двумя полями — для времени и для счетчика. Вот тогда на целом объекте нужно делать Compare and Set, операция получится атомарная. Или в объекте сделать свой AtomicInt и использовать его при совпадении метки времени (для другой метки времени будет свой AtomicInteger).


AtomicReference это понятно, но это лишнее выделение объекта на каждой операции, этого хотелось избежать. С AtomicReference всё просто — через update пишешь и готово, насколько я понимаю. Хотя может тут я тоже не к месту заморачиваюсь, надо проверить для общего развития.

M>Там слишком большой contention получается в случае continue (и особенно — переполнения счетчика). Ему бы Thread.sleep(1) вставить.


Там счётчик не переполняется по факту, по крайней мере в моих экспериментах.

В общем сделал через synchronized, мне такой скорости за глаза. Почему-то думал, что будет на порядок медленней.

Наверное можно ещё сделать через ThreadLocal переменные и не заморачиваться тем, что в разных потоках будут получаться одинаковые (timeMillis, counter), попробую ради интереса этот вариант, сравнить скорость.
Отредактировано 06.09.2022 13:04 vsb . Предыдущая версия . Еще …
Отредактировано 06.09.2022 13:00 vsb . Предыдущая версия .
Отредактировано 06.09.2022 12:59 vsb . Предыдущая версия .
Re[5]: Помогите с многопоточным кодом
От: maxkar  
Дата: 06.09.22 18:35
Оценка: 21 (2)
Здравствуйте, vsb, Вы писали:

vsb>В общем сделал через synchronized, мне такой скорости за глаза. Почему-то думал, что будет на порядок медленней.

Я тоже сделал тесты. У меня для вас плохие новости. Все гораздо сложнее, чем кажется на первый взгляд. Давайте по-порядку.

Тесты на JMH (я хотел его освоить для других проектов, вы дали хороший повод) доступны здесь. Извините, sbt. Я его знаю лучше. А еще он у меня есть! (А вот gradle, maven и прочего — нет). Весь код на Java, находится в src. Параметры запуска и результаты — в random.txt/urandom.txt, еще небольшое описание в README.txt (можно не читать, там примерно то же, что в этом сообщении).

Собственно, я экспериментировал с различными расположениями synchronized а также с вариантами атомиков. На коротких тестах (я проверял, что алгоритмы вообще работают) Synchronized везде оказался в 1.5 раза лучше. При этом перемещение SecureRandom вверх давало небольшой прирост скорости (uid2RndFirst). Я еще игрался с nanoTime — оно может иметь разрешение лучше, чем currentTimeMillis(). И само по себе вроде бы быстрее в Linux, но скорость компенсируется необходимостью делать дополнительную математику. Отладил я тесты, запустил jmh с настройками по-умолчанию и отправился по своим делам. Вернулся, посмотрел на процесс и ужаснулся. Отдельные запуски провалились по скорости в 5 раз (с двух миллионов до четырехсот тысяч). При этом "отдельные запуски" — это вычисление в течение 10 секунд. На таком разбросе считать "среднюю прозиводительность" как-то не очень правильно. На машине ничего тяжелого запущено не было, списать на это не получится. И тут-то я понял, что мы измеряем что-то не то.

Фактически вы в ваших тестах (и я при запуске тестов по умолчанию) измеряем не скорость вашего алгоритма. Мы измеряем быстродействие (throughput) у SecureRandom. По-умолчанию он ходит в /dev/random, который может блокироватсья при недостатке энтропии. Поэтому-то и проваливалось быстродействие, когда я был не за компьютером — нет энтропии (нажатия клавиш, движения мышки). И вот здесь оказывается на руку тот факт, что synchronized — тяжелый. Что такое атомик? В сущности, это просто чтение/запись в память специальными инструкциями процессора. Synchrnoized делает гораздо больше. Да, в простом случае там тоже spin-lock, но потом он переделывается в мютекс операционной системы, планировщик ОС уведомляется о том, что поток в ожидании и т.д. В общем, движуха. А движуха — это энтропия, которой нам так не хватает! Решение synchronized побеждает не потому, что оно лучше. Нет! Оно побеждает как раз потому, что оно гораздо хуже!

Если переконфигурировать JVM на использование неблокирующего шума (/dev/urandom), ситуация в корне меняется. Атомики и некоторые (они короткие по коду, скорее всего разруливаются спинлоком без вызовов OS) варианты синхронизации приближаются к теоретическому максимуму в четыре миллиона. Ну и начинают есть 100% CPU (предыдущие итерации — нет, они блокируются на random и загрузка CPU где-то в районе 20-40% всего). И в целом атомики выигрывают у synchronized аж в 8 раз! И казалось бы, справедливость восторжествовала! Но и здесь не все так гладко...

У нас ведь раньше synchronized давал 3/4 теоретической пропускной способности. Так как мы его аж в 8 раз обогнали? Так вот, оказывается, synchronized-решения (которые с "большой" синхронизацией) теперь стали работать в 6 раз медленне . Т.е. мы вроде-бы заменили вызовы в SecureRandom на более быстрые, и все у нас замедлилось. Этот эффект я объяснять не умею. Может быть, какие-то эффекты, связанные с планировщиком ОС. Т.е. когда у нас SecureRandom блокируется, добавление потока в список ожидания (wait list) на мьютексе дает энтропию, разблокирует другой поток и все получается более-менее по-порядку. А когда там synchronized, все получается быстро и слишком много потоков встают в ожидание. В общем, я . Если кто-то знает — поделитесь соображениями.

Выводов не будет. Я бы по результам данных экспериментов добавил сбор метрик вокруг всего метода и SecureRandom. Кто его знает, что там у вас с энтропией будет на реальных серверах. А еще я бы поймал админа или девопса и спросил, что у них вообще происходит. В некоторых компаниях образы docker для Java уже содержат настройки по использованию /dev/urandom в качестве источника случайных чисел. В этом случае нужно брать атомики или "малую" синхронизацию. А вот если ничего нет — нужно собирать данные и думать дальше.

vsb>Наверное можно ещё сделать через ThreadLocal переменные и не заморачиваться тем, что в разных потоках будут получаться одинаковые (timeMillis, counter), попробую ради интереса этот вариант, сравнить скорость.

Гипотеза. Будет порядка двух миллионов. Может быть меньше. Вряд ли больше. Все из-за недостаточной энтропии для SecureRandom.
Отредактировано 07.09.2022 14:48 maxkar . Предыдущая версия .
Re[6]: Помогите с многопоточным кодом
От: StanislavK Великобритания  
Дата: 06.09.22 19:49
Оценка:
Здравствуйте, maxkar, Вы писали:

M>Гипотеза. Будет порядка двух миллионов. Может быть меньше. Вряд ли больше. Все из-за недостаточной энтропии для SecureRandom.

SecureRandom никто никогда не использует для потоковой (от слова stream) генерации, в этом нем никакого смысла, оно нужно только для seed.
Re[3]: Помогите с многопоточным кодом
От: StanislavK Великобритания  
Дата: 06.09.22 20:54
Оценка:
Здравствуйте, vsb, Вы писали:

vsb>Не, у меня там в реальном коде немного по-другому, в это не упирается.


vsb>Если интересно — полный код, но там специфика UUID формата. Генерацию случайного числа я заменил на счетчик.

А чем не устраивает то, что делается в randomUUID? Типа боитесь что генератор повторится?
Ну ок. В общем, тогда задача сводится к генерации long который не повторяется и у всех потоков разный. Сделать это проще всего если просто заранее решить сколько есть потоков и разделить для них диапазоны long-а. Потом просто в каждом их них делать инкремент — никаких синхронизаций и системных вызовов.

vsb>Первый вариант выдаёт 2 млн значений в секунду, второй вариант выдаёт около 3, формат ограничивает 4 млн в секунду.

Это ваще повезло. SecureRandom на сервере, скорее всего ваще ляжет. Его надо только для сида использовать.
Отредактировано 06.09.2022 21:01 StanislavK . Предыдущая версия . Еще …
Отредактировано 06.09.2022 20:55 StanislavK . Предыдущая версия .
Re[4]: Помогите с многопоточным кодом
От: vsb Казахстан  
Дата: 06.09.22 21:03
Оценка:
Здравствуйте, StanislavK, Вы писали:

vsb>>Если интересно — полный код, но там специфика UUID формата. Генерацию случайного числа я заменил на счетчик.

SK>А чем не устраивает то, что делается в randomUUID? Типа боитесь что генератор повторится?

Нет, мне нужны возрастающие UUID-ы.

vsb>>Первый вариант выдаёт 2 млн значений в секунду, второй вариант выдаёт около 3, формат ограничивает 4 млн в секунду.

SK>Это ваще повезло. SecureRandom на сервере, скорее всего ваще ляжет. Его надо только для сида использовать.

Спасибо, почитаю на эту тему.
Re[5]: Помогите с многопоточным кодом
От: StanislavK Великобритания  
Дата: 06.09.22 21:05
Оценка: +1
Здравствуйте, vsb, Вы писали:

vsb>>>Если интересно — полный код, но там специфика UUID формата. Генерацию случайного числа я заменил на счетчик.

SK>>А чем не устраивает то, что делается в randomUUID? Типа боитесь что генератор повторится?
vsb>Нет, мне нужны возрастающие UUID-ы.
тогда тупо AtmicLong.incrementAndGet()
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.