Информация об изменениях

Сообщение Re[4]: Библиотека для параллельного обхода графа от 01.06.2022 5:54

Изменено 01.06.2022 5:57 vsb

Re[4]: Библиотека для параллельного обхода графа
Здравствуйте, elmal, Вы писали:

vsb>>Вот интереса ради накидал код.

E>Оно конечно красиво, только у тебя дерево, а не граф, за счет чего у тебя получилось естественным образом обойти через рекурсию. С графом
E>(когда ветки сначала расходятся и их выполнение идет параллельно, а далее ранее параллельные ветки смыкаются в одну) уже не факт что получится так красиво.

Ну вот пример, когда ветви расходятся и смыкаются. Уже не дерево.

package test;

import java.time.LocalTime;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class Test {
    public static void main(String[] args1) throws Exception {
        Node node1 = new Node(
                args -> {
                    doWork("1", 100);
                    return "a";
                }
        );

        Node node2 = new Node(
                args -> {
                    doWork("2", 200);
                    return args[0] + " b";
                },
                node1
        );

        Node node3 = new Node(
                args -> {
                    doWork("3", 300);
                    return args[0] + " c";
                },
                node1
        );

        Node node4 = new Node(
                args -> {
                    doWork("4", 400);
                    return args[0] + " d " + args[1];
                },
                node2, node3
        );

        var future = nodeToFuture(new IdentityHashMap<>(), node4);
        var o = future.get();
        System.out.println(o);
    }

    private static void doWork(String work, int delay) {
        System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + ": " + work + " (" + delay + ")");
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static CompletableFuture<Object> nodeToFuture(IdentityHashMap<Node, CompletableFuture<Object>> cache, Node n) {
        return cache.computeIfAbsent(n, node -> {
            if (node.dependencies.isEmpty()) {
                return CompletableFuture.supplyAsync(() -> node.action.apply(new Object[0]));
            }
            List<CompletableFuture<Object>> futureList = node.dependencies.stream().map(n1 -> nodeToFuture(cache, n1)).toList();
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
            return allFutures.thenApplyAsync(Function.identity()).thenApply(unused -> {
                Object[] args = futureList.stream().map(f -> {
                    try {
                        return f.get();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }).toArray();
                return node.action.apply(args);
            });
        });

    }

    static class Node {
        final Function<Object[], Object> action;
        final List<Node> dependencies;

        Node(Function<Object[], Object> action, Node... dependencies) {
            this.action = action;
            this.dependencies = List.of(dependencies);
        }
    }
}


11:53:29.900550200 ForkJoinPool.commonPool-worker-1: 1 (100)
11:53:30.027538300 ForkJoinPool.commonPool-worker-1: 2 (200)
11:53:30.027538300 ForkJoinPool.commonPool-worker-2: 3 (300)
11:53:30.342080 ForkJoinPool.commonPool-worker-2: 4 (400)
Re[4]: Библиотека для параллельного обхода графа
Здравствуйте, elmal, Вы писали:

vsb>>Вот интереса ради накидал код.

E>Оно конечно красиво, только у тебя дерево, а не граф, за счет чего у тебя получилось естественным образом обойти через рекурсию. С графом
E>(когда ветки сначала расходятся и их выполнение идет параллельно, а далее ранее параллельные ветки смыкаются в одну) уже не факт что получится так красиво.

Ну вот пример, когда ветви расходятся и смыкаются. Уже не дерево.

package test;

import java.time.LocalTime;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class Test {
    public static void main(String[] args1) throws Exception {
        Node node1 = new Node(
                args -> {
                    doWork("1", 100);
                    return "a";
                }
        );

        Node node2 = new Node(
                args -> {
                    doWork("2", 200);
                    return args[0] + " b";
                },
                node1
        );

        Node node3 = new Node(
                args -> {
                    doWork("3", 300);
                    return args[0] + " c";
                },
                node1
        );

        Node node4 = new Node(
                args -> {
                    doWork("4", 400);
                    return args[0] + " d " + args[1];
                },
                node2, node3
        );

        var future = nodeToFuture(new IdentityHashMap<>(), node4);
        var o = future.get();
        System.out.println(o);
    }

    private static void doWork(String work, int delay) {
        System.out.println(LocalTime.now() + " " + Thread.currentThread().getName() + ": " + work + " (" + delay + ")");
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static CompletableFuture<Object> nodeToFuture(IdentityHashMap<Node, CompletableFuture<Object>> cache, Node n) {
        return cache.computeIfAbsent(n, node -> {
            if (node.dependencies.isEmpty()) {
                return CompletableFuture.supplyAsync(() -> node.action.apply(new Object[0]));
            }
            List<CompletableFuture<Object>> futureList = node.dependencies.stream().map(n1 -> nodeToFuture(cache, n1)).toList();
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
            return allFutures.thenApplyAsync(Function.identity()).thenApply(unused -> {
                Object[] args = futureList.stream().map(f -> {
                    try {
                        return f.get();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }).toArray();
                return node.action.apply(args);
            });
        });

    }

    static class Node {
        final Function<Object[], Object> action;
        final List<Node> dependencies;

        Node(Function<Object[], Object> action, Node... dependencies) {
            this.action = action;
            this.dependencies = List.of(dependencies);
        }
    }
}


11:53:29.900550200 ForkJoinPool.commonPool-worker-1: 1 (100)
11:53:30.027538300 ForkJoinPool.commonPool-worker-1: 2 (200)
11:53:30.027538300 ForkJoinPool.commonPool-worker-2: 3 (300)
11:53:30.342080 ForkJoinPool.commonPool-worker-2: 4 (400)
a b d a c