Re[2]: SynchronizationContext a-la node.js
От: SuhanovSergey  
Дата: 01.10.16 10:04
Оценка:
Здравствуйте, Serginio1, Вы писали:

Мой прототип

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        static int sharedStateUseConcurrencyLevel = 0;

        static void WorkWithSharedState()
        {
            var concurrencyLevel = Interlocked.Increment(ref sharedStateUseConcurrencyLevel);
            if (concurrencyLevel > 1)
                Console.WriteLine("concurrencyLevel > 1");
            Thread.Sleep(10);
            Interlocked.Decrement(ref sharedStateUseConcurrencyLevel);
        }

        static async Task Test()
        {
            Func<int, Task> step = async delay =>
            {
                await Task.Delay(delay);
                WorkWithSharedState();
                await Task.Delay(delay);
                WorkWithSharedState();
                await Task.Delay(delay);
                WorkWithSharedState();
            };
            var r = new Random();
            await Task.WhenAll(Enumerable.Range(0, 10).Select(i => step(r.Next(100))));
        }

        class MySynchronizationContext : SynchronizationContext
        {
            ConcurrentQueue<KeyValuePair<SendOrPostCallback, object>> callbacksQueue =
                new ConcurrentQueue<KeyValuePair<SendOrPostCallback, object>>();
            int workitemSubmitted;

            public override void Post(SendOrPostCallback d, object state)
            {
                callbacksQueue.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state));
                ScheduleThreadPoolWorkItem();
            }

            void ScheduleThreadPoolWorkItem()
            {
                if (callbacksQueue.Count > 0)
                {
                    if (Interlocked.Exchange(ref workitemSubmitted, 1) == 0)
                    {
                        ThreadPool.QueueUserWorkItem(_ =>
                        {
                            SetSynchronizationContext(this);
                            for (int itemsToProcess = callbacksQueue.Count; itemsToProcess > 0; --itemsToProcess)
                            {
                                KeyValuePair<SendOrPostCallback, object> item;
                                if (callbacksQueue.TryDequeue(out item))
                                    item.Key(item.Value);
                            }
                            Debug.Assert(Interlocked.Exchange(ref workitemSubmitted, 0) == 1);
                            ScheduleThreadPoolWorkItem();
                        });
                    }
                }
            }
        }

        static void Main(string[] args)
        {
            SynchronizationContext.SetSynchronizationContext(null);
            Console.WriteLine("Default context set");
            Test().Wait();

            Console.WriteLine("Custom context set");
            SynchronizationContext.SetSynchronizationContext(new MySynchronizationContext());
            Test().Wait();

            Console.ReadKey();
        }
    }
}
Re[2]: SynchronizationContext a-la node.js
От: TK Лес кывт.рф
Дата: 01.10.16 13:27
Оценка: 24 (1) +1
Здравствуйте, Serginio1, Вы писали:

SS>>Хочется иметь SynchronizationContext

SS>>- который бы сериализовал колбэки, так что только один колбэк испольняется в момент времени
SS>>- не являлся бю UI-ным SynchronizationContext-ом, а испльзовал бы тред пул. Никакой тред не должен спать, ожидая колбэков.

SS>>Нужно для IO-bound фоновый задачи, которая запускает параллельные составные IO таски и не хочет синзронизировать доступ к общим переменным между await-ами.


S> Может я не понял вопроса, но вот здесь Consuming the Task-based Asynchronous Pattern


Не проще было ConcurrentExclusiveSchedulerPair использовать?
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Re[3]: SynchronizationContext a-la node.js
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 01.10.16 19:55
Оценка:
Здравствуйте, TK, Вы писали:

TK>Здравствуйте, Serginio1, Вы писали:


SS>>>Хочется иметь SynchronizationContext

SS>>>- который бы сериализовал колбэки, так что только один колбэк испольняется в момент времени
SS>>>- не являлся бю UI-ным SynchronizationContext-ом, а испльзовал бы тред пул. Никакой тред не должен спать, ожидая колбэков.

