await для IObservable<T>
От: ionoy Эстония www.ammyui.com
Дата: 16.02.15 12:50
Оценка:
Для Task'ов есть удобный сахар в виде async/await. IObservable<T> — это фактически тот же Task, но для множества значений.
Так почему бы не добавить await для IObservable, раз уж он есть для тасков?

Что-то наподобие этого:

IObservable<string> strings = GetStringsObservable();
await foreach(var str in strings) {
    // Здесь у нас OnNext обработчик
} catch (Exception e) {
   // OnError
}

//Всё что после "цикла" - это OnCompleted


Как и в случае с IEnumerable<T>, мы можем комбинировать и модифицировать источник:

await foreach(var str in strings.Where(s => !string.IsNullOrWhitespace(s)) {
  Debug.WriteLine(str);
}


Проблемы контекста не стоит, так как эта функциональность уже встроена в RX:
await foreach(var str in strings.ObserveOn(RxMain.MainThreadScheduler)) {
}


В добавок было бы круто иметь yield для генерации IObservable:

IObservable<int> GetNumbers()
{
   yield 0;
   yield 1;
   yield 2;
}

==>

IObservable<int> GetNumbers()
{
   return Observable.Create(obs => {
      obs.OnNext(0);
      obs.OnNext(1);
      obs.OnNext(2);
   });
}


На практике, я примерно одинаково много пользуюсь как тасками, так и IObservable<T>. В последнее время появилось ощущение, что с IObservable работать менее удобно, так как нет такого же сахара. А хочется
www.livexaml.com
www.ammyui.com
www.nemerleweb.com
Re: await для IObservable<T>
От: agat50  
Дата: 16.02.15 13:31
Оценка:
Здравствуйте, ionoy, Вы писали:

I>Для Task'ов есть удобный сахар в виде async/await. IObservable<T> — это фактически тот же Task, но для множества значений.

I>Так почему бы не добавить await для IObservable, раз уж он есть для тасков?

Вы не одиноки в желаниях=)

https://roslyn.codeplex.com/discussions/541638
Re[2]: await для IObservable<T>
От: ionoy Эстония www.ammyui.com
Дата: 16.02.15 14:06
Оценка: 1 (1) +1
Здравствуйте, agat50, Вы писали:

A>Вы не одиноки в желаниях=)


A>https://roslyn.codeplex.com/discussions/541638


Странное обсуждение. Единственный человек, который там по делу говорит, это Dave Sexton, что неудивительно. К сожалению, собеседники его не слышат.

В большинстве своём народ ещё не до конца осознал мощи IObservable, вот и противятся. RX — офигенная штука, но барьер вхождения у неё всё-таки высоковат. Может если добавить сахарка, то люди бы и подтянулись.
www.livexaml.com
www.ammyui.com
www.nemerleweb.com
Re: await для IObservable<T>
От: Jack128  
Дата: 16.02.15 18:59
Оценка:
Здравствуйте, ionoy, Вы писали:

I>Для Task'ов есть удобный сахар в виде async/await. IObservable<T> — это фактически тот же Task, но для множества значений.

I>Так почему бы не добавить await для IObservable, раз уж он есть для тасков?

I>Что-то наподобие этого:


I>
I>IObservable<string> strings = GetStringsObservable();
I>await foreach(var str in strings) {
I>    // Здесь у нас OnNext обработчик
I>} catch (Exception e) {
I>   // OnError
I>}


Я небольшой спец по Rx, но вроде такой синтаксис никаких проблем реализовать нет:

IObservable<string> strings = GetStringsObservable();
try
{
  for(var task in strings.AsEnumerableOfTask()) 
  {
    string s = await task;
    ... 
  }
}
catch(Exception e) 
{
   Console.WriteLine(e);
}
Отредактировано 16.02.2015 19:20 Jack128 . Предыдущая версия . Еще …
Отредактировано 16.02.2015 19:00 Jack128 . Предыдущая версия .
Re: await для IObservable<T>
От: gandjustas Россия http://blog.gandjustas.ru/
Дата: 16.02.15 19:56
Оценка: 41 (3)
Здравствуйте, ionoy, Вы писали:

I>Для Task'ов есть удобный сахар в виде async/await. IObservable<T> — это фактически тот же Task, но для множества значений.

I>Так почему бы не добавить await для IObservable, раз уж он есть для тасков?

I>Что-то наподобие этого:


I>
I>IObservable<string> strings = GetStringsObservable();
I>await foreach(var str in strings) {
I>    // Здесь у нас OnNext обработчик
I>} catch (Exception e) {
I>   // OnError
I>}

I>//Всё что после "цикла" - это OnCompleted
I>




I>Как и в случае с IEnumerable<T>, мы можем комбинировать и модифицировать источник:


I>
I>await foreach(var str in strings.Where(s => !string.IsNullOrWhitespace(s)) {
I>  Debug.WriteLine(str);
I>}
I>


http://blogs.msdn.com/b/flaviencharlon/archive/2012/08/15/recreating-a-foreach-statement-for-iobservable.aspx

В принципе подобное есть и сейчас, только без сахара.


I>
I>IObservable<int> GetNumbers()
I>{
I>   yield 0;
I>   yield 1;
I>   yield 2;
I>}

I>==>

I>IObservable<int> GetNumbers()
I>{
I>   return Observable.Create(obs => {
I>      obs.OnNext(0);
I>      obs.OnNext(1);
I>      obs.OnNext(2);
I>   });
I>}
I>


Ты и сейчас можешь сделать
GetNumbers().ToObservable()


I>На практике, я примерно одинаково много пользуюсь как тасками, так и IObservable<T>. В последнее время появилось ощущение, что с IObservable работать менее удобно, так как нет такого же сахара. А хочется

И не зря такое ощущение. Очень мало задач, на которые естественным образом ложится Rx, обычно приходится делать кучу костылей.

Я вот делал одно приложение на Rx и немного офигел. Было устройство, которое работало по COM порту (могло быть и Tcp\Udp, в принципе неважно).
В COM порт можно отправить данные и можно постоянно слушать входящий поток.
Требовалось реализовать простой request-response протокол. Но запросы могли отправляться как внутренним таймером приложения, так и действиями пользователей.

Казалось бы должно быть все ОК, но это только казалось.
Первая проблема — реализовать сам request-response. Получается один IObservable на request, второй на response. И нужно на второй подписаться раньше, чем на первый (иначе подписка может не успеть сработать до прихода ответа) и тут же отписаться после получения данных. SelectMany не помогает, пытался курить When\Then, но не осилил.

И это простой вариант, так как ответы приходят в той же последовательности, кто и команды. А для более сложного случая (на похожем проекте), когда ответ может прийти в другом порядке (сопоставление по id в запросе) я даже не стал с Rx связываться, потому что прототип оказался раза в три больше по объему кода, чем TPL и постоянно приходилось бороться с утечками памяти.

Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).

