Для Task'ов есть удобный сахар в виде async/await. IObservable<T> — это фактически тот же Task, но для множества значений.
Так почему бы не добавить await для IObservable, раз уж он есть для тасков?
Что-то наподобие этого:
IObservable<string> strings = GetStringsObservable();
await foreach(var str in strings) {
// Здесь у нас OnNext обработчик
} catch (Exception e) {
// OnError
}
//Всё что после "цикла" - это OnCompleted
Как и в случае с IEnumerable<T>, мы можем комбинировать и модифицировать источник:
await foreach(var str in strings.Where(s => !string.IsNullOrWhitespace(s)) {
Debug.WriteLine(str);
}
Проблемы контекста не стоит, так как эта функциональность уже встроена в RX:
await foreach(var str in strings.ObserveOn(RxMain.MainThreadScheduler)) {
}
В добавок было бы круто иметь yield для генерации IObservable:
На практике, я примерно одинаково много пользуюсь как тасками, так и IObservable<T>. В последнее время появилось ощущение, что с IObservable работать менее удобно, так как нет такого же сахара. А хочется
Здравствуйте, ionoy, Вы писали:
I>Для Task'ов есть удобный сахар в виде async/await. IObservable<T> — это фактически тот же Task, но для множества значений. I>Так почему бы не добавить await для IObservable, раз уж он есть для тасков?
Странное обсуждение. Единственный человек, который там по делу говорит, это Dave Sexton, что неудивительно. К сожалению, собеседники его не слышат.
В большинстве своём народ ещё не до конца осознал мощи IObservable, вот и противятся. RX — офигенная штука, но барьер вхождения у неё всё-таки высоковат. Может если добавить сахарка, то люди бы и подтянулись.
Здравствуйте, ionoy, Вы писали:
I>Для Task'ов есть удобный сахар в виде async/await. IObservable<T> — это фактически тот же Task, но для множества значений. I>Так почему бы не добавить await для IObservable, раз уж он есть для тасков?
I>Что-то наподобие этого:
I>
I>IObservable<string> strings = GetStringsObservable();
I>await foreach(var str in strings) {
I> // Здесь у нас OnNext обработчик
I>} catch (Exception e) {
I> // OnError
I>}
Я небольшой спец по Rx, но вроде такой синтаксис никаких проблем реализовать нет:
Здравствуйте, ionoy, Вы писали:
I>Для Task'ов есть удобный сахар в виде async/await. IObservable<T> — это фактически тот же Task, но для множества значений. I>Так почему бы не добавить await для IObservable, раз уж он есть для тасков?
I>Что-то наподобие этого:
I>
I>IObservable<string> strings = GetStringsObservable();
I>await foreach(var str in strings) {
I> // Здесь у нас OnNext обработчик
I>} catch (Exception e) {
I> // OnError
I>}
I>//Всё что после "цикла" - это OnCompleted
I>
I>Как и в случае с IEnumerable<T>, мы можем комбинировать и модифицировать источник:
I>
I>На практике, я примерно одинаково много пользуюсь как тасками, так и IObservable<T>. В последнее время появилось ощущение, что с IObservable работать менее удобно, так как нет такого же сахара. А хочется
И не зря такое ощущение. Очень мало задач, на которые естественным образом ложится Rx, обычно приходится делать кучу костылей.
Я вот делал одно приложение на Rx и немного офигел. Было устройство, которое работало по COM порту (могло быть и Tcp\Udp, в принципе неважно).
В COM порт можно отправить данные и можно постоянно слушать входящий поток.
Требовалось реализовать простой request-response протокол. Но запросы могли отправляться как внутренним таймером приложения, так и действиями пользователей.
Казалось бы должно быть все ОК, но это только казалось.
Первая проблема — реализовать сам request-response. Получается один IObservable на request, второй на response. И нужно на второй подписаться раньше, чем на первый (иначе подписка может не успеть сработать до прихода ответа) и тут же отписаться после получения данных. SelectMany не помогает, пытался курить When\Then, но не осилил.
И это простой вариант, так как ответы приходят в той же последовательности, кто и команды. А для более сложного случая (на похожем проекте), когда ответ может прийти в другом порядке (сопоставление по id в запросе) я даже не стал с Rx связываться, потому что прототип оказался раза в три больше по объему кода, чем TPL и постоянно приходилось бороться с утечками памяти.
Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).
А еще напрягает, что структура потоков данных, создаваемая RX — статична. Например если ты сделал Observable.Merge(a,b), то для того чтобы сделать Observable.Merge(a,b,c) тебе придется убить первую подписку. Это усложняет архитектуру, требуя фактически "перезапускать" все потоки данных при каждом изменении конфигурации. А это само по себе является проблемой, так как за время перезапуска могут произойти важные события (прийти важные данные).
В итоге для задачи работы с устройствами гораздо лучше подходит модель акторов или штука вроде TPL Dataflow (нечто среднее между Rx и акторами).
А на Rx можно делать банальный stream processing, когда потоки данных фиксированы, приходят извне и не порождаются самой программой.
Здравствуйте, gandjustas, Вы писали:
G>Здравствуйте, ionoy, Вы писали:
I>>На практике, я примерно одинаково много пользуюсь как тасками, так и IObservable<T>. В последнее время появилось ощущение, что с IObservable работать менее удобно, так как нет такого же сахара. А хочется G>И не зря такое ощущение. Очень мало задач, на которые естественным образом ложится Rx, обычно приходится делать кучу костылей.
Если видишь, что Rx не упрощает, а напротив, усложняет твой код, то в конкретном месте его использовать не надо. Это на правах капитана очевидности
G>Я вот делал одно приложение на Rx и немного офигел. Было устройство, которое работало по COM порту (могло быть и Tcp\Udp, в принципе неважно). G>В COM порт можно отправить данные и можно постоянно слушать входящий поток. G>Требовалось реализовать простой request-response протокол. Но запросы могли отправляться как внутренним таймером приложения, так и действиями пользователей.
G>Казалось бы должно быть все ОК, но это только казалось. G>Первая проблема — реализовать сам request-response. Получается один IObservable на request, второй на response. И нужно на второй подписаться раньше, чем на первый (иначе подписка может не успеть сработать до прихода ответа) и тут же отписаться после получения данных. SelectMany не помогает, пытался курить When\Then, но не осилил.
Не понимаю, зачем тебе IObservable на реквест. У меня на работе точно такая же задача. Надо отправлять пакет с неким sequenceId, а потом ждать ответ от "сервера", где этот sequenceId присутствует.
Запросы я просто складываю в очередь, а отдельный поток постоянно за ней следит и по мере возможности отсылает данные куда надо.
G>И это простой вариант, так как ответы приходят в той же последовательности, кто и команды. А для более сложного случая (на похожем проекте), когда ответ может прийти в другом порядке (сопоставление по id в запросе) я даже не стал с Rx связываться, потому что прототип оказался раза в три больше по объему кода, чем TPL и постоянно приходилось бороться с утечками памяти.
Утечки памяти сами по себе в Rx не возникают.
G>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).
Тут я совсем не понимаю. С одной стороны ты хочешь обрабатывать объекты в одной подписке, с другой стороны у тебя нет общего интерфейса. Почему не сделать по одному подписчику на каждый тип объекта?
G>А еще напрягает, что структура потоков данных, создаваемая RX — статична. Например если ты сделал Observable.Merge(a,b), то для того чтобы сделать Observable.Merge(a,b,c) тебе придется убить первую подписку. Это усложняет архитектуру, требуя фактически "перезапускать" все потоки данных при каждом изменении конфигурации. А это само по себе является проблемой, так как за время перезапуска могут произойти важные события (прийти важные данные).
Я так понимаю, что дополнительный поток приходит извне, и сразу неизвестен. Тогда можно через прокси Subject:
Subject<T> _proxy;
public IObservable<T> Data { get { return _proxy.AsObservable(); } }
void AddStream(IObservable<T> stream)
{
stream.Subscribe(_proxy);
}
Отписку добавить по вкусу.
Можно без Subject:
ReactiveList<IObservable<T>> _streams;
public IObservable<T> Data {
get {
return _streams.Changed
.Select(_ => Observable.Merge(_streams))
.Switch();
}
}
G>В итоге для задачи работы с устройствами гораздо лучше подходит модель акторов или штука вроде TPL Dataflow (нечто среднее между Rx и акторами).
Я просто комбинирую TPL и Rx, тем более что они отлично вместе уживаются.
G>А на Rx можно делать банальный stream processing, когда потоки данных фиксированы, приходят извне и не порождаются самой программой.
На Rx можно сделать всё что угодно, просто некоторые вещи получатся переусложнёнными.
Здравствуйте, SuhanovSergey, Вы писали:
SS>В данном случае правильнее говорить об await для IEnumerable. Из IEnumerable клиент поллит. IObservable пушит в клиента. await foreach — это полл.
var a = await someTask это тоже push, а работаем мы с ним как с pull.
SS>Недавно смотрел на такую библиотеку: https://asyncenum.codeplex.com/
SS>будет выглядеть так
SS>
Здравствуйте, ionoy, Вы писали:
I>Здравствуйте, gandjustas, Вы писали:
G>>Здравствуйте, ionoy, Вы писали:
I>Не понимаю, зачем тебе IObservable на реквест. У меня на работе точно такая же задача. Надо отправлять пакет с неким sequenceId, а потом ждать ответ от "сервера", где этот sequenceId присутствует.
Во-первых этот код был написан когда про await еще ничего не было слышно.
I>Запросы я просто складываю в очередь, а отдельный поток постоянно за ней следит и по мере возможности отсылает данные куда надо.
1) Ты всю композицию кода сделал на таксах в итоге. Rx нужен только для таймаута, но и таймаут на тасках можно сделать.
2) Предполагать что всегда sequenceid в пакете будет — наивно, для случая когда не будет все сильно усложняется
3) Отдельный поток для отправки — странное решение при наличии TPL или Rx
4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет.
5) Неочевидная проблема, но я часто на нее нарывался. p => p.SequenceId == sequenceId создает замыкание, есть предположения когда GC сможет его убрать? У меня с таким кодом память текла очень быстро.
G>>И это простой вариант, так как ответы приходят в той же последовательности, кто и команды. А для более сложного случая (на похожем проекте), когда ответ может прийти в другом порядке (сопоставление по id в запросе) я даже не стал с Rx связываться, потому что прототип оказался раза в три больше по объему кода, чем TPL и постоянно приходилось бороться с утечками памяти. I>Утечки памяти сами по себе в Rx не возникают.
Конечно сами не возникают, но ты в своем коде уже допустил её. Именно на таком коде у меня память текла.
G>>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).
I>Тут я совсем не понимаю. С одной стороны ты хочешь обрабатывать объекты в одной подписке, с другой стороны у тебя нет общего интерфейса. Почему не сделать по одному подписчику на каждый тип объекта?
Еще раз:
1) есть несколько потоков событий от таймера и от UI.
2) Все они возвращают разные объекты
3) На каждый отдельно подписываться нельзя, это ломает синхронизацию.
То есть нужно все потоки смержить и подписаться. Технически несложно Observable.Merge вызвать, но подписчик при этом ничего сделать не сможет. В итоге я через Do сделал обработку, а подписчик не делал ничего.
G>>А еще напрягает, что структура потоков данных, создаваемая RX — статична. Например если ты сделал Observable.Merge(a,b), то для того чтобы сделать Observable.Merge(a,b,c) тебе придется убить первую подписку. Это усложняет архитектуру, требуя фактически "перезапускать" все потоки данных при каждом изменении конфигурации. А это само по себе является проблемой, так как за время перезапуска могут произойти важные события (прийти важные данные).
I>Я так понимаю, что дополнительный поток приходит извне, и сразу неизвестен. Тогда можно через прокси Subject:
Можно, но после пары тыщ строк таких костылей надоело.
I>Отписку добавить по вкусу.
I>Можно без Subject:
I>
I> ReactiveList<IObservable<T>> _streams;
I> public IObservable<T> Data {
I> get {
I> return _streams.Changed
I> .Select(_ => Observable.Merge(_streams))
I> .Switch();
I> }
I> }
I>
А вот так нельзя. добавление элемента в _streams не изменит результат в Observable.Merge.
G>>В итоге для задачи работы с устройствами гораздо лучше подходит модель акторов или штука вроде TPL Dataflow (нечто среднее между Rx и акторами). I>Я просто комбинирую TPL и Rx, тем более что они отлично вместе уживаются.
Я пока не увидел зачем тебе Rx
G>>А на Rx можно делать банальный stream processing, когда потоки данных фиксированы, приходят извне и не порождаются самой программой. I>На Rx можно сделать всё что угодно, просто некоторые вещи получатся переусложнёнными.
Да не просто "некоторые", а почти все. Rx — хорошая либа с точки зрения теории, но довольно неудобная на практике.
Здравствуйте, ionoy, Вы писали:
I>Здравствуйте, SuhanovSergey, Вы писали:
SS>>В данном случае правильнее говорить об await для IEnumerable. Из IEnumerable клиент поллит. IObservable пушит в клиента. await foreach — это полл.
I>var a = await someTask это тоже push, а работаем мы с ним как с pull.
Да, это можно рассматиривать как one shot push из реализации таски. С другой стороны обычно строчкой выше эта someTask создаётся, что является pull запросом. Сахар await foreach разворачивался бы в await x.MoveNextAsync(). По аналогии с обычным IEnumerable.MoveNext MoveNextAsync запускал бы производство нового элемента в момент вызова, что является pull моделью. Хотя, конечно, возможна реализация IEnumerableAsync, которая бы проихводила элементы самопроизвольно и кэшировали бы их.
Короче, с await-ими грань между моделями тут размыта.
I>Точно такой же код я могу написать и с IObservable<T>. Хочется отказаться от лямбд для OnNext.
можно и без лямбд. без сахара получается длинно.
IAsyncEnumerable<string> strings = GetStringsAsyncEnumerable();
using (var enumerator = await strings.GetEnumerator())
{
while (await enumerator.MoveNext())
MyOnNext(enumerator.Current);
}
Сктати в asyncenum, можно писать продюсеров таким образом:
I>>Запросы я просто складываю в очередь, а отдельный поток постоянно за ней следит и по мере возможности отсылает данные куда надо. G>1) Ты всю композицию кода сделал на таксах в итоге. Rx нужен только для таймаута, но и таймаут на тасках можно сделать.
В любом случае где-то должен быть код, который будет:
1. Проверять sequenceId
2. Ждать таймаута
Конечно всё можно сделать на тасках, но раз уж у меня есть интерфейс IObservable<Packet>, то так выходит удобнее. IObservable<T> я не возвращаю, потому что семантически это неверно.
G>2) Предполагать что всегда sequenceid в пакете будет — наивно, для случая когда не будет все сильно усложняется
У тебя же устройство работало через COM порт, а теперь вдруг наивно... Может я чего-то не понял в твоей задаче
G>3) Отдельный поток для отправки — странное решение при наличии TPL или Rx
Дело вкуса.
G>4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет.
В моём случае это неважно, так как траффик не очень большой.
G>5) Неочевидная проблема, но я часто на нее нарывался. p => p.SequenceId == sequenceId создает замыкание, есть предположения когда GC сможет его убрать? У меня с таким кодом память текла очень быстро.
Непонятно, с чего тут вдруг память должна течь.
G>>>И это простой вариант, так как ответы приходят в той же последовательности, кто и команды. А для более сложного случая (на похожем проекте), когда ответ может прийти в другом порядке (сопоставление по id в запросе) я даже не стал с Rx связываться, потому что прототип оказался раза в три больше по объему кода, чем TPL и постоянно приходилось бороться с утечками памяти. I>>Утечки памяти сами по себе в Rx не возникают. G>Конечно сами не возникают, но ты в своем коде уже допустил её. Именно на таком коде у меня память текла.
У меня память не течёт, да и непонятно с чего бы ей течь.
G>>>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).
I>>Тут я совсем не понимаю. С одной стороны ты хочешь обрабатывать объекты в одной подписке, с другой стороны у тебя нет общего интерфейса. Почему не сделать по одному подписчику на каждый тип объекта? G>Еще раз: G>1) есть несколько потоков событий от таймера и от UI. G>2) Все они возвращают разные объекты G>3) На каждый отдельно подписываться нельзя, это ломает синхронизацию. G>То есть нужно все потоки смержить и подписаться. Технически несложно Observable.Merge вызвать, но подписчик при этом ничего сделать не сможет. В итоге я через Do сделал обработку, а подписчик не делал ничего.
Всё равно не понял. Если источники у тебя "горячие", зачем тебе синхронизировать подписку? Твои Do ничем от раздельных подписок не отличаются, во всяком случае на первый взгляд.
G>>>А еще напрягает, что структура потоков данных, создаваемая RX — статична. Например если ты сделал Observable.Merge(a,b), то для того чтобы сделать Observable.Merge(a,b,c) тебе придется убить первую подписку. Это усложняет архитектуру, требуя фактически "перезапускать" все потоки данных при каждом изменении конфигурации. А это само по себе является проблемой, так как за время перезапуска могут произойти важные события (прийти важные данные).
I>>Я так понимаю, что дополнительный поток приходит извне, и сразу неизвестен. Тогда можно через прокси Subject: G>Можно, но после пары тыщ строк таких костылей надоело.
А через TPL там прямо красота, да?
I>>Отписку добавить по вкусу.
I>>Можно без Subject:
I>>
I>> ReactiveList<IObservable<T>> _streams;
I>> public IObservable<T> Data {
I>> get {
I>> return _streams.Changed
I>> .Select(_ => Observable.Merge(_streams))
I>> .Switch();
I>> }
I>> }
I>>
G>А вот так нельзя. добавление элемента в _streams не изменит результат в Observable.Merge.
Почему не изменит?
G>>>В итоге для задачи работы с устройствами гораздо лучше подходит модель акторов или штука вроде TPL Dataflow (нечто среднее между Rx и акторами). I>>Я просто комбинирую TPL и Rx, тем более что они отлично вместе уживаются. G>Я пока не увидел зачем тебе Rx
Мне то он много зачем, я ведь только про твою задачу отвечал.
G>>>А на Rx можно делать банальный stream processing, когда потоки данных фиксированы, приходят извне и не порождаются самой программой. I>>На Rx можно сделать всё что угодно, просто некоторые вещи получатся переусложнёнными. G>Да не просто "некоторые", а почти все.
Это явное преувеличение.
Здравствуйте, SuhanovSergey, Вы писали:
SS>Здравствуйте, ionoy, Вы писали:
I>>Здравствуйте, SuhanovSergey, Вы писали:
SS>>>В данном случае правильнее говорить об await для IEnumerable. Из IEnumerable клиент поллит. IObservable пушит в клиента. await foreach — это полл.
I>>var a = await someTask это тоже push, а работаем мы с ним как с pull.
SS>Да, это можно рассматиривать как one shot push из реализации таски. С другой стороны обычно строчкой выше эта someTask создаётся, что является pull запросом.
С IObservable абсолютно то же самое.
SS>Сахар await foreach разворачивался бы в await x.MoveNextAsync(). По аналогии с обычным IEnumerable.MoveNext MoveNextAsync запускал бы производство нового элемента в момент вызова, что является pull моделью. Хотя, конечно, возможна реализация IEnumerableAsync, которая бы проихводила элементы самопроизвольно и кэшировали бы их.
SS>Короче, с await-ими грань между моделями тут размыта.
Я всё это понимаю, но не понимаю зачем. Без поддержки синтаксиса это ровно ничем не отличается от IObservable, а преимуществ я не вижу.
I>>Точно такой же код я могу написать и с IObservable<T>. Хочется отказаться от лямбд для OnNext.
SS>можно и без лямбд. без сахара получается длинно. SS>
Здравствуйте, ionoy, Вы писали:
I>Здравствуйте, gandjustas, Вы писали:
G>>Здравствуйте, ionoy, Вы писали:
G>>1) Ты всю композицию кода сделал на таксах в итоге. Rx нужен только для таймаута, но и таймаут на тасках можно сделать. I>В любом случае где-то должен быть код, который будет: I> 1. Проверять sequenceId I> 2. Ждать таймаута
Это как раз не самое важное. Самое важное как из батонкликов будет отправляться запрос. Rx в принципе позволяет легко и красиво это сделать (через Linq). Но именно что "в принципе", любая реальная задача обрастает костылями.
I>Конечно всё можно сделать на тасках, но раз уж у меня есть интерфейс IObservable<Packet>, то так выходит удобнее. IObservable<T> я не возвращаю, потому что семантически это неверно.
Дык у тебя почти все на тасках и сделано. Вместо оборачивания чего-либо в IObservable вполне можно и входдящий поток сделать на тасках.
G>>2) Предполагать что всегда sequenceid в пакете будет — наивно, для случая когда не будет все сильно усложняется I>У тебя же устройство работало через COM порт, а теперь вдруг наивно... Может я чего-то не понял в твоей задаче
На самом деле было несколько похожих задач и несколько устройств. В первом случае никаких sequenceid не было, ответы приходили в том же порядке, что и команды, но устройство могло игнорировать часть команд, не отправляя ответов. Из-за этого было еще больше костылей, чем хотелось бы.
Потом была аналогичная задача, где можно было по sequenceid кореллировать запросы и ответы, работало по UDP, сделал сначала как ты, через where — получилось медленно и утекала память. Пришлось плодить костыли. Потом оказалось, что переписав на TPL можно выиграть по скорости и объему кода.
G>>4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет. I>В моём случае это неважно, так как траффик не очень большой.
А в моем случае 100 раз в секунду.
G>>5) Неочевидная проблема, но я часто на нее нарывался. p => p.SequenceId == sequenceId создает замыкание, есть предположения когда GC сможет его убрать? У меня с таким кодом память текла очень быстро. I>Непонятно, с чего тут вдруг память должна течь.
Я вот тоже до конца не разобрался, но помоему суть в том, что Task освобождается поздно и держит лямбду с замыканием. При сотнях вызовов в секунду память начинает реально течь.
После убирания замыканий проблема исчезла.
G>>>>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).
I>Всё равно не понял. Если источники у тебя "горячие", зачем тебе синхронизировать подписку? Твои Do ничем от раздельных подписок не отличаются, во всяком случае на первый взгляд.
Ответы — горячие, а запросы — холодные (не выполняются пока подписка не выполнилась), надо чтобы запрос уходил только после получения ответа. Поэтому надо синхронизировать все IObservable.
Попробуй написать свой пример когда у тебя нет sequenceid и быстро поймешь.
G>>>>А еще напрягает, что структура потоков данных, создаваемая RX — статична. Например если ты сделал Observable.Merge(a,b), то для того чтобы сделать Observable.Merge(a,b,c) тебе придется убить первую подписку. Это усложняет архитектуру, требуя фактически "перезапускать" все потоки данных при каждом изменении конфигурации. А это само по себе является проблемой, так как за время перезапуска могут произойти важные события (прийти важные данные).
I>>>Я так понимаю, что дополнительный поток приходит извне, и сразу неизвестен. Тогда можно через прокси Subject: G>>Можно, но после пары тыщ строк таких костылей надоело. I>А через TPL там прямо красота, да?
Использовал для такой задачи dataflow — реально красота. Появился новый источник — создал объект и присоедил его к мультиплексеру, все. Две-три строки от силы.
I>>>Отписку добавить по вкусу.
I>>>Можно без Subject:
I>>>
I>>> ReactiveList<IObservable<T>> _streams;
I>>> public IObservable<T> Data {
I>>> get {
I>>> return _streams.Changed
I>>> .Select(_ => Observable.Merge(_streams))
I>>> .Switch();
I>>> }
I>>> }
I>>>
G>>А вот так нельзя. добавление элемента в _streams не изменит результат в Observable.Merge. I>Почему не изменит?
Если лист у тебя — Enumerable, то не изменит. Если observable, то проблема уходит на уровень выше и все равно обрастает костылями.
G>>>>В итоге для задачи работы с устройствами гораздо лучше подходит модель акторов или штука вроде TPL Dataflow (нечто среднее между Rx и акторами). I>>>Я просто комбинирую TPL и Rx, тем более что они отлично вместе уживаются. G>>Я пока не увидел зачем тебе Rx I>Мне то он много зачем, я ведь только про твою задачу отвечал.
Прекрасно, ты сам показал что для работы с устройствами Rx как-то не сильно подходит, ибо композиция сделана тасками.
Можешь привести пример где Rx таки нужен будет?
SS>>Короче, с await-ими грань между моделями тут размыта. I>Я всё это понимаю, но не понимаю зачем. Без поддержки синтаксиса это ровно ничем не отличается от IObservable, а преимуществ я не вижу.
Зачем? async/await позволяет писать читабельный асинхронный (не блолирующий потоки во время IO) код. IEnumerableAsync добавляет к этому нативную поддержу коллекций.
Всё конечно можно сделать и на Rx, но у него есть проблемы с читабельностью и зацикленностью на коллекциях.
SS>>Сктати в asyncenum, можно писать продюсеров таким образом: SS>>
Здравствуйте, gandjustas, Вы писали:
G>Здравствуйте, ionoy, Вы писали:
I>>Здравствуйте, gandjustas, Вы писали:
G>>>Здравствуйте, ionoy, Вы писали: G>Это как раз не самое важное. Самое важное как из батонкликов будет отправляться запрос. Rx в принципе позволяет легко и красиво это сделать (через Linq). Но именно что "в принципе", любая реальная задача обрастает костылями.
У меня батонклик привязан к ViewModel через ReactiveUI (WinForms). Дальше запрос кладётся в очередь, так что Rx я использую только при обработке клика, если надо.
I>>Конечно всё можно сделать на тасках, но раз уж у меня есть интерфейс IObservable<Packet>, то так выходит удобнее. IObservable<T> я не возвращаю, потому что семантически это неверно. G>Дык у тебя почти все на тасках и сделано. Вместо оборачивания чего-либо в IObservable вполне можно и входдящий поток сделать на тасках.
Потоки удобнее делать на IObservable, если есть какая-то логика обработки.
G>>>4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет. I>>В моём случае это неважно, так как траффик не очень большой. G>А в моем случае 100 раз в секунду.
100 раз в секунду тоже не так много, т.к. у тебя запросы в любом случае последовательные. Когда приходит новый подписчик, старый уже отписался. Если же in-flight запросов много, то придётся немного переписать.
G>>>5) Неочевидная проблема, но я часто на нее нарывался. p => p.SequenceId == sequenceId создает замыкание, есть предположения когда GC сможет его убрать? У меня с таким кодом память текла очень быстро. I>>Непонятно, с чего тут вдруг память должна течь. G>Я вот тоже до конца не разобрался, но помоему суть в том, что Task освобождается поздно и держит лямбду с замыканием. При сотнях вызовов в секунду память начинает реально течь. G>После убирания замыканий проблема исчезла.
Память могла утекать только если не происходила отписка. Ну или сверх-древняя версия Rx содержала баг и не освобождала замыкание.
G>>>>>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями). G>Попробуй написать свой пример когда у тебя нет sequenceid и быстро поймешь.
Ты просто объединил отсылку и получение в одной подписке. Из-за этого получилась большая каша кода, что тебе не понравилось. Если бы ты эти понятия разделил, всё было бы значительно красивее, и может быть, ты бы увидел преимущества Rx.
I>>А через TPL там прямо красота, да? G>Использовал для такой задачи dataflow — реально красота. Появился новый источник — создал объект и присоедил его к мультиплексеру, все. Две-три строки от силы.
Тут одна
I>>>>Отписку добавить по вкусу.
I>>>>Можно без Subject:
I>>>>
I>>>> ReactiveList<IObservable<T>> _streams;
I>>>> public IObservable<T> Data {
I>>>> get {
I>>>> return _streams.Changed
I>>>> .Select(_ => Observable.Merge(_streams))
I>>>> .Switch();
I>>>> }
I>>>> }
I>>>>
G>>>А вот так нельзя. добавление элемента в _streams не изменит результат в Observable.Merge. I>>Почему не изменит? G>Если лист у тебя — Enumerable, то не изменит. Если observable, то проблема уходит на уровень выше и все равно обрастает костылями.
Там ничего не уходит на уровень выше. Это полностью рабочий код.
G>Можешь привести пример где Rx таки нужен будет?
Честно говоря, сейчас не хочется выбирать красивые примеры и потом защищать их от аргументов, мол можно сделать по-другому. Если есть конкретная задача, то можно написать реализацию на Rx и сравнить с другими решениями. У меня сейчас несколько desktop и winphone приложений в разработке и там всё пронизано IObservable. Как минимум мне самому это очень удобно.
SS>>>Короче, с await-ими грань между моделями тут размыта. I>>Я всё это понимаю, но не понимаю зачем. Без поддержки синтаксиса это ровно ничем не отличается от IObservable, а преимуществ я не вижу.
SS>Зачем? async/await позволяет писать читабельный асинхронный (не блолирующий потоки во время IO) код. IEnumerableAsync добавляет к этому нативную поддержу коллекций.
Я про то, что код указанный выше можно строчка в строчку переписать на Rx.
SS>Всё конечно можно сделать и на Rx, но у него есть проблемы с читабельностью и зацикленностью на коллекциях.
Код ничем не будет отличаться. Какая зацикленность на коллекциях?
SS>>>Сктати в asyncenum, можно писать продюсеров таким образом: SS>>>
I>>Опять же калька с Observable.Create();
SS>Observable.Create не позволит так же просто и эффективно блокировать продюсера пока консьюмер не готов.
Последовательность обработки — один из принципов Rx. Это, как раз, иногда добавляет проблем.
Ещё раз говорю, весь код который тут приведён можно переписать на Rx практически без изменений.
Здравствуйте, ionoy, Вы писали:
I>Здравствуйте, gandjustas, Вы писали:
G>>Здравствуйте, ionoy, Вы писали:
I>>>Здравствуйте, gandjustas, Вы писали:
G>>>>Здравствуйте, ionoy, Вы писали: G>>Это как раз не самое важное. Самое важное как из батонкликов будет отправляться запрос. Rx в принципе позволяет легко и красиво это сделать (через Linq). Но именно что "в принципе", любая реальная задача обрастает костылями.
I>У меня батонклик привязан к ViewModel через ReactiveUI (WinForms). Дальше запрос кладётся в очередь, так что Rx я использую только при обработке клика, если надо.
Я как раз и хотел использовать Rx, чтобы вручную этими очередями не заниматься. Если писать очереди вручную, то можно хоть Rx, хоть TPL разницы вообще никакой не будет (только стилистическая и то не везде)
I>>>Конечно всё можно сделать на тасках, но раз уж у меня есть интерфейс IObservable<Packet>, то так выходит удобнее. IObservable<T> я не возвращаю, потому что семантически это неверно. G>>Дык у тебя почти все на тасках и сделано. Вместо оборачивания чего-либо в IObservable вполне можно и входдящий поток сделать на тасках. I>Потоки удобнее делать на IObservable, если есть какая-то логика обработки.
Делать то удобнее, а обрабатывать — не всегда. увы.
G>>>>4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет. I>>>В моём случае это неважно, так как траффик не очень большой. G>>А в моем случае 100 раз в секунду. I>100 раз в секунду тоже не так много, т.к. у тебя запросы в любом случае последовательные. Когда приходит новый подписчик, старый уже отписался. Если же in-flight запросов много, то придётся немного переписать.
100 раз в секунду достаточно чтобы проблемы повылезали. Вообще в Rx многие проблемы вылезают только под нагрузкой. Не знаю с чем связано, не копал так глубоко.
G>>>>5) Неочевидная проблема, но я часто на нее нарывался. p => p.SequenceId == sequenceId создает замыкание, есть предположения когда GC сможет его убрать? У меня с таким кодом память текла очень быстро. I>>>Непонятно, с чего тут вдруг память должна течь. G>>Я вот тоже до конца не разобрался, но помоему суть в том, что Task освобождается поздно и держит лямбду с замыканием. При сотнях вызовов в секунду память начинает реально течь. G>>После убирания замыканий проблема исчезла. I>Память могла утекать только если не происходила отписка. Ну или сверх-древняя версия Rx содержала баг и не освобождала замыкание.
Скорее всего отписка происходила поздно, от этого объект успевал дожить до второго поколения, что давало большу нагрузку на GC, а из-за больших нагрузок на GC отписка еще сильнее задерживалась итд.
G>>>>>>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями). G>>Попробуй написать свой пример когда у тебя нет sequenceid и быстро поймешь. I>Ты просто объединил отсылку и получение в одной подписке. Из-за этого получилась большая каша кода, что тебе не понравилось. Если бы ты эти понятия разделил, всё было бы значительно красивее, и может быть, ты бы увидел преимущества Rx.
Дык суть Rx в композируемости потоков данных. Если ты руками в очереди перекладываешь объекты, то суть Rx теряется.
I>>>А через TPL там прямо красота, да? G>>Использовал для такой задачи dataflow — реально красота. Появился новый источник — создал объект и присоедил его к мультиплексеру, все. Две-три строки от силы.
I>Тут одна
Да не одна, потому что далеко не весь код, чтобы заработало.
I>>>>>Можно без Subject:
I>>>>>
I>>>>> ReactiveList<IObservable<T>> _streams;
I>>>>> public IObservable<T> Data {
I>>>>> get {
I>>>>> return _streams.Changed
I>>>>> .Select(_ => Observable.Merge(_streams))
I>>>>> .Switch();
I>>>>> }
I>>>>> }
I>>>>>
G>>>>А вот так нельзя. добавление элемента в _streams не изменит результат в Observable.Merge. I>>>Почему не изменит? G>>Если лист у тебя — Enumerable, то не изменит. Если observable, то проблема уходит на уровень выше и все равно обрастает костылями. I>Там ничего не уходит на уровень выше. Это полностью рабочий код.
А в ReacliveList кто и как добавляет\удаляет элементы?
G>>Можешь привести пример где Rx таки нужен будет?
I>Честно говоря, сейчас не хочется выбирать красивые примеры и потом защищать их от аргументов, мол можно сделать по-другому. Если есть конкретная задача, то можно написать реализацию на Rx и сравнить с другими решениями. У меня сейчас несколько desktop и winphone приложений в разработке и там всё пронизано IObservable. Как минимум мне самому это очень удобно.
Да, я тоже так писал. Потом оказалось что TPL ничуть не хуже (после появления async\await), а во многих случаях еще более читаемый код дает.
Здравствуйте, gandjustas, Вы писали:
I>>У меня батонклик привязан к ViewModel через ReactiveUI (WinForms). Дальше запрос кладётся в очередь, так что Rx я использую только при обработке клика, если надо. G>Я как раз и хотел использовать Rx, чтобы вручную этими очередями не заниматься. Если писать очереди вручную, то можно хоть Rx, хоть TPL разницы вообще никакой не будет (только стилистическая и то не везде)
А там заниматься то нечем. Поэтому и удобнее без TPL и Rx.
G>>>>>4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет. I>>>>В моём случае это неважно, так как траффик не очень большой. G>>>А в моем случае 100 раз в секунду. I>>100 раз в секунду тоже не так много, т.к. у тебя запросы в любом случае последовательные. Когда приходит новый подписчик, старый уже отписался. Если же in-flight запросов много, то придётся немного переписать. G>100 раз в секунду достаточно чтобы проблемы повылезали. Вообще в Rx многие проблемы вылезают только под нагрузкой. Не знаю с чем связано, не копал так глубоко.
Да вроде есть толковые ребята, которые используют его в нагрузке.
I>>Память могла утекать только если не происходила отписка. Ну или сверх-древняя версия Rx содержала баг и не освобождала замыкание. G>Скорее всего отписка происходила поздно, от этого объект успевал дожить до второго поколения, что давало большу нагрузку на GC, а из-за больших нагрузок на GC отписка еще сильнее задерживалась итд.
Отписка должна происходить моментально после обработки.
I>>>>А через TPL там прямо красота, да? G>>>Использовал для такой задачи dataflow — реально красота. Появился новый источник — создал объект и присоедил его к мультиплексеру, все. Две-три строки от силы. I>>Тут одна G>Да не одна, потому что далеко не весь код, чтобы заработало.
Для той функциональности, которую ты попросил — весь.
I>>>>>>
I>>>>>> ReactiveList<IObservable<T>> _streams;
I>>>>>> public IObservable<T> Data {
I>>>>>> get {
I>>>>>> return _streams.Changed
I>>>>>> .Select(_ => Observable.Merge(_streams))
I>>>>>> .Switch();
I>>>>>> }
I>>>>>> }
I>>>>>>
I>>Там ничего не уходит на уровень выше. Это полностью рабочий код. G>А в ReacliveList кто и как добавляет\удаляет элементы?
Да кто хочет. Там обычный интерфейс, как у List<T>.
G>>>Можешь привести пример где Rx таки нужен будет?
I>>Честно говоря, сейчас не хочется выбирать красивые примеры и потом защищать их от аргументов, мол можно сделать по-другому. Если есть конкретная задача, то можно написать реализацию на Rx и сравнить с другими решениями. У меня сейчас несколько desktop и winphone приложений в разработке и там всё пронизано IObservable. Как минимум мне самому это очень удобно. G>Да, я тоже так писал. Потом оказалось что TPL ничуть не хуже (после появления async\await), а во многих случаях еще более читаемый код дает.
Так я и не предлагаю всё на Rx делать. Для меня отлично работает правило "один ответ — Таск, больше — Observable".
Здравствуйте, ionoy, Вы писали:
I>Здравствуйте, SuhanovSergey, Вы писали:
SS>>Зачем? async/await позволяет писать читабельный асинхронный (не блолирующий потоки во время IO) код. IEnumerableAsync добавляет к этому нативную поддержу коллекций. I>Последовательность обработки — один из принципов Rx. Это, как раз, иногда добавляет проблем. I>Ещё раз говорю, весь код который тут приведён можно переписать на Rx практически без изменений. I>Поэтому и вопрос — зачем?
Приведённый ниже код никогда не блокирует потоки на время IO. Считаем, что все awaitable методы делают тот или иной IO.
Попробуте переписать код на Rx с сохранением лаконичности и читабильности.
(код выдуманный, к смыслу не придираться)
var userId = await GetUserId(name);
IAsyncEnumerable<Invoice> invoices =
(await dataAccess.ReadInvoicesFromDB(userId))
.Where(invoice => invoice.Total == 0) // filter by synchronous predicate
.Where(async invoice => !(await IsInvoiceProcessed(invoice.Id))); // filter by asynchronous predicate
await invoces.ForEachAsync(async invoice => // async invoice handler
{
var total = CalculateTotal(invoice); // CPU-bound procedure
await UpdateInvoiceTotal(invoice.Id, total); // IO - request to DB
await ProcessInvoice(invoice); // IO - request to other service
});
async IAsyncEnumerable<Invoice> IDataAccess.ReadInvoicesFromDB(string userId)
{
var connection = await ConnectToDB();
return EnumerableAsync.Produce(async yieldAsync =>
{
for (;;)
{
var batch = await connection.Query(...);
if (batch.IsEmpty)
return;
foreach (var invoice in batch)
await yieldAsync.Yield(invoice); // producer waits for consumer here wo blocking any thread
}
});
}