Библиотека для параллельного обхода графа
От: elmal  
Дата: 31.05.22 05:11
Оценка:
Есть направленный ациклический граф. Каждый узел этого графа — какая то ресурсоемкая задача, которая зависит от другой. Соответственно
нужна библиотека, в которой есть алгоритм, который позволяет обходить этот граф таким образом, чтобы подветки можно было выполнять параллельно в разных потоках.

Я быстренько написал алгоритм, который является комбинацией обхода в ширину, далее эти ноды сгруппировал по максимальной дистанции от начала, и соответственно каждая группа следующей дистанции может выполняться только когда есть результаты предыдущей.

Однако тут есть некоторые недостатки и этот алгоритм не оптимален. А более сложную реализацию уже велосипедить не хочется. Есть какое готовое простое решение, или все таки буду вынужден городить свой лисапед?
Re: Библиотека для параллельного обхода графа
От: · Великобритания  
Дата: 31.05.22 21:55
Оценка:
Здравствуйте, elmal, Вы писали:

e> Однако тут есть некоторые недостатки и этот алгоритм не оптимален. А более сложную реализацию уже велосипедить не хочется. Есть какое готовое простое решение, или все таки буду вынужден городить свой лисапед?

Вроде это topological sort зовётся, используется в dependency resolution. Гугли подходящие либы/примеры кода.

Разбивка на группы звучит как-то неправильно, т.к. задачи могут выполняться разное время и ожидать завершения задач одной группы имхо неоптимально. Просто засовывать в thread pool задачи, готовые к исполнению.
avalon/3.0.0
но это не зря, хотя, может быть, невзначай
гÅрмония мира не знает границ — сейчас мы будем пить чай
Re[2]: Библиотека для параллельного обхода графа
От: cppguard  
Дата: 31.05.22 22:56
Оценка:
Здравствуйте, ·, Вы писали:

·>Разбивка на группы звучит как-то неправильно, т.к. задачи могут выполняться разное время и ожидать завершения задач одной группы имхо неоптимально. Просто засовывать в thread pool задачи, готовые к исполнению.


Весь нюанс состоит в том, как определить, что все зависимости уже вычислены, не блокируя при этом поток. То есть, вот у нас есть пул потоков. Чтобы добавить туда очередную задачу на исполнение, нам нужно передать вместе с задачей результаты вычисления зависимотей. А как мы узнаем, что они готовы? Простой вариант — обернуть результаты в Future и заблокироваться, пока все не будут готовы. Но тогда и поток блокируется и проистаивает.

Решением в лоб я вижу такой подход. Когда вычисление очередной зависимости заканчивается, смотрим, что от неё зависит и пытаемся перейти в тот узел и начать его вычислять. Если при переходе выясняется, что не все зависимости готовы, то сразу выходим. Тогда вычисление последней зависимости уже, наконец, запустит вычисление зависимого узла.
Re: Библиотека для параллельного обхода графа
От: vsb Казахстан  
Дата: 31.05.22 23:41
Оценка:
Не очень понял, что мешает построить соответствующий граф из объектов вроде Future и подать его на исполнение. И оно там само разберется естественным образом, чего в каком порядке выполнять.
Re[3]: Библиотека для параллельного обхода графа
От: vsb Казахстан  
Дата: 31.05.22 23:42
Оценка:
Здравствуйте, cppguard, Вы писали:

C>Весь нюанс состоит в том, как определить, что все зависимости уже вычислены, не блокируя при этом поток. То есть, вот у нас есть пул потоков. Чтобы добавить туда очередную задачу на исполнение, нам нужно передать вместе с задачей результаты вычисления зависимотей. А как мы узнаем, что они готовы? Простой вариант — обернуть результаты в Future и заблокироваться, пока все не будут готовы. Но тогда и поток блокируется и проистаивает.


Не понял, а чем CompletableFuture.allOf не устраивает?
Re[2]: Библиотека для параллельного обхода графа
От: vsb Казахстан  
Дата: 31.05.22 23:49
Оценка:
Вот интереса ради накидал код.

package test;

