Здравствуйте, TK, Вы писали:
TK>Здравствуйте, _NN_, Вы писали:
TK>>>Зачем там поток? Должно хватить и Observable.Defer с семафором.
_NN>>Кстати а тормозить очередь обработчиков это не будет если сделать задержку внутри через Delay ?
TK>Смотря как код написать.
TK>Есть поменять на:
| | Скрытый текст |
| | TK>TK>IObservable<Y> getY(X x)
TK>{
TK> if (x.Value == X.ABC)
TK> return Observable.Return(new Y(1));
TK> else
TK> if (x.SomethingElse == true)
TK> {
TK> return Observable.Delay(Observable.Return(x), timeout).Select(xx => xx.Value == X.ABC ? new Y(1) : new Y(2));
TK> }
TK>}
TK>
TK>Либо, можно Delay и не использовать:
TK>TK>IObservable<Y> getY(X x)
TK>{
TK> return Observable.Create<Y>(async (obs, token) =>
TK> {
TK> if (x.Value == X.ABC) {
TK> obs.OnNext(new Y(1));
TK> }
TK> else
TK> if (x.SomethingElse == true)
TK> {
TK> await Task.Delay(timeout, token);
TK> if (x.Value == X.ABC) {
TK> obs.OnNext(new Y(1));
TK> }
TK> else {
TK> obs.OnNext(new Y(2));
TK> }
TK> }
TK> }
TK>
|
| | |
Теперь более понятно. Вернуть IObservable<Y> вместо Y и это использовать.
TK>Это все источник событий тормозить не будет и все сильно будет зависить от того, как обрабатывать результат. можно написать someObservable.Select(x => getY(x)).Concat(), можно someObservable.Select(x => getY(x)).Merge() — с разными последствиями на результат. Вам какие именно нужны?
Мне главное, чтобы был порядок.
Т.е. если приходит событие пока я жду пусть подождёт в сторонке и обработается когда придёт его очередь.
При этом хотелось бы дать другим подписчикам работать дальше пока я жду.
Потому как там дальше идёт
someObservable
.Select(x => getY(x))
.Where(y => y.IsValid)
.Scan(new { Current=abc, Prev=null }, (prev, cur) => new { Current=cur, Prev=prev.Current })
.Select(makeResult)
TK>Что-бы очередь тормозить надо воткнуть ожидание семафора кто-то ближе к входу в getY() либо не втыкать если, ее образование не страшно...