А еще напрягает, что структура потоков данных, создаваемая RX — статична. Например если ты сделал Observable.Merge(a,b), то для того чтобы сделать Observable.Merge(a,b,c) тебе придется убить первую подписку. Это усложняет архитектуру, требуя фактически "перезапускать" все потоки данных при каждом изменении конфигурации. А это само по себе является проблемой, так как за время перезапуска могут произойти важные события (прийти важные данные).

В итоге для задачи работы с устройствами гораздо лучше подходит модель акторов или штука вроде TPL Dataflow (нечто среднее между Rx и акторами).
А на Rx можно делать банальный stream processing, когда потоки данных фиксированы, приходят извне и не порождаются самой программой.
Re[2]: await для IObservable<T>
От: ionoy Эстония www.ammyui.com
Дата: 17.02.15 05:57
Оценка:
Здравствуйте, gandjustas, Вы писали:

G>Здравствуйте, ionoy, Вы писали:


I>>На практике, я примерно одинаково много пользуюсь как тасками, так и IObservable<T>. В последнее время появилось ощущение, что с IObservable работать менее удобно, так как нет такого же сахара. А хочется

G>И не зря такое ощущение. Очень мало задач, на которые естественным образом ложится Rx, обычно приходится делать кучу костылей.

Если видишь, что Rx не упрощает, а напротив, усложняет твой код, то в конкретном месте его использовать не надо. Это на правах капитана очевидности

G>Я вот делал одно приложение на Rx и немного офигел. Было устройство, которое работало по COM порту (могло быть и Tcp\Udp, в принципе неважно).

G>В COM порт можно отправить данные и можно постоянно слушать входящий поток.
G>Требовалось реализовать простой request-response протокол. Но запросы могли отправляться как внутренним таймером приложения, так и действиями пользователей.

G>Казалось бы должно быть все ОК, но это только казалось.

G>Первая проблема — реализовать сам request-response. Получается один IObservable на request, второй на response. И нужно на второй подписаться раньше, чем на первый (иначе подписка может не успеть сработать до прихода ответа) и тут же отписаться после получения данных. SelectMany не помогает, пытался курить When\Then, но не осилил.

Не понимаю, зачем тебе IObservable на реквест. У меня на работе точно такая же задача. Надо отправлять пакет с неким sequenceId, а потом ждать ответ от "сервера", где этот sequenceId присутствует.

Упрощённо так:

private async Task<Packet> SendAndWaitForAnswer(int retries, int timeout, Packet packet, byte sequenceId) 
{
    try {
        return await RetryOnTimeout(async () => {
            _dataToWrite.Enqueue(Tuple.Create(packet.Data, sequenceId));
            
            return await _packetStream.Where(p => p.SequenceId == sequenceId)
                                      .Take(1)
                                      .Timeout(TimeSpan.FromMilliseconds(timeout))
                                      .ToTask();
        }, retries);
    } catch (TimeoutException e) {
    // No response
    }
}


Запросы я просто складываю в очередь, а отдельный поток постоянно за ней следит и по мере возможности отсылает данные куда надо.

G>И это простой вариант, так как ответы приходят в той же последовательности, кто и команды. А для более сложного случая (на похожем проекте), когда ответ может прийти в другом порядке (сопоставление по id в запросе) я даже не стал с Rx связываться, потому что прототип оказался раза в три больше по объему кода, чем TPL и постоянно приходилось бороться с утечками памяти.


Утечки памяти сами по себе в Rx не возникают.

G>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).


Тут я совсем не понимаю. С одной стороны ты хочешь обрабатывать объекты в одной подписке, с другой стороны у тебя нет общего интерфейса. Почему не сделать по одному подписчику на каждый тип объекта?

G>А еще напрягает, что структура потоков данных, создаваемая RX — статична. Например если ты сделал Observable.Merge(a,b), то для того чтобы сделать Observable.Merge(a,b,c) тебе придется убить первую подписку. Это усложняет архитектуру, требуя фактически "перезапускать" все потоки данных при каждом изменении конфигурации. А это само по себе является проблемой, так как за время перезапуска могут произойти важные события (прийти важные данные).


Я так понимаю, что дополнительный поток приходит извне, и сразу неизвестен. Тогда можно через прокси Subject:

   Subject<T> _proxy;
    
   public IObservable<T> Data { get { return _proxy.AsObservable(); } }
  
   void AddStream(IObservable<T> stream) 
   {
       stream.Subscribe(_proxy);
   }


Отписку добавить по вкусу.

Можно без Subject:

   ReactiveList<IObservable<T>> _streams;
   public IObservable<T> Data { 
       get { 
         return _streams.Changed
                        .Select(_ => Observable.Merge(_streams))
                        .Switch(); 
       } 
   }


G>В итоге для задачи работы с устройствами гораздо лучше подходит модель акторов или штука вроде TPL Dataflow (нечто среднее между Rx и акторами).

Я просто комбинирую TPL и Rx, тем более что они отлично вместе уживаются.

G>А на Rx можно делать банальный stream processing, когда потоки данных фиксированы, приходят извне и не порождаются самой программой.

На Rx можно сделать всё что угодно, просто некоторые вещи получатся переусложнёнными.
www.livexaml.com
www.ammyui.com
www.nemerleweb.com
Re[2]: await для IObservable<T>
От: ionoy Эстония www.ammyui.com
Дата: 17.02.15 05:58
Оценка:
Здравствуйте, Jack128, Вы писали:

J>Я небольшой спец по Rx, но вроде такой синтаксис никаких проблем реализовать нет:


J>
J>IObservable<string> strings = GetStringsObservable();
J>try
J>{
J>  for(var task in strings.AsEnumerableOfTask()) 
J>  {
J>    string s = await task;
J>    ... 
J>  }
J>}
J>catch(Exception e) 
J>{
J>   Console.WriteLine(e);
J>}
J>


Не думал о таком варианте. Выглядит неплохо, но без накладных расходов не представляю как такое реализовать.
www.livexaml.com
www.ammyui.com
www.nemerleweb.com
Re: await для IObservable<T>
От: SuhanovSergey  
Дата: 17.02.15 08:22
Оценка: 15 (2)
В данном случае правильнее говорить об await для IEnumerable. Из IEnumerable клиент поллит. IObservable пушит в клиента. await foreach — это полл.