import java.time.LocalTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class Test {
    public static void main(String[] args1) throws Exception {
        Node node = new Node(
                args -> {
                    doWork("1");
                    return args[0].toString() + " a " + args[1].toString();
                },
                new Node(args -> {
                    doWork("2");
                    return "b";
                }),
                new Node(args -> {
                    doWork("3");
                    return args[0].toString() + " c " + args[1].toString();
                },
                        new Node(args -> {
                            doWork("4");
                            return "d";
                        }),
                        new Node(args -> {
                            doWork("5");
                            return "e";
                        })
                ));
        var future = nodeToFuture(node);
        var o = future.get();
        System.out.println(o);
    }

    private static void doWork(String work) {
        System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + ": " + work);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static CompletableFuture<Object> nodeToFuture(Node node) {
        if (node.dependencies.isEmpty()) {
            return CompletableFuture.supplyAsync(() -> node.action.apply(new Object[0]));
        }
        List<CompletableFuture<Object>> futureList = node.dependencies.stream().map(Test::nodeToFuture).toList();
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
        return allFutures.thenApply(unused -> {
            Object[] args = futureList.stream().map(f -> {
                try {
                    return f.get();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).toArray();
            return node.action.apply(args);
        });
    }

    static class Node {
        final Function<Object[], Object> action;
        final List<Node> dependencies;

        Node(Function<Object[], Object> action, Node... dependencies) {
            this.action = action;
            this.dependencies = List.of(dependencies);
        }
    }
}


Вывод:
05:48:21.967978400 ForkJoinPool.commonPool-worker-3: 5
05:48:21.967978400 ForkJoinPool.commonPool-worker-1: 2
05:48:21.967978400 ForkJoinPool.commonPool-worker-2: 4
05:48:22.997875500 ForkJoinPool.commonPool-worker-2: 3
05:48:24.017700600 ForkJoinPool.commonPool-worker-2: 1
b a d c e
Re[4]: Библиотека для параллельного обхода графа
От: cppguard  
Дата: 01.06.22 01:37
Оценка:
Здравствуйте, vsb, Вы писали:

vsb>Не понял, а чем CompletableFuture.allOf не устраивает?


Заблокирует поток, пока всё не будет вычисленно? Можно конечно, использовать пул размера больше чем количество исполнительных ядер. Но тогда и накладные расходы вырастут.
Re[3]: Библиотека для параллельного обхода графа
От: elmal  
Дата: 01.06.22 04:49
Оценка:
Здравствуйте, vsb, Вы писали:

vsb>Вот интереса ради накидал код.

Оно конечно красиво, только у тебя дерево, а не граф, за счет чего у тебя получилось естественным образом обойти через рекурсию. С графом
(когда ветки сначала расходятся и их выполнение идет параллельно, а далее ранее параллельные ветки смыкаются в одну) уже не факт что получится так красиво.
Re[3]: Библиотека для параллельного обхода графа
От: cppguard  
Дата: 01.06.22 05:27
Оценка:
Здравствуйте, vsb, Вы писали:

vsb>Вот интереса ради накидал код.


Без данных о времени выполнения тут не о чем разговаривать. А вот если бы данные были, стало бы понятно, что решение неоптимальное.
Re[5]: Библиотека для параллельного обхода графа
От: vsb Казахстан  
Дата: 01.06.22 05:38
Оценка:
Здравствуйте, cppguard, Вы писали:

vsb>>Не понял, а чем CompletableFuture.allOf не устраивает?


C>Заблокирует поток, пока всё не будет вычисленно?


Нет. Я же вывел имена потоков. Грубо говоря — поток, который вычислил последнее из оставшихся значений, будет использован для вычисления "суммы".

> Можно конечно, использовать пул размера больше чем количество исполнительных ядер. Но тогда и накладные расходы вырастут.


В данном случае пул автоматический, но можно указывать любой, настроенный как душе угодно. Хоть на один поток.
Re: Библиотека для параллельного обхода графа
От: cserg  
Дата: 01.06.22 05:49
Оценка:
Здравствуйте, elmal, Вы писали:

E>Есть направленный ациклический граф. Каждый узел этого графа — какая то ресурсоемкая задача, которая зависит от другой. Соответственно

E>нужна библиотека, в которой есть алгоритм, который позволяет обходить этот граф таким образом, чтобы подветки можно было выполнять параллельно в разных потоках.
Не понял про параллельный обход. Вроде параллельный обход вершин(ребер) графа — это когда элементы графа обрабатываются несколькими потоками. Например, граф содержит некоторый набор сильно связных компонент и каждая компонента обрабатывается отдельным потоком. А у вас больше похоже на проблему распределения зависимых задач по потокам, т.е. составление расписания.
Re[4]: Библиотека для параллельного обхода графа
От: vsb Казахстан  
Дата: 01.06.22 05:54
Оценка: 11 (2)
Здравствуйте, elmal, Вы писали:

vsb>>Вот интереса ради накидал код.

E>Оно конечно красиво, только у тебя дерево, а не граф, за счет чего у тебя получилось естественным образом обойти через рекурсию. С графом
E>(когда ветки сначала расходятся и их выполнение идет параллельно, а далее ранее параллельные ветки смыкаются в одну) уже не факт что получится так красиво.

Ну вот пример, когда ветви расходятся и смыкаются. Уже не дерево.

package test;

import java.time.LocalTime;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class Test {
    public static void main(String[] args1) throws Exception {
        Node node1 = new Node(
                args -> {
                    doWork("1", 100);
                    return "a";
                }
        );

        Node node2 = new Node(
                args -> {
                    doWork("2", 200);
                    return args[0] + " b";
                },
                node1
        );

        Node node3 = new Node(
                args -> {
                    doWork("3", 300);
                    return args[0] + " c";
                },
                node1
        );

        Node node4 = new Node(
                args -> {
                    doWork("4", 400);
                    return args[0] + " d " + args[1];
                },
                node2, node3
        );

        var future = nodeToFuture(new IdentityHashMap<>(), node4);
        var o = future.get();
        System.out.println(o);
    }

    private static void doWork(String work, int delay) {
        System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + ": " + work + " (" + delay + ")");
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static CompletableFuture<Object> nodeToFuture(IdentityHashMap<Node, CompletableFuture<Object>> cache, Node n) {
        return cache.computeIfAbsent(n, node -> {
            if (node.dependencies.isEmpty()) {
                return CompletableFuture.supplyAsync(() -> node.action.apply(new Object[0]));
            }
            List<CompletableFuture<Object>> futureList = node.dependencies.stream().map(n1 -> nodeToFuture(cache, n1)).toList();
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
            return allFutures.thenApplyAsync(Function.identity()).thenApply(unused -> {
                Object[] args = futureList.stream().map(f -> {
                    try {
                        return f.get();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }).toArray();
                return node.action.apply(args);
            });
        });

    }

    static class Node {
        final Function<Object[], Object> action;
        final List<Node> dependencies;

        Node(Function<Object[], Object> action, Node... dependencies) {
            this.action = action;
            this.dependencies = List.of(dependencies);
        }
    }
}


11:53:29.900550200 ForkJoinPool.commonPool-worker-1: 1 (100)
11:53:30.027538300 ForkJoinPool.commonPool-worker-1: 2 (200)
11:53:30.027538300 ForkJoinPool.commonPool-worker-2: 3 (300)
11:53:30.342080 ForkJoinPool.commonPool-worker-2: 4 (400)
a b d a c
Отредактировано 01.06.2022 5:57 vsb . Предыдущая версия .
Re[5]: Библиотека для параллельного обхода графа
От: elmal  
Дата: 01.06.22 06:09
Оценка:
Здравствуйте, vsb, Вы писали:

vsb>Ну вот пример, когда ветви расходятся и смыкаются. Уже не дерево.

Да ужжж. Не, ну мне в принципе нравится, практически то, что мне нужно. В эту сторону я вообще изначально не думал, ибо предполагал что обход в глубину не подойдет. А так по идее действительно должно сработать, если вызывать не с первой ноды, а с последней. На перфоманс по памяти мне по большому счету пофиг.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.