RxJava, 1 поток генератора, n потоков обработки
От: Тёмчик Австралия жж
Дата: 28.07.17 21:22
Оценка:
Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?
Re: RxJava, 1 поток генератора, n потоков обработки
От: bzig  
Дата: 29.07.17 03:06
Оценка:
Здравствуйте, Тёмчик, Вы писали:

Тё>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?


Refcount
Re: RxJava, 1 поток генератора, n потоков обработки
От: StanislavK Великобритания  
Дата: 29.07.17 15:00
Оценка:
Здравствуйте, Тёмчик, Вы писали:

Тё>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?


По теме — пример бы, а то вариантов много — можно refcount (но это страшно, так как трудно контролируемо) можно subscribe вызывать только на первом и у него не делать unsubscribe. На вот, это реактор, но оно все одна фигня, считай что Flux это Observable:

    @Test
    public void connectableFluxDispose() throws Exception {
        CountDownLatch stoppedLatch = new CountDownLatch(1);
        Flux<String> stringFlux = Flux.create(emmiter -> {
            Thread thread = new Thread(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    emmiter.next(System.currentTimeMillis()+"");
                    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.NANOSECONDS);
                }
                stoppedLatch.countDown();
            });
            thread.start();
            emmiter.onDispose(thread::interrupt);
        });

        ConnectableFlux<String> connectableFlux = stringFlux.publish();

        AtomicLong receivedEvents = new AtomicLong(0);
        CountDownLatch firstEventLatch = new CountDownLatch(10);
        Disposable subscribeDisposable = connectableFlux.subscribe(s -> {
            receivedEvents.incrementAndGet();
            firstEventLatch.countDown();
        });
        Disposable connectDisposable = connectableFlux.connect();

        firstEventLatch.await();
        // this will stop only receiving events in the first 'subscribe'
        subscribeDisposable.dispose();
        long counterAfterFirstDispose = receivedEvents.get();

        CountDownLatch secondEventLatch = new CountDownLatch(10);
        subscribeDisposable = connectableFlux.subscribe(s -> secondEventLatch.countDown());
        secondEventLatch.await();
        subscribeDisposable.dispose();

        connectDisposable.dispose();
        stoppedLatch.await();

        // asserts that first 'subscribe' haven't received events since subscribeDisposable.dispose();
        assertEquals(counterAfterFirstDispose, receivedEvents.get());
    }


Не по теме — забей на rxjava — глюкалово страшное, написанное непонятно кем и для кого. На java лучше использовать https://projectreactor.io/, там с качеством (производительность, более правильная backpressure и тд) получше.
Re[2]: RxJava, 1 поток генератора, n потоков обработки
От: Тёмчик Австралия жж
Дата: 30.07.17 15:06
Оценка:
Здравствуйте, StanislavK, Вы писали:

Тё>>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?


SK>По теме — пример бы, а то вариантов много



AtomicInteger counter = new AtomicInteger();
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(20));
Subscription subscription = Observable.<List<String>>create(subscriber -> {
    while(true) {
        if (counter.incrementAndGet() % 3) {
            System.out.println("xxxxxxxxxxx");
        } else {
            subscriber.onNext(Arrays.asList("1", "2"));
            try { Thread.sleep(200L); } catch (InterruptedException e) {}
        }
    }
}, Emitter.BackpressureMode.DROP).subscribe(scheduler).flatMap(batch -> Observable.from(batch).subscribeOn(scheduler))
.doOnNext(s -> System.out.println(Thread.currentThread().getId() + " : " + s))
.subscribe();

System.out.println("Started observable!");
Thread.sleep(10000L);
subscription.unsubscribe();


10 секунд наблюдаем
18 : 1
18 : 2
19 : 1
19 : 2
xxxxxxxxxxx
...

и потом после unsubscribe — только
xxxxxxxxxxx
xxxxxxxxxxx
xxxxxxxxxxx

Получается, что мой первый бесконечный цикл не дохнет, хотя subscriber.onNext уходит в никуда. Как в цикле узнать, что этот subscriber — уже отписался?

  Скрытый текст
- можно refcount (но это страшно, так как трудно контролируемо) можно subscribe вызывать только на первом и у него не делать unsubscribe. На вот, это реактор, но оно все одна фигня, считай что Flux это Observable:

SK>
SK>    @Test
SK>    public void connectableFluxDispose() throws Exception {
SK>        CountDownLatch stoppedLatch = new CountDownLatch(1);
SK>        Flux<String> stringFlux = Flux.create(emmiter -> {
SK>            Thread thread = new Thread(() -> {
SK>                while (!Thread.currentThread().isInterrupted()) {
SK>                    emmiter.next(System.currentTimeMillis()+"");
SK>                    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.NANOSECONDS);
SK>                }
SK>                stoppedLatch.countDown();
SK>            });
SK>            thread.start();
SK>            emmiter.onDispose(thread::interrupt);
SK>        });

SK>        ConnectableFlux<String> connectableFlux = stringFlux.publish();

SK>        AtomicLong receivedEvents = new AtomicLong(0);
SK>        CountDownLatch firstEventLatch = new CountDownLatch(10);
SK>        Disposable subscribeDisposable = connectableFlux.subscribe(s -> {
SK>            receivedEvents.incrementAndGet();
SK>            firstEventLatch.countDown();
SK>        });
SK>        Disposable connectDisposable = connectableFlux.connect();