Недавно смотрел на такую библиотеку: https://asyncenum.codeplex.com/

Ваш желаемый код

I>
I>IObservable<string> strings = GetStringsObservable();
I>await foreach(var str in strings) {
I>    // Здесь у нас OnNext обработчик
I>} catch (Exception e) {
I>   // OnError
I>}
I>


будет выглядеть так

    IAsyncEnumerable<string> strings = GetStringsAsyncEnumerable();
    try {
         await strings.ForEachAsync(async str => await MyOnNext(str));
         MyOnCompleted();
    } catch (Exception e) {
         MyOnError(e);
    }


Признаюсь, реализацию asyncenum я нашёл чрезмерно усложнённой. В результате написал свой более лучший велосипед.
Re[2]: await для IObservable<T>
От: ionoy Эстония www.ammyui.com
Дата: 17.02.15 08:46
Оценка:
Здравствуйте, SuhanovSergey, Вы писали:

SS>В данном случае правильнее говорить об await для IEnumerable. Из IEnumerable клиент поллит. IObservable пушит в клиента. await foreach — это полл.


var a = await someTask это тоже push, а работаем мы с ним как с pull.

SS>Недавно смотрел на такую библиотеку: https://asyncenum.codeplex.com/


SS>будет выглядеть так


SS>
SS>    IAsyncEnumerable<string> strings = GetStringsAsyncEnumerable();
SS>    try {
SS>         await strings.ForEachAsync(async str => await MyOnNext(str));
SS>         MyOnCompleted();
SS>    } catch (Exception e) {
SS>         MyOnError(e);
SS>    }
SS>


Точно такой же код я могу написать и с IObservable<T>. Хочется отказаться от лямбд для OnNext.
www.livexaml.com
www.ammyui.com
www.nemerleweb.com
Re[3]: await для IObservable<T>
От: gandjustas Россия http://blog.gandjustas.ru/
Дата: 17.02.15 10:02
Оценка: +1
Здравствуйте, ionoy, Вы писали:

I>Здравствуйте, gandjustas, Вы писали:


G>>Здравствуйте, ionoy, Вы писали:



I>Не понимаю, зачем тебе IObservable на реквест. У меня на работе точно такая же задача. Надо отправлять пакет с неким sequenceId, а потом ждать ответ от "сервера", где этот sequenceId присутствует.

Во-первых этот код был написан когда про await еще ничего не было слышно.


I>Упрощённо так:


I>
I>private async Task<Packet> SendAndWaitForAnswer(int retries, int timeout, Packet packet, byte sequenceId) 
I>{
I>    try {
I>        return await RetryOnTimeout(async () => {
I>            _dataToWrite.Enqueue(Tuple.Create(packet.Data, sequenceId));
            
I>            return await _packetStream.Where(p => p.SequenceId == sequenceId)
I>                                      .Take(1)
I>                                      .Timeout(TimeSpan.FromMilliseconds(timeout))
I>                                      .ToTask();
I>        }, retries);
I>    } catch (TimeoutException e) {
I>    // No response
I>    }
I>}
I>


I>Запросы я просто складываю в очередь, а отдельный поток постоянно за ней следит и по мере возможности отсылает данные куда надо.

1) Ты всю композицию кода сделал на таксах в итоге. Rx нужен только для таймаута, но и таймаут на тасках можно сделать.
2) Предполагать что всегда sequenceid в пакете будет — наивно, для случая когда не будет все сильно усложняется
3) Отдельный поток для отправки — странное решение при наличии TPL или Rx
4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет.
5) Неочевидная проблема, но я часто на нее нарывался. p => p.SequenceId == sequenceId создает замыкание, есть предположения когда GC сможет его убрать? У меня с таким кодом память текла очень быстро.

G>>И это простой вариант, так как ответы приходят в той же последовательности, кто и команды. А для более сложного случая (на похожем проекте), когда ответ может прийти в другом порядке (сопоставление по id в запросе) я даже не стал с Rx связываться, потому что прототип оказался раза в три больше по объему кода, чем TPL и постоянно приходилось бороться с утечками памяти.

I>Утечки памяти сами по себе в Rx не возникают.
Конечно сами не возникают, но ты в своем коде уже допустил её. Именно на таком коде у меня память текла.

G>>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).


I>Тут я совсем не понимаю. С одной стороны ты хочешь обрабатывать объекты в одной подписке, с другой стороны у тебя нет общего интерфейса. Почему не сделать по одному подписчику на каждый тип объекта?

Еще раз:
1) есть несколько потоков событий от таймера и от UI.
2) Все они возвращают разные объекты
3) На каждый отдельно подписываться нельзя, это ломает синхронизацию.
То есть нужно все потоки смержить и подписаться. Технически несложно Observable.Merge вызвать, но подписчик при этом ничего сделать не сможет. В итоге я через Do сделал обработку, а подписчик не делал ничего.

G>>А еще напрягает, что структура потоков данных, создаваемая RX — статична. Например если ты сделал Observable.Merge(a,b), то для того чтобы сделать Observable.Merge(a,b,c) тебе придется убить первую подписку. Это усложняет архитектуру, требуя фактически "перезапускать" все потоки данных при каждом изменении конфигурации. А это само по себе является проблемой, так как за время перезапуска могут произойти важные события (прийти важные данные).


I>Я так понимаю, что дополнительный поток приходит извне, и сразу неизвестен. Тогда можно через прокси Subject:

Можно, но после пары тыщ строк таких костылей надоело.

I>Отписку добавить по вкусу.


I>Можно без Subject:


I>
I>   ReactiveList<IObservable<T>> _streams;
I>   public IObservable<T> Data { 
I>       get { 
I>         return _streams.Changed
I>                        .Select(_ => Observable.Merge(_streams))
I>                        .Switch(); 
I>       } 
I>   }
I>


А вот так нельзя. добавление элемента в _streams не изменит результат в Observable.Merge.

G>>В итоге для задачи работы с устройствами гораздо лучше подходит модель акторов или штука вроде TPL Dataflow (нечто среднее между Rx и акторами).

I>Я просто комбинирую TPL и Rx, тем более что они отлично вместе уживаются.
Я пока не увидел зачем тебе Rx

G>>А на Rx можно делать банальный stream processing, когда потоки данных фиксированы, приходят извне и не порождаются самой программой.