SS>>>Нужно для IO-bound фоновый задачи, которая запускает параллельные составные IO таски и не хочет синзронизировать доступ к общим переменным между await-ами.


S>> Может я не понял вопроса, но вот здесь Consuming the Task-based Asynchronous Pattern


TK>Не проще было ConcurrentExclusiveSchedulerPair использовать?

Спасибо Тимофей! Буду мотать на ус. Просто подумалось, что ему хватит асинхронной очереди BufferBlock, для последовательного выполнения калбеков.
Но так как я мало пишу на C£, то видно многое упускаю. Буду разбираться.
и солнце б утром не вставало, когда бы не было меня
Re[3]: SynchronizationContext a-la node.js
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 01.10.16 19:59
Оценка:
Здравствуйте, SuhanovSergey, Вы писали:

Наверное, то что предлагают Sinix и ТК более подходит для решения твоей задачи.
Тебе же не обязательно, что бы выполнялось в одном потоке? Главное, что бы последовательно?
и солнце б утром не вставало, когда бы не было меня
Отредактировано 01.10.2016 20:04 Serginio1 . Предыдущая версия .
Re[3]: SynchronizationContext a-la node.js
От: pilgrim_ Россия  
Дата: 01.10.16 21:07
Оценка:
Здравствуйте, SuhanovSergey, Вы писали:

SS>Мой прототип


Пара замечаний, 1-е за что зацепился взгляд

SS>
SS> if (callbacksQueue.Count > 0)
SS>


Для проверки что concurrent коллекция пустая/не-пустая рекомендуется использовать спец.метод IsEmpty, Count "тяжелее" по перформансу.

SS>
SS> for (int itemsToProcess = callbacksQueue.Count; itemsToProcess > 0; --itemsToProcess)
SS>


тут аналогично, но также непонятно зачем тут for, с очередью обычно работат так:

while (callbacksQueue.TryDequeue(out item))
{
//code
}



SS>Debug.Assert(Interlocked.Exchange(ref workitemSubmitted, 0) == 1);


Такой код будет выкинут в релизной сборке, Exchange нужно делать вне Assert.
Re: SynchronizationContext a-la node.js
От: Danchik Украина  
Дата: 03.10.16 14:18
Оценка:
Здравствуйте, SuhanovSergey, Вы писали:

SS>Хочется иметь SynchronizationContext

SS>- который бы сериализовал колбэки, так что только один колбэк испольняется в момент времени
SS>- не являлся бю UI-ным SynchronizationContext-ом, а испльзовал бы тред пул. Никакой тред не должен спать, ожидая колбэков.

SS>Нужно для IO-bound фоновый задачи, которая запускает параллельные составные IO таски и не хочет синзронизировать доступ к общим переменным между await-ами.


Может на Rx.NET это сделать?
Re[9]: SynchronizationContext a-la node.js
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 03.10.16 15:02
Оценка: +1
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, Serginio1, Вы писали:


S>> Там пример на ConfigureAwait(false) для библиотек для того, что бы они не вызываличь в потоке UI.

S>Эта ошибка с любым лимитированным шедулером возможна, просто понадобится больше ожидающих задач.
S>Лечить надо причину, а не симптомы.

S>>Лучше бы сделали наоборот, сто по умолчанию ConfigureAwait(false)

S>Как обычно: предлагаем что-то — не останавливаемся,а думаем "а что, если моё пожелание сбудется?". Дальше продолжать?


S>>Так все await последовательно будут выполняться. Только после того когда они все закончатся будет вызван Take()

S>>Другое дело если внутри есть ContinueWith или Task.Run
S>Речь про произвольный код — по определению на это надо закладываться.


S>> Давай конкретный пример разберем. И в каком месте здесь дедлок


S>ну это ж классика. Самый простой способ:

S>
  что не так-то?
