someObservable.Select(x => getY(x));
Y getY(X x)
{
if (x.Value == X.ABC)
return new Y(1);
else
return new Y(2);
}
В некоторых случаях проверку 'x.Value == X.ABC' нужно совершить повторно через некоторое время так как Value может поменяться.
Т.е. что-то вроде:
Y getY(X x)
{
if (x.Value == X.ABC)
return new Y(1);
else
if (x.SomethingElse == true)
{
Thread.Sleep(timeout);
if (x.Value == X.ABC)
return new Y(1);
else
return new Y(2);
}
}
_NN>В некоторых случаях проверку 'x.Value == X.ABC' нужно совершить повторно через некоторое время так как Value может поменяться.
Я тот ещё сварщик, по памяти — выставить getY как второй Observable + join. Если не подходит, то лучше спросить на stackoverflow, тут емнип только ув. ganjustas по Rx регулярно отвечает. Если кого забыл — звиняйте
Здравствуйте, Sinix, Вы писали:
S>Здравствуйте, _NN_, Вы писали:
_NN>>В некоторых случаях проверку 'x.Value == X.ABC' нужно совершить повторно через некоторое время так как Value может поменяться.
S>Я тот ещё сварщик, по памяти — выставить getY как второй Observable + join. Если не подходит, то лучше спросить на stackoverflow, тут емнип только ув. ganjustas по Rx регулярно отвечает. Если кого забыл — звиняйте
А порядок будет тот же или надо еще скинуть в окно и отсортировать на всякий случай ?
Здравствуйте, _NN_, Вы писали:
_NN>А порядок будет тот же или надо еще скинуть в окно и отсортировать на всякий случай ?
Вроде бы тот же, но могу врать.
Здравствуйте, Sinix, Вы писали:
S>Здравствуйте, _NN_, Вы писали:
_NN>>А порядок будет тот же или надо еще скинуть в окно и отсортировать на всякий случай ? S>Вроде бы тот же, но могу врать.
Здравствуйте, TK, Вы писали:
TK>Здравствуйте, _NN_, Вы писали:
_NN>>Есть примерно такой код: _NN>>Как правильно избавиться от Thread.Sleep ?
TK>Перепишите все без Observable
Да думаю придётся скорее всего, чтобы работало, а потом думать как красиво написать.
TK>А так, надо заменить Thread.Sleep на Task.Delay и вместо Select отправить поток данных в TransformBlock с минимальным буфером.
Я в TPL Dataflow совсем не силён.
Можно хотя бы схематично описать идею , а там гляди и в Rx решить удастся?
Здравствуйте, _NN_, Вы писали:
TK>>А так, надо заменить Thread.Sleep на Task.Delay и вместо Select отправить поток данных в TransformBlock с минимальным буфером. _NN>Я в TPL Dataflow совсем не силён. _NN>Можно хотя бы схематично описать идею , а там гляди и в Rx решить удастся?
А что там описывать? TransformBlock принимает на вход метод типа Func<A, Task<B>> (ваш getY)
дальше примерно следующее:
Здравствуйте, TK, Вы писали:
TK>Здравствуйте, _NN_, Вы писали:
TK>>>А так, надо заменить Thread.Sleep на Task.Delay и вместо Select отправить поток данных в TransformBlock с минимальным буфером. _NN>>Я в TPL Dataflow совсем не силён. _NN>>Можно хотя бы схематично описать идею , а там гляди и в Rx решить удастся?
TK>А что там описывать? TransformBlock принимает на вход метод типа Func<A, Task<B>> (ваш getY) TK>дальше примерно следующее:
TK>return Observable.Using(() => someObservable.Subscribe(block.AsObserver()), r => block.AsObservable().Finally(() => r.Dispose()).Finally(() => block.Complete());
Мне бы ещё на .NET 3.5
Думаю сделаю сначала по старинке с очередью и потоком, а когда будет у меня рабочий код, пораскинем мозгами как улучшить.
Здравствуйте, _NN_, Вы писали:
_NN>Мне бы ещё на .NET 3.5 _NN>Думаю сделаю сначала по старинке с очередью и потоком, а когда будет у меня рабочий код, пораскинем мозгами как улучшить.
Зачем там поток? Должно хватить и Observable.Defer с семафором.
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Здравствуйте, TK, Вы писали:
TK>Здравствуйте, _NN_, Вы писали:
_NN>>Мне бы ещё на .NET 3.5 _NN>>Думаю сделаю сначала по старинке с очередью и потоком, а когда будет у меня рабочий код, пораскинем мозгами как улучшить.
TK>Зачем там поток? Должно хватить и Observable.Defer с семафором.
Да как-то ни разу не было возможности использовать Defer так я как-то и не думал о нём.
Можно попробовать.
Здравствуйте, TK, Вы писали:
TK>Здравствуйте, _NN_, Вы писали:
_NN>>Мне бы ещё на .NET 3.5 _NN>>Думаю сделаю сначала по старинке с очередью и потоком, а когда будет у меня рабочий код, пораскинем мозгами как улучшить.
TK>Зачем там поток? Должно хватить и Observable.Defer с семафором.
Кстати а тормозить очередь обработчиков это не будет если сделать задержку внутри через Delay ?
Здравствуйте, _NN_, Вы писали:
TK>>Зачем там поток? Должно хватить и Observable.Defer с семафором. _NN>Кстати а тормозить очередь обработчиков это не будет если сделать задержку внутри через Delay ?
Смотря как код написать.
Есть поменять на:
IObservable<Y> getY(X x)
{
if (x.Value == X.ABC)
return Observable.Return(new Y(1));
else
if (x.SomethingElse == true)
{
return Observable.Delay(Observable.Return(x), timeout).Select(xx => xx.Value == X.ABC ? new Y(1) : new Y(2));
}
}
Это все источник событий тормозить не будет и все сильно будет зависить от того, как обрабатывать результат. можно написать someObservable.Select(x => getY(x)).Concat(), можно someObservable.Select(x => getY(x)).Merge() — с разными последствиями на результат. Вам какие именно нужны?
Что-бы очередь тормозить надо воткнуть ожидание семафора кто-то ближе к входу в getY() либо не втыкать если, ее образование не страшно...
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Здравствуйте, TK, Вы писали: TK>Здравствуйте, _NN_, Вы писали: TK>>>Зачем там поток? Должно хватить и Observable.Defer с семафором. _NN>>Кстати а тормозить очередь обработчиков это не будет если сделать задержку внутри через Delay ? TK>Смотря как код написать. TK>Есть поменять на:
Скрытый текст
TK>
TK>IObservable<Y> getY(X x)
TK>{
TK> if (x.Value == X.ABC)
TK> return Observable.Return(new Y(1));
TK> else
TK> if (x.SomethingElse == true)
TK> {
TK> return Observable.Delay(Observable.Return(x), timeout).Select(xx => xx.Value == X.ABC ? new Y(1) : new Y(2));
TK> }
TK>}
TK>
Теперь более понятно. Вернуть IObservable<Y> вместо Y и это использовать. TK>Это все источник событий тормозить не будет и все сильно будет зависить от того, как обрабатывать результат. можно написать someObservable.Select(x => getY(x)).Concat(), можно someObservable.Select(x => getY(x)).Merge() — с разными последствиями на результат. Вам какие именно нужны?
Мне главное, чтобы был порядок.
Т.е. если приходит событие пока я жду пусть подождёт в сторонке и обработается когда придёт его очередь.
При этом хотелось бы дать другим подписчикам работать дальше пока я жду.
TK>>Это все источник событий тормозить не будет и все сильно будет зависить от того, как обрабатывать результат. можно написать someObservable.Select(x => getY(x)).Concat(), можно someObservable.Select(x => getY(x)).Merge() — с разными последствиями на результат. Вам какие именно нужны? _NN>Мне главное, чтобы был порядок. _NN>Т.е. если приходит событие пока я жду пусть подождёт в сторонке и обработается когда придёт его очередь. _NN>При этом хотелось бы дать другим подписчикам работать дальше пока я жду.
Не понятно...
1. Порядок даст Concat() но, пока идет ожидание будет копиться очередь из не обработанных событий (ну и с TimeOut будут заморочки). Merge() такую очередь копить не будет но, порядок следования событий будет "сбит".
2. Подписчик это кто? Он ваш или это подписчик на someObservable? Если someObservable то, они блокироваться не будут. Если ваш то см. пункт 1
Здравствуйте, TK, Вы писали:
TK>1. Порядок даст Concat() но, пока идет ожидание будет копиться очередь из не обработанных событий (ну и с TimeOut будут заморочки). Merge() такую очередь копить не будет но, порядок следования событий будет "сбит".
Тут то что надо.
Хочется ещё сделать небольшую оптимизацию, чтобы не ждать большой промежуток а проверять короткими промежутками и выйти по истечению времени если ничего не получилось.
Скажем такой пример:
using System;
using System.Reactive.Linq;
namespace RxTest
{
class Program
{
static void Main(string[] args)
{
var r = Observable.Range(0, 10);
r.Select(x => x)
.Select(x => getY(x))
.Concat()
.Subscribe(x => Console.WriteLine("a: " + x));
Console.ReadKey();
}
private static IObservable<int> getY(int i, int tries = 0)
{
Console.WriteLine("In getY: i = {0}, tries = {1}", i, tries);
if (i > 5)
{
int y;
if (tryGetReal(i, out y))
{
// No delayreturn Observable.Return(y);
}
else
{
if (tries < 3)
{
// Let's Try againreturn Observable.Defer(() => getY(i, tries + 1)).Delay(TimeSpan.FromMilliseconds(100));
}
else
{
// No more tries, stop !return Observable.Empty<int>();
}
}
}
else
{
return Observable.Empty<int>();
}
}
private static bool tryGetReal(int i, out int y)
{
if (i > 7)
{
y = i;
return false;
}
else
{
y = i;
return true;
}
}
}
}
Здравствуйте, _NN_, Вы писали:
_NN>Хочется ещё сделать небольшую оптимизацию, чтобы не ждать большой промежуток а проверять короткими промежутками и выйти по истечению времени если ничего не получилось.
_NN>Скажем такой пример:
Defer, рекурсивные вызовы и out параметры? скорей всего где-то есть засада. Если нужно просто n коротких повторов то:
var query = from counter in Observable.Timer(TimeSpan.FromMsec(0), TimeSpan.FromMsec(100)).Take(repeatCount)
let result = tryGetReal(x)
where result.Success
select result.Value;
return query.Take(1);
_NN>Насколько это правильно ?
Слишком оно у вас замороченно... Возможно, что для вашей задачи проще всего будет убрать вообще IObservable:
Здравствуйте, TK, Вы писали:
TK>Здравствуйте, _NN_, Вы писали:
_NN>>Хочется ещё сделать небольшую оптимизацию, чтобы не ждать большой промежуток а проверять короткими промежутками и выйти по истечению времени если ничего не получилось.
_NN>>Скажем такой пример:
TK>Defer, рекурсивные вызовы и out параметры? скорей всего где-то есть засада. Если нужно просто n коротких повторов то:
Возможно и есть но работает TK>
TK>var query = from counter in Observable.Timer(TimeSpan.FromMsec(0), TimeSpan.FromMsec(100)).Take(repeatCount)
TK> let result = tryGetReal(x)
TK> where result.Success
TK> select result.Value;
TK>return query.Take(1);
TK>
Такого варианта с таймером я не предвидел.
_NN>>Насколько это правильно ?
TK>Слишком оно у вас замороченно... Возможно, что для вашей задачи проще всего будет убрать вообще IObservable: TK>
Здравствуйте, _NN_, Вы писали:
TK>>Рабоче-крестьянские foreach не только проще понять/отлаживать, но и работать скорей всего будет не в пример быстрее. _NN>Эх.. у нас .NET 3.5.
https://www.nuget.org/packages/TaskParallelLibrary/ это не работает?
В любом случае, если число ваших обработчиков счетно — сделайте выделенный тред. у AsEnumerable своя очередь внутри и исходный поток не блокируется и при использовании Thread.Sleep
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.