I>На Rx можно сделать всё что угодно, просто некоторые вещи получатся переусложнёнными.
Да не просто "некоторые", а почти все. Rx — хорошая либа с точки зрения теории, но довольно неудобная на практике.
Re[3]: await для IObservable<T>
От: SuhanovSergey  
Дата: 17.02.15 10:17
Оценка:
Здравствуйте, ionoy, Вы писали:

I>Здравствуйте, SuhanovSergey, Вы писали:


SS>>В данном случае правильнее говорить об await для IEnumerable. Из IEnumerable клиент поллит. IObservable пушит в клиента. await foreach — это полл.


I>var a = await someTask это тоже push, а работаем мы с ним как с pull.


Да, это можно рассматиривать как one shot push из реализации таски. С другой стороны обычно строчкой выше эта someTask создаётся, что является pull запросом. Сахар await foreach разворачивался бы в await x.MoveNextAsync(). По аналогии с обычным IEnumerable.MoveNext MoveNextAsync запускал бы производство нового элемента в момент вызова, что является pull моделью. Хотя, конечно, возможна реализация IEnumerableAsync, которая бы проихводила элементы самопроизвольно и кэшировали бы их.

Короче, с await-ими грань между моделями тут размыта.


I>Точно такой же код я могу написать и с IObservable<T>. Хочется отказаться от лямбд для OnNext.


можно и без лямбд. без сахара получается длинно.
IAsyncEnumerable<string> strings = GetStringsAsyncEnumerable();
using (var enumerator = await strings.GetEnumerator())
{
   while (await enumerator.MoveNext())
       MyOnNext(enumerator.Current);
}


Сктати в asyncenum, можно писать продюсеров таким образом:
IAsyncEnumerable<int> numbers = Produce(async yieldAsync =>
{
   await yieldAsync.Yield(1);
   await yieldAsync.Yield(2);
});


что коррелирует с вопросом про "yield для генерации IObservable".
Re[4]: await для IObservable<T>
От: ionoy Эстония www.ammyui.com
Дата: 17.02.15 10:39
Оценка:
Здравствуйте, gandjustas, Вы писали:

G>Здравствуйте, ionoy, Вы писали:


I>>Упрощённо так:


I>>
I>>private async Task<Packet> SendAndWaitForAnswer(int retries, int timeout, Packet packet, byte sequenceId) 
I>>{
I>>    try {
I>>        return await RetryOnTimeout(async () => {
I>>            _dataToWrite.Enqueue(Tuple.Create(packet.Data, sequenceId));
            
I>>            return await _packetStream.Where(p => p.SequenceId == sequenceId)
I>>                                      .Take(1)
I>>                                      .Timeout(TimeSpan.FromMilliseconds(timeout))
I>>                                      .ToTask();
I>>        }, retries);
I>>    } catch (TimeoutException e) {
I>>    // No response
I>>    }
I>>}
I>>


I>>Запросы я просто складываю в очередь, а отдельный поток постоянно за ней следит и по мере возможности отсылает данные куда надо.

G>1) Ты всю композицию кода сделал на таксах в итоге. Rx нужен только для таймаута, но и таймаут на тасках можно сделать.
В любом случае где-то должен быть код, который будет:
1. Проверять sequenceId
2. Ждать таймаута

Конечно всё можно сделать на тасках, но раз уж у меня есть интерфейс IObservable<Packet>, то так выходит удобнее. IObservable<T> я не возвращаю, потому что семантически это неверно.

G>2) Предполагать что всегда sequenceid в пакете будет — наивно, для случая когда не будет все сильно усложняется

У тебя же устройство работало через COM порт, а теперь вдруг наивно... Может я чего-то не понял в твоей задаче

G>3) Отдельный поток для отправки — странное решение при наличии TPL или Rx

Дело вкуса.

G>4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет.

В моём случае это неважно, так как траффик не очень большой.

G>5) Неочевидная проблема, но я часто на нее нарывался. p => p.SequenceId == sequenceId создает замыкание, есть предположения когда GC сможет его убрать? У меня с таким кодом память текла очень быстро.

Непонятно, с чего тут вдруг память должна течь.

G>>>И это простой вариант, так как ответы приходят в той же последовательности, кто и команды. А для более сложного случая (на похожем проекте), когда ответ может прийти в другом порядке (сопоставление по id в запросе) я даже не стал с Rx связываться, потому что прототип оказался раза в три больше по объему кода, чем TPL и постоянно приходилось бороться с утечками памяти.

I>>Утечки памяти сами по себе в Rx не возникают.
G>Конечно сами не возникают, но ты в своем коде уже допустил её. Именно на таком коде у меня память текла.
У меня память не течёт, да и непонятно с чего бы ей течь.

G>>>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).


I>>Тут я совсем не понимаю. С одной стороны ты хочешь обрабатывать объекты в одной подписке, с другой стороны у тебя нет общего интерфейса. Почему не сделать по одному подписчику на каждый тип объекта?

G>Еще раз:
G>1) есть несколько потоков событий от таймера и от UI.
G>2) Все они возвращают разные объекты
G>3) На каждый отдельно подписываться нельзя, это ломает синхронизацию.
G>То есть нужно все потоки смержить и подписаться. Технически несложно Observable.Merge вызвать, но подписчик при этом ничего сделать не сможет. В итоге я через Do сделал обработку, а подписчик не делал ничего.
Всё равно не понял. Если источники у тебя "горячие", зачем тебе синхронизировать подписку? Твои Do ничем от раздельных подписок не отличаются, во всяком случае на первый взгляд.

G>>>А еще напрягает, что структура потоков данных, создаваемая RX — статична. Например если ты сделал Observable.Merge(a,b), то для того чтобы сделать Observable.Merge(a,b,c) тебе придется убить первую подписку. Это усложняет архитектуру, требуя фактически "перезапускать" все потоки данных при каждом изменении конфигурации. А это само по себе является проблемой, так как за время перезапуска могут произойти важные события (прийти важные данные).


I>>Я так понимаю, что дополнительный поток приходит извне, и сразу неизвестен. Тогда можно через прокси Subject:

G>Можно, но после пары тыщ строк таких костылей надоело.
А через TPL там прямо красота, да?

I>>Отписку добавить по вкусу.


I>>Можно без Subject:


I>>
I>>   ReactiveList<IObservable<T>> _streams;
I>>   public IObservable<T> Data { 
I>>       get { 
I>>         return _streams.Changed
I>>                        .Select(_ => Observable.Merge(_streams))
I>>                        .Switch(); 
I>>       } 
I>>   }
I>>