S>
S>public class AsyncProducerConsumerCollection<T>
S>{
S>    private readonly Queue<T> m_collection = new Queue<T>();

S>    private readonly Queue<TaskCompletionSource<T>> m_waiting =
S>        new Queue<TaskCompletionSource<T>>();

S>    public void Add(T item)
S>    {
S>        TaskCompletionSource<T> tcs = null;
S>        lock (m_collection)
S>        {
S>            if (m_waiting.Count > 0)
S>                tcs = m_waiting.Dequeue();
S>            else
S>                m_collection.Enqueue(item);
S>        }
S>        if (tcs != null)
S>            tcs.TrySetResult(item);
S>    }

S>    public Task<T> Take()
S>    {
S>        lock (m_collection)
S>        {
S>            if (m_collection.Count > 0)
S>            {
S>                return Task.FromResult(m_collection.Dequeue());
S>            }
S>            else
S>            {
S>                var tcs = new TaskCompletionSource<T>();
S>                m_waiting.Enqueue(tcs);
S>                return tcs.Task;
S>            }
S>        }
S>    }
S>}

S>static class Program
S>{
S>    private static AsyncProducerConsumerCollection<Action> m_data = new AsyncProducerConsumerCollection<Action>();

S>    private static async Task ConsumerAsync()
S>    {
S>        while (true)
S>        {
S>            var nextItem = await m_data.Take();
S>            nextItem.Invoke();
S>        }
S>    }

S>    private static void Produce(Action data)
S>    {
S>        m_data.Add(data);
S>    }

S>    static void Main(string[] args)
S>    {
S>        var local1 = 0;
S>        Action a = null;
S>        a = () =>
S>        {
S>            while (local1 == 0)
S>            {
S>                Produce(
S>                    () =>
S>                    {
S>                        Console.WriteLine(">local1++");
S>                        local1++;
S>                        Produce(a);
S>                    });
S>            }
S>            Console.WriteLine("!!!" + local1);
S>        };

S>        Produce(a);
S>        ConsumerAsync().Wait();

S>        Console.WriteLine("Done.");
S>        Console.ReadKey();
S>    }
S>}
S>




Посмотрел.

Правильнее по алгоритму сначала запустить чтение

ConsumerAsync();

А затем вставку
Produce(a);



while (local1 == 0)
            {
// Просто запускает делегат
// А он не выполнится пока ConsumerAsync() не сработает
// А он в твоем алгоритме стоит поле Produce
// И цикл крутится бесконечно
                Produce(
                () =>
                    {
                        Console.WriteLine(">local1++");
                        local1++;
                        Produce(a);
                    });
            }


Но и в случае с ConsumerAsync() он вызывает Produce с тем же бесконечным циклом и ждет выполнения.
При этом до Console.WriteLine(">local1++"); он никогда не добирается.

А исклю
и солнце б утром не вставало, когда бы не было меня
Re[10]: SynchronizationContext a-la node.js
От: Sinix  
Дата: 03.10.16 15:19
Оценка:
Здравствуйте, Serginio1, Вы писали:

S>Правильнее по алгоритму сначала запустить чтение

Не спасёт.

S>// И цикл крутится бесконечно

Угу. Всего-то один умный человек решил, что if (local1 == 0) недостаточно кавайно, while понадёжней будет. Ну и последний Produce(a); убрал — while же есть.

Только чур не рассказывать, что такого не бывает, этим летом барабашку ловил. Чтоб было веселее — шедулер был не однопоточный, и оно таки со временем себя прожёвывало. Если никто больше проц не отжирал.

В общем, не надо изобретать велосипед, когда есть await. Проверено сложным способом
Отредактировано 03.10.2016 15:21 Sinix . Предыдущая версия . Еще …
Отредактировано 03.10.2016 15:20 Sinix . Предыдущая версия .
Re[11]: SynchronizationContext a-la node.js
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 03.10.16 19:27
Оценка:
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, Serginio1, Вы писали:


S>>Правильнее по алгоритму сначала запустить чтение

S>Не спасёт.

S>>// И цикл крутится бесконечно

S>Угу. Всего-то один умный человек решил, что if (local1 == 0) недостаточно кавайно, while понадёжней будет. Ну и последний Produce(a); убрал — while же есть.

S>Только чур не рассказывать, что такого не бывает, этим летом барабашку ловил. Чтоб было веселее — шедулер был не однопоточный, и оно таки со временем себя прожёвывало. Если никто больше проц не отжирал.


S>В общем, не надо изобретать велосипед, когда есть await. Проверено сложным способом


Прошу прощения писал в цктноте. Не до конца не разобрался

while (local1 == 0)
            {
// Будет крутится в цикле пока не вызовется
// Этот делегат

                Produce(
                () =>
                    {
                        Console.WriteLine(">local1++");
                        local1++;
                        Produce(a);
                    });
            }


Но он не выполнится ибо
Ждет выполнения првого, который крутит бесконечный цикл и ставит делегаты в очередь

while (true)
        {
            var nextItem = await m_data.Take();
            nextItem.Invoke(); // Будет выполняться пока очередь не вызовет OutOfMemmory
        }


Это по сути аналогично в одном потоке

event.WaitOne();
event.Set();

По сути это и есть дэдлок.

Аналогично будет и при использовании SynchronizationContext

А в данном примере задача будет использована только 1 раз
var nextItem = await m_data.Take();


Другое дело, что задачу топикастера, можно решать параллельные задачи с шедулером
и солнце б утром не вставало, когда бы не было меня
Re[12]: SynchronizationContext a-la node.js
От: Sinix  
Дата: 03.10.16 19:44
Оценка:
Здравствуйте, Serginio1, Вы писали:

S> По сути это и есть дэдлок.

S> Аналогично будет и при использовании SynchronizationContext

Бинго!

Про это и писал выше по ветке,

Для тасков так делать нельзя, дедлок | забитую очередь отхватить можно
...
Но топикстартеру надо "SynchronizationContext который бы сериализовал колбэки"...
Вызывать калбэки напрямую или ч/з await тут роли не играет, проблема с самими задачами.


Т.е. как только появляется необходимость запихнуть в одну очередь произвольный код — начинаются грабли.


S> Другое дело, что задачу топикастера, можно решать параллельные задачи с шедулером

Ну да. Или сами себе создаём проблемы с узким местом — очередью сообщений, или городим синхронизацию (и снова рискуем огрести из-за логической ошибки) или отказываемся от изменяемого разделяемого состояния вообще. Последнее как-то проще в итоге выходит.
Re[13]: SynchronizationContext a-la node.js
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 03.10.16 21:04
Оценка:
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, Serginio1, Вы писали:


S>> По сути это и есть дэдлок.

S>> Аналогично будет и при использовании SynchronizationContext

S>Бинго!


S>Про это и писал выше по ветке,

S>

S>Для тасков так делать нельзя, дедлок | забитую очередь отхватить можно
S>...
S>Но топикстартеру надо "SynchronizationContext который бы сериализовал колбэки"...
S>Вызывать калбэки напрямую или ч/з await тут роли не играет, проблема с самими задачами.


S>Т.е. как только появляется необходимость запихнуть в одну очередь произвольный код — начинаются грабли.

Ну дык куча приложений работает с SynchronizationContext и никаких граблей не отхватывает. Та же 1С по такому принципу работает и дедлоков не отхватывает.
Просто для каждой задачи свой подход. Вплоть до разруливания дедлок аналогично как в SQL (Кодт помню выкладывал алгоритм).


S>> Другое дело, что задачу топикастера, можно решать параллельные задачи с шедулером

S>Ну да. Или сами себе создаём проблемы с узким местом — очередью сообщений, или городим синхронизацию (и снова рискуем огрести из-за логической ошибки) или отказываемся от изменяемого разделяемого состояния вообще. Последнее как-то проще в итоге выходит.

Ну на самом деле у меня была задача при работе с Вацапом. Куча народу отправляли сообщения через один номер.
Отправлять нужно с задержками. В очереди может скапливаться по нескольку сообщений, но их недостаточно, что бы забить очередь до отказа.
и солнце б утром не вставало, когда бы не было меня
Отредактировано 03.10.2016 21:06 Serginio1 . Предыдущая версия .
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.