Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 05.01.17 09:09
Оценка: 15 (3)
Привет!

Наверняка вы знакомы или даже работали с Simulink (или подобными инструментами). Симуляция там реализована как пошаговое вычисление состояния модели, т.е. на каждом шаге значения выходов блоков (состояние) с прошлого шага "передаётся" на входы блоков, затем все блоки вычисляются, в результате чего получаем состояния для следующей итерации (видео о блоках: https://www.youtube.com/watch?v=qoj8Pa2wi0A).

Идея в том, чтобы реализовать подобный инструмент, но построенный на асинхронном обмене сообщениями (на реактивных потоках в частности) вместо пошагового выселения состояния. Т.е. в целом это будет работать также: значения с выходов блоков будут передаватся на входы, но в виде посылки сообщений, а вычисления блоков будет выполнятся асинхронно.

Думаю, это даст возможность ускорить (через распараллеливание/распределение вычислений) симуляцию. Что вероятно позволит симулировать более сложные модели.

Что думаете вы?

Здесь
Автор: C.A.B
Дата: 05.01.17
можно попинать мою скромную попытку реализовать это, и здесь почитать больше и посмотреть код.

Спасибо!
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Отредактировано 09.01.2017 23:18 VladD2 . Предыдущая версия .
Re: Симуляция на реактивных потоках
От: Ikemefula Беларусь http://blogs.rsdn.org/ikemefula
Дата: 06.01.17 08:41
Оценка:
Здравствуйте, C.A.B, Вы писали:

CAB>Идея в том, чтобы реализовать подобный инструмент, но построенный на асинхронном обмене сообщениями (на реактивных потоках в частности) вместо пошагового выселения состояния. Т.е. в целом это будет работать также: значения с выходов блоков будут передаватся на входы, но в виде посылки сообщений, а вычисления блоков будет выполнятся асинхронно.


CAB>Думаю, это даст возможность ускорить (через распараллеливание/распределение вычислений) симуляцию. Что вероятно позволит симулировать более сложные модели.


CAB>Что думаете вы?


Непонятно, чем это отличается от обычных акторов.
Re[2]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 06.01.17 13:16
Оценка:
Здравствуйте, Ikemefula, Вы писали:

I>Здравствуйте, C.A.B, Вы писали:


CAB>>...на реактивных потоках в частности...


I>Непонятно, чем это отличается от обычных акторов.


Образно можно представить что модель реактивных потоков накладывает некоторые ограничения на акторы. Больше можно узнать здесь.
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[3]: Симуляция на реактивных потоках
От: so5team https://stiffstream.com
Дата: 06.01.17 14:42
Оценка: +1
Здравствуйте, C.A.B, Вы писали:

I>>Непонятно, чем это отличается от обычных акторов.


CAB>Образно можно представить что модель реактивных потоков накладывает некоторые ограничения на акторы. Больше можно узнать здесь.


Главная проблема акторов (да и взаимодействия на асинхронных сообщениях вообще) при обработке потоков данных -- это отсутствие механизма back-pressure. Но, если вы пишете про пошаговую симуляцию ("Симуляция там реализована как пошаговое вычисление состояния модели, т.е. на каждом шаге значения выходов блоков (состояние) с прошлого шага "передаётся" на входы блоков, затем все блоки вычисляются, в результате чего получаем состояния для следующей итерации"), то зачем здесь вообще нужен back-pressure? Ведь у вас следующий шаг не начнется, пока предыдущий не закончится.
Re[4]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 06.01.17 17:38
Оценка:
Здравствуйте, so5team, Вы писали:

S>Здравствуйте, C.A.B, Вы писали:


CAB>>... Больше можно узнать здесь.


S>... то зачем здесь вообще нужен back-pressure? Ведь у вас следующий шаг не начнется, пока предыдущий не закончится.


Это в случае если все блоки (из которых построена модель) вычисляются синхронно (на каждом шаге вся симуляция ждёт пока все блоки будут вычислены).

Но идея в том, чтобы вычислять их асинхронно (каждый блок ждёт только те блоки от которых зависит), тут-то нам и пригодится back-pressure

К тому-же пошаговость не всегда нужна. Если к примеру, вы симулируете как-то логическую схему и можете пренебречь временем её работы то достаточно вычислять только отдельные её элементы (в которых происходят изменяя).
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[5]: Симуляция на реактивных потоках
От: so5team https://stiffstream.com
Дата: 06.01.17 18:34
Оценка: 4 (1)
Здравствуйте, C.A.B, Вы писали:

CAB>Но идея в том, чтобы вычислять их асинхронно (каждый блок ждёт только те блоки от которых зависит), тут-то нам и пригодится back-pressure


Тогда это банально делается на CSP-каналах (вроде каналов в Go). Тут единственный фокус -- это так распределить блоки модели по рабочим потокам (сопрограммам), чтобы блок, который пишет в канал, не оказался заблокирован при переполнении канала, если блок-читатель из этого канала работает на том же потоке.
Re[3]: Симуляция на реактивных потоках
От: Ikemefula Беларусь http://blogs.rsdn.org/ikemefula
Дата: 06.01.17 19:32
Оценка:
Здравствуйте, C.A.B, Вы писали:

I>>Непонятно, чем это отличается от обычных акторов.


CAB>Образно можно представить что модель реактивных потоков накладывает некоторые ограничения на акторы. Больше можно узнать здесь.


Я посмотрел

public class InfiniteIncrementNumberPublisher extends AsyncIterablePublisher<Integer> {
    public InfiniteIncrementNumberPublisher(final Executor executor) {
        super(new Iterable<Integer>() {
          @Override public Iterator<Integer> iterator() {
            return new Iterator<Integer>() {
              private int at = 0;
              @Override public boolean hasNext() { return true; }
              @Override public Integer next() { return at++; } // Wraps around on overflow
              @Override public void remove() { throw new UnsupportedOperationException(); }
            };
          }
        }, executor);
    }
}


Адъский адъ эта ваша джава, чисто блудняк, куда ни посмотри С таким уровнем абстракций до внятной асинхронной модели как до луны раком.

Столько мусора ради одной простой идеи:
@Override public Iterator<Integer> iterator() {
        int i = 0;

        while(true) {
           yield i++;
        }
    }


На крайняк можно было бы сделать примерно так
   ...
   @Override public void iterator(Generator generator) {
    while(true) {
       generator.next(i++);
    }
   }
Отредактировано 06.01.2017 19:33 Pauel . Предыдущая версия .
Re[6]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 06.01.17 20:04
Оценка:
Здравствуйте, so5team, Вы писали:

S>Здравствуйте, C.A.B, Вы писали:


S>Тогда это банально делается на CSP-каналах ...


Спасибо интересная штука, даже есть реализация для Scala.

Да это очень похоже на реактивные потоки, возможно с той разницей что концептуально здесь на «первом плане» каналы. Тогда как в РП это блоки «трансформирующие» поток сообщений, а каналы как «само собой разумеющееся».
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[4]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 06.01.17 20:15
Оценка:
Здравствуйте, Ikemefula, Вы писали:

I>Здравствуйте, C.A.B, Вы писали:


I> Адъский адъ эта ваша джава, чисто блудняк, куда ни посмотри С таким уровнем абстракций до внятной асинхронной модели как до луны раком.


Лучше смотреть как это выглядит на Scala

Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re: Симуляция на реактивных потоках
От: dsorokin Россия  
Дата: 07.01.17 13:28
Оценка: 2 (1)
Здравствуйте, C.A.B, Вы писали:

CAB>Идея в том, чтобы реализовать подобный инструмент, но построенный на асинхронном обмене сообщениями (на реактивных потоках в частности) вместо пошагового выселения состояния. Т.е. в целом это будет работать также: значения с выходов блоков будут передаватся на входы, но в виде посылки сообщений, а вычисления блоков будет выполнятся асинхронно.


CAB>Думаю, это даст возможность ускорить (через распараллеливание/распределение вычислений) симуляцию. Что вероятно позволит симулировать более сложные модели.


CAB>Что думаете вы?


Я думаю, что потоки и реактивное функциональное программирование идут отдельно, а распараллеливание и распределенность идут тоже отдельно. Они фактически мало связаны, даже ортогональны друг другу, но правильная организация позволяет их красиво интегрировать друг с другом для задач моделирования, прежде всего, дискретно-событийного моделирования
Re: Симуляция на реактивных потоках
От: vdimas Россия  
Дата: 09.01.17 16:49
Оценка:
Здравствуйте, C.A.B, Вы писали:

CAB>Идея в том, чтобы реализовать подобный инструмент, но построенный на асинхронном обмене сообщениями (на реактивных потоках в частности) вместо пошагового выселения состояния. Т.е. в целом это будет работать также: значения с выходов блоков будут передаватся на входы, но в виде посылки сообщений, а вычисления блоков будет выполнятся асинхронно.


Такая схема хорошо работает только в случае однонаправленного распространения сигналов. В случае любой обратной связи процесс распространения сигнала грозит стать бесконечным. Причем, наивный трюк в остановкой распространения сигнала в случае, если новое вычисленное значение не отличается от старого, может не сработать в случае чисел с плавающей запятой — запросто можно получить бесконечный колебательный процесс даже в случае "сходящихся" колебаний сугубо из-за точности вычислений (получить бесконечные колебания на младших разрядах).

Тоже экспериментировал с этим не раз.


CAB>Думаю, это даст возможность ускорить (через распараллеливание/распределение вычислений) симуляцию.


"Пошаговое" исполнение легко можно распараллелить, т.е. вычислять новое значение каждого функционального узла независимо от остальных. Каждый блок в такой схеме — это триггер, который запоминает результат предыдущих вычислений и этот результат считается неизменным в течении "такта".

Ну и при пошаговом исполнении можно юзать такой параметр как "время", т.е., при эмуляции, например, простой RC-цепи dy интеграла по накапливаемому заряду будет зависеть от виртуальной "длительности" такта. Ну или можно брать реальную длительность такта (для обработки звука иногда балуюсь примерно тем же).
Re[2]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 09.01.17 20:13
Оценка:
Здравствуйте, vdimas, Вы писали:

V>... В случае любой обратной связи процесс распространения сигнала грозит стать бесконечным...


Да, эффект имеет место быть. Но от этого не избавится в силу дискретной природы чисел с плавающей запятой и компьютеров в целом. Нужно искать компромисс между точностью, устойчивостью, скоростью и т.п. Это не серебряная пуля.

CAB>>Думаю, это даст возможность ускорить (через распараллеливание/распределение вычислений) симуляцию.


V>"Пошаговое" исполнение легко можно распараллелить, т.е. вычислять новое значение каждого функционального узла независимо от остальных...


В общем-то в этом и фишка Чтобы распараллелитьи и сделать вычисление асинхронным, на сколько это возможно.
К тому же пошаговая симуляция не всегда нужна, по крайней мере мне. В Scala я могу вынести это в отельный трайт и подключать по необходимости.

V>Ну и при пошаговом исполнении можно юзать такой параметр как "время"...


О, это самое интересное. Параметр "время" можно юзать и без пошагового вычиления! Нужно только следить за согласованностью этого параметра с остальными параметрами модели, в то время как вычисления могут асинхронными.
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[4]: Симуляция на реактивных потоках
От: VladD2 Российская Империя www.nemerle.org
Дата: 09.01.17 23:24
Оценка:
Здравствуйте, so5team, Вы писали:

S>Главная проблема акторов (да и взаимодействия на асинхронных сообщениях вообще) при обработке потоков данных -- это отсутствие механизма back-pressure. Но, если вы пишете про пошаговую симуляцию ("Симуляция там реализована как пошаговое вычисление состояния модели, т.е. на каждом шаге значения выходов блоков (состояние) с прошлого шага "передаётся" на входы блоков, затем все блоки вычисляются, в результате чего получаем состояния для следующей итерации"), то зачем здесь вообще нужен back-pressure? Ведь у вас следующий шаг не начнется, пока предыдущий не закончится.


А что такое back-pressure в данном случае?
Есть логика намерений и логика обстоятельств, последняя всегда сильнее.
Re: Симуляция на реактивных потоках
От: VladD2 Российская Империя www.nemerle.org
Дата: 09.01.17 23:35
Оценка:
Здравствуйте, C.A.B, Вы писали:

CAB>Идея в том, чтобы реализовать подобный инструмент, но построенный на асинхронном обмене сообщениями (на реактивных потоках в частности) вместо пошагового выселения состояния. Т.е. в целом это будет работать также: значения с выходов блоков будут передаватся на входы, но в виде посылки сообщений, а вычисления блоков будет выполнятся асинхронно.


CAB>Думаю, это даст возможность ускорить (через распараллеливание/распределение вычислений) симуляцию. Что вероятно позволит симулировать более сложные модели.


CAB>Что думаете вы?


Это одна из реализаций диаграммы потока данных. Более формальной (и правильной, на мой взгляд) реализацией является явное построение графа зависимостей. По нему уже можно делать разные реализации. Это может быть и реактивная схема (как предлагаешь ты), и прямая интерпретация графа зависимостей, и генерация кода.

Самым быстрым способом будет — генерация кода. Код, при этом, можно автоматически распараллеливать (ведь заранее известны все участки которые могут выполняться параллельно).

Проблемой всех таких решений будет то, что ты будешь распараллеливать и то, что не выгодно распараллеливать. Скажем глупо распараллеливать сложение двух числе, даже если оно делается 10 рез. Переключение контентов, ожидания и последующая синхронизация создадут куда большие непроизводительные затраты, чем получится выигрыш от распараллеливания.

Еще одна проблема — как получается схема. Если она получается статически, то генерация кода становится предпочтительной. Если динамически, то решения вроде реактивной модели будут более приемлемыми

Но в любом случае сначала лучше строить граф зависимостей.

По сути, реактивная модель — это один из вариантов его реализации. Причем довольно не эффетивный, так как узлы строятся в виде замыканий (функцональных).
Есть логика намерений и логика обстоятельств, последняя всегда сильнее.
Re[3]: Симуляция на реактивных потоках
От: dsorokin Россия  
Дата: 10.01.17 05:12
Оценка: 103 (2)
Здравствуйте, C.A.B, Вы писали:

V>>Ну и при пошаговом исполнении можно юзать такой параметр как "время"...


CAB>О, это самое интересное. Параметр "время" можно юзать и без пошагового вычиления! Нужно только следить за согласованностью этого параметра с остальными параметрами модели, в то время как вычисления могут асинхронными.


При распараллеливании и/или создании распределенной имитации модельное время — это самый главный фактор. Возникает так называемый "парадокс времени", когда время входящего сообщения может оказаться меньше текущего модельного времени. С этим нужно как-то бороться. Для этого существуют "консервативные" и "оптимистичные" алгоритмы. Тут целая теория развита. На эту тему есть хорошая книга "Parallel and Distributed Simulation Systems" за авторством Richard M. Fujimoto.

Но как я написал выше, это мало связано с функциональным программированием в том смысле, что все это прекрасно сочетается вместе. Например, мне это удалось сделать в своей Айвике: http://hackage.haskell.org/packages/search?terms=aivika. Там есть и упомянутые потоки, и элементы FRP, и монады, и продолжения, и распределенность с параллельностью. Я реализовал оптимистичную стратегию деформации времени с одним известным алгоритмом синхронизации глобального модельного времени между "локальными процессами" узлов распределенного кластера. В общем, можно запускать полноценные дискретно-событийные имитационные модели на суперкомпьютере.

И добро пожаловать в клуб любителей моделирования!
Re[4]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 10.01.17 11:12
Оценка:
Здравствуйте, dsorokin, Вы писали:

D>...хорошая книга "Parallel and Distributed Simulation Systems" за авторством Richard M. Fujimoto.


D>...сделать в своей Айвике: http://hackage.haskell.org/packages/search?terms=aivika.


Мегакруть! Большое спасибо!

D>И добро пожаловать в клуб любителей моделирования!


Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[2]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 10.01.17 11:47
Оценка:
Здравствуйте, VladD2, Вы писали:

VD>Самым быстрым способом будет — генерация кода...


И самый сложный в реализации. Не думаю что я буду трогать это в ближайшее время. Хотя идея интересная да и в Скале есть макросы для кодогенерации.

VD>Еще одна проблема — как получается схема. Если она получается статически, то генерация кода становится предпочтительной. Если динамически, то решения вроде реактивной модели будут более приемлемыми


Пока что статическая, но в планах сделать динамической.

VD>Но в любом случае сначала лучше строить граф зависимостей.


VD>По сути, реактивная модель — это один из вариантов его реализации. Причем довольно не эффетивный, так как узлы строятся в виде замыканий (функцональных).


Этого я честно говоря не понял. Но реактивные потоки построены на обмене сообщениями (без замыкания, насколько я знаю), когда одни узел (блок) шлёт сообщения другому узлу где они ставятся в очередь и затем обрабатываются в отдельном потоке (как в акторах). И относительно вопроса выше, back-pressure это алгоритм балансировки вычислений, предотвращающий переполнения очередей сообщений.

PS: Пользуясь случаем хочу передать привет вашей компании и тому чуваку который пилит плагин для Скалы. Спасибо вам за отличный инструмент, но блин не пора ли заняться его оптимизацией, я ещё могу жить с инспектором выводящем типы по 2-3 минуты, но подвисание UI при редактирования... WTF?
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[3]: Симуляция на реактивных потоках
От: VladD2 Российская Империя www.nemerle.org
Дата: 10.01.17 13:57
Оценка:
Здравствуйте, C.A.B, Вы писали:

CAB>И самый сложный в реализации.


Зависит от опыта и используемого подхода.

CAB>Не думаю что я буду трогать это в ближайшее время.


Тебе решать. Я просто описал возможные подходы и их преимущества/недостатки. По скорости исполнения и минимальности утилизации ресурсов у генерации кода конкурентов нет. А нужно тебе или нет решай сам.

CAB>Хотя идея интересная да и в Скале есть макросы для кодогенерации.


Могу похвастаться тем, что они их появление было не без моего участия.
Несколько лет назад я предложил их автору модель приемлемую для Скалы. Рад, что он смог их реализовать.

CAB>Пока что статическая, но в планах сделать динамической.


Не стоит делать что-то без необходимости. Сложность увеличиться, а вот преимуществ может и не появиться.

VD>>По сути, реактивная модель — это один из вариантов его реализации. Причем довольно не эффетивный, так как узлы строятся в виде замыканий (функцональных).


CAB>Этого я честно говоря не понял.


Матмодель там одна. Граф зависимостей выражает ее явно. Реактивная модель неявно создает аналог графа зависимости. Но как и все не явное при этом появляются непроизводительные расходы.

CAB>Но реактивные потоки построены на обмене сообщениями (без замыкания, насколько я знаю),


Это в рекламных булшитах, может быть. А на практике в каком-нибудь RX-е или в Скале эта модель реализуется за счет функциональных замыканий. Практически вся обработка данных упирается в работу со списками. А реактивный список отличается от обычного тем, что из него тянут данные, вместо того, чтобы толкать их туда.

Ну, и что такое события в той же Скале? Это все те же функциональные типы и их вызовы.

Надо просто понимать, что один и тот же алгоритм будучи реализованным на низком уровне будет значительно эффективнее. Вот почему в Склае так тормозять обычные циклы? Да потому что реализованы через функциональные типы. Там в итоге получается замыкание которое и вызванивается для каждого элемента. Как не оптимизируй вызовы, но это не может сравниться с реальным циклом.

CAB>когда одни узел (блок) шлёт сообщения другому узлу где они ставятся в очередь и затем обрабатываются в отдельном потоке (как в акторах).


А ты задумывался, что означает "шлет сообщение" и "очередь"? Вот это ваше сообщение и есть вызов функционального объекта. При этом получаются косвенные вызовы и замыкания разных переменных. Все это создает кучу говна в памяти и является менее эффективным.

CAB>И относительно вопроса выше, back-pressure это алгоритм балансировки вычислений, предотвращающий переполнения очередей сообщений.


Ну, вот используя явный граф зависимостей очередей не будет в прицепи.

CAB>PS: Пользуясь случаем хочу передать привет вашей компании и тому чуваку который пилит плагин для Скалы.


Я в этой "конторе" уже год как не работаю. Но ребята от туда этот форум читают, думаю.

CAB>Спасибо вам за отличный инструмент, но блин не пора ли заняться его оптимизацией, я ещё могу жить с инспектором выводящем типы по 2-3 минуты, но подвисание UI при редактирования... WTF?


Насколько я знаю плагин для Салы пишется по остаточному принципу. Вот плагин к Котлину — другое дело. Да и язык этот более IDE-пригодный. Так что возможно стоит не него посмотреть. Хотя Скале он, конечно, сильно проигрывает (для опытного пользователя).
Есть логика намерений и логика обстоятельств, последняя всегда сильнее.
Re[2]: Симуляция на реактивных потоках
От: dsorokin Россия  
Дата: 11.01.17 05:48
Оценка:
Здравствуйте, VladD2, Вы писали:

VD>По сути, реактивная модель — это один из вариантов его реализации. Причем довольно не эффетивный, так как узлы строятся в виде замыканий (функцональных).


Другие способы нисколько не лучше.

Особенно прикалывает неэффективная и крайне наивная реализация имитации через системные нативные или зеленые потоки (нити). Вроде бы гонятся за эффективностью, а как в дело вступают ресурсы, то тут сразу возникают системные блокировки, и все, полный абзац. Да и неточный ужасно метод, который сильно зависит от системного планировщика. Да еще и масштабировать модельное время надо. Ну, нафиг такое счастье!

На Си++ еще пытаются использовать со-программы (OMNeT++), играя с системным стеком, а чем это лучше монад и продолжений? Вопрос крайне неоднозначный.

Тут тема такая, что нет явного победителя. Как ни реализуй, везде где-то проигрываем, где-то выигрываем. Да и нынче на питончике во всю моделируют (SimPy), а там с эффективностью выполнения, вообще, не очень.
Re[5]: Симуляция на реактивных потоках
От: WolfHound  
Дата: 20.01.17 00:06
Оценка:
Здравствуйте, VladD2, Вы писали:

VD>А что такое back-pressure в данном случае?

Остановка поставщика если получатель не успевает обрабатывать сообщения.
Самый простой вариант ограничить размер очереди сообщений и при достижении порога блокировать добавление сообщения пока из очереди старое не заберут.
... << RSDN@Home 1.0.0 alpha 5 rev. 0>>
Пусть это будет просто:
просто, как только можно,
но не проще.
(C) А. Эйнштейн
Re: Симуляция на реактивных потоках
От: Sinclair Россия https://github.com/evilguest/
Дата: 20.01.17 07:01
Оценка:
Здравствуйте, C.A.B, Вы писали:
CAB>Идея в том, чтобы реализовать подобный инструмент, но построенный на асинхронном обмене сообщениями (на реактивных потоках в частности) вместо пошагового выселения состояния. Т.е. в целом это будет работать также: значения с выходов блоков будут передаватся на входы, но в виде посылки сообщений, а вычисления блоков будет выполнятся асинхронно.
А как вы делаете синхронизацию?
Ну, то есть если у нас два блока генерируют одинаковые последовательности из (1, -1, 1, -1, ...), а третий их складвает, то в пошаговой модели мы имеем умножение на 2: (2, -2, 2, -2, ...).
А в асинхронной — всё, что угодно, если нет гарантии порядка прихода сообщений: {0, -2, -2, 0, 2, -2, 0, ...)
Уйдемте отсюда, Румата! У вас слишком богатые погреба.
Re[2]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 20.01.17 18:35
Оценка:
Здравствуйте, Sinclair, Вы писали:

S>А как вы делаете синхронизацию?

S>Ну, то есть если у нас два блока генерируют одинаковые последовательности из (1, -1, 1, -1, ...), а третий их складвает, то в пошаговой модели мы имеем умножение на 2: (2, -2, 2, -2, ...).

Есть такая штука, о которой я узнал из мира CUDA, называется «barrier synchronization».
Суть в том, что третий блок ждёт пока будут получены оба сообщения, и только за тем складывает и ждёт следующие 2 сообщения.
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[3]: Симуляция на реактивных потоках
От: Sinclair Россия https://github.com/evilguest/
Дата: 23.01.17 05:25
Оценка:
Здравствуйте, C.A.B, Вы писали:
CAB>Есть такая штука, о которой я узнал из мира CUDA, называется «barrier synchronization».
CAB>Суть в том, что третий блок ждёт пока будут получены оба сообщения, и только за тем складывает и ждёт следующие 2 сообщения.
Это как бы убивает весь эффект от параллелизма. Плюс к этому, вам надо существенно переписывать логику блоков.
Потому, что блок B может генерировать просто константу. В обычном случае у него просто GetValue() переписывается в return 1;. В реактивном случае ему надо "не забыть" сгенерировать нужное количество событий.
Опять же — а что делать, если блоку C скормить на вход блок A дважды? Он будет генерировать вдвое меньше сообщений, чем блок А?
В общем, схема сама по себе не плоха — просто надо понимать, что сопоставление её с классическим образцом затруднено.
Уйдемте отсюда, Румата! У вас слишком богатые погреба.
Re[4]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 23.01.17 22:31
Оценка:
Здравствуйте, Sinclair, Вы писали:

S>Это как бы убивает весь эффект от параллелизма...


С чего вдруг?

S>...В реактивном случае ему надо "не забыть" сгенерировать нужное количество событий.


Не обязательно, для подобных блоков мы можем ожидать только первого сообщения и далее хранить его пока не будет получено новое. Если система типов достаточно мощная, то выбор логики будет выполнятся прозрачно для пользователя. Если вы читаете Скалу, здесь как раз пример блоков где один вход "константный" а другий "ждущий": https://github.com/AlexCAB/MathAct/blob/master/mathact_examples/src/main/scala/examples/common/SimplePidExample.scala (Не работает вставка ссылок в MS Edge)

S>В общем, схема сама по себе не плоха — просто надо понимать, что сопоставление её с классическим образцом затруднено.


В целом, да, это другой подход к реализации симуляции, здесь есть свои проблемы и свои решения, свои преимущества и свои недостатки.

S>...сопоставление её с классическим образцом...


Дедушка Фрейд икнул
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[5]: Симуляция на реактивных потоках
От: Sinclair Россия https://github.com/evilguest/
Дата: 24.01.17 10:05
Оценка:
Здравствуйте, C.A.B, Вы писали:

CAB>Не обязательно, для подобных блоков мы можем ожидать только первого сообщения и далее хранить его пока не будет получено новое.

Не понял, кто должен быть подобен? Т.е. где делается выбор — в блоке А или в блоке C?
CAB>Если система типов достаточно мощная, то выбор логики будет выполнятся прозрачно для пользователя. Если вы читаете Скалу, здесь как раз пример блоков где один вход "константный" а другий "ждущий": https://github.com/AlexCAB/MathAct/blob/master/mathact_examples/src/main/scala/examples/common/SimplePidExample.scala (Не работает вставка ссылок в MS Edge)
Недостаточно хорошо читаю, чтобы понять, что происходит.

S>>...сопоставление её с классическим образцом...

CAB>Дедушка Фрейд икнул
Не знаю, причём тут фрейд. Я просто лет 15 тому назад участвовал в проекте по переписыванию подобной системы с TurboPascal на Delphi. И весь смысл был в том, что собсно сами симуляции, написанные ещё в 80х, нужно было гонять на новой системе. Если мы меняем вычислительную модель, то нужно уметь как-то переписывать симуляции со старого формата в новый — причём так, чтобы поведение сохранялось.
Сходу не вполне понятно, как это сделать, с учётом изменения понятия "шаг".
Либо я не догоняю в вашу идею — может быть, у вас понятие шагов в каком-то виде есть, и запрос "дай мне значение с шага N" отличается от запроса "дай мне значение с шага N+1".

А если писать симуляцию с нуля, то и вопросов возникать не будет — ну да, вот такие блоки, вот они так работают. Хочешь потреблять значение блока одновременно в двух местах- втыкай мультиплексор, иначе будешь получать смешение времён.
Уйдемте отсюда, Румата! У вас слишком богатые погреба.
Re[6]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 24.01.17 21:50
Оценка:
Здравствуйте, Sinclair, Вы писали:

S>Не понял, кто должен быть подобен? Т.е. где делается выбор — в блоке А или в блоке C?


Я имею ввиду блок который генерирует константу (пусть будет А, шлющий только 1 сообщение на старте), и блок с логикой оптимизированной для получения оной (пусть буде С, получающий сообщение от А и хранящий его).

S>...Я просто лет 15 тому назад участвовал в проекте по переписыванию подобной системы с TurboPascal на Delphi. И весь смысл был в том, что собсно сами симуляции, написанные ещё в 80х...


Я верю что никто в здравом уме и трезвой памяти не будет занимается переписыванием симуляций из 80-х Зачем их вообще переписывать? В моём понимании это вещь одноразовая: запилили -> посчитали -> выкинули. Но может быть и нет, спорить не буду.

S>А если писать симуляцию с нуля, то и вопросов возникать не будет — ну да, вот такие блоки, вот они так работают...


Ну да, так и есть. Вообще конкретно этот проект (что я написал) больше исследовательский, из разряда "а что если взять реактивные потоки запилить на них..."
Так что если вам это тоже интересно, можете скачать и поиграть.
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[7]: Симуляция на реактивных потоках
От: Sinclair Россия https://github.com/evilguest/
Дата: 25.01.17 05:15
Оценка:
Здравствуйте, C.A.B, Вы писали:
CAB>Я имею ввиду блок который генерирует константу (пусть будет А, шлющий только 1 сообщение на старте), и блок с логикой оптимизированной для получения оной (пусть буде С, получающий сообщение от А и хранящий его)
По-моему, это убивает всю идею на корню. Блок C не должен знать, получает ли он константу или синусоиду — иначе порушится вся модульность.

CAB>Я верю что никто в здравом уме и трезвой памяти не будет занимается переписыванием симуляций из 80-х Зачем их вообще переписывать? В моём понимании это вещь одноразовая: запилили -> посчитали -> выкинули. Но может быть и нет, спорить не буду.

Э-э, у вас какие-то одноразовые симуляции. Я не знаю точно, что за задачи решала та система, которую мы переписывали, но заказчики работали на железные дороги Великобритании.
То есть вот эти все абстрактные входы-выходы использовались где-то у них в системах автоматизации. Задача симуляции не сводится к тому, чтобы однажды получить число 42. Это же программа — просто написанная на очень специфичном языке программирования. Она преобразует некоторый вход в интересный выход.

CAB>Ну да, так и есть. Вообще конкретно этот проект (что я написал) больше исследовательский, из разряда "а что если взять реактивные потоки запилить на них..."

CAB>Так что если вам это тоже интересно, можете скачать и поиграть.
Увы, я уже от этого всего слишком далёк.
Уйдемте отсюда, Румата! У вас слишком богатые погреба.
Re[8]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 25.01.17 08:46
Оценка:
Здравствуйте, Sinclair, Вы писали:

CAB>>Я имею ввиду блок который генерирует константу (пусть будет А, шлющий только 1 сообщение на старте), и блок с логикой оптимизированной для получения оной (пусть буде С, получающий сообщение от А и хранящий его)

S>По-моему, это убивает всю идею на корню. Блок C не должен знать, получает ли он константу или синусоиду — иначе порушится вся модульность.

Да, в какой-то степени модульность страдает, но определённо не "порушится вся". Конкретно в моей реализации входы и выходы блоков имеют тип (Double, Int, String, Object, so on), так что соединять можно только те из них которые имеют совместимый тип.
С другой стороны, образно говоря, благодаря типизированости блок знает что за значение он получает и что с ним делать.
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[9]: Симуляция на реактивных потоках
От: Sinclair Россия https://github.com/evilguest/
Дата: 26.01.17 09:50
Оценка:
Здравствуйте, C.A.B, Вы писали:

CAB>Да, в какой-то степени модульность страдает, но определённо не "порушится вся". Конкретно в моей реализации входы и выходы блоков имеют тип (Double, Int, String, Object, so on), так что соединять можно только те из них которые имеют совместимый тип.

Продолжаю непонимать, как это отвечает на вопрос про ожидание событий. В моём примере все типы — Int.
Дальше что?
Как мне написать блок C, чтобы результат его работы был понятен без 0.75 сорокаградусной? Напомню: работа блока C — складывать значения двух входов.
Блок заранее не знает, что к нему подключат — генератор константы или синусоиды.
Вот у меня есть блок типа A, который генерирует -1 в степени N.
Если я подключаю на вход к блоку C два идентичных блока A1 и A2, то ожидаю, что C выдаст на выходе 2*(-1)^N.
Того же самого я ожидаю, подключая к обоим входам блока С блок A1.
В "обычной схеме", где есть в явном виде "значения всех слотов на предыдущем шаге" и "значения всех слотов на новом шаге", которые меняются местами каждый такт, это интуитивно понятное ожидание выполняется без приседаний.
В "реактивной схеме" мы налетаем либо на то, что C генерирует мусор (при отказе от синхронизации), либо на генерацию 0, если блок C оборудован "ожиданием" каждого из сигналов.
Уйдемте отсюда, Румата! У вас слишком богатые погреба.
Re: Симуляция на реактивных потоках
От: Nick Linker Россия lj://_lcr_
Дата: 28.01.17 10:20
Оценка:
C.A.B,

CAB>Что думаете вы?

CAB>Спасибо!

Может я ошибаюсь, но первая возникшая мысль была, что идея аналогична библиотеке pipes (http://hackage.haskell.org/package/pipes). Я ещё поковыряюсь в вашей реализации, попробую разобраться. Может позже что-нибудь умное напишу
quicksort =: (($:@(<#[),(=#[),$:@(>#[)) ({~ ?@#)) ^: (1<#)
Re[2]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 30.01.17 07:32
Оценка:
Здравствуйте, Nick Linker, Вы писали:

NL>Может я ошибаюсь, но первая возникшая мысль была, что идея аналогична библиотеке pipes (http://hackage.haskell.org/package/pipes).


Спасибо, интересная сриминговая библиотека.
В целом похоже, но я позаимсвовал стриминговую модель из этой библитеки.
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[10]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 30.01.17 10:27
Оценка: 33 (1)
Здравствуйте, Sinclair, Вы писали:

S>Как мне написать блок C, чтобы результат его работы был понятен без 0.75 сорокаградусной?


Ох, это очень плохая идея

S>Напомню: работа блока C — складывать значения двух входов.

S>Блок заранее не знает, что к нему подключат — генератор константы или синусоиды.

Он знает.

S>... либо на генерацию 0, если блок C оборудован "ожиданием" каждого из сигналов.


Если блок ожидает сигнала то он ничего не генерирует.

S>Вот у меня есть блок типа A, который генерирует -1 в степени N.

S>Если я подключаю на вход к блоку C два идентичных блока A1 и A2, то ожидаю, что C выдаст на выходе 2*(-1)^N.
S>Того же самого я ожидаю, подключая к обоим входам блока С блок A1.

Попробую объяснить на псевдокоде:
/* Определим блоки */

class A {                                                                     //Класс блока A имеет один выход 'out' который на старте последовательно посылает четыре значения на выход любого другого подключенного блока
    final Stream<Int> out = Stream{1, -1, 1, -1}                              //здесь это (-1)^N но может быть синусоида или что угодно. 
}                                                                        

class B {                                                                     //Класс блока B так же с одним выходом, посылает только одно значение (константу).
    final Unitary<Int> out = Unitary(2)
}

class Base {                                                                  //Базовый класс, скрывающий всю машинерию общую для блоков-матоператоров.

    protected abstract Int doCalc(Array<Int> input)                           //Операция которую должен реализовать субкласс 

    private Array<Queue<Int>> streamBuf = new Array()                         //Буфер для последовательностей входящих значений (для реализации пороговой синхронизации).
    private Array<Option<Int>> unitaryBuf = new Array()                       //Буфер для единичных входящих значений (констант).
                                                                              //Буферы изначально заполнены Queue() и None значениями, и имеющие количество элементов равное количеству подключений к соответсвующему входу.

    final Stream<Int> out = new Empty                                         //Выход, ничего не посылает на старте, "ожидает" вызова .send(Int) метода.

    private void chechIfAllValsReadyAndDoCalcIfSo(){
        while(streamBuf.notContainsEmptyQueue && unitaryBuf.notContainsNone){ //Если получены значения для всех входов,
            out.send(doCalc(                                                  //выполнить операцию и отправить результат на выход,
                streamBuf.map(_.dequeueFirst) ++ unitaryBuf.map(_.get)))      //и очистить буфер входа принимающего последовательность значений (не константу).                                              
        }
    }

    final Stream<Int> in = onReceive((index, value) -> {                      //Вход с логикой заточенной для получения последовательности значений,
        streamBuf[index].enqueue(value)                                       //index - номе подключения к данному входу.
        chechIfAllValsReadyAndDoCalcIfSo() 
    })
    final Unitary<Int> in = onReceive((index, value) -> {                     //Вход для получения единичных значений (констант). 
        unitaryBuf[index] = new Some(value)
        chechIfAllValsReadyAndDoCalcIfSo()
    })
}

class C extends Base {                                                        //Собственно операция суммирование
  protected Int doCalc(Array<Int> input) { return input.sum() } 
}

class P {                                                                     //Класс блока для вывода результата
    final Stream<Int> in = onReceive((_, value) -> { println(value) })
}


/* Создадим несколько блоков */

A a1 = new A() 
A a2 = new A() 
B b = new B() 
C c = new C()
P printer = new P()


/* Соберём из них пару примеров */

a1 ~> c                //Блоки 'a1', 'a2' генерирующие [1, -1, 1, -1], будут подключены с входу 'Stream<Int> in' (т.е. к входу с совместимым типом) блока 'c', 
a2 ~> c ~> printer     //выход блока 'c' будет подключен к 'printer'.
                       //Все четыре блока работают асинхронно, например:
                       // 1. Блок 'a1' получает первым свой квант процессорного времени и отправляет сразу все четыре значения.
                       // 2. Которые попадают в первую очередь в блок 'c' (streamBuf выглядит как [[1,-1,1,-1],[]]), 'c' не отправляет ничего на выход так как вторая очередь пуста.
                       // 3. Блок 'a2' выполняется затем и успевает отправить только 2 значения из 4-х.
                       // 4. Каждый раз при получении нового значения от 'a2', 'c' ставит его в очередь (streamBuf = [[1,-1,1,-1],[1]]) и так как нет пустых очередей 
                       //    забирает по одному элементу из каждой, выполняет операцию, и отправляет результат на свой выход.
                       // 5. Таким образом после обработки 2-х сообщений от 'a2': streamBuf = [[1,-1],[]], блоком 'printer' будет выведено: "2", "-2". 
                       // 6. Соответсвенно по получении оставшихся 2-х сообщений от 'a2': streamBuf = [[],[]], блоком 'printer' будет выведено: "2", "-2", "2", "-2". 
                       //PS: Такой прикольный синтаксис можно сделать в Scala, но в целом это примерно эквивалентно: 
                       //  a1.out.connectTo(c.in)
                       //  a2.out.connectTo(c.in)
                       //  c.out.connectTo(printer.in)

a1 ~> c                //Для блока 'b' генерирующего константу, всё примерно так-же, роме того что он будет подключен к входу 'Unitary<Int> in' с отличной логикой обработки значений.
b  ~> c ~> printer     //Например:
                       // 1. Блок 'a1' опять выполняется первым и отправляет сразу все четыре значения. 
                       // 2. streamBuf = [[1,-1,1,-1]], unitaryBuf = [None]
                       // 3. Блок 'b' выполняется и посылает своё единственное значение: streamBuf = [[1,-1,1,-1]], unitaryBuf = [Some(2)]
                       // 4. Блок 'c' выполняет операцию сложения для всех значений в очереди и отправляет полученные значения на выход.
                       // 5. Блок 'printer' выводит: "3", "1", "3", "1".


Как то так
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
Re[11]: Симуляция на реактивных потоках
От: Sinclair Россия https://github.com/evilguest/
Дата: 30.01.17 17:06
Оценка:
Здравствуйте, C.A.B, Вы писали:

CAB>Он знает.

По-моему, это — очень плохая идея.

S>>Вот у меня есть блок типа A, который генерирует -1 в степени N.

S>>Если я подключаю на вход к блоку C два идентичных блока A1 и A2, то ожидаю, что C выдаст на выходе 2*(-1)^N.
S>>Того же самого я ожидаю, подключая к обоим входам блока С блок A1.

CAB>Попробую объяснить на псевдокоде:

CAB>A a1 = new A()
CAB>A a2 = new A()
CAB>B b = new B()
CAB>C c = new C()
CAB>P printer = new P()


CAB>a1 ~> c //Блоки 'a1', 'a2' генерирующие [1, -1, 1, -1], будут подключены с входу 'Stream<Int> in' (т.е. к входу с совместимым типом) блока 'c',

CAB>a2 ~> c ~> printer //выход блока 'c' будет подключен к 'printer'.
CAB> //Все четыре блока работают асинхронно, например:
CAB> // 1. Блок 'a1' получает первым свой квант процессорного времени и отправляет сразу все четыре значения.
CAB> // 2. Которые попадают в первую очередь в блок 'c' (streamBuf выглядит как [1,-1,1,-1],[]]), 'c' не отправляет ничего на выход так как вторая очередь пуста.
CAB> // 3. Блок 'a2' выполняется затем и успевает отправить только 2 значения из 4-х.
CAB> // 4. Каждый раз при получении нового значения от 'a2', 'c' ставит его в очередь (streamBuf = [1,-1,1,-1],[1]]) и так как нет пустых очередей
CAB> // забирает по одному элементу из каждой, выполняет операцию, и отправляет результат на свой выход.
CAB> // 5. Таким образом после обработки 2-х сообщений от 'a2': streamBuf = [1,-1],[]], блоком 'printer' будет выведено: "2", "-2".
CAB> // 6. Соответсвенно по получении оставшихся 2-х сообщений от 'a2': streamBuf = [[],[]], блоком 'printer' будет выведено: "2", "-2", "2", "-2".
CAB> //PS: Такой прикольный синтаксис можно сделать в Scala, но в целом это примерно эквивалентно:
CAB> // a1.out.connectTo(c.in)
CAB> // a2.out.connectTo(c.in)
CAB> // c.out.connectTo(printer.in)

CAB>a1 ~> c //Для блока 'b' генерирующего константу, всё примерно так-же, роме того что он будет подключен к входу 'Unitary<Int> in' с отличной логикой обработки значений.

CAB>b ~> c ~> printer //Например:
CAB> // 1. Блок 'a1' опять выполняется первым и отправляет сразу все четыре значения.
CAB> // 2. streamBuf = [1,-1,1,-1]], unitaryBuf = [None]
CAB> // 3. Блок 'b' выполняется и посылает своё единственное значение: streamBuf = [1,-1,1,-1]], unitaryBuf = [Some(2)]
CAB> // 4. Блок 'c' выполняет операцию сложения для всех значений в очереди и отправляет полученные значения на выход.
CAB> // 5. Блок 'printer' выводит: "3", "1", "3", "1".
CAB>[/java]

CAB>Как то так

Вот смотрите.
1. В "традиционной" системе у нас есть блок типа C. Ему не надо вот эти 30 строк трудноотлаживаемой машинерии; вся его логика сводится к
Int Output1 { return Input1.GetValue() + Input2.GetValue()}
2. Что вы собираетесь делать с блоками, генерирующими бесконечные последовательности? У вас блок а1 собрался оптом запихать сразу всё. Если ему дать это сделать, то он напрочь забьёт очередь блока C, не дав шанса блоку а2.
3. По-прежнему не раскрыт вопрос о том, что делать с двукратным подключением блока a1 к блоку с. Оно вообще поддерживается?
4. Что делать со starvation? Где гарантия, что блок a2 вообще хоть когда-то получит свой квант процессорного времени?
Уйдемте отсюда, Румата! У вас слишком богатые погреба.
Re[12]: Симуляция на реактивных потоках
От: C.A.B LinkedIn
Дата: 30.01.17 19:42
Оценка: 33 (1)
Здравствуйте, Sinclair, Вы писали:

S>1. В "традиционной" системе у нас есть блок типа C. Ему не надо вот эти 30 строк трудноотлаживаемой машинерии; вся его логика сводится к

S>Int Output1 { return Input1.GetValue() + Input2.GetValue()}

Да, также вся эта возня с сообщения отъедает не мало процессорного времени. Используете подходящий инструмент.

S>2. Что вы собираетесь делать с блоками, генерирующими бесконечные последовательности? У вас блок а1 собрался оптом запихать сразу всё. Если ему дать это сделать, то он напрочь забьёт очередь блока C, не дав шанса блоку а2.

S>4. Что делать со starvation? Где гарантия, что блок a2 вообще хоть когда-то получит свой квант процессорного времени?

Для этого у нас есть back pressure алгоритм, который балансирует нагрузку (размеры очередей).

S>3. По-прежнему не раскрыт вопрос о том, что делать с двукратным подключением блока a1 к блоку с. Оно вообще поддерживается?


Да, просто два раза выполнить подключение:

a1 ~> c   
a1 ~> c ~> printer  //Результат: "2", "-2", "2", "-2".


Или три

a1 ~> c  
a1 ~> c ~> printer  //Результат: "3", "-3", "3", "-3". 
a1 ~> c
Между тем,что я думаю,тем,что я хочу сказать,тем,что я,как мне кажется,говорю,и тем,что вы хотите услышать,тем,что как вам кажется,вы слышите,тем,что вы понимаете,стоит десять вариантов возникновения непонимания.Но всё-таки давайте попробуем...(Э.Уэллс)
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.