Здравствуйте, sergunok, Вы писали:
S>Код обрабатывает ситуации типа "если минимум проапдейтился, но сейчас есть активная заявка, значит снять ее и по окончании выставить по новой цене" S>Буду признателен, если подскажите как нечто подобное реализуется с помощью Rx и (или) в псевдосинхронном варианте с помощью yield.
Здравствуйте, Undying, Вы писали:
U>Здравствуйте, sergunok, Вы писали:
S>>Код обрабатывает ситуации типа "если минимум проапдейтился, но сейчас есть активная заявка, значит снять ее и по окончании выставить по новой цене" S>>Буду признателен, если подскажите как нечто подобное реализуется с помощью Rx и (или) в псевдосинхронном варианте с помощью yield.
U>На TaskPulling'е
Чето я не понял где здесь асинхронность? Как использовать такой метод?
U>Требуемую бизнес-логику я не совсем понял, но вроде в такой алгоритм можно встроить все что душе угодно.
На самом деле такой алгоритм в линейной форме не выражается. Там два потока входящих событий и один поток исходящих, кореллирующих между собой.
Re[8]: Good practice по алгоритмизации в условиях асинхронно
Здравствуйте, gandjustas, Вы писали:
U>>Требуемую бизнес-логику я не совсем понял, но вроде в такой алгоритм можно встроить все что душе угодно. G>На самом деле такой алгоритм в линейной форме не выражается. Там два потока входящих событий и один поток исходящих, кореллирующих между собой.
В чем смысл мыслить в терминах событий? Естественно на событиях такие задачи решаются очень сложно. А просто они решаются в терминах синхронизации того состояния которое есть с тем, которое должно быть.
Re[6]: Good practice по алгоритмизации в условиях асинхронно
Здравствуйте, sergunok, Вы писали:
S>Algorithm реализует стратегию покупки в момент изменения минимума цены. S>Код обрабатывает ситуации типа "если минимум проапдейтился, но сейчас есть активная заявка, значит снять ее и по окончании выставить по новой цене"
Повторил насколько хватило моих знаний Rx, а их, как выяснилось, крайне мало.
public class Algorithm : IDisposable
{
// Состояние алгоритмаprivate enum StateEn
{
// ничего не делаем, ожидаем события
Idle,
// покупка
Buying,
// снятие заявки на покупку
CancellingBuy
}
private class State
{
public State(Order order, StateEn state, double price)
{
Order = order;
StateEn = state;
Price = price;
}
public Order Order { get; private set; }
public StateEn StateEn { get; private set; }
public double Price { get; private set; }
}
private ITrade trade;
// Лимит - объем выставляемых заявок.private int limit;
IDisposable cookie;
public Algorithm(ITrade trade, int limit)
{
this.trade = trade;
this.limit = limit;
//Последовательность состояний
IConnectableObservable<State> states = null;
//Создаем ленивость вычисленийvar deferedStates = Observable.Defer(() => states);
//Используем Subject чтобы сделать из событий IObservablevar orderChanged = new Subject<Order>();
var ticks = new Subject<double>();
this.trade.OrderChanged += o => orderChanged.OnNext(o);
this.trade.NewTick += v => ticks.OnNext(v);
//Последовательность минимальных ценvar minPrices = ticks.Scan(double.MaxValue, (min, v) => min > v ? v : min).DistinctUntilChanged();
//Разделяем потоки событий чтобы легче было писатьvar orderMatched = orderChanged.Where(o => o.Status == OrderStatusEn.Matched);
var orderCancelled = orderChanged.Where(o => o.Status == OrderStatusEn.Cancelled);
var idleState = deferedStates.Where(s => s.StateEn == StateEn.Idle);
var buyState = deferedStates.Where(s => s.StateEn == StateEn.Buying);
var cancellingState = deferedStates.Where(s => s.StateEn == StateEn.CancellingBuy);
//Объединяем несколько результирующих потоков в один
states =
Observable.Join(
//Если новая минимальная цена и ожидаем события, то покупаем
minPrices.And(idleState).Then((p, _) => Buy(p)),
//Если новая минимальная цена и покупаем, то отменям
minPrices.And(buyState)
.Then((p, s) =>
{
this.trade.CancelOrder(s.Order);
return new State(null, StateEn.CancellingBuy, p);
}),
//Если статус заявки изменился на matched и покупаем, то переходим в ожидание
orderMatched.And(buyState).Then((o, s) => new State(null, StateEn.Idle, s.Price)),
//Если статус заявки изменился и отменяем, то покупаем по новой цене
orderChanged.And(cancellingState).Then((_, s) => Buy(s.Price))
).Publish(new State(null, StateEn.Idle, 0));//Создается асинхронность
cookie = states.Subscribe(s =>
{
//Получили забесплатно логирование в одном месте#if DEBUG
Console.WriteLine("{0} {1} {2}", DateTime.Now, s.StateEn, s.Price);
#endif
});
states.Connect();
}
private State Buy(double price)
{
var order = new Order() { Price = price, Volume = this.limit };
this.trade.RegisterOrder(order);
return new State(order, StateEn.Buying, price);
}
public void Dispose()
{
cookie.Dispose();
}
}
В местах, выделенных жирным можно подсовывать начальные значения в случае перезапуска.
По сути ничем не лучше вариантов с обработчиками событий, кроме того что вся логика записана в одном месте.
Re[9]: Good practice по алгоритмизации в условиях асинхронно
Undying и gandjustas, спасибо за примеры кода!
Перевариваю их сейчас..
По поводу: U>В чем смысл мыслить в терминах событий? Естественно на событиях такие задачи решаются очень сложно. А просто они решаются в терминах синхронизации того состояния которое есть с тем, которое должно быть.
Очередь выходных событий (не очень нравится в данном случае термин "очередь выходных событий", скорее назвал бы очередью действий) ИМХО — это хороший способ решать широкий круг подобных задач. (Хотя для моего примера — это возможно "из пушки по воробьям").
Думаю, что при использовании очереди здорово разрешать действиям из очереди подписываться на входные события.
Для моего примера действиями могут быть:
WaitingAction
BuyAction
CancelingBuyAction
У каждого Action есть свой метод Do.
Подписывание на события:
WaitingAсtion подписывается на событие новый min
BuyAction на удовлетворение заявки и новый min
CancelingBuyAction на удовлетворение заявки или снятие
(В данном примере действия по сути совпали с состояниями)
Обработчикам доступны любые методы по модификации очереди действий и функция старта обработки след."действия" из очереди.
Re[10]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, sergunok, Вы писали:
S>Здравствуйте, Ikemefula, Вы писали:
I>>А какие с этим проблемы если ты автомат уже выделил ?
S>Проблемы буду скоро когда, буду трансформировать более сложный алгоритм в автомат.
S>Согласен с gandjustas, что для перевода синхронного алгоритма а автоматный вид требуются усилия (пускай и автоматизируемые).
S>Пока только не совсем понимаю как все это решается RX'ом или вот даже просто yield, последнее в Питоновском Twisted Framework S>решал сам фреймворк:
S>@inlineCallback S>def login(self, login, pwd): S> success = yield loginRemotely(login, pwd) S> self.data = yield loadData(login)
IObservable<Data> Login(string login, string pwd)
{
return from _ in LoginRemotely(login, pwd) //LoginRemotely возвращает IObservable<ЧТОНИТЬ>from d in LoadData(login) //LoadData возвращает IObservable<Data>select d;
}
Re[10]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, sergunok, Вы писали:
S>Очередь выходных событий (не очень нравится в данном случае термин "очередь выходных событий", скорее назвал бы очередью действий) ИМХО — это хороший способ решать широкий круг подобных задач. (Хотя для моего примера — это возможно "из пушки по воробьям").
Я не совсем понимаю, зачем в таких задачах вообще строить логику на событиях. Как я понимаю задача у тебя примерно такая. Есть некий биржевой товар, у которого время от времени изменяется цена. При изменениях цены и выполнение определенных условий надо либо удалить старую заявку, либо поставить новую заявку, либо вначале удалить старую, а затем поставить новую заявку. Соответственно на каждый биржевой товар заводим Task, пока цена не меняется Task спит, как только поменялась, Task просыпает и проверяет условия — нужно ли удалять существующую заявку (если она есть) и нужно ли ставить новую заявку. Проблема может быть в том, что добавление и удаление заявок являются асинхронным операциями, но эту проблему TaskPulling позволяет легко решить.
Достоинства. Все изменения производятся из одного места и в предсказуемом порядке (к примеру, вначале производится удаление старой заявки, затем добавление новой), поэтому легко гарантировать, что в конкретный момент времени может существовать только одна заявка, не нужно думать о том, что произойдет если вначале придет событие на удаление, тут же событие на добавление и тут же снова на удаление и т.п. Соответственно код получается простым и надежным.
Re[11]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, Undying, Вы писали:
U>Здравствуйте, sergunok, Вы писали:
S>>Очередь выходных событий (не очень нравится в данном случае термин "очередь выходных событий", скорее назвал бы очередью действий) ИМХО — это хороший способ решать широкий круг подобных задач. (Хотя для моего примера — это возможно "из пушки по воробьям").
U>Я не совсем понимаю, зачем в таких задачах вообще строить логику на событиях. Как я понимаю задача у тебя примерно такая. Есть некий биржевой товар, у которого время от времени изменяется цена. При изменениях цены и выполнение определенных условий надо либо удалить старую заявку, либо поставить новую заявку, либо вначале удалить старую, а затем поставить новую заявку. Соответственно на каждый биржевой товар заводим Task, пока цена не меняется Task спит, как только поменялась, Task просыпает и проверяет условия — нужно ли удалять существующую заявку (если она есть) и нужно ли ставить новую заявку. Проблема может быть в том, что добавление и удаление заявок являются асинхронным операциями, но эту проблему TaskPulling позволяет легко решить.
Честно говоря не понимаю терминов, которые ты используешь. Можешь пример рабочего кода привести и рассказать что такое Task и TaskPulling.
Если Task это System.Threading.Tasks.Task, то то что ты описываешь ничуть не лучше Rx и потоков событий (асинхронных коллекций), а даже хуже, так как потребует большого количества водопроводного кода.
Re[12]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, gandjustas, Вы писали:
G>Честно говоря не понимаю терминов, которые ты используешь.
Что именно непонятно? Сам принцип замены логики на событиях на логику синхронизации текущего состояния с требуемым? Или принцип понятен, но непонятно как реализуется синхронизация состояний при наличии асинхронных операций на TaskPulling'е?
G>Можешь пример рабочего кода привести и рассказать что такое Task и TaskPulling.
G>Если Task это System.Threading.Tasks.Task, то то что ты описываешь ничуть не лучше Rx и потоков событий (асинхронных коллекций), а даже хуже, так как потребует большого количества водопроводного кода.
Имеется в виду Task из TaskPulling'а.
Re[13]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, Undying, Вы писали:
U>Здравствуйте, gandjustas, Вы писали:
G>>Честно говоря не понимаю терминов, которые ты используешь.
U>Что именно непонятно? Сам принцип замены логики на событиях на логику синхронизации текущего состояния с требуемым? Или принцип понятен, но непонятно как реализуется синхронизация состояний при наличии асинхронных операций на TaskPulling'е?
Непонятно как реализуется асинхронность в данном случае. Выполнение кода должно как-то прерываться и потом возвращаться назад.
G>>Можешь пример рабочего кода привести и рассказать что такое Task и TaskPulling. U>Описание здесь: http://rsdn.ru/forum/dotnet/3037674.1.aspx
Ну если все опирается на таймер, то так делать не стоит.
G>>Если Task это System.Threading.Tasks.Task, то то что ты описываешь ничуть не лучше Rx и потоков событий (асинхронных коллекций), а даже хуже, так как потребует большого количества водопроводного кода. U>Имеется в виду Task из TaskPulling'а.
Тогда уж лучше взять таски из TPL и ParallelExtensionExtras
Re[14]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, gandjustas, Вы писали:
U>>Что именно непонятно? Сам принцип замены логики на событиях на логику синхронизации текущего состояния с требуемым? Или принцип понятен, но непонятно как реализуется синхронизация состояний при наличии асинхронных операций на TaskPulling'е? G>Непонятно как реализуется асинхронность в данном случае. Выполнение кода должно как-то прерываться и потом возвращаться назад.
Для этого и существуют yield'ы. Например, у нас изменилась цена товара, из-за чего старую заявку нужно удалить, удаление заявки операция асинхронная. Соответственно таска, заметив изменение цены, активируется и начинает удаление заявки, пока заявка не удалена таска ждет (с помощью yield return new WaitStep()). Пока заявка не будет удалена новые изменения цены будут либо игнорироваться, либо складываться в очередь, в зависимости от того, что требуется по условию задачи. Как только заявка удалилась таска продолжает свою работу.
G>>>Можешь пример рабочего кода привести и рассказать что такое Task и TaskPulling. U>>Описание здесь: http://rsdn.ru/forum/dotnet/3037674.1.aspx
G>Ну если все опирается на таймер, то так делать не стоит.
Таймер это решение для бедных, если псевдосинхронная запись решение задачи облегачает, а использовать TaskPulling по каким-то причинам не хочется. Собственно TaskPulling никакие таймеры не использует.
Re[15]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, Undying, Вы писали:
U>Здравствуйте, gandjustas, Вы писали:
U>>>Что именно непонятно? Сам принцип замены логики на событиях на логику синхронизации текущего состояния с требуемым? Или принцип понятен, но непонятно как реализуется синхронизация состояний при наличии асинхронных операций на TaskPulling'е? G>>Непонятно как реализуется асинхронность в данном случае. Выполнение кода должно как-то прерываться и потом возвращаться назад.
U>Для этого и существуют yield'ы. Например, у нас изменилась цена товара, из-за чего старую заявку нужно удалить, удаление заявки операция асинхронная. Соответственно таска, заметив изменение цены, активируется и начинает удаление заявки, пока заявка не удалена таска ждет (с помощью yield return new WaitStep()). Пока заявка не будет удалена новые изменения цены будут либо игнорироваться, либо складываться в очередь, в зависимости от того, что требуется по условию задачи. Как только заявка удалилась таска продолжает свою работу.
Это я в курсе. Меня другое интересует. Вот есть асинхронный вызов, типа BeginRead, на нем я хочу прервать выполнение с помощью yield и вернутся когда вызов завершится. Значит мне надо что-то передать в callback этого самого BeginRead, что вызовет продолжение работы с места прерывания.
G>>>>Можешь пример рабочего кода привести и рассказать что такое Task и TaskPulling. U>>>Описание здесь: http://rsdn.ru/forum/dotnet/3037674.1.aspx
G>>Ну если все опирается на таймер, то так делать не стоит.
U>Таймер это решение для бедных, если псевдосинхронная запись решение задачи облегачает, а использовать TaskPulling по каким-то причинам не хочется. Собственно TaskPulling никакие таймеры не использует.
А что использует?
Кстати с помощью Rx итераторы делаются очень легко.
Пример (чтение заданного количества байт из потока):
public static IObservable<Unit> ReadExact(this Stream stream, byte[] buffer, int offset, int count)
{
return ReadExactInternal(stream, buffer, offset, count).Concat().TakeLast(1).Select(_ => new Unit());
}
public static IEnumerable<IObservable<int>> ReadExactInternal(Stream stream, byte[] buffer, int offset, int count)
{
var read = Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead);
while(count>0)
{
int bytesRead = 0;
yield return read(buffer, offset, count).Do(r => bytesRead = r);
count -= bytesRead;
offset += bytesRead;
}
}
Re[13]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, gandjustas, Вы писали:
G>Это я в курсе. Меня другое интересует. Вот есть асинхронный вызов, типа BeginRead, на нем я хочу прервать выполнение с помощью yield и вернутся когда вызов завершится. Значит мне надо что-то передать в callback этого самого BeginRead, что вызовет продолжение работы с места прерывания.
U>>Таймер это решение для бедных, если псевдосинхронная запись решение задачи облегачает, а использовать TaskPulling по каким-то причинам не хочется. Собственно TaskPulling никакие таймеры не использует. G>А что использует?
TaskPulling ставит таску в очередь указанного потока. Далее поток просыпается берет первую таску из очереди, делает очередной шаг таски, после чего таска сдвигается в конец очереди (реально посложнее немного, но не суть).
Re[14]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, gandjustas, Вы писали:
G>Посмотрел код, там потоки запускаются, не пойдет. G>Примерно в тоже время в MSDN Magazine появилась статья про AsyncEnumerator, который без дополнительных потоков работает.
Написать вариант TaskPulling'а базирующийся на WinForms.Timer и соответственно работающий в том же потоке дело 15 минут, только на большинстве задач смысла в этом никакого, исключение, наверное, некоторые задачи связанные с гуи.
Re[15]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, Undying, Вы писали:
U>Здравствуйте, gandjustas, Вы писали:
G>>Посмотрел код, там потоки запускаются, не пойдет. G>>Примерно в тоже время в MSDN Magazine появилась статья про AsyncEnumerator, который без дополнительных потоков работает.
U>Написать вариант TaskPulling'а базирующийся на WinForms.Timer и соответственно работающий в том же потоке дело 15 минут, только на большинстве задач смысла в этом никакого, исключение, наверное, некоторые задачи связанные с гуи.
Таймер слишком редко срабатывает для таких задач, а плодить потоки — быстро придет белый пушной зверек под нагрузкой.
Нужно полностью асинхронный механизм работы (на коллбеках), в TPL и Rx такой есть, зачем что-то еще использовать?
Re[16]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, gandjustas, Вы писали:
G>Таймер слишком редко срабатывает для таких задач, а плодить потоки — быстро придет белый пушной зверек под нагрузкой.
Зачем плодить потоки? Кто мешает ставить произвольное количество тасок в один и тот же поток?
Re[16]: Good practice по алгоритмизации в условиях асинхронн
Здравствуйте, gandjustas, Вы писали:
G>Нужно полностью асинхронный механизм работы (на коллбеках), в TPL и Rx такой есть, зачем что-то еще использовать?
Здравствуйте, Undying, Вы писали:
U>Здравствуйте, gandjustas, Вы писали:
G>>Таймер слишком редко срабатывает для таких задач, а плодить потоки — быстро придет белый пушной зверек под нагрузкой.
U>Зачем плодить потоки? Кто мешает ставить произвольное количество тасок в один и тот же поток?
Да никто не мешает, только поток висит в ожидании после выполнения шага. При большом количестве задач в очереди такая схема начинает сосать со страшной силой, поэтому надо плодить потоки.