[Rx] Задержать событие при условии
От: _NN_ www.nemerleweb.com
Дата: 03.12.16 11:25
Оценка:
Есть примерно такой код:

someObservable.Select(x => getY(x));

Y getY(X x)
{
  if (x.Value == X.ABC)
    return new Y(1);
  else
    return new Y(2);
}


В некоторых случаях проверку 'x.Value == X.ABC' нужно совершить повторно через некоторое время так как Value может поменяться.
Т.е. что-то вроде:
Y getY(X x)
{
  if (x.Value == X.ABC)
    return new Y(1);
  else
  if (x.SomethingElse == true)
  {
     Thread.Sleep(timeout);
     if (x.Value == X.ABC)
       return new Y(1);
     else
       return new Y(2);
  }
}


Как правильно избавиться от Thread.Sleep ?
http://rsdn.nemerleweb.com
http://nemerleweb.com
rx
Re: [Rx] Задержать событие при условии
От: Sinix  
Дата: 03.12.16 11:52
Оценка:
Здравствуйте, _NN_, Вы писали:


_NN>В некоторых случаях проверку 'x.Value == X.ABC' нужно совершить повторно через некоторое время так как Value может поменяться.


Я тот ещё сварщик, по памяти — выставить getY как второй Observable + join. Если не подходит, то лучше спросить на stackoverflow, тут емнип только ув. ganjustas по Rx регулярно отвечает. Если кого забыл — звиняйте
Re[2]: [Rx] Задержать событие при условии
От: _NN_ www.nemerleweb.com
Дата: 03.12.16 12:13
Оценка:
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, _NN_, Вы писали:



_NN>>В некоторых случаях проверку 'x.Value == X.ABC' нужно совершить повторно через некоторое время так как Value может поменяться.


S>Я тот ещё сварщик, по памяти — выставить getY как второй Observable + join. Если не подходит, то лучше спросить на stackoverflow, тут емнип только ув. ganjustas по Rx регулярно отвечает. Если кого забыл — звиняйте


А порядок будет тот же или надо еще скинуть в окно и отсортировать на всякий случай ?
http://rsdn.nemerleweb.com
http://nemerleweb.com
Re[3]: [Rx] Задержать событие при условии
От: Sinix  
Дата: 03.12.16 12:24
Оценка:
Здравствуйте, _NN_, Вы писали:

_NN>А порядок будет тот же или надо еще скинуть в окно и отсортировать на всякий случай ?

Вроде бы тот же, но могу врать.
Re[4]: [Rx] Задержать событие при условии
От: _NN_ www.nemerleweb.com
Дата: 03.12.16 12:35
Оценка:
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, _NN_, Вы писали:


_NN>>А порядок будет тот же или надо еще скинуть в окно и отсортировать на всякий случай ?

S>Вроде бы тот же, но могу врать.

Попробую на SO, посмотрим что подскажут.
http://rsdn.nemerleweb.com
http://nemerleweb.com
Re: [Rx] Задержать событие при условии
От: TK Лес кывт.рф
Дата: 03.12.16 19:16
Оценка:
Здравствуйте, _NN_, Вы писали:

_NN>Есть примерно такой код:

_NN>Как правильно избавиться от Thread.Sleep ?

Перепишите все без Observable

А так, надо заменить Thread.Sleep на Task.Delay и вместо Select отправить поток данных в TransformBlock с минимальным буфером.
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Re[3]: [Rx] Задержать событие при условии
От: TK Лес кывт.рф
Дата: 03.12.16 19:24
Оценка:
Здравствуйте, _NN_, Вы писали:

_NN>А порядок будет тот же или надо еще скинуть в окно и отсортировать на всякий случай ?


Ваш Thread.Sleep лочит источник. Если очередь не пугает, можно просто Observable.Delay использовать
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Re[2]: [Rx] Задержать событие при условии
От: _NN_ www.nemerleweb.com
Дата: 03.12.16 19:27
Оценка:
Здравствуйте, TK, Вы писали:

TK>Здравствуйте, _NN_, Вы писали:


_NN>>Есть примерно такой код:

_NN>>Как правильно избавиться от Thread.Sleep ?

TK>Перепишите все без Observable

Да думаю придётся скорее всего, чтобы работало, а потом думать как красиво написать.

TK>А так, надо заменить Thread.Sleep на Task.Delay и вместо Select отправить поток данных в TransformBlock с минимальным буфером.

Я в TPL Dataflow совсем не силён.
Можно хотя бы схематично описать идею , а там гляди и в Rx решить удастся?
http://rsdn.nemerleweb.com
http://nemerleweb.com
Re[3]: [Rx] Задержать событие при условии
От: TK Лес кывт.рф
Дата: 03.12.16 19:41
Оценка:
Здравствуйте, _NN_, Вы писали:

TK>>А так, надо заменить Thread.Sleep на Task.Delay и вместо Select отправить поток данных в TransformBlock с минимальным буфером.

_NN>Я в TPL Dataflow совсем не силён.
_NN>Можно хотя бы схематично описать идею , а там гляди и в Rx решить удастся?

А что там описывать? TransformBlock принимает на вход метод типа Func<A, Task<B>> (ваш getY)
дальше примерно следующее:

return Observable.Using(() => someObservable.Subscribe(block.AsObserver()), r => block.AsObservable().Finally(() => r.Dispose()).Finally(() => block.Complete());
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Re[4]: [Rx] Задержать событие при условии
От: _NN_ www.nemerleweb.com
Дата: 03.12.16 21:02
Оценка:
Здравствуйте, TK, Вы писали:

TK>Здравствуйте, _NN_, Вы писали:


TK>>>А так, надо заменить Thread.Sleep на Task.Delay и вместо Select отправить поток данных в TransformBlock с минимальным буфером.

_NN>>Я в TPL Dataflow совсем не силён.
_NN>>Можно хотя бы схематично описать идею , а там гляди и в Rx решить удастся?

TK>А что там описывать? TransformBlock принимает на вход метод типа Func<A, Task<B>> (ваш getY)

TK>дальше примерно следующее:

TK>return Observable.Using(() => someObservable.Subscribe(block.AsObserver()), r => block.AsObservable().Finally(() => r.Dispose()).Finally(() => block.Complete());


Мне бы ещё на .NET 3.5
Думаю сделаю сначала по старинке с очередью и потоком, а когда будет у меня рабочий код, пораскинем мозгами как улучшить.
http://rsdn.nemerleweb.com
http://nemerleweb.com
Re[5]: [Rx] Задержать событие при условии
От: TK Лес кывт.рф
Дата: 03.12.16 21:22
Оценка:
Здравствуйте, _NN_, Вы писали:

_NN>Мне бы ещё на .NET 3.5

_NN>Думаю сделаю сначала по старинке с очередью и потоком, а когда будет у меня рабочий код, пораскинем мозгами как улучшить.

Зачем там поток? Должно хватить и Observable.Defer с семафором.
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Re[6]: [Rx] Задержать событие при условии
От: _NN_ www.nemerleweb.com
Дата: 04.12.16 06:20
Оценка:
Здравствуйте, TK, Вы писали:

TK>Здравствуйте, _NN_, Вы писали:


_NN>>Мне бы ещё на .NET 3.5

_NN>>Думаю сделаю сначала по старинке с очередью и потоком, а когда будет у меня рабочий код, пораскинем мозгами как улучшить.

TK>Зачем там поток? Должно хватить и Observable.Defer с семафором.


Да как-то ни разу не было возможности использовать Defer так я как-то и не думал о нём.
Можно попробовать.
http://rsdn.nemerleweb.com
http://nemerleweb.com
Re[6]: [Rx] Задержать событие при условии
От: _NN_ www.nemerleweb.com
Дата: 04.12.16 06:29
Оценка:
Здравствуйте, TK, Вы писали:

TK>Здравствуйте, _NN_, Вы писали:


_NN>>Мне бы ещё на .NET 3.5

_NN>>Думаю сделаю сначала по старинке с очередью и потоком, а когда будет у меня рабочий код, пораскинем мозгами как улучшить.

TK>Зачем там поток? Должно хватить и Observable.Defer с семафором.


Кстати а тормозить очередь обработчиков это не будет если сделать задержку внутри через Delay ?
http://rsdn.nemerleweb.com
http://nemerleweb.com
Re[7]: [Rx] Задержать событие при условии
От: TK Лес кывт.рф
Дата: 04.12.16 08:58
Оценка: 20 (1)
Здравствуйте, _NN_, Вы писали:

TK>>Зачем там поток? Должно хватить и Observable.Defer с семафором.

_NN>Кстати а тормозить очередь обработчиков это не будет если сделать задержку внутри через Delay ?

Смотря как код написать.
Есть поменять на:

IObservable<Y> getY(X x)
{
  if (x.Value == X.ABC)
    return Observable.Return(new Y(1));
  else
  if (x.SomethingElse == true)
  {
     return Observable.Delay(Observable.Return(x), timeout).Select(xx => xx.Value == X.ABC ? new Y(1) : new Y(2));
  }
}

Либо, можно Delay и не использовать:
IObservable<Y> getY(X x)
{
  return Observable.Create<Y>(async (obs, token) =>
  {
    if (x.Value == X.ABC) {
        obs.OnNext(new Y(1));
    }
    else
    if (x.SomethingElse == true)
    {
        await Task.Delay(timeout, token);
        if (x.Value == X.ABC) {
           obs.OnNext(new Y(1));
        }
        else {
           obs.OnNext(new Y(2));
        }
     }
   }


Это все источник событий тормозить не будет и все сильно будет зависить от того, как обрабатывать результат. можно написать someObservable.Select(x => getY(x)).Concat(), можно someObservable.Select(x => getY(x)).Merge() — с разными последствиями на результат. Вам какие именно нужны?

Что-бы очередь тормозить надо воткнуть ожидание семафора кто-то ближе к входу в getY() либо не втыкать если, ее образование не страшно...
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Re[8]: [Rx] Задержать событие при условии
От: _NN_ www.nemerleweb.com
Дата: 04.12.16 09:20
Оценка:
Здравствуйте, TK, Вы писали:

TK>Здравствуйте, _NN_, Вы писали:


TK>>>Зачем там поток? Должно хватить и Observable.Defer с семафором.

_NN>>Кстати а тормозить очередь обработчиков это не будет если сделать задержку внутри через Delay ?

TK>Смотря как код написать.

TK>Есть поменять на:
  Скрытый текст
TK>
TK>IObservable<Y> getY(X x)
TK>{
TK>  if (x.Value == X.ABC)
TK>    return Observable.Return(new Y(1));
TK>  else
TK>  if (x.SomethingElse == true)
TK>  {
TK>     return Observable.Delay(Observable.Return(x), timeout).Select(xx => xx.Value == X.ABC ? new Y(1) : new Y(2));
TK>  }
TK>}
TK>

TK>Либо, можно Delay и не использовать:
TK>
TK>IObservable<Y> getY(X x)
TK>{
TK>  return Observable.Create<Y>(async (obs, token) =>
TK>  {
TK>    if (x.Value == X.ABC) {
TK>        obs.OnNext(new Y(1));
TK>    }
TK>    else
TK>    if (x.SomethingElse == true)
TK>    {
TK>        await Task.Delay(timeout, token);
TK>        if (x.Value == X.ABC) {
TK>           obs.OnNext(new Y(1));
TK>        }
TK>        else {
TK>           obs.OnNext(new Y(2));
TK>        }
TK>     }
TK>   }
TK>

Теперь более понятно. Вернуть IObservable<Y> вместо Y и это использовать.

TK>Это все источник событий тормозить не будет и все сильно будет зависить от того, как обрабатывать результат. можно написать someObservable.Select(x => getY(x)).Concat(), можно someObservable.Select(x => getY(x)).Merge() — с разными последствиями на результат. Вам какие именно нужны?

Мне главное, чтобы был порядок.
Т.е. если приходит событие пока я жду пусть подождёт в сторонке и обработается когда придёт его очередь.
При этом хотелось бы дать другим подписчикам работать дальше пока я жду.

Потому как там дальше идёт
someObservable
  .Select(x => getY(x))
  .Where(y => y.IsValid)
  .Scan(new { Current=abc, Prev=null }, (prev, cur) => new { Current=cur, Prev=prev.Current })
  .Select(makeResult)



TK>Что-бы очередь тормозить надо воткнуть ожидание семафора кто-то ближе к входу в getY() либо не втыкать если, ее образование не страшно...
http://rsdn.nemerleweb.com
http://nemerleweb.com
Re[9]: [Rx] Задержать событие при условии
От: TK Лес кывт.рф
Дата: 04.12.16 12:02
Оценка:
Здравствуйте, _NN_, Вы писали:


TK>>Это все источник событий тормозить не будет и все сильно будет зависить от того, как обрабатывать результат. можно написать someObservable.Select(x => getY(x)).Concat(), можно someObservable.Select(x => getY(x)).Merge() — с разными последствиями на результат. Вам какие именно нужны?

_NN>Мне главное, чтобы был порядок.
_NN>Т.е. если приходит событие пока я жду пусть подождёт в сторонке и обработается когда придёт его очередь.
_NN>При этом хотелось бы дать другим подписчикам работать дальше пока я жду.

Не понятно...

1. Порядок даст Concat() но, пока идет ожидание будет копиться очередь из не обработанных событий (ну и с TimeOut будут заморочки). Merge() такую очередь копить не будет но, порядок следования событий будет "сбит".
2. Подписчик это кто? Он ваш или это подписчик на someObservable? Если someObservable то, они блокироваться не будут. Если ваш то см. пункт 1

someObservable
  .Select(x => getY(x))
  .Merge()|Concat()|MergeAndSort()
  .Where(y => y.IsValid)
  .Scan(new { Current=abc, Prev=null }, (prev, cur) => new { Current=cur, Prev=prev.Current })
  .Select(makeResult)
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Отредактировано 04.12.2016 12:05 TK . Предыдущая версия .
Re[10]: [Rx] Задержать событие при условии
От: _NN_ www.nemerleweb.com
Дата: 04.12.16 12:12
Оценка:
Здравствуйте, TK, Вы писали:

TK>1. Порядок даст Concat() но, пока идет ожидание будет копиться очередь из не обработанных событий (ну и с TimeOut будут заморочки). Merge() такую очередь копить не будет но, порядок следования событий будет "сбит".

Тут то что надо.

Хочется ещё сделать небольшую оптимизацию, чтобы не ждать большой промежуток а проверять короткими промежутками и выйти по истечению времени если ничего не получилось.

Скажем такой пример:
using System;
using System.Reactive.Linq;

namespace RxTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var r = Observable.Range(0, 10);

            r.Select(x => x)
                .Select(x => getY(x))
                .Concat()
                .Subscribe(x => Console.WriteLine("a: " + x));
            

            Console.ReadKey();
        }

        private static IObservable<int> getY(int i, int tries = 0)
        {
            Console.WriteLine("In getY: i = {0}, tries = {1}", i, tries);

            if (i > 5)
            {
                int y;
                if (tryGetReal(i, out y))
                {
                    // No delay
                    return Observable.Return(y);
                }
                else
                {
                    if (tries < 3)
                    {
                        // Let's Try again
                        return Observable.Defer(() => getY(i, tries + 1)).Delay(TimeSpan.FromMilliseconds(100));
                    }
                    else
                    {
                        // No more tries, stop !
                        return Observable.Empty<int>();
                    }
                }
            }
            else
            {
                return Observable.Empty<int>();
            }
        }

        private static bool tryGetReal(int i, out int y)
        {
            if (i > 7)
            {
                y = i;
                return false;
            }
            else
            {
                y = i;
                return true;
            }
        }
    }
}


Насколько это правильно ?
http://rsdn.nemerleweb.com
http://nemerleweb.com
Отредактировано 04.12.2016 12:16 _NN_ . Предыдущая версия .
Re[11]: [Rx] Задержать событие при условии
От: TK Лес кывт.рф
Дата: 04.12.16 13:06
Оценка: 10 (1)
Здравствуйте, _NN_, Вы писали:

_NN>Хочется ещё сделать небольшую оптимизацию, чтобы не ждать большой промежуток а проверять короткими промежутками и выйти по истечению времени если ничего не получилось.


_NN>Скажем такой пример:


Defer, рекурсивные вызовы и out параметры? скорей всего где-то есть засада. Если нужно просто n коротких повторов то:
var query = from counter in Observable.Timer(TimeSpan.FromMsec(0), TimeSpan.FromMsec(100)).Take(repeatCount)
  let result = tryGetReal(x)
  where result.Success
  select result.Value;
return query.Take(1);


_NN>Насколько это правильно ?


Слишком оно у вас замороченно... Возможно, что для вашей задачи проще всего будет убрать вообще IObservable:
async Task RunProcessing<T>(IObservable<T> source)
{
     foreach (var sample in source.AsEnumerable())
     {
        var output = await ProcessItem(sample);
        Console.WriteLine("a: " + output);
     }
}


Рабоче-крестьянские foreach не только проще понять/отлаживать, но и работать скорей всего будет не в пример быстрее.
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Re[12]: [Rx] Задержать событие при условии
От: _NN_ www.nemerleweb.com
Дата: 04.12.16 13:20
Оценка:
Здравствуйте, TK, Вы писали:

TK>Здравствуйте, _NN_, Вы писали:


_NN>>Хочется ещё сделать небольшую оптимизацию, чтобы не ждать большой промежуток а проверять короткими промежутками и выйти по истечению времени если ничего не получилось.


_NN>>Скажем такой пример:


TK>Defer, рекурсивные вызовы и out параметры? скорей всего где-то есть засада. Если нужно просто n коротких повторов то:

Возможно и есть но работает
TK>
TK>var query = from counter in Observable.Timer(TimeSpan.FromMsec(0), TimeSpan.FromMsec(100)).Take(repeatCount)
TK>  let result = tryGetReal(x)
TK>  where result.Success
TK>  select result.Value;
TK>return query.Take(1);
TK>

Такого варианта с таймером я не предвидел.

_NN>>Насколько это правильно ?


TK>Слишком оно у вас замороченно... Возможно, что для вашей задачи проще всего будет убрать вообще IObservable:

TK>
TK>async Task RunProcessing<T>(IObservable<T> source)
TK>{
TK>     foreach (var sample in source.AsEnumerable())
TK>     {
TK>        var output = await ProcessItem(sample);
TK>        Console.WriteLine("a: " + output);
TK>     }
TK>}
TK>


TK>Рабоче-крестьянские foreach не только проще понять/отлаживать, но и работать скорей всего будет не в пример быстрее.

Эх.. у нас .NET 3.5.
http://rsdn.nemerleweb.com
http://nemerleweb.com
Re[13]: [Rx] Задержать событие при условии
От: TK Лес кывт.рф
Дата: 04.12.16 14:06
Оценка:
Здравствуйте, _NN_, Вы писали:

TK>>Рабоче-крестьянские foreach не только проще понять/отлаживать, но и работать скорей всего будет не в пример быстрее.

_NN>Эх.. у нас .NET 3.5.

https://www.nuget.org/packages/TaskParallelLibrary/ это не работает?
В любом случае, если число ваших обработчиков счетно — сделайте выделенный тред. у AsEnumerable своя очередь внутри и исходный поток не блокируется и при использовании Thread.Sleep
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.