2D-Linq и оптимизация цифровых фильтров - 4
От: Sinclair Россия https://github.com/evilguest/
Дата: 03.08.20 06:08
Оценка: 141 (9)
Обещанного три года ждут, а прошло
Автор: Sinclair
Дата: 10.08.18
всего лишь два.
Столкнувшись с некоторыми трудностями, я проект подзабросил, и почти не вспоминал о нём до этой весны.
В самоизоляции я решил-таки откопать стюардессу, и продолжить с того места, где остановился в прошлый раз
Автор: Sinclair
Дата: 29.06.18
.

Итак, что у нас с тех пор?
Предыдущие жалкие, ничтожные попытки манипулировать IL-кодом были безжалостно выброшены.
С нуля был начат проект linq2d на основе expression trees, и доведён до некоторой минимальной работоспособности.
Подробности — https://github.com/evilguest/linq2d/blob/master/README.md
Функционально, по сравнению с прошлым разом, добавилось:
1. Рекурсивные обращения
2. Поддержка одновременного вычисления нескольких результатов
3. Кэширование трансформаций, чтобы для повторных вычислений не заниматься трансляцией linq-выражений

С этими расширениями можно наконец-то замутить адаптивную бинаризацию по Савола
Автор: Pavel Dvorkin
Дата: 29.06.18
:
public static byte[,] SauvolaBinarize(byte[,] grayScale, int WHalf, double K)
{
        var integrate = 
            from g in grayScale
            from ri in Result.SubstBy(0)    // сумма элементов - int32
            from rq in Result.SubstBy(0L)    // сумма квадратов - int64. Можно обойтись и int32, если площадь окна просмотра не более 2^16, но так честнее
            select ValueTuple.Create(
                ri[-1, 0] + ri[0, -1] - ri[-1, -1] + g,
                rq[-1, 0] + rq[0, -1] - rq[-1, -1] + g * g);
        var (p, sq) = integrate.ToArrays();    // вычисляем оба интеграла сразу

    var detect = 
            from g in grayScale
            from i in p.With(OutOfBoundsStrategy.Integral(0))    // специальная стратегия подстановки для значений за пределами массива - 0 для слева-сверху,
            from q in sq.With(OutOfBoundsStrategy.Integral(0L)) // ближайший сосед для справа-снизу.
            let tl = i.Offset(-WHalf - 1, -WHalf - 1)        // самая громоздкая часть: вычисляем эффективное окно просмотра, смещая текущую точку на +-Whalf
            let tr = i.Offset(-WHalf - 1, WHalf)        // нам нужны 8 точек, т.к. мы используем оба предынтегрированных массива
            let bl = i.Offset(WHalf, -WHalf - 1)
            let br = i.Offset(WHalf, WHalf)
            let tlq = q.Offset(-WHalf - 1, -WHalf - 1)
            let trq = q.Offset(-WHalf - 1, WHalf)
            let blq = q.Offset(WHalf, -WHalf - 1)
            let brq = q.Offset(WHalf, WHalf)
            let area = (br.X - tl.X) * (br.Y - tl.Y)        // площадь окна разная, в зависимости от того, куда попала точка; все поправки в X и Y учитываются OutOfBoundsStrategy.Integral
            let diff = br.Value + tl.Value - tr.Value - bl.Value
            let sqdiff = brq.Value + tlq.Value - trq.Value - blq.Value
            let mean = (double)diff / area            // вообще, здесь можно и не приводить к double. Но это мешает векторизации, т.к. целочисленный div есть только в AVX-512. 
            let std = Math.Sqrt((sqdiff - diff * mean) / (area - 1)) // А вот double делятся уже в avx2.

            let threshold = mean * (1 + K * ((std / 128) - 1))

            select g > threshold ? byte.MaxValue : byte.MinValue;
}

Все приседания вокруг границ массива спрятаны в SubstBy и OutOfBoundsStrategy.Integral.
Два прохода по исходному массиву для интегрирования заменены на один — так быстрее.
В бенчмарке этот код разрезан на две части, чтобы проще было кэшировать.
В честных LinqSauvola*() эти методы вызываются один за другим.
В CachedLinqSauvola* вызываются заранее подготовленные Transform-делегаты.