G>А вот так нельзя. добавление элемента в _streams не изменит результат в Observable.Merge.

Почему не изменит?

G>>>В итоге для задачи работы с устройствами гораздо лучше подходит модель акторов или штука вроде TPL Dataflow (нечто среднее между Rx и акторами).

I>>Я просто комбинирую TPL и Rx, тем более что они отлично вместе уживаются.
G>Я пока не увидел зачем тебе Rx
Мне то он много зачем, я ведь только про твою задачу отвечал.

G>>>А на Rx можно делать банальный stream processing, когда потоки данных фиксированы, приходят извне и не порождаются самой программой.

I>>На Rx можно сделать всё что угодно, просто некоторые вещи получатся переусложнёнными.
G>Да не просто "некоторые", а почти все.
Это явное преувеличение.
www.livexaml.com
www.ammyui.com
www.nemerleweb.com
Re[4]: await для IObservable<T>
От: ionoy Эстония www.ammyui.com
Дата: 17.02.15 10:43
Оценка:
Здравствуйте, SuhanovSergey, Вы писали:

SS>Здравствуйте, ionoy, Вы писали:


I>>Здравствуйте, SuhanovSergey, Вы писали:


SS>>>В данном случае правильнее говорить об await для IEnumerable. Из IEnumerable клиент поллит. IObservable пушит в клиента. await foreach — это полл.


I>>var a = await someTask это тоже push, а работаем мы с ним как с pull.


SS>Да, это можно рассматиривать как one shot push из реализации таски. С другой стороны обычно строчкой выше эта someTask создаётся, что является pull запросом.

С IObservable абсолютно то же самое.

SS>Сахар await foreach разворачивался бы в await x.MoveNextAsync(). По аналогии с обычным IEnumerable.MoveNext MoveNextAsync запускал бы производство нового элемента в момент вызова, что является pull моделью. Хотя, конечно, возможна реализация IEnumerableAsync, которая бы проихводила элементы самопроизвольно и кэшировали бы их.


SS>Короче, с await-ими грань между моделями тут размыта.

Я всё это понимаю, но не понимаю зачем. Без поддержки синтаксиса это ровно ничем не отличается от IObservable, а преимуществ я не вижу.

I>>Точно такой же код я могу написать и с IObservable<T>. Хочется отказаться от лямбд для OnNext.


SS>можно и без лямбд. без сахара получается длинно.

SS>
SS>IAsyncEnumerable<string> strings = GetStringsAsyncEnumerable();
SS>using (var enumerator = await strings.GetEnumerator())
SS>{
SS>   while (await enumerator.MoveNext())
SS>       MyOnNext(enumerator.Current);
SS>}
SS>


Не, ну это неинтересно.

SS>Сктати в asyncenum, можно писать продюсеров таким образом:

SS>
SS>IAsyncEnumerable<int> numbers = Produce(async yieldAsync =>
SS>{
SS>   await yieldAsync.Yield(1);
SS>   await yieldAsync.Yield(2);
SS>});
SS>


Опять же калька с Observable.Create();
www.livexaml.com
www.ammyui.com
www.nemerleweb.com
Re[5]: await для IObservable<T>
От: gandjustas Россия http://blog.gandjustas.ru/
Дата: 17.02.15 11:02
Оценка:
Здравствуйте, ionoy, Вы писали:

I>Здравствуйте, gandjustas, Вы писали:


G>>Здравствуйте, ionoy, Вы писали:


G>>1) Ты всю композицию кода сделал на таксах в итоге. Rx нужен только для таймаута, но и таймаут на тасках можно сделать.

I>В любом случае где-то должен быть код, который будет:
I> 1. Проверять sequenceId
I> 2. Ждать таймаута
Это как раз не самое важное. Самое важное как из батонкликов будет отправляться запрос. Rx в принципе позволяет легко и красиво это сделать (через Linq). Но именно что "в принципе", любая реальная задача обрастает костылями.


I>Конечно всё можно сделать на тасках, но раз уж у меня есть интерфейс IObservable<Packet>, то так выходит удобнее. IObservable<T> я не возвращаю, потому что семантически это неверно.

Дык у тебя почти все на тасках и сделано. Вместо оборачивания чего-либо в IObservable вполне можно и входдящий поток сделать на тасках.

G>>2) Предполагать что всегда sequenceid в пакете будет — наивно, для случая когда не будет все сильно усложняется

I>У тебя же устройство работало через COM порт, а теперь вдруг наивно... Может я чего-то не понял в твоей задаче
На самом деле было несколько похожих задач и несколько устройств. В первом случае никаких sequenceid не было, ответы приходили в том же порядке, что и команды, но устройство могло игнорировать часть команд, не отправляя ответов. Из-за этого было еще больше костылей, чем хотелось бы.
Потом была аналогичная задача, где можно было по sequenceid кореллировать запросы и ответы, работало по UDP, сделал сначала как ты, через where — получилось медленно и утекала память. Пришлось плодить костыли. Потом оказалось, что переписав на TPL можно выиграть по скорости и объему кода.


G>>4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет.

I>В моём случае это неважно, так как траффик не очень большой.
А в моем случае 100 раз в секунду.

G>>5) Неочевидная проблема, но я часто на нее нарывался. p => p.SequenceId == sequenceId создает замыкание, есть предположения когда GC сможет его убрать? У меня с таким кодом память текла очень быстро.

I>Непонятно, с чего тут вдруг память должна течь.
Я вот тоже до конца не разобрался, но помоему суть в том, что Task освобождается поздно и держит лямбду с замыканием. При сотнях вызовов в секунду память начинает реально течь.
После убирания замыканий проблема исчезла.


G>>>>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).



I>Всё равно не понял. Если источники у тебя "горячие", зачем тебе синхронизировать подписку? Твои Do ничем от раздельных подписок не отличаются, во всяком случае на первый взгляд.

Ответы — горячие, а запросы — холодные (не выполняются пока подписка не выполнилась), надо чтобы запрос уходил только после получения ответа. Поэтому надо синхронизировать все IObservable.
Попробуй написать свой пример когда у тебя нет sequenceid и быстро поймешь.

G>>>>А еще напрягает, что структура потоков данных, создаваемая RX — статична. Например если ты сделал Observable.Merge(a,b), то для того чтобы сделать Observable.Merge(a,b,c) тебе придется убить первую подписку. Это усложняет архитектуру, требуя фактически "перезапускать" все потоки данных при каждом изменении конфигурации. А это само по себе является проблемой, так как за время перезапуска могут произойти важные события (прийти важные данные).