SK>        firstEventLatch.await();
SK>        // this will stop only receiving events in the first 'subscribe'
SK>        subscribeDisposable.dispose();
SK>        long counterAfterFirstDispose = receivedEvents.get();

SK>        CountDownLatch secondEventLatch = new CountDownLatch(10);
SK>        subscribeDisposable = connectableFlux.subscribe(s -> secondEventLatch.countDown());
SK>        secondEventLatch.await();
SK>        subscribeDisposable.dispose();

SK>        connectDisposable.dispose();
SK>        stoppedLatch.await();

SK>        // asserts that first 'subscribe' haven't received events since subscribeDisposable.dispose();
SK>        assertEquals(counterAfterFirstDispose, receivedEvents.get());
SK>    }
SK>


SK>Не по теме — забей на rxjava — глюкалово страшное, написанное непонятно кем и для кого. На java лучше использовать https://projectreactor.io/, там с качеством (производительность, более правильная backpressure и тд) получше.
Re[3]: RxJava, 1 поток генератора, n потоков обработки
От: StanislavK Великобритания  
Дата: 14.08.17 14:45
Оценка:
Здравствуйте, Тёмчик, Вы писали:

А вот и я из отпуска

  Скрытый текст
Тё>
Тё>AtomicInteger counter = new AtomicInteger();
Тё>Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(20));
Тё>Subscription subscription = Observable.<List<String>>create(subscriber -> {
Тё>    while(true) {
Тё>        if (counter.incrementAndGet() % 3) {
Тё>            System.out.println("xxxxxxxxxxx");
Тё>        } else {
Тё>            subscriber.onNext(Arrays.asList("1", "2"));
Тё>            try { Thread.sleep(200L); } catch (InterruptedException e) {}
Тё>        }
Тё>    }
Тё>}, Emitter.BackpressureMode.DROP).subscribe(scheduler).flatMap(batch -> Observable.from(batch).subscribeOn(scheduler))
Тё>.doOnNext(s -> System.out.println(Thread.currentThread().getId() + " : " + s))
Тё>.subscribe();

Тё>System.out.println("Started observable!");
Тё>Thread.sleep(10000L);
Тё>subscription.unsubscribe();
Тё>

Стоит в заглянуть в мой пример. Само никогда не дохнет, надо у subscriber спросить все уже или еще нет или попросить его об этом оповестить.

Тё>Получается, что мой первый бесконечный цикл не дохнет, хотя subscriber.onNext уходит в никуда. Как в цикле узнать, что этот subscriber — уже отписался?

Правильный вопрос. Не знаю какая rxjava у тебя, но во вротой есть вот такое (в примере у меня тоже примерно так):
AtomicBoolean unsubscribeDetected = new AtomicBoolean(false);
Observable<String> observable = Observable.create(emmiter -> {
    new Thread(() -> {
        while (!emmiter.isDisposed()) {
            emmiter.onNext("next");
            sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
        }
    }).start();
    emmiter.setDisposable(new SimpleDisposable(() -> unsubscribeDetected.set(true)));
});


см. isDisposed() & setDisposable()
Re: RxJava, 1 поток генератора, n потоков обработки
От: x76AdF1  
Дата: 12.10.17 15:17
Оценка:
Здравствуйте, Тёмчик, Вы писали:

Тё>Делаю так: 1 поток порождает пачки сообщений (<List<Msg>>), которые далее параллелятся в n-стримов <Msg> и потом subscribe. После ожидания по subscription.unsubscribe. В результате самый первый поток продолжает рождать сообщения, они просто не уходят дальше. Как правильно делать unsubscribe чтобы все пртоки остановились?


Попробую объяснить, в чем подвох.
Параллелизация в рамках Rx нарушает т.н. Observable contract, который прямо говорит, что onNext() должны вызываться последовательно, но не параллельно. Если проще — rx не подразумевает параллельного многопотоного выполнения.

Однако, добиться распараллеливания через Rx все же можно. Рассмотрим простой пример потока, который выполняет некую долгую работу для каждой своей эмиссии.
Observable<Integer> emitter = Observable.range(50,100);
emitter.map(i -> longRunningJob(i))
.subscribe(emission -> System.out.println(emission);)


Для того, чтобы запараллелить выполнение наших долгих работ не нарушая основных контрактов, мы должны
1) для каджой эмиссии создать новый Observable
2) для каждого Observable из п.1 указать свой поток эмиссии (subscribeOn)
3) определить выполнение нашей длительной работы/операции уже в рамках окружения из п.1 и п.2
ну и, конечно, делать пп.1-3 мы будем в рамках flatMap-оператора, который и будет следить за соблюдением всех контрактов, собирая последовательно результаты выполнения длительных работ, и обеспечивая именно последовательное выполнение onNext() для каждой из эмиссий

Observable<Integer> emitter = Observable.range(50,100);
emitter.flatMap(emission -> Observable.just(emission)
                            .subscribeOn(Schedulers.computation())
                            .map(i -> longRunningJob(i))
        ).subscribe(emission -> System.out.println(emission);)

В любом случае, более стабильную работу для параллелизации дадут стандартные средства из java.util.concurrency — ThreadPoolExecutor, ExecutorServices, и прочее.
Отредактировано 12.10.2017 15:20 x76AdF1 . Предыдущая версия .
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.