Сообщение Re: RxJava, 1 поток генератора, n потоков обработки от 12.10.2017 15:17
Изменено 12.10.2017 15:20 x76AdF1
Re: RxJava, 1 поток генератора, n потоков обработки
Здравствуйте, Тёмчик, Вы писали:
Тё>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?
Попробую объяснить, в чем подвох.
Параллелизация в рамках Rx нарушает т.н. Observable contract, который прямо говорит, что onNext() должны вызываться последовательно, но не параллельно. Если проще — rx не подразумевает параллельного многопотоного выполнения.
Однако, добиться распараллеливания через Rx все же можно. Рассмотрим простой пример потока, который выполняет некую долгую работу для каждой своей эмиссии.
Observable<Integer> emitter = Observable.range(50,100);
emitter.map(i -> longRunningJob(i))
.subscribe(emission -> System.out.println(emission)
Для того, чтобы запараллелить выполнение наших долгих работ не нарушая основных контрактов, мы должны
1) для каджой эмиссии создать новый Observable
2) для каждого Observable из п.1 указать свой поток эмиссии (subscribeOn)
3) определить выполнение нашей длительной работы/операции уже в рамках окружения из п.1 и п.2
ну и, конечно, делать пп.1-3 мы будем в рамках flatMap-оператора, который и будет следить за соблюдением всех контрактов, собирая последовательно результаты выполнения длительных работ, и обеспечивая именно последовательное выполнение onNext() для каждой из эмиссий
Observable<Integer> emitter = Observable.range(50,100);
emitter.flatMap(emission -> Observable.just(emission)
.subscribeOn(Schedulers.computation())
.map(i -> longRunningJob(i))
).subscribe(emission -> System.out.println(emission)
В любом случае, более стабильную работу для параллелизации дадут стандартные средства из java.util.concurrency — ThreadPoolExecutor, ExecutorServices, и прочее.
Тё>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?
Попробую объяснить, в чем подвох.
Параллелизация в рамках Rx нарушает т.н. Observable contract, который прямо говорит, что onNext() должны вызываться последовательно, но не параллельно. Если проще — rx не подразумевает параллельного многопотоного выполнения.
Однако, добиться распараллеливания через Rx все же можно. Рассмотрим простой пример потока, который выполняет некую долгую работу для каждой своей эмиссии.
Observable<Integer> emitter = Observable.range(50,100);
emitter.map(i -> longRunningJob(i))
.subscribe(emission -> System.out.println(emission)
Для того, чтобы запараллелить выполнение наших долгих работ не нарушая основных контрактов, мы должны
1) для каджой эмиссии создать новый Observable
2) для каждого Observable из п.1 указать свой поток эмиссии (subscribeOn)
3) определить выполнение нашей длительной работы/операции уже в рамках окружения из п.1 и п.2
ну и, конечно, делать пп.1-3 мы будем в рамках flatMap-оператора, который и будет следить за соблюдением всех контрактов, собирая последовательно результаты выполнения длительных работ, и обеспечивая именно последовательное выполнение onNext() для каждой из эмиссий
Observable<Integer> emitter = Observable.range(50,100);
emitter.flatMap(emission -> Observable.just(emission)
.subscribeOn(Schedulers.computation())
.map(i -> longRunningJob(i))
).subscribe(emission -> System.out.println(emission)
В любом случае, более стабильную работу для параллелизации дадут стандартные средства из java.util.concurrency — ThreadPoolExecutor, ExecutorServices, и прочее.
Re: RxJava, 1 поток генератора, n потоков обработки
Здравствуйте, Тёмчик, Вы писали:
Тё>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?
Попробую объяснить, в чем подвох.
Параллелизация в рамках Rx нарушает т.н. Observable contract, который прямо говорит, что onNext() должны вызываться последовательно, но не параллельно. Если проще — rx не подразумевает параллельного многопотоного выполнения.
Однако, добиться распараллеливания через Rx все же можно. Рассмотрим простой пример потока, который выполняет некую долгую работу для каждой своей эмиссии.
Для того, чтобы запараллелить выполнение наших долгих работ не нарушая основных контрактов, мы должны
1) для каджой эмиссии создать новый Observable
2) для каждого Observable из п.1 указать свой поток эмиссии (subscribeOn)
3) определить выполнение нашей длительной работы/операции уже в рамках окружения из п.1 и п.2
ну и, конечно, делать пп.1-3 мы будем в рамках flatMap-оператора, который и будет следить за соблюдением всех контрактов, собирая последовательно результаты выполнения длительных работ, и обеспечивая именно последовательное выполнение onNext() для каждой из эмиссий
В любом случае, более стабильную работу для параллелизации дадут стандартные средства из java.util.concurrency — ThreadPoolExecutor, ExecutorServices, и прочее.
Тё>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?
Попробую объяснить, в чем подвох.
Параллелизация в рамках Rx нарушает т.н. Observable contract, который прямо говорит, что onNext() должны вызываться последовательно, но не параллельно. Если проще — rx не подразумевает параллельного многопотоного выполнения.
Однако, добиться распараллеливания через Rx все же можно. Рассмотрим простой пример потока, который выполняет некую долгую работу для каждой своей эмиссии.
Observable<Integer> emitter = Observable.range(50,100);
emitter.map(i -> longRunningJob(i))
.subscribe(emission -> System.out.println(emission);)Для того, чтобы запараллелить выполнение наших долгих работ не нарушая основных контрактов, мы должны
1) для каджой эмиссии создать новый Observable
2) для каждого Observable из п.1 указать свой поток эмиссии (subscribeOn)
3) определить выполнение нашей длительной работы/операции уже в рамках окружения из п.1 и п.2
ну и, конечно, делать пп.1-3 мы будем в рамках flatMap-оператора, который и будет следить за соблюдением всех контрактов, собирая последовательно результаты выполнения длительных работ, и обеспечивая именно последовательное выполнение onNext() для каждой из эмиссий
Observable<Integer> emitter = Observable.range(50,100);
emitter.flatMap(emission -> Observable.just(emission)
.subscribeOn(Schedulers.computation())
.map(i -> longRunningJob(i))
).subscribe(emission -> System.out.println(emission);)В любом случае, более стабильную работу для параллелизации дадут стандартные средства из java.util.concurrency — ThreadPoolExecutor, ExecutorServices, и прочее.