I>>>Я так понимаю, что дополнительный поток приходит извне, и сразу неизвестен. Тогда можно через прокси Subject:

G>>Можно, но после пары тыщ строк таких костылей надоело.
I>А через TPL там прямо красота, да?
Использовал для такой задачи dataflow — реально красота. Появился новый источник — создал объект и присоедил его к мультиплексеру, все. Две-три строки от силы.

I>>>Отписку добавить по вкусу.


I>>>Можно без Subject:


I>>>
I>>>   ReactiveList<IObservable<T>> _streams;
I>>>   public IObservable<T> Data { 
I>>>       get { 
I>>>         return _streams.Changed
I>>>                        .Select(_ => Observable.Merge(_streams))
I>>>                        .Switch(); 
I>>>       } 
I>>>   }
I>>>


G>>А вот так нельзя. добавление элемента в _streams не изменит результат в Observable.Merge.

I>Почему не изменит?
Если лист у тебя — Enumerable, то не изменит. Если observable, то проблема уходит на уровень выше и все равно обрастает костылями.

G>>>>В итоге для задачи работы с устройствами гораздо лучше подходит модель акторов или штука вроде TPL Dataflow (нечто среднее между Rx и акторами).

I>>>Я просто комбинирую TPL и Rx, тем более что они отлично вместе уживаются.
G>>Я пока не увидел зачем тебе Rx
I>Мне то он много зачем, я ведь только про твою задачу отвечал.
Прекрасно, ты сам показал что для работы с устройствами Rx как-то не сильно подходит, ибо композиция сделана тасками.
Можешь привести пример где Rx таки нужен будет?
Re[5]: await для IObservable<T>
От: SuhanovSergey  
Дата: 17.02.15 11:08
Оценка:
SS>>Короче, с await-ими грань между моделями тут размыта.
I>Я всё это понимаю, но не понимаю зачем. Без поддержки синтаксиса это ровно ничем не отличается от IObservable, а преимуществ я не вижу.

Зачем? async/await позволяет писать читабельный асинхронный (не блолирующий потоки во время IO) код. IEnumerableAsync добавляет к этому нативную поддержу коллекций.

Всё конечно можно сделать и на Rx, но у него есть проблемы с читабельностью и зацикленностью на коллекциях.

SS>>Сктати в asyncenum, можно писать продюсеров таким образом:

SS>>
SS>>IAsyncEnumerable<int> numbers = Produce(async yieldAsync =>
SS>>{
SS>>   await yieldAsync.Yield(1);
SS>>   await yieldAsync.Yield(2);
SS>>});
SS>>


I>Опять же калька с Observable.Create();


Observable.Create не позволит так же просто и эффективно блокировать продюсера пока консьюмер не готов.
Re[6]: await для IObservable<T>
От: ionoy Эстония www.ammyui.com
Дата: 17.02.15 11:54
Оценка:
Здравствуйте, gandjustas, Вы писали:

G>Здравствуйте, ionoy, Вы писали:


I>>Здравствуйте, gandjustas, Вы писали:


G>>>Здравствуйте, ionoy, Вы писали:

G>Это как раз не самое важное. Самое важное как из батонкликов будет отправляться запрос. Rx в принципе позволяет легко и красиво это сделать (через Linq). Но именно что "в принципе", любая реальная задача обрастает костылями.

У меня батонклик привязан к ViewModel через ReactiveUI (WinForms). Дальше запрос кладётся в очередь, так что Rx я использую только при обработке клика, если надо.

I>>Конечно всё можно сделать на тасках, но раз уж у меня есть интерфейс IObservable<Packet>, то так выходит удобнее. IObservable<T> я не возвращаю, потому что семантически это неверно.

G>Дык у тебя почти все на тасках и сделано. Вместо оборачивания чего-либо в IObservable вполне можно и входдящий поток сделать на тасках.
Потоки удобнее делать на IObservable, если есть какая-то логика обработки.


G>>>4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет.

I>>В моём случае это неважно, так как траффик не очень большой.
G>А в моем случае 100 раз в секунду.
100 раз в секунду тоже не так много, т.к. у тебя запросы в любом случае последовательные. Когда приходит новый подписчик, старый уже отписался. Если же in-flight запросов много, то придётся немного переписать.

G>>>5) Неочевидная проблема, но я часто на нее нарывался. p => p.SequenceId == sequenceId создает замыкание, есть предположения когда GC сможет его убрать? У меня с таким кодом память текла очень быстро.

I>>Непонятно, с чего тут вдруг память должна течь.
G>Я вот тоже до конца не разобрался, но помоему суть в том, что Task освобождается поздно и держит лямбду с замыканием. При сотнях вызовов в секунду память начинает реально течь.
G>После убирания замыканий проблема исчезла.
Память могла утекать только если не происходила отписка. Ну или сверх-древняя версия Rx содержала баг и не освобождала замыкание.

G>>>>>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).

G>Попробуй написать свой пример когда у тебя нет sequenceid и быстро поймешь.
Ты просто объединил отсылку и получение в одной подписке. Из-за этого получилась большая каша кода, что тебе не понравилось. Если бы ты эти понятия разделил, всё было бы значительно красивее, и может быть, ты бы увидел преимущества Rx.

I>>А через TPL там прямо красота, да?

G>Использовал для такой задачи dataflow — реально красота. Появился новый источник — создал объект и присоедил его к мультиплексеру, все. Две-три строки от силы.

Тут одна

I>>>>Отписку добавить по вкусу.


I>>>>Можно без Subject:


I>>>>
I>>>>   ReactiveList<IObservable<T>> _streams;
I>>>>   public IObservable<T> Data { 
I>>>>       get { 
I>>>>         return _streams.Changed
I>>>>                        .Select(_ => Observable.Merge(_streams))
I>>>>                        .Switch(); 
I>>>>       } 
I>>>>   }
I>>>>


G>>>А вот так нельзя. добавление элемента в _streams не изменит результат в Observable.Merge.

I>>Почему не изменит?
G>Если лист у тебя — Enumerable, то не изменит. Если observable, то проблема уходит на уровень выше и все равно обрастает костылями.
Там ничего не уходит на уровень выше. Это полностью рабочий код.

G>Можешь привести пример где Rx таки нужен будет?


