Возможно ли реализовать MailboxProcessor на C#?
От: varenikAA  
Дата: 27.02.21 07:26
Оценка:
В C# есть BlockingCollection<T>
в которую можно асинхронно постить (Post) данные и последовательно вычитывать, но нет возможности запрашивать данные (PostAndReply)
по аналогии с F#:

  F# MailBox
type msg =
    | Incr of int
    | Fetch of AsyncReplyChannel<int>

let counter =
    MailboxProcessor.Start(fun inbox ->
        let rec loop n =
            async { let! msg = inbox.Receive()
                    match msg with
                    | Incr(x) -> return! loop(n + x)
                    | Fetch(replyChannel) ->
                        replyChannel.Reply(n)
                        return! loop(n) }
        loop 0)

The msg union wraps two types of messages: we can tell the MailboxProcessor to increment, or have it send its contents to a reply channel. The type AsyncReplyChannel<'a> exposes a single method, member Reply : 'reply -> unit. We can use this class in fsi as follows:

> counter.Post(Incr 7);;
val it : unit = ()
> counter.Post(Incr 50);;
val it : unit = ()
> counter.PostAndReply(fun replyChannel -> Fetch replyChannel);;
val it : int = 57
☭ ✊ В мире нет ничего, кроме движущейся материи.
Re: ChannelReader<T>
От: Qbit86 Кипр
Дата: 27.02.21 09:01
Оценка: 12 (1) +1
Здравствуйте, varenikAA, Вы писали:

AA>В C# есть BlockingCollection<T>


Возможно, для этих целей лучше подходит Channel&lt;T&gt;, из которого можно читать через ChannelReader&lt;T&gt;.
Глаза у меня добрые, но рубашка — смирительная!
Re[2]: ChannelReader<T>
От: varenikAA  
Дата: 27.02.21 10:09
Оценка:
Здравствуйте, Qbit86, Вы писали:

Q>Возможно, для этих целей лучше подходит Channel&lt;T&gt;

Спасибо. но это немножко не то.
Идея сделать асинхронный последовательный доступ к общему хранилищу.
а эти классы удаляют полученное значение из коллекции при чтении.
Может быть конечно, достаточно ConcurrentBag<T>, но я не уверен, есть ли гарантия что например, я читаю объект, а его заменили на новый или удалили.
Вообще почтовый ящик отличная абстракция позволяет закодить взаимодействие акторов без блокировок и синхронизации, т.к. все обращения обрабатываются в порядке очереди.
по идее BufferedBlock.Post можно отправить коллбэк который вернет нужные данные. Но все равно странно что нет простой реализации в базовой библиотеке.
в той же кложе аж несколько штук и агент и атом и еще забыл название.
☭ ✊ В мире нет ничего, кроме движущейся материи.
Re[2]: ChannelReader<T>
От: varenikAA  
Дата: 27.02.21 10:13
Оценка:
Здравствуйте, Qbit86, Вы писали:

https://blog.jayway.com/2013/11/15/an-actor-model-implementation-in-c-using-tpl-dataflow/

вот тут что-то похожее, но это разрыв промежности в сравнении с mailboxprocessor по мне.
☭ ✊ В мире нет ничего, кроме движущейся материи.
Re: Возможно ли реализовать MailboxProcessor на C#?
От: varenikAA  
Дата: 27.02.21 11:43
Оценка:
Здравствуйте, varenikAA, Вы писали:

AA>В C# есть BlockingCollection<T>

AA>в которую можно асинхронно постить (Post) данные

Пока остановился на акке, хотя хотелось бы поменьше зависимостей.
А так почти идентично фаршу получилось:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;

namespace MailBox
{

    public class GetList { }
    public class Add
    {
        public Add(long item)
        {
            Item = item;
        }
        public long Item { get; }
    }