Дополнительные затраты памяти — только на промежуточные массивы p и sq, как и в оригинальном С++ коде.
Нынешняя производительность на моём стареньком T460:
Intel Core i7-6600U CPU 2.60GHz (Skylake), 1 CPU, 4 logical and 2 physical cores
.NET Core SDK=3.1.302
[Host]     : .NET Core 3.1.6 (CoreCLR 4.700.20.26901, CoreFX 4.700.20.31603), X64 RyuJIT
|                  Method | WHalf |   FileName |    Mean |    Error |   StdDev |  Median | Ratio | RatioSD |
|------------------------ |------ |----------- |--------:|---------:|---------:|--------:|------:|--------:|
|             SafeSauvola |     5 | p00743.bmp | 2.795 s | 0.0624 s | 0.1812 s | 2.744 s |  1.91 |    0.20 |
|     UnsafeSauvolaScalar |     5 | p00743.bmp | 1.475 s | 0.0494 s | 0.1455 s | 1.410 s |  1.00 |    0.00 |
|       LinqSauvolaVector |     5 | p00743.bmp | 2.433 s | 0.0485 s | 0.0664 s | 2.439 s |  1.47 |    0.09 |
|       LinqSauvolaScalar |     5 | p00743.bmp | 2.598 s | 0.1468 s | 0.4328 s | 2.607 s |  1.77 |    0.34 |
| CachedLinqSauvolaVector |     5 | p00743.bmp | 1.018 s | 0.0202 s | 0.0283 s | 1.014 s |  0.62 |    0.04 |
| CachedLinqSauvolaScalar |     5 | p00743.bmp | 1.140 s | 0.0226 s | 0.0338 s | 1.139 s |  0.69 |    0.04 |

За baseline взят
  простой unsafe метод
    public static unsafe byte[,] UnsafeSauvola(byte[,] grayScale, int whalf)
        {
            int height = grayScale.Height();
            int width = grayScale.Width();
            var p = new int[height, width];
            var sq = new long[height, width];
            byte[,] result = new byte[height, width];

            fixed (byte* gPtr = &grayScale[0, 0])
            fixed (int* pPtr = &p[0, 0])
            fixed (long* sqPtr = &sq[0, 0])
            fixed (byte* rPtr = &result[0, 0])
            {
                for (int i = 0; i < height; i++)
                    for (int j = 0; j < width; j++)
                    {
                        pPtr[i * width + j] = gPtr[i * width + j];
                        sqPtr[i * width + j] = gPtr[i * width + j] * gPtr[i * width + j];
                        if (i > 0)
                        {
                            pPtr[i * width + j] += pPtr[(i - 1) * width + j];
                            sqPtr[i * width + j] += sqPtr[(i - 1) * width + j];
                        }
                        if (j > 0)
                        {
                            pPtr[i * width + j] += pPtr[i * width + j - 1];
                            sqPtr[i * width + j] += sqPtr[i * width + j - 1];
                        }
                        if (i > 0 && j > 0)
                        {
                            pPtr[i * width + j] -= pPtr[(i - 1) * width + j - 1];
                            sqPtr[i * width + j] -= sqPtr[(i - 1) * width + j - 1];
                        }
                    }
                for (int i = 0; i < height; i++)
                    for (int j = 0; j < width; j++)
                    {
                        var xmin = Math.Max(0, i - whalf);
                        var ymin = Math.Max(0, j - whalf);
                        var xmax = Math.Min(height - 1, i + whalf);
                        var ymax = Math.Min(width - 1, j + whalf);

                        var area = (xmax - xmin + 1) * (ymax - ymin + 1);

                        var diff = pPtr[width * xmax + ymax];
                        var sqdiff = sqPtr[width * xmax + ymax];
                        if (xmin > 0)
                        {
                            diff -= -pPtr[width * (xmin - 1) + ymax];
                            sqdiff -= -sqPtr[width * (xmin - 1) + ymax];
                        }
                        if (ymin > 0)
                        {
                            diff -= pPtr[width * xmax + ymin - 1];
                            sqdiff -= sqPtr[width * xmax + ymin - 1];
                        }
                        if (xmin > 0 && ymin > 0)
                        {
                            diff += pPtr[width * (xmin - 1) + ymin - 1];
                            sqdiff += sqPtr[width * (xmin - 1) + ymin - 1];
                        }
                        var mean = (double)diff / area;
                        var std = Math.Sqrt((sqdiff - diff * mean) / (area - 1));

                        var threshold = mean * (1 + K * ((std / 128) - 1));

                        rPtr[i * width + j] = gPtr[i * width + j] > threshold ? byte.MaxValue : byte.MinValue;
                    }
                return result;
            }
        }

Идиоматический С#-код проигрывает ему почти в два (1.91) раза. Это — плата за контроль обращений к элементам массива.
Linq-код проигрывает в 1.4-1.7 раз. Но это — от небольшого количества повторов, так что стоимость рантайм-компиляции вносит существенный вклад.
А вот с кэшированием результат уже существенно лучше: "обычная" версия наигрывает 30% производительности за счёт более агрессивной оптимизации кода по сравнению с JIT.
Частичная AVX-векторизация выигрывает ещё 7%, итого "абстрактный" код выезжает за ~0.6 времени работы "конкретного".
Всё это — однопоточный код; прикрутить обратно параллелизацию руки пока не дошли. Но у неё основной недостаток — она бесполезна при массовом обслуживании, где важен throughput, а не response time.
С кодом на C++ пока не сравнивал, надо напилить внешнюю DLL и выполнить забег с ней.
Уйдемте отсюда, Румата! У вас слишком богатые погреба.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.