Честно говоря, сейчас не хочется выбирать красивые примеры и потом защищать их от аргументов, мол можно сделать по-другому. Если есть конкретная задача, то можно написать реализацию на Rx и сравнить с другими решениями. У меня сейчас несколько desktop и winphone приложений в разработке и там всё пронизано IObservable. Как минимум мне самому это очень удобно.
www.livexaml.com
www.ammyui.com
www.nemerleweb.com
Re[6]: await для IObservable<T>
От: ionoy Эстония www.ammyui.com
Дата: 17.02.15 11:58
Оценка:
Здравствуйте, SuhanovSergey, Вы писали:


SS>>>Короче, с await-ими грань между моделями тут размыта.

I>>Я всё это понимаю, но не понимаю зачем. Без поддержки синтаксиса это ровно ничем не отличается от IObservable, а преимуществ я не вижу.

SS>Зачем? async/await позволяет писать читабельный асинхронный (не блолирующий потоки во время IO) код. IEnumerableAsync добавляет к этому нативную поддержу коллекций.

Я про то, что код указанный выше можно строчка в строчку переписать на Rx.

SS>Всё конечно можно сделать и на Rx, но у него есть проблемы с читабельностью и зацикленностью на коллекциях.

Код ничем не будет отличаться. Какая зацикленность на коллекциях?

SS>>>Сктати в asyncenum, можно писать продюсеров таким образом:

SS>>>
SS>>>IAsyncEnumerable<int> numbers = Produce(async yieldAsync =>
SS>>>{
SS>>>   await yieldAsync.Yield(1);
SS>>>   await yieldAsync.Yield(2);
SS>>>});
SS>>>


I>>Опять же калька с Observable.Create();


SS>Observable.Create не позволит так же просто и эффективно блокировать продюсера пока консьюмер не готов.

Последовательность обработки — один из принципов Rx. Это, как раз, иногда добавляет проблем.
Ещё раз говорю, весь код который тут приведён можно переписать на Rx практически без изменений.

Поэтому и вопрос — зачем?
www.livexaml.com
www.ammyui.com
www.nemerleweb.com
Re[7]: await для IObservable<T>
От: gandjustas Россия http://blog.gandjustas.ru/
Дата: 17.02.15 12:15
Оценка:
Здравствуйте, ionoy, Вы писали:

I>Здравствуйте, gandjustas, Вы писали:


G>>Здравствуйте, ionoy, Вы писали:


I>>>Здравствуйте, gandjustas, Вы писали:


G>>>>Здравствуйте, ionoy, Вы писали:

G>>Это как раз не самое важное. Самое важное как из батонкликов будет отправляться запрос. Rx в принципе позволяет легко и красиво это сделать (через Linq). Но именно что "в принципе", любая реальная задача обрастает костылями.

I>У меня батонклик привязан к ViewModel через ReactiveUI (WinForms). Дальше запрос кладётся в очередь, так что Rx я использую только при обработке клика, если надо.

Я как раз и хотел использовать Rx, чтобы вручную этими очередями не заниматься. Если писать очереди вручную, то можно хоть Rx, хоть TPL разницы вообще никакой не будет (только стилистическая и то не везде)

I>>>Конечно всё можно сделать на тасках, но раз уж у меня есть интерфейс IObservable<Packet>, то так выходит удобнее. IObservable<T> я не возвращаю, потому что семантически это неверно.

G>>Дык у тебя почти все на тасках и сделано. Вместо оборачивания чего-либо в IObservable вполне можно и входдящий поток сделать на тасках.
I>Потоки удобнее делать на IObservable, если есть какая-то логика обработки.
Делать то удобнее, а обрабатывать — не всегда. увы.


G>>>>4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет.

I>>>В моём случае это неважно, так как траффик не очень большой.
G>>А в моем случае 100 раз в секунду.
I>100 раз в секунду тоже не так много, т.к. у тебя запросы в любом случае последовательные. Когда приходит новый подписчик, старый уже отписался. Если же in-flight запросов много, то придётся немного переписать.
100 раз в секунду достаточно чтобы проблемы повылезали. Вообще в Rx многие проблемы вылезают только под нагрузкой. Не знаю с чем связано, не копал так глубоко.

G>>>>5) Неочевидная проблема, но я часто на нее нарывался. p => p.SequenceId == sequenceId создает замыкание, есть предположения когда GC сможет его убрать? У меня с таким кодом память текла очень быстро.

I>>>Непонятно, с чего тут вдруг память должна течь.
G>>Я вот тоже до конца не разобрался, но помоему суть в том, что Task освобождается поздно и держит лямбду с замыканием. При сотнях вызовов в секунду память начинает реально течь.
G>>После убирания замыканий проблема исчезла.
I>Память могла утекать только если не происходила отписка. Ну или сверх-древняя версия Rx содержала баг и не освобождала замыкание.
Скорее всего отписка происходила поздно, от этого объект успевал дожить до второго поколения, что давало большу нагрузку на GC, а из-за больших нагрузок на GC отписка еще сильнее задерживалась итд.

G>>>>>>Далее я победил request-response. У меня появилось много IObservable<T> (где все T разные), и надо было их объединить в нечто и подписаться. Но так нельзя. Если приведешь все IObservable<object>, то потом придется на выходе через typetest разбирать поток. А на каждый в отдельности IObservable подписаться нельзя, потому что они начинают конкурентно слать команды и ломается логика обработки request-response (которая и так полна костылями).

G>>Попробуй написать свой пример когда у тебя нет sequenceid и быстро поймешь.
I>Ты просто объединил отсылку и получение в одной подписке. Из-за этого получилась большая каша кода, что тебе не понравилось. Если бы ты эти понятия разделил, всё было бы значительно красивее, и может быть, ты бы увидел преимущества Rx.
Дык суть Rx в композируемости потоков данных. Если ты руками в очереди перекладываешь объекты, то суть Rx теряется.

I>>>А через TPL там прямо красота, да?

G>>Использовал для такой задачи dataflow — реально красота. Появился новый источник — создал объект и присоедил его к мультиплексеру, все. Две-три строки от силы.

I>Тут одна

Да не одна, потому что далеко не весь код, чтобы заработало.

I>>>>>Можно без Subject:


I>>>>>
I>>>>>   ReactiveList<IObservable<T>> _streams;
I>>>>>   public IObservable<T> Data { 
I>>>>>       get { 
I>>>>>         return _streams.Changed
I>>>>>                        .Select(_ => Observable.Merge(_streams))
I>>>>>                        .Switch(); 
I>>>>>       } 
I>>>>>   }
I>>>>>


