Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?
Re: RxJava, 1 поток генератора, n потоков обработки
Здравствуйте, Тёмчик, Вы писали:
Тё>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?
Refcount
Re: RxJava, 1 поток генератора, n потоков обработки
Здравствуйте, Тёмчик, Вы писали:
Тё>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?
По теме — пример бы, а то вариантов много — можно refcount (но это страшно, так как трудно контролируемо) можно subscribe вызывать только на первом и у него не делать unsubscribe. На вот, это реактор, но оно все одна фигня, считай что Flux это Observable:
@Test
public void connectableFluxDispose() throws Exception {
CountDownLatch stoppedLatch = new CountDownLatch(1);
Flux<String> stringFlux = Flux.create(emmiter -> {
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
emmiter.next(System.currentTimeMillis()+"");
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.NANOSECONDS);
}
stoppedLatch.countDown();
});
thread.start();
emmiter.onDispose(thread::interrupt);
});
ConnectableFlux<String> connectableFlux = stringFlux.publish();
AtomicLong receivedEvents = new AtomicLong(0);
CountDownLatch firstEventLatch = new CountDownLatch(10);
Disposable subscribeDisposable = connectableFlux.subscribe(s -> {
receivedEvents.incrementAndGet();
firstEventLatch.countDown();
});
Disposable connectDisposable = connectableFlux.connect();
firstEventLatch.await();
// this will stop only receiving events in the first 'subscribe'
subscribeDisposable.dispose();
long counterAfterFirstDispose = receivedEvents.get();
CountDownLatch secondEventLatch = new CountDownLatch(10);
subscribeDisposable = connectableFlux.subscribe(s -> secondEventLatch.countDown());
secondEventLatch.await();
subscribeDisposable.dispose();
connectDisposable.dispose();
stoppedLatch.await();
// asserts that first 'subscribe' haven't received events since subscribeDisposable.dispose();
assertEquals(counterAfterFirstDispose, receivedEvents.get());
}
Не по теме — забей на rxjava — глюкалово страшное, написанное непонятно кем и для кого. На java лучше использовать https://projectreactor.io/, там с качеством (производительность, более правильная backpressure и тд) получше.
Re[2]: RxJava, 1 поток генератора, n потоков обработки
Здравствуйте, StanislavK, Вы писали: Тё>>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились? SK>По теме — пример бы, а то вариантов много
и потом после unsubscribe — только
xxxxxxxxxxx
xxxxxxxxxxx
xxxxxxxxxxx
Получается, что мой первый бесконечный цикл не дохнет, хотя subscriber.onNext уходит в никуда. Как в цикле узнать, что этот subscriber — уже отписался?
Скрытый текст
- можно refcount (но это страшно, так как трудно контролируемо) можно subscribe вызывать только на первом и у него не делать unsubscribe. На вот, это реактор, но оно все одна фигня, считай что Flux это Observable:
SK>
SK> @Test
SK> public void connectableFluxDispose() throws Exception {
SK> CountDownLatch stoppedLatch = new CountDownLatch(1);
SK> Flux<String> stringFlux = Flux.create(emmiter -> {
SK> Thread thread = new Thread(() -> {
SK> while (!Thread.currentThread().isInterrupted()) {
SK> emmiter.next(System.currentTimeMillis()+"");
SK> Uninterruptibles.sleepUninterruptibly(1, TimeUnit.NANOSECONDS);
SK> }
SK> stoppedLatch.countDown();
SK> });
SK> thread.start();
SK> emmiter.onDispose(thread::interrupt);
SK> });
SK> ConnectableFlux<String> connectableFlux = stringFlux.publish();
SK> AtomicLong receivedEvents = new AtomicLong(0);
SK> CountDownLatch firstEventLatch = new CountDownLatch(10);
SK> Disposable subscribeDisposable = connectableFlux.subscribe(s -> {
SK> receivedEvents.incrementAndGet();
SK> firstEventLatch.countDown();
SK> });
SK> Disposable connectDisposable = connectableFlux.connect();
SK> firstEventLatch.await();
SK> // this will stop only receiving events in the first 'subscribe'
SK> subscribeDisposable.dispose();
SK> long counterAfterFirstDispose = receivedEvents.get();
SK> CountDownLatch secondEventLatch = new CountDownLatch(10);
SK> subscribeDisposable = connectableFlux.subscribe(s -> secondEventLatch.countDown());
SK> secondEventLatch.await();
SK> subscribeDisposable.dispose();
SK> connectDisposable.dispose();
SK> stoppedLatch.await();
SK> // asserts that first 'subscribe' haven't received events since subscribeDisposable.dispose();
SK> assertEquals(counterAfterFirstDispose, receivedEvents.get());
SK> }
SK>
SK>Не по теме — забей на rxjava — глюкалово страшное, написанное непонятно кем и для кого. На java лучше использовать https://projectreactor.io/, там с качеством (производительность, более правильная backpressure и тд) получше.
Re[3]: RxJava, 1 поток генератора, n потоков обработки
Стоит в заглянуть в мой пример. Само никогда не дохнет, надо у subscriber спросить все уже или еще нет или попросить его об этом оповестить. Тё>Получается, что мой первый бесконечный цикл не дохнет, хотя subscriber.onNext уходит в никуда. Как в цикле узнать, что этот subscriber — уже отписался?
Правильный вопрос. Не знаю какая rxjava у тебя, но во вротой есть вот такое (в примере у меня тоже примерно так):
AtomicBoolean unsubscribeDetected = new AtomicBoolean(false);
Observable<String> observable = Observable.create(emmiter -> {
new Thread(() -> {
while (!emmiter.isDisposed()) {
emmiter.onNext("next");
sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
}).start();
emmiter.setDisposable(new SimpleDisposable(() -> unsubscribeDetected.set(true)));
});
см. isDisposed() & setDisposable()
Re: RxJava, 1 поток генератора, n потоков обработки
Здравствуйте, Тёмчик, Вы писали:
Тё>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?
Попробую объяснить, в чем подвох.
Параллелизация в рамках Rx нарушает т.н. Observable contract, который прямо говорит, что onNext() должны вызываться последовательно, но не параллельно. Если проще — rx не подразумевает параллельного многопотоного выполнения.
Однако, добиться распараллеливания через Rx все же можно. Рассмотрим простой пример потока, который выполняет некую долгую работу для каждой своей эмиссии.
Для того, чтобы запараллелить выполнение наших долгих работ не нарушая основных контрактов, мы должны
1) для каджой эмиссии создать новый Observable
2) для каждого Observable из п.1 указать свой поток эмиссии (subscribeOn)
3) определить выполнение нашей длительной работы/операции уже в рамках окружения из п.1 и п.2
ну и, конечно, делать пп.1-3 мы будем в рамках flatMap-оператора, который и будет следить за соблюдением всех контрактов, собирая последовательно результаты выполнения длительных работ, и обеспечивая именно последовательное выполнение onNext() для каждой из эмиссий
В любом случае, более стабильную работу для параллелизации дадут стандартные средства из java.util.concurrency — ThreadPoolExecutor, ExecutorServices, и прочее.