    public class Storage : ReceiveActor
    {
        private readonly ConcurrentBag<long> items = new ConcurrentBag<long>();
        public Storage()
        {

            Task.Run(async () =>
            {
                while (true)
                {
                    await Task.Delay(TimeSpan.FromSeconds(5));

                    while (items.Count > 100)
                    {
                        if (items.TryTake(out var x))
                        {
                            Console.WriteLine(x + " удален");
                        }
                    }
                }
            });
            Receive<Add>(add => items.Add(add.Item));
            Receive<GetList>(a => Sender.Tell(items.ToArray()));

        }
    }
    static class Program
    {
        static void Main()
        {
            var random = new Random();
            var system = ActorSystem.Create("MySystem");
            var greeter = system.ActorOf<Storage>("greeter");

            Task.Run(async () =>
            {

                while (true)
                {
                    greeter.Tell(new Add(random.Next(1, 100)));
                    await Task.Delay(random.Next(10, 20));
                }
            });

            Task.Run(async () =>
            {
                while (true)
                {
                    var items = (long[])(await greeter.Ask(new GetList()));
                    Console.WriteLine($"Sum = {items.Sum()}");
                    await Task.Delay(random.Next(10, 20));
                }
            });

            Console.WriteLine("done");
            Console.ReadLine();
        }

    }
}
☭ ✊ В мире нет ничего, кроме движущейся материи.
Re: Возможно ли реализовать MailboxProcessor на C#?
От: Farsight СССР  
Дата: 27.02.21 19:23
Оценка:
Здравствуйте, varenikAA, Вы писали:

Может проще Akka или Orleans взять? Ну или юзать эфшарповый из сишарпа.
</farsight>
Re[2]: Возможно ли реализовать MailboxProcessor на C#?
От: varenikAA  
Дата: 28.02.21 01:04
Оценка:
Здравствуйте, Farsight, Вы писали:

F>Здравствуйте, varenikAA, Вы писали:


F>Может проще Akka или Orleans взять? Ну или юзать эфшарповый из сишарпа.


вчера нашел
https://proto.actor/

да похоже на то. хотя тут пишут что это антипаттерн.
я просто хочу в публичный мини чат добавить рассылку новом клиентам последних 10-50 сообщений.
и вот думаю что эффективней акторы или обычная конкурентная коллекция?
☭ ✊ В мире нет ничего, кроме движущейся материи.
Re: Возможно ли реализовать MailboxProcessor на C#?
От: VladD2 Российская Империя www.nemerle.org
Дата: 11.03.21 23:21
Оценка: +1
Здравствуйте, varenikAA, Вы писали:

AA>В C# есть BlockingCollection<T>

AA>в которую можно асинхронно постить (Post) данные и последовательно вычитывать, но нет возможности запрашивать данные (PostAndReply)

BlockingCollection — это подходящая вещь для организации работы когда один поток очередь разгребает, а другие набрасывают.

Для ответов, в такой модели, заводятся другие очереди и уже в них шлется ответ. При этом не нужна синхронизация между потоками и гарантированно отсутствие дедлоков.
Есть логика намерений и логика обстоятельств, последняя всегда сильнее.
Re[2]: Возможно ли реализовать MailboxProcessor на C#?
От: varenikAA  
Дата: 12.03.21 06:46
Оценка:
Здравствуйте, VladD2, Вы писали:

AA>>в которую можно асинхронно постить (Post) данные и последовательно вычитывать, но нет возможности запрашивать данные (PostAndReply)


VD>BlockingCollection — это подходящая вещь для организации работы когда один поток очередь разгребает, а другие набрасывают.



Попробовал так(правда запрос и ответ пришлось разделить, не получилось в одном PostAndReply)

https://gist.github.com/altbodhi/1799af4fd5f767b240b20e5b5192e819

