В C# есть BlockingCollection<T>
в которую можно асинхронно постить (Post) данные и последовательно вычитывать, но нет возможности запрашивать данные (PostAndReply)
по аналогии с F#:
F# MailBox
type msg =
| Incr of int
| Fetch of AsyncReplyChannel<int>
let counter =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { let! msg = inbox.Receive()
match msg with
| Incr(x) -> return! loop(n + x)
| Fetch(replyChannel) ->
replyChannel.Reply(n)
return! loop(n) }
loop 0)
The msg union wraps two types of messages: we can tell the MailboxProcessor to increment, or have it send its contents to a reply channel. The type AsyncReplyChannel<'a> exposes a single method, member Reply : 'reply -> unit. We can use this class in fsi as follows:
> counter.Post(Incr 7);;
val it : unit = ()
> counter.Post(Incr 50);;
val it : unit = ()
> counter.PostAndReply(fun replyChannel -> Fetch replyChannel);;
val it : int = 57
Здравствуйте, Qbit86, Вы писали:
Q>Возможно, для этих целей лучше подходит Channel<T>
Спасибо. но это немножко не то.
Идея сделать асинхронный последовательный доступ к общему хранилищу.
а эти классы удаляют полученное значение из коллекции при чтении.
Может быть конечно, достаточно ConcurrentBag<T>, но я не уверен, есть ли гарантия что например, я читаю объект, а его заменили на новый или удалили.
Вообще почтовый ящик отличная абстракция позволяет закодить взаимодействие акторов без блокировок и синхронизации, т.к. все обращения обрабатываются в порядке очереди.
по идее BufferedBlock.Post можно отправить коллбэк который вернет нужные данные. Но все равно странно что нет простой реализации в базовой библиотеке.
в той же кложе аж несколько штук и агент и атом и еще забыл название.
Здравствуйте, varenikAA, Вы писали:
AA>В C# есть BlockingCollection<T> AA>в которую можно асинхронно постить (Post) данные
Пока остановился на акке, хотя хотелось бы поменьше зависимостей.
А так почти идентично фаршу получилось:
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
namespace MailBox
{
public class GetList { }
public class Add
{
public Add(long item)
{
Item = item;
}
public long Item { get; }
}
public class Storage : ReceiveActor
{
private readonly ConcurrentBag<long> items = new ConcurrentBag<long>();
public Storage()
{
Task.Run(async () =>
{
while (true)
{
await Task.Delay(TimeSpan.FromSeconds(5));
while (items.Count > 100)
{
if (items.TryTake(out var x))
{
Console.WriteLine(x + " удален");
}
}
}
});
Receive<Add>(add => items.Add(add.Item));
Receive<GetList>(a => Sender.Tell(items.ToArray()));
}
}
static class Program
{
static void Main()
{
var random = new Random();
var system = ActorSystem.Create("MySystem");
var greeter = system.ActorOf<Storage>("greeter");
Task.Run(async () =>
{
while (true)
{
greeter.Tell(new Add(random.Next(1, 100)));
await Task.Delay(random.Next(10, 20));
}
});
Task.Run(async () =>
{
while (true)
{
var items = (long[])(await greeter.Ask(new GetList()));
Console.WriteLine($"Sum = {items.Sum()}");
await Task.Delay(random.Next(10, 20));
}
});
Console.WriteLine("done");
Console.ReadLine();
}
}
}
☭ ✊ В мире нет ничего, кроме движущейся материи.
Re: Возможно ли реализовать MailboxProcessor на C#?
да похоже на то. хотя тут пишут что это антипаттерн.
я просто хочу в публичный мини чат добавить рассылку новом клиентам последних 10-50 сообщений.
и вот думаю что эффективней акторы или обычная конкурентная коллекция?
☭ ✊ В мире нет ничего, кроме движущейся материи.
Re: Возможно ли реализовать MailboxProcessor на C#?
Здравствуйте, varenikAA, Вы писали:
AA>В C# есть BlockingCollection<T> AA>в которую можно асинхронно постить (Post) данные и последовательно вычитывать, но нет возможности запрашивать данные (PostAndReply)
BlockingCollection — это подходящая вещь для организации работы когда один поток очередь разгребает, а другие набрасывают.
Для ответов, в такой модели, заводятся другие очереди и уже в них шлется ответ. При этом не нужна синхронизация между потоками и гарантированно отсутствие дедлоков.
Есть логика намерений и логика обстоятельств, последняя всегда сильнее.
Re[2]: Возможно ли реализовать MailboxProcessor на C#?
Здравствуйте, VladD2, Вы писали:
AA>>в которую можно асинхронно постить (Post) данные и последовательно вычитывать, но нет возможности запрашивать данные (PostAndReply)
VD>BlockingCollection — это подходящая вещь для организации работы когда один поток очередь разгребает, а другие набрасывают.
Попробовал так(правда запрос и ответ пришлось разделить, не получилось в одном PostAndReply)
BlockingCollection не сильно отличается от BufferBlock, но все равно нужно следить за данными(например список сообщений все равно нужно копировать иначе гарантированно падаем.
☭ ✊ В мире нет ничего, кроме движущейся материи.
Re[3]: Возможно ли реализовать MailboxProcessor на C#?
Здравствуйте, varenikAA, Вы писали:
AA>BlockingCollection не сильно отличается от BufferBlock, но все равно нужно следить за данными(например список сообщений все равно нужно копировать иначе гарантированно падаем.
Здравствуйте, VladD2, Вы писали:
VD>BlockingCollection — это подходящая вещь для организации работы когда один поток очередь разгребает, а другие набрасывают.
Она в современных реалиях вполне хороша и в обратной ситуации: один продюсер и много потоков-разгребателей, при чем, при желании, даже через async/await — внутри реализация знаком с этим случаем тоже.
Re[3]: Возможно ли реализовать MailboxProcessor на C#?
Здравствуйте, Mystic Artifact, Вы писали:
MA> Она в современных реалиях вполне хороша и в обратной ситуации: один продюсер и много потоков-разгребателей, при чем, при желании, даже через async/await — внутри реализация знаком с этим случаем тоже.
А что в этом подходе хорошего то? Много потов лезут к общему ресурсу толкаясь и создавая проблемы. Надо дать каждому такому потоку по BlockingCollection и пусть разгребают каждый свою очередь. А продюсер или менеджер будет им эти задачи накидывать.
Есть логика намерений и логика обстоятельств, последняя всегда сильнее.
Re[4]: Возможно ли реализовать MailboxProcessor на C#?
Здравствуйте, VladD2, Вы писали:
VD>А что в этом подходе хорошего то?
Простота. Оно создано работать одинаково хорошо (или одинаково плохо) во всех этих режимах.
VD>Много потов лезут к общему ресурсу толкаясь и создавая проблемы. Надо дать каждому такому потоку по BlockingCollection и пусть разгребают каждый свою очередь. А продюсер или менеджер будет им эти задачи накидывать.
В зависимости от натурального характера задач (например, 100-кратная разница в размере/времени обработки разных задач) менеджеру надо будет озаботиться тем, что бы все потоки были заняты работой. В случае выше рабочие потоки "толкаясь" в одной коллекции — получают это бесплатно. Понятно, что это всё решаемо.
Второе, если просто раскидать BlockingCollection каждому потоку, то желательно не забыть вместо дефолтной коллекции (ConcurrentQueue, которая MPMC (multiple producer multiple consumer)) — подсунуть какую-то более простую / подходящую (SPSC).
В общем в любом случае получится сложнее. Если это действительно обоснованно/необходимо — то конечно так и надо делать. Думаю каждой задаче своё решение, тем более нюансов в каждом конкретном случае своя масса. Кому-то надо за каждый лишний лок бороться, а кому-то это не будет видно даже под микроскоп.
А, вообще, я с тобой согласен. Я просто обратил внимание, что BlockingCollection работает в любых режимах, и для меня — это его основное достоинство.
Здравствуйте, varenikAA, Вы писали:
Q>>Возможно, для этих целей лучше подходит Channel<T> AA>Спасибо. но это немножко не то.
Как раз то.
Каждая заинтересованная в получении писем сущность начинает работу с асинхронного запроса письма из почтового ящика.
Происходящее с т.з. активности — блокирующий вызов, т.е. подписка на событие готовности.
С т.з. "физического потока" (т.е. потока уровня ОС) — асинхронный вызов.
AA>Идея сделать асинхронный последовательный доступ к общему хранилищу.
Строго говоря, последовательный доступ всегда синхронный, ну да ладно, терминология современного донета малость загоняет, разводит путаницу на ровном месте.
AA>Вообще почтовый ящик отличная абстракция позволяет закодить взаимодействие акторов без блокировок и синхронизации, т.к. все обращения обрабатываются в порядке очереди.
Блокировка/синхронизация никуда не делись.
Просто раньше очередь ожидающих активностей (процессов, потоков) была на уровне механизмов ОС, а через async/await эта очередь живёт на юзверском уровне, но по-сути она в точности такая же.
Я имел не множко другое, типа маленькой БД в памяти, но без lock(). т.е. чтобы отправить в БД данные и потом когда надо сделать запрос.
А эти примитивы все основаны на удалении прочтенного сообщения. В C# полный аналог или совсем не возможен я про PostAndReply либо очень сложен в реализации.
Немножко похожее можно сделать в виде акторов, когда в актор-запрос помещается актор из которого будет извлечен ответ.
AA>>Вообще почтовый ящик отличная абстракция позволяет закодить взаимодействие акторов без блокировок и синхронизации, т.к. все обращения обрабатываются в порядке очереди.
V>Блокировка/синхронизация никуда не делись.
А как же C[ompare]A[nd]S[wap]? В этом приеме блокировка разве есть?
Здравствуйте, varenikAA, Вы писали:
V>>Как раз то. AA>Я имел не множко другое, типа маленькой БД в памяти, но без lock(). т.е. чтобы отправить в БД данные и потом когда надо сделать запрос.
А какая разница с т.з. логики кода, как именно выполняется сериализация доступа — через блокировку потоков уровня ОС, или через блокировку кооперативных потоков асинхронной системы .Net?
AA>А эти примитивы все основаны на удалении прочтенного сообщения. В C# полный аналог или совсем не возможен я про PostAndReply либо очень сложен в реализации.
Во-первых, тип MailBoxProcessor можно использовать из C# (либо напрямую, либо сделав соотв. фасад).
Во-вторых, а чего там сложного? В крайнем случае можно взять исходники класса и портировать на C#, чтобы не тянуть ядерную DLL языка F# в зависимости.
V>>Блокировка/синхронизация никуда не делись. AA>А как же C[ompare]A[nd]S[wap]? В этом приеме блокировка разве есть?
В этом месте блокируется неудачливый поток (заходит на новую попытку), но система в целом продвигается, поэтому lock-free.
Но асинхронщина дотнета вовсе не lock-free с т.з. асинхронных потоков, она асинхронна только с т.з. потоков уровня ОС.
А с т.з. исполняемой логики в асинхронных методах — происходящее очень даже синхронно. ))
Тут единственное что происходит — вытесняющая многозадачность уровня ядра ОС подменяется кооперативной многозадачностью юзверского уровня.