Есть направленный ациклический граф. Каждый узел этого графа — какая то ресурсоемкая задача, которая зависит от другой. Соответственно
нужна библиотека, в которой есть алгоритм, который позволяет обходить этот граф таким образом, чтобы подветки можно было выполнять параллельно в разных потоках.
Я быстренько написал алгоритм, который является комбинацией обхода в ширину, далее эти ноды сгруппировал по максимальной дистанции от начала, и соответственно каждая группа следующей дистанции может выполняться только когда есть результаты предыдущей.
Однако тут есть некоторые недостатки и этот алгоритм не оптимален. А более сложную реализацию уже велосипедить не хочется. Есть какое готовое простое решение, или все таки буду вынужден городить свой лисапед?
Здравствуйте, elmal, Вы писали:
e> Однако тут есть некоторые недостатки и этот алгоритм не оптимален. А более сложную реализацию уже велосипедить не хочется. Есть какое готовое простое решение, или все таки буду вынужден городить свой лисапед?
Вроде это topological sort зовётся, используется в dependency resolution. Гугли подходящие либы/примеры кода.
Разбивка на группы звучит как-то неправильно, т.к. задачи могут выполняться разное время и ожидать завершения задач одной группы имхо неоптимально. Просто засовывать в thread pool задачи, готовые к исполнению.
Здравствуйте, ·, Вы писали:
·>Разбивка на группы звучит как-то неправильно, т.к. задачи могут выполняться разное время и ожидать завершения задач одной группы имхо неоптимально. Просто засовывать в thread pool задачи, готовые к исполнению.
Весь нюанс состоит в том, как определить, что все зависимости уже вычислены, не блокируя при этом поток. То есть, вот у нас есть пул потоков. Чтобы добавить туда очередную задачу на исполнение, нам нужно передать вместе с задачей результаты вычисления зависимотей. А как мы узнаем, что они готовы? Простой вариант — обернуть результаты в Future и заблокироваться, пока все не будут готовы. Но тогда и поток блокируется и проистаивает.
Решением в лоб я вижу такой подход. Когда вычисление очередной зависимости заканчивается, смотрим, что от неё зависит и пытаемся перейти в тот узел и начать его вычислять. Если при переходе выясняется, что не все зависимости готовы, то сразу выходим. Тогда вычисление последней зависимости уже, наконец, запустит вычисление зависимого узла.
Не очень понял, что мешает построить соответствующий граф из объектов вроде Future и подать его на исполнение. И оно там само разберется естественным образом, чего в каком порядке выполнять.
Здравствуйте, cppguard, Вы писали:
C>Весь нюанс состоит в том, как определить, что все зависимости уже вычислены, не блокируя при этом поток. То есть, вот у нас есть пул потоков. Чтобы добавить туда очередную задачу на исполнение, нам нужно передать вместе с задачей результаты вычисления зависимотей. А как мы узнаем, что они готовы? Простой вариант — обернуть результаты в Future и заблокироваться, пока все не будут готовы. Но тогда и поток блокируется и проистаивает.
Не понял, а чем CompletableFuture.allOf не устраивает?
package test;
import java.time.LocalTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public class Test {
public static void main(String[] args1) throws Exception {
Node node = new Node(
args -> {
doWork("1");
return args[0].toString() + " a " + args[1].toString();
},
new Node(args -> {
doWork("2");
return"b";
}),
new Node(args -> {
doWork("3");
return args[0].toString() + " c " + args[1].toString();
},
new Node(args -> {
doWork("4");
return"d";
}),
new Node(args -> {
doWork("5");
return"e";
})
));
var future = nodeToFuture(node);
var o = future.get();
System.out.println(o);
}
private static void doWork(String work) {
System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + ": " + work);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static CompletableFuture<Object> nodeToFuture(Node node) {
if (node.dependencies.isEmpty()) {
return CompletableFuture.supplyAsync(() -> node.action.apply(new Object[0]));
}
List<CompletableFuture<Object>> futureList = node.dependencies.stream().map(Test::nodeToFuture).toList();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
return allFutures.thenApply(unused -> {
Object[] args = futureList.stream().map(f -> {
try {
return f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).toArray();
return node.action.apply(args);
});
}
static class Node {
final Function<Object[], Object> action;
final List<Node> dependencies;
Node(Function<Object[], Object> action, Node... dependencies) {
this.action = action;
this.dependencies = List.of(dependencies);
}
}
}
Вывод:
05:48:21.967978400 ForkJoinPool.commonPool-worker-3: 5
05:48:21.967978400 ForkJoinPool.commonPool-worker-1: 2
05:48:21.967978400 ForkJoinPool.commonPool-worker-2: 4
05:48:22.997875500 ForkJoinPool.commonPool-worker-2: 3
05:48:24.017700600 ForkJoinPool.commonPool-worker-2: 1
b a d c e
Здравствуйте, vsb, Вы писали:
vsb>Не понял, а чем CompletableFuture.allOf не устраивает?
Заблокирует поток, пока всё не будет вычисленно? Можно конечно, использовать пул размера больше чем количество исполнительных ядер. Но тогда и накладные расходы вырастут.
Здравствуйте, vsb, Вы писали:
vsb>Вот интереса ради накидал код.
Оно конечно красиво, только у тебя дерево, а не граф, за счет чего у тебя получилось естественным образом обойти через рекурсию. С графом
(когда ветки сначала расходятся и их выполнение идет параллельно, а далее ранее параллельные ветки смыкаются в одну) уже не факт что получится так красиво.
Здравствуйте, cppguard, Вы писали:
vsb>>Не понял, а чем CompletableFuture.allOf не устраивает?
C>Заблокирует поток, пока всё не будет вычисленно?
Нет. Я же вывел имена потоков. Грубо говоря — поток, который вычислил последнее из оставшихся значений, будет использован для вычисления "суммы".
> Можно конечно, использовать пул размера больше чем количество исполнительных ядер. Но тогда и накладные расходы вырастут.
В данном случае пул автоматический, но можно указывать любой, настроенный как душе угодно. Хоть на один поток.
Здравствуйте, elmal, Вы писали:
E>Есть направленный ациклический граф. Каждый узел этого графа — какая то ресурсоемкая задача, которая зависит от другой. Соответственно E>нужна библиотека, в которой есть алгоритм, который позволяет обходить этот граф таким образом, чтобы подветки можно было выполнять параллельно в разных потоках.
Не понял про параллельный обход. Вроде параллельный обход вершин(ребер) графа — это когда элементы графа обрабатываются несколькими потоками. Например, граф содержит некоторый набор сильно связных компонент и каждая компонента обрабатывается отдельным потоком. А у вас больше похоже на проблему распределения зависимых задач по потокам, т.е. составление расписания.
Здравствуйте, elmal, Вы писали:
vsb>>Вот интереса ради накидал код. E>Оно конечно красиво, только у тебя дерево, а не граф, за счет чего у тебя получилось естественным образом обойти через рекурсию. С графом E>(когда ветки сначала расходятся и их выполнение идет параллельно, а далее ранее параллельные ветки смыкаются в одну) уже не факт что получится так красиво.
Ну вот пример, когда ветви расходятся и смыкаются. Уже не дерево.
11:53:29.900550200 ForkJoinPool.commonPool-worker-1: 1 (100)
11:53:30.027538300 ForkJoinPool.commonPool-worker-1: 2 (200)
11:53:30.027538300 ForkJoinPool.commonPool-worker-2: 3 (300)
11:53:30.342080 ForkJoinPool.commonPool-worker-2: 4 (400)
a b d a c
Здравствуйте, vsb, Вы писали:
vsb>Ну вот пример, когда ветви расходятся и смыкаются. Уже не дерево.
Да ужжж. Не, ну мне в принципе нравится, практически то, что мне нужно. В эту сторону я вообще изначально не думал, ибо предполагал что обход в глубину не подойдет. А так по идее действительно должно сработать, если вызывать не с первой ноды, а с последней. На перфоманс по памяти мне по большому счету пофиг.