G>>>>А вот так нельзя. добавление элемента в _streams не изменит результат в Observable.Merge.

I>>>Почему не изменит?
G>>Если лист у тебя — Enumerable, то не изменит. Если observable, то проблема уходит на уровень выше и все равно обрастает костылями.
I>Там ничего не уходит на уровень выше. Это полностью рабочий код.
А в ReacliveList кто и как добавляет\удаляет элементы?

G>>Можешь привести пример где Rx таки нужен будет?


I>Честно говоря, сейчас не хочется выбирать красивые примеры и потом защищать их от аргументов, мол можно сделать по-другому. Если есть конкретная задача, то можно написать реализацию на Rx и сравнить с другими решениями. У меня сейчас несколько desktop и winphone приложений в разработке и там всё пронизано IObservable. Как минимум мне самому это очень удобно.

Да, я тоже так писал. Потом оказалось что TPL ничуть не хуже (после появления async\await), а во многих случаях еще более читаемый код дает.
Re[8]: await для IObservable<T>
От: ionoy Эстония www.ammyui.com
Дата: 17.02.15 12:24
Оценка:
Здравствуйте, gandjustas, Вы писали:

I>>У меня батонклик привязан к ViewModel через ReactiveUI (WinForms). Дальше запрос кладётся в очередь, так что Rx я использую только при обработке клика, если надо.

G>Я как раз и хотел использовать Rx, чтобы вручную этими очередями не заниматься. Если писать очереди вручную, то можно хоть Rx, хоть TPL разницы вообще никакой не будет (только стилистическая и то не везде)
А там заниматься то нечем. Поэтому и удобнее без TPL и Rx.

G>>>>>4) Если вызвать параллельно 100 таких функций, то сравнение p.SequenceId == sequenceId будет выполняться 100 раз, логично было бы сделать dictionary но в Rx так не выйдет.

I>>>>В моём случае это неважно, так как траффик не очень большой.
G>>>А в моем случае 100 раз в секунду.
I>>100 раз в секунду тоже не так много, т.к. у тебя запросы в любом случае последовательные. Когда приходит новый подписчик, старый уже отписался. Если же in-flight запросов много, то придётся немного переписать.
G>100 раз в секунду достаточно чтобы проблемы повылезали. Вообще в Rx многие проблемы вылезают только под нагрузкой. Не знаю с чем связано, не копал так глубоко.
Да вроде есть толковые ребята, которые используют его в нагрузке.

I>>Память могла утекать только если не происходила отписка. Ну или сверх-древняя версия Rx содержала баг и не освобождала замыкание.

G>Скорее всего отписка происходила поздно, от этого объект успевал дожить до второго поколения, что давало большу нагрузку на GC, а из-за больших нагрузок на GC отписка еще сильнее задерживалась итд.
Отписка должна происходить моментально после обработки.

I>>>>А через TPL там прямо красота, да?

G>>>Использовал для такой задачи dataflow — реально красота. Появился новый источник — создал объект и присоедил его к мультиплексеру, все. Две-три строки от силы.
I>>Тут одна
G>Да не одна, потому что далеко не весь код, чтобы заработало.
Для той функциональности, которую ты попросил — весь.

I>>>>>>
I>>>>>>   ReactiveList<IObservable<T>> _streams;
I>>>>>>   public IObservable<T> Data { 
I>>>>>>       get { 
I>>>>>>         return _streams.Changed
I>>>>>>                        .Select(_ => Observable.Merge(_streams))
I>>>>>>                        .Switch(); 
I>>>>>>       } 
I>>>>>>   }
I>>>>>>


I>>Там ничего не уходит на уровень выше. Это полностью рабочий код.

G>А в ReacliveList кто и как добавляет\удаляет элементы?

Да кто хочет. Там обычный интерфейс, как у List<T>.

G>>>Можешь привести пример где Rx таки нужен будет?


I>>Честно говоря, сейчас не хочется выбирать красивые примеры и потом защищать их от аргументов, мол можно сделать по-другому. Если есть конкретная задача, то можно написать реализацию на Rx и сравнить с другими решениями. У меня сейчас несколько desktop и winphone приложений в разработке и там всё пронизано IObservable. Как минимум мне самому это очень удобно.

G>Да, я тоже так писал. Потом оказалось что TPL ничуть не хуже (после появления async\await), а во многих случаях еще более читаемый код дает.

Так я и не предлагаю всё на Rx делать. Для меня отлично работает правило "один ответ — Таск, больше — Observable".
www.livexaml.com
www.ammyui.com
www.nemerleweb.com
Re[7]: await для IObservable<T>
От: SuhanovSergey  
Дата: 17.02.15 13:08
Оценка:
Здравствуйте, ionoy, Вы писали:

I>Здравствуйте, SuhanovSergey, Вы писали:



SS>>Зачем? async/await позволяет писать читабельный асинхронный (не блолирующий потоки во время IO) код. IEnumerableAsync добавляет к этому нативную поддержу коллекций.

I>Последовательность обработки — один из принципов Rx. Это, как раз, иногда добавляет проблем.
I>Ещё раз говорю, весь код который тут приведён можно переписать на Rx практически без изменений.
I>Поэтому и вопрос — зачем?
Приведённый ниже код никогда не блокирует потоки на время IO. Считаем, что все awaitable методы делают тот или иной IO.
Попробуте переписать код на Rx с сохранением лаконичности и читабильности.

(код выдуманный, к смыслу не придираться)
var userId = await GetUserId(name);
IAsyncEnumerable<Invoice> invoices = 
    (await dataAccess.ReadInvoicesFromDB(userId))
    .Where(invoice => invoice.Total == 0) // filter by synchronous predicate
    .Where(async invoice => !(await IsInvoiceProcessed(invoice.Id))); // filter by asynchronous predicate
await invoces.ForEachAsync(async invoice => // async invoice handler
{
    var total = CalculateTotal(invoice); // CPU-bound procedure
    await UpdateInvoiceTotal(invoice.Id, total); // IO - request to DB
    await ProcessInvoice(invoice); // IO - request to other service
});

async IAsyncEnumerable<Invoice> IDataAccess.ReadInvoicesFromDB(string userId)
{
    var connection = await ConnectToDB();
    return EnumerableAsync.Produce(async yieldAsync =>
    {
         for (;;)
         {
            var batch = await connection.Query(...);
            if (batch.IsEmpty)
                return;
            foreach (var invoice in batch)
                await yieldAsync.Yield(invoice); // producer waits for consumer here wo blocking any thread
         }
    });
}
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.