BlockingCollection не сильно отличается от BufferBlock, но все равно нужно следить за данными(например список сообщений все равно нужно копировать иначе гарантированно падаем.
☭ ✊ В мире нет ничего, кроме движущейся материи.
Re[3]: Возможно ли реализовать MailboxProcessor на C#?
От: VladD2 Российская Империя www.nemerle.org
Дата: 12.03.21 12:45
Оценка: +1
Здравствуйте, varenikAA, Вы писали:

AA>BlockingCollection не сильно отличается от BufferBlock, но все равно нужно следить за данными(например список сообщений все равно нужно копировать иначе гарантированно падаем.


BufferBlock не использовал, но, на первый взгляд, основное отличие в названии. В BlockingCollection главено Blocking. Список сообщений копировать никуда не нужно и ничего не падает. Проверено неоднократно. Но нужно чтобы они immutable были. Вот как он используется:
https://github.com/rsdn/nitra/blob/d80af0200012852b535d573030725305407a818b/Nitra/ClientServer/Nitra.ClientServer.Server/Router.n#L128
          foreach(msg in _mainQueue.GetConsumingEnumerable(_cts.Token))
          {
            | serverMsg is ClientMessage.Shutdown => OnClientMessage(serverMsg); return;
            | serverMsg is ClientMessage          => OnClientMessage(serverMsg);
            | serverMsg is RouterAnswerMessage    => OnRouterAnswerMessage(serverMsg);
            | _ => assert2(false)
          }

Все очень просто и элегантно. Ну, а добавляется через Add из любого другого потока.
Есть логика намерений и логика обстоятельств, последняя всегда сильнее.
Re[2]: Возможно ли реализовать MailboxProcessor на C#?
От: Mystic Artifact  
Дата: 12.03.21 22:06
Оценка:
Здравствуйте, VladD2, Вы писали:

VD>BlockingCollection — это подходящая вещь для организации работы когда один поток очередь разгребает, а другие набрасывают.


Она в современных реалиях вполне хороша и в обратной ситуации: один продюсер и много потоков-разгребателей, при чем, при желании, даже через async/await — внутри реализация знаком с этим случаем тоже.
Re[3]: Возможно ли реализовать MailboxProcessor на C#?
От: VladD2 Российская Империя www.nemerle.org
Дата: 13.03.21 06:09
Оценка:
Здравствуйте, Mystic Artifact, Вы писали:

MA> Она в современных реалиях вполне хороша и в обратной ситуации: один продюсер и много потоков-разгребателей, при чем, при желании, даже через async/await — внутри реализация знаком с этим случаем тоже.


А что в этом подходе хорошего то? Много потов лезут к общему ресурсу толкаясь и создавая проблемы. Надо дать каждому такому потоку по BlockingCollection и пусть разгребают каждый свою очередь. А продюсер или менеджер будет им эти задачи накидывать.
Есть логика намерений и логика обстоятельств, последняя всегда сильнее.
Re[4]: Возможно ли реализовать MailboxProcessor на C#?
От: Mystic Artifact  
Дата: 13.03.21 09:49
Оценка:
Здравствуйте, VladD2, Вы писали:

VD>А что в этом подходе хорошего то?


Простота. Оно создано работать одинаково хорошо (или одинаково плохо) во всех этих режимах.

VD>Много потов лезут к общему ресурсу толкаясь и создавая проблемы. Надо дать каждому такому потоку по BlockingCollection и пусть разгребают каждый свою очередь. А продюсер или менеджер будет им эти задачи накидывать.


В зависимости от натурального характера задач (например, 100-кратная разница в размере/времени обработки разных задач) менеджеру надо будет озаботиться тем, что бы все потоки были заняты работой. В случае выше рабочие потоки "толкаясь" в одной коллекции — получают это бесплатно. Понятно, что это всё решаемо.

Второе, если просто раскидать BlockingCollection каждому потоку, то желательно не забыть вместо дефолтной коллекции (ConcurrentQueue, которая MPMC (multiple producer multiple consumer)) — подсунуть какую-то более простую / подходящую (SPSC).

В общем в любом случае получится сложнее. Если это действительно обоснованно/необходимо — то конечно так и надо делать. Думаю каждой задаче своё решение, тем более нюансов в каждом конкретном случае своя масса. Кому-то надо за каждый лишний лок бороться, а кому-то это не будет видно даже под микроскоп.

А, вообще, я с тобой согласен. Я просто обратил внимание, что BlockingCollection работает в любых режимах, и для меня — это его основное достоинство.
Re[3]: ChannelReader<T>
От: vdimas Россия  
Дата: 06.04.21 03:30
Оценка:
Здравствуйте, varenikAA, Вы писали:

Q>>Возможно, для этих целей лучше подходит Channel&lt;T&gt;

AA>Спасибо. но это немножко не то.

Как раз то.
Каждая заинтересованная в получении писем сущность начинает работу с асинхронного запроса письма из почтового ящика.
Происходящее с т.з. активности — блокирующий вызов, т.е. подписка на событие готовности.
С т.з. "физического потока" (т.е. потока уровня ОС) — асинхронный вызов.


AA>Идея сделать асинхронный последовательный доступ к общему хранилищу.


Строго говоря, последовательный доступ всегда синхронный, ну да ладно, терминология современного донета малость загоняет, разводит путаницу на ровном месте.

В общем, требуется просто последовательный доступ.
Примерно так:
http://www.rsdn.org/forum/dotnet/7947025


AA>Вообще почтовый ящик отличная абстракция позволяет закодить взаимодействие акторов без блокировок и синхронизации, т.к. все обращения обрабатываются в порядке очереди.


Блокировка/синхронизация никуда не делись.
Просто раньше очередь ожидающих активностей (процессов, потоков) была на уровне механизмов ОС, а через async/await эта очередь живёт на юзверском уровне, но по-сути она в точности такая же.
Отредактировано 06.04.2021 3:35 vdimas . Предыдущая версия . Еще …
Отредактировано 06.04.2021 3:34 vdimas . Предыдущая версия .
Отредактировано 06.04.2021 3:31 vdimas . Предыдущая версия .
Re[4]: ChannelReader<T>
От: varenikAA  
Дата: 06.04.21 05:19
Оценка:
Здравствуйте, vdimas, Вы писали:


V>Как раз то.



Я имел не множко другое, типа маленькой БД в памяти, но без lock(). т.е. чтобы отправить в БД данные и потом когда надо сделать запрос.
А эти примитивы все основаны на удалении прочтенного сообщения. В C# полный аналог или совсем не возможен я про PostAndReply либо очень сложен в реализации.
Немножко похожее можно сделать в виде акторов, когда в актор-запрос помещается актор из которого будет извлечен ответ.


AA>>Вообще почтовый ящик отличная абстракция позволяет закодить взаимодействие акторов без блокировок и синхронизации, т.к. все обращения обрабатываются в порядке очереди.


V>Блокировка/синхронизация никуда не делись.

А как же C[ompare]A[nd]S[wap]? В этом приеме блокировка разве есть?
☭ ✊ В мире нет ничего, кроме движущейся материи.
Re[5]: ChannelReader<T>
От: vdimas Россия  
Дата: 06.04.21 12:07
Оценка: 1 (1) +2
Здравствуйте, varenikAA, Вы писали:

V>>Как раз то.

AA>Я имел не множко другое, типа маленькой БД в памяти, но без lock(). т.е. чтобы отправить в БД данные и потом когда надо сделать запрос.

А какая разница с т.з. логики кода, как именно выполняется сериализация доступа — через блокировку потоков уровня ОС, или через блокировку кооперативных потоков асинхронной системы .Net?


AA>А эти примитивы все основаны на удалении прочтенного сообщения. В C# полный аналог или совсем не возможен я про PostAndReply либо очень сложен в реализации.


Во-первых, тип MailBoxProcessor можно использовать из C# (либо напрямую, либо сделав соотв. фасад).

Во-вторых, а чего там сложного? В крайнем случае можно взять исходники класса и портировать на C#, чтобы не тянуть ядерную DLL языка F# в зависимости.


V>>Блокировка/синхронизация никуда не делись.

AA>А как же C[ompare]A[nd]S[wap]? В этом приеме блокировка разве есть?

В этом месте блокируется неудачливый поток (заходит на новую попытку), но система в целом продвигается, поэтому lock-free.
Но асинхронщина дотнета вовсе не lock-free с т.з. асинхронных потоков, она асинхронна только с т.з. потоков уровня ОС.
А с т.з. исполняемой логики в асинхронных методах — происходящее очень даже синхронно. ))

Тут единственное что происходит — вытесняющая многозадачность уровня ядра ОС подменяется кооперативной многозадачностью юзверского уровня.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.