SynchronizationContext a-la node.js
От: SuhanovSergey  
Дата: 29.09.16 19:04
Оценка:
Хочется иметь SynchronizationContext
— который бы сериализовал колбэки, так что только один колбэк испольняется в момент времени
— не являлся бю UI-ным SynchronizationContext-ом, а испльзовал бы тред пул. Никакой тред не должен спать, ожидая колбэков.

Нужно для IO-bound фоновый задачи, которая запускает параллельные составные IO таски и не хочет синзронизировать доступ к общим переменным между await-ами.
Re: SynchronizationContext a-la node.js
От: Sinix  
Дата: 29.09.16 19:42
Оценка: +1
Здравствуйте, SuhanovSergey, Вы писали:

SS>Нужно для IO-bound фоновый задачи, которая запускает параллельные составные IO таски и не хочет синзронизировать доступ к общим переменным между await-ами.


Task<T> + limited scheduler, где-то тут был. Или любой из вариантов отсюда.

Очень рекомендую переписать без изменяемого shared-состояния. И проще, и дешевле выйдет.
Re: SynchronizationContext a-la node.js
От: Vladek Россия Github
Дата: 30.09.16 02:14
Оценка:
Здравствуйте, SuhanovSergey, Вы писали:

SS>Хочется иметь SynchronizationContext

SS>- который бы сериализовал колбэки, так что только один колбэк испольняется в момент времени
SS>- не являлся бю UI-ным SynchronizationContext-ом, а испльзовал бы тред пул. Никакой тред не должен спать, ожидая колбэков.

foreach (var item in items)
{
    await DoTaskAsync(item).ConfigureAwait(false);
}
Re[2]: SynchronizationContext a-la node.js
От: Sinix  
Дата: 30.09.16 05:50
Оценка:
Здравствуйте, Vladek, Вы писали:

V> await DoTaskAsync(item).ConfigureAwait(false);

Работать будет. Только слегка не так, как задумано ; )
Re: SynchronizationContext a-la node.js
От: TK Лес кывт.рф
Дата: 30.09.16 05:54
Оценка:
Здравствуйте, SuhanovSergey, Вы писали:

SS>Хочется иметь SynchronizationContext

SS>- который бы сериализовал колбэки, так что только один колбэк испольняется в момент времени
SS>- не являлся бю UI-ным SynchronizationContext-ом, а испльзовал бы тред пул. Никакой тред не должен спать, ожидая колбэков.

SS>Нужно для IO-bound фоновый задачи, которая запускает параллельные составные IO таски и не хочет синзронизировать доступ к общим переменным между await-ами.


Например, есть SynchronizationAttribute который будет сериализовать доступ к инстансу.
Ещё есть orleans в котором реализован шедулер с подобным функционалом.
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Re[2]: SynchronizationContext a-la node.js
От: Sinix  
Дата: 30.09.16 06:29
Оценка:
Здравствуйте, TK, Вы писали:

TK>Например, есть SynchronizationAttribute который будет сериализовать доступ к инстансу.

[MethodImpl(MethodImplOptions.Synchronized)] тогда уж, SynchronizationAttribute под дремучее легаси в виде ContextBoundObject заточен, насколько помню.

Вот такие штуки лучше не советовать вообще. А то видел я, как люди писали код типа
if (Shared.A > Shared.B)
  Shared.A = Shared.B
и потом доказывали, что в фреймворке баг, т.к. оно не работает


TK>Ещё есть orleans в котором реализован шедулер с подобным функционалом.

А не будет ли orleans тут оверкиллом? Причём реально оверкиллом, он как-никак на рои из агентов заточен. TPL DataFlow тогда уж, но он тоже эффективностью не блещет — тонны аллокаций на каждый вызов.
Re[3]: SynchronizationContext a-la node.js
От: TK Лес кывт.рф
Дата: 30.09.16 08:28
Оценка: +1
Здравствуйте, Sinix, Вы писали:

TK>>Ещё есть orleans в котором реализован шедулер с подобным функционалом.

S>А не будет ли orleans тут оверкиллом? Причём реально оверкиллом, он как-никак на рои из агентов заточен. TPL DataFlow тогда уж, но он тоже эффективностью не блещет — тонны аллокаций на каждый вызов.

Так весь Orleans брать не надо. Нужно посмотреть на шедулер и по мотивам написать свой
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
Re: SynchronizationContext a-la node.js
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 30.09.16 09:10
Оценка:
Здравствуйте, SuhanovSergey, Вы писали:

SS>Хочется иметь SynchronizationContext

SS>- который бы сериализовал колбэки, так что только один колбэк испольняется в момент времени
SS>- не являлся бю UI-ным SynchronizationContext-ом, а испльзовал бы тред пул. Никакой тред не должен спать, ожидая колбэков.

SS>Нужно для IO-bound фоновый задачи, которая запускает параллельные составные IO таски и не хочет синзронизировать доступ к общим переменным между await-ами.


Может я не понял вопроса, но вот здесь Consuming the Task-based Asynchronous Pattern

Есть пример асинхронной очереди


public class AsyncProducerConsumerCollection<T>
{
    private readonly Queue<T> m_collection = new Queue<T>();
    private readonly Queue<TaskCompletionSource<T>> m_waiting = 
        new Queue<TaskCompletionSource<T>>();

    public void Add(T item)
    {
        TaskCompletionSource<T> tcs = null;
        lock (m_collection)
        {
            if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();
            else m_collection.Enqueue(item);
        }
        if (tcs != null) tcs.TrySetResult(item);
    }

    public Task<T> Take()
    {
        lock (m_collection)
        {
            if (m_collection.Count > 0) 
            {
                return Task.FromResult(m_collection.Dequeue()); 
            }
            else 
            {
                var tcs = new TaskCompletionSource<T>();
                m_waiting.Enqueue(tcs);
                return tcs.Task;
            }
        }
    }
}

Используя эту структуру данных, можно написать следующий код:

private static AsyncProducerConsumerCollection<int> m_data = …;
…
private static async  Task ConsumerAsync()
{
    while(true)
    {
        int nextItem = await m_data.Take();
        ProcessNextItem(nextItem);
    }
}
…
private static void Produce(int data)
{
    m_data.Add(data);
}


Там же

Пространство имен System.Threading.Tasks.Dataflow включает тип BufferBlock<T>, который можно использовать таким же образом, но без построения пользовательского типа коллекции:


private static BufferBlock<int> m_data = …;
…
private static async  Task ConsumerAsync()
{
    while(true)
    {
        int nextItem = await m_data.ReceiveAsync();
        ProcessNextItem(nextItem);
    }
}
…
private static void Produce(int data)
{
    m_data.Post(data);
}
и солнце б утром не вставало, когда бы не было меня
Отредактировано 30.09.2016 9:13 Serginio1 . Предыдущая версия .
Re[2]: SynchronizationContext a-la node.js
От: Sinix  
Дата: 30.09.16 09:35
Оценка: 2 (1)
Здравствуйте, Serginio1, Вы писали:

S>Есть пример асинхронной очереди

Для тасков так делать нельзя, дедлок | забитую очередь отхватить можно. Нужен шедулер с учётом TaskCreationOptions.PreferFairness и TaskCreationOptions.LongRunning. В идеале — с приоритетами, типа такого.
Re[3]: SynchronizationContext a-la node.js
От: Sharov Россия  
Дата: 30.09.16 10:33
Оценка:
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, TK, Вы писали:


TK>>Например, есть SynchronizationAttribute который будет сериализовать доступ к инстансу.

S>[MethodImpl(MethodImplOptions.Synchronized)] тогда уж, SynchronizationAttribute под дремучее легаси в виде ContextBoundObject заточен, насколько помню.

S>Вот такие штуки лучше не советовать вообще. А то видел я, как люди писали код типа

S>
S>if (Shared.A > Shared.B)
S>  Shared.A = Shared.B
S>
и потом доказывали, что в фреймворке баг, т.к. оно не работает


А что не так с этим кодом в контексте аттрибута MethodImplOptions.Synchronized? При условии, что Shared.A , Shared.B больше нигде не меняются.
Кодом людям нужно помогать!
Re[4]: SynchronizationContext a-la node.js
От: Sinix  
Дата: 30.09.16 10:45
Оценка:
Здравствуйте, Sharov, Вы писали:

S>А что не так с этим кодом в контексте аттрибута MethodImplOptions.Synchronized? При условии, что Shared.A , Shared.B больше нигде не меняются.

Ты знал
Re[3]: SynchronizationContext a-la node.js
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 30.09.16 11:25
Оценка:
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, Serginio1, Вы писали:


S>>Есть пример асинхронной очереди

S>Для тасков так делать нельзя, дедлок | забитую очередь отхватить можно. Нужен шедулер с учётом TaskCreationOptions.PreferFairness и TaskCreationOptions.LongRunning. В идеале — с приоритетами, типа такого.

Там нет очереди тасков.
Там таск только на ожидании и получении результата

public Task<T> Take()
    {
        lock (m_collection)
        {

// Если в очереди есть значения то сразу возвращаем значение
// Task.FromResult(m_collection.Dequeue()); 
            if (m_collection.Count > 0) 
            {
                return Task.FromResult(m_collection.Dequeue()); 
            }
            else 
            {
// Если нет сообщений то возвращаем TaskCompletionSource
                var tcs = new TaskCompletionSource<T>();
                m_waiting.Enqueue(tcs);
                return tcs.Task;
            }
        }
    }



public void Add(T item)
    {
        TaskCompletionSource<T> tcs = null;
        lock (m_collection)
        {
// Если есть ожидание то получим TaskCompletionSource
            if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();
            else m_collection.Enqueue(item);
        }

// И установим добавляемое значение в результат ожидаемой задачи
        if (tcs != null) tcs.TrySetResult(item);
    }


Понятно, что очередь может заполняться пока памяти хватит. Но вот где дедлок?
и солнце б утром не вставало, когда бы не было меня
Re[4]: SynchronizationContext a-la node.js
От: Sinix  
Дата: 30.09.16 11:41
Оценка:
Здравствуйте, Serginio1, Вы писали:

S>Там нет очереди тасков.

S> Там таск только на ожидании и получении результата
Там — нет Но топикстартеру надо "SynchronizationContext который бы сериализовал колбэки"...
Вызывать калбэки напрямую или ч/з await тут роли не играет, проблема с самими задачами.
При большой нагрузке или увеличивается средняя задержка (если продолжения в конец очереди пихаются), или начинаются таймауты (если LIFO делать).


S> Понятно, что очередь может заполняться пока памяти хватит. Но вот где дедлок?


Простейший вариант с task.Result брать не будем, это скучно и все на это дело уже наступали. А вот такое вот с однопоточным LIFO-планировщиком
var t = SetFlagAsync();
while (await CheckFlag())
{ ... }
await t;

не хотите?
Re[5]: SynchronizationContext a-la node.js
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 30.09.16 12:01
Оценка:
Здравствуйте, Sinix, Вы писали:



S>Простейший вариант с task.Result брать не будем, это скучно и все на это дело уже наступали. А вот такое вот с однопоточным LIFO-планировщиком

А можно поподробнее? Я еще не успел наступить. И что не так с примером.
S>
S>var t = SetFlagAsync();
S>while (await CheckFlag())
S>{ ... }
S>await t;
S>

S>не хотите?



Ему как раз await t не нужен. Ему нужен Func или Action и что бы они выполнялись последовательно и неважно в каком потоке.

не хочет синзронизировать доступ к общим переменным между await-ами


То есть


while (true)
{
  var t=await ВозьмемИзFIFOЗамыкание();

   t();

 }
и солнце б утром не вставало, когда бы не было меня
Re[6]: SynchronizationContext a-la node.js
От: Sinix  
Дата: 30.09.16 12:50
Оценка:
Здравствуйте, Serginio1, Вы писали:

S>>Простейший вариант с task.Result брать не будем, это скучно и все на это дело уже наступали. А вот такое вот с однопоточным LIFO-планировщиком

S> А можно поподробнее?
http://blog.stephencleary.com/2012/07/dont-block-on-async-code.html


S> И что не так с примером.

var t = SetFlagAsync(); // Кладём в очередь задачу "установить флаг"
while (true)
{ 
  var task = CheckFlag(); // Кладём в начало очереди задачу "проверить флаг"
  var result = await CheckFlag(); 
  //запускаем очередь на выполнение, выполняется CheckFlag, флаг ещё не установлен, кладём в начало очереди задачу "продолжить, результат - false".
  if (result) break;
}
await t; // сюда не доходим.

S> Ему как раз await t не нужен. Ему нужен Func или Action и что бы они выполнялись последовательно и неважно в каком потоке.
Которые в свою очередь будут запускать новые Func или Action. И поскольку await топикстартер использует, то см пример выше.
Re[7]: SynchronizationContext a-la node.js
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 30.09.16 13:11
Оценка:
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, Serginio1, Вы писали:


S>>>Простейший вариант с task.Result брать не будем, это скучно и все на это дело уже наступали. А вот такое вот с однопоточным LIFO-планировщиком

S>> А можно поподробнее?
S>http://blog.stephencleary.com/2012/07/dont-block-on-async-code.html

Там пример на ConfigureAwait(false) для библиотек для того, что бы они не вызываличь в потоке UI.
Лучше бы сделали наоборот, сто по умолчанию ConfigureAwait(false)

S>> И что не так с примером.

S>
S>var t = SetFlagAsync(); // Кладём в очередь задачу "установить флаг"
S>while (true)
S>{ 
S>  var task = CheckFlag(); // Кладём в начало очереди задачу "проверить флаг"
S>  var result = await CheckFlag(); 
S>  //запускаем очередь на выполнение, выполняется CheckFlag, флаг ещё не установлен, кладём в начало очереди задачу "продолжить, результат - false".
S>  if (result) break;
S>}
S>await t; // сюда не доходим.
S>

S>> Ему как раз await t не нужен. Ему нужен Func или Action и что бы они выполнялись последовательно и неважно в каком потоке.
S>Которые в свою очередь будут запускать новые Func или Action. И поскольку await топикстартер использует, то см пример выше.

Так все await последовательно будут выполняться. Только после того когда они все закончатся будет вызван Take()
Другое дело если внутри есть ContinueWith или Task.Run



Давай конкретный пример разберем. И в каком месте здесь дедлок


public Task<T> Take()
    {
        lock (m_collection)
        {

// Если в очереди есть значения то сразу возвращаем значение
// Task.FromResult(m_collection.Dequeue()); 
            if (m_collection.Count > 0) 
            {
                return Task.FromResult(m_collection.Dequeue()); 
            }
            else 
            {
// Если нет сообщений то возвращаем TaskCompletionSource
// и по сути там будет только 1 элемент в очереди

                var tcs = new TaskCompletionSource<T>();
                m_waiting.Enqueue(tcs);
                return tcs.Task;
            }
        }
    }



public void Add(T item)
    {
        TaskCompletionSource<T> tcs = null;
        lock (m_collection)
        {
// Если есть ожидание то получим TaskCompletionSource
// m_waiting.Count будет либо 0 либо 2
            if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();
            else m_collection.Enqueue(item);
        }

// И установим добавляемое значение в результат ожидаемой задачи
// Очередь TaskCompletionSource пуста.
        if (tcs != null) tcs.TrySetResult(item);
    }
и солнце б утром не вставало, когда бы не было меня
Re[8]: SynchronizationContext a-la node.js
От: Sinix  
Дата: 30.09.16 14:17
Оценка:
Здравствуйте, Serginio1, Вы писали:

S> Там пример на ConfigureAwait(false) для библиотек для того, что бы они не вызываличь в потоке UI.

Эта ошибка с любым лимитированным шедулером возможна, просто понадобится больше ожидающих задач.
Лечить надо причину, а не симптомы.

S>Лучше бы сделали наоборот, сто по умолчанию ConfigureAwait(false)

Как обычно: предлагаем что-то — не останавливаемся,а думаем "а что, если моё пожелание сбудется?". Дальше продолжать?


S>Так все await последовательно будут выполняться. Только после того когда они все закончатся будет вызван Take()

S>Другое дело если внутри есть ContinueWith или Task.Run
Речь про произвольный код — по определению на это надо закладываться.


S> Давай конкретный пример разберем. И в каком месте здесь дедлок


ну это ж классика. Самый простой способ:
  что не так-то?
public class AsyncProducerConsumerCollection<T>
{
    private readonly Queue<T> m_collection = new Queue<T>();

    private readonly Queue<TaskCompletionSource<T>> m_waiting =
        new Queue<TaskCompletionSource<T>>();

    public void Add(T item)
    {
        TaskCompletionSource<T> tcs = null;
        lock (m_collection)
        {
            if (m_waiting.Count > 0)
                tcs = m_waiting.Dequeue();
            else
                m_collection.Enqueue(item);
        }
        if (tcs != null)
            tcs.TrySetResult(item);
    }

    public Task<T> Take()
    {
        lock (m_collection)
        {
            if (m_collection.Count > 0)
            {
                return Task.FromResult(m_collection.Dequeue());
            }
            else
            {
                var tcs = new TaskCompletionSource<T>();
                m_waiting.Enqueue(tcs);
                return tcs.Task;
            }
        }
    }
}

static class Program
{
    private static AsyncProducerConsumerCollection<Action> m_data = new AsyncProducerConsumerCollection<Action>();

    private static async Task ConsumerAsync()
    {
        while (true)
        {
            var nextItem = await m_data.Take();
            nextItem.Invoke();
        }
    }

    private static void Produce(Action data)
    {
        m_data.Add(data);
    }

    static void Main(string[] args)
    {
        var local1 = 0;
        Action a = null;
        a = () =>
        {
            while (local1 == 0)
            {
                Produce(
                    () =>
                    {
                        Console.WriteLine(">local1++");
                        local1++;
                        Produce(a);
                    });
            }
            Console.WriteLine("!!!" + local1);
        };

        Produce(a);
        ConsumerAsync().Wait();

        Console.WriteLine("Done.");
        Console.ReadKey();
    }
}
Re[9]: SynchronizationContext a-la node.js
От: Serginio1 СССР https://habrahabr.ru/users/serginio1/topics/
Дата: 30.09.16 14:30
Оценка:
Здравствуйте, Sinix, Вы писали:

S>Здравствуйте, Serginio1, Вы писали:


S>> Там пример на ConfigureAwait(false) для библиотек для того, что бы они не вызываличь в потоке UI.

S>Эта ошибка с любым лимитированным шедулером возможна, просто понадобится больше ожидающих задач.
Там вроде проблема, что с ConfigureAwait(true) метод должен выполниться в потоке UI.
Но в этом же потоке ожидает на await вызывающий метод.
S>Лечить надо причину, а не симптомы.

S>>Лучше бы сделали наоборот, сто по умолчанию ConfigureAwait(false)

S>Как обычно: предлагаем что-то — не останавливаемся,а думаем "а что, если моё пожелание сбудется?". Дальше продолжать?


S>>Так все await последовательно будут выполняться. Только после того когда они все закончатся будет вызван Take()

S>>Другое дело если внутри есть ContinueWith или Task.Run
S>Речь про произвольный код — по определению на это надо закладываться.


S>> Давай конкретный пример разберем. И в каком месте здесь дедлок


S>ну это ж классика. Самый простой способ:

S>
  что не так-то?
S>
S>public class AsyncProducerConsumerCollection<T>
S>{
S>    private readonly Queue<T> m_collection = new Queue<T>();

S>    private readonly Queue<TaskCompletionSource<T>> m_waiting =
S>        new Queue<TaskCompletionSource<T>>();

S>    public void Add(T item)
S>    {
S>        TaskCompletionSource<T> tcs = null;
S>        lock (m_collection)
S>        {
S>            if (m_waiting.Count > 0)
S>                tcs = m_waiting.Dequeue();
S>            else
S>                m_collection.Enqueue(item);
S>        }
S>        if (tcs != null)
S>            tcs.TrySetResult(item);
S>    }

S>    public Task<T> Take()
S>    {
S>        lock (m_collection)
S>        {
S>            if (m_collection.Count > 0)
S>            {
S>                return Task.FromResult(m_collection.Dequeue());
S>            }
S>            else
S>            {
S>                var tcs = new TaskCompletionSource<T>();
S>                m_waiting.Enqueue(tcs);
S>                return tcs.Task;
S>            }
S>        }
S>    }
S>}

S>static class Program
S>{
S>    private static AsyncProducerConsumerCollection<Action> m_data = new AsyncProducerConsumerCollection<Action>();

S>    private static async Task ConsumerAsync()
S>    {
S>        while (true)
S>        {
S>            var nextItem = await m_data.Take();
S>            nextItem.Invoke();
S>        }
S>    }

S>    private static void Produce(Action data)
S>    {
S>        m_data.Add(data);
S>    }

S>    static void Main(string[] args)
S>    {
S>        var local1 = 0;
S>        Action a = null;
S>        a = () =>
S>        {
S>            while (local1 == 0)
S>            {
S>                Produce(
S>                    () =>
S>                    {
S>                        Console.WriteLine(">local1++");
S>                        local1++;
S>                        Produce(a);
S>                    });
S>            }
S>            Console.WriteLine("!!!" + local1);
S>        };

S>        Produce(a);
S>        ConsumerAsync().Wait();

S>        Console.WriteLine("Done.");
S>        Console.ReadKey();
S>    }
S>}
S>



В понедельник разбираться буду. Чего то под вечер мозги не работают.
и солнце б утром не вставало, когда бы не было меня
Re[10]: SynchronizationContext a-la node.js
От: Sinix  
Дата: 30.09.16 14:38
Оценка: 2 (1)
Здравствуйте, Serginio1, Вы писали:

S>>Эта ошибка с любым лимитированным шедулером возможна, просто понадобится больше ожидающих задач.

S> Там вроде проблема, что с ConfigureAwait(true) метод должен выполниться в потоке UI.
S>Но в этом же потоке ожидает на await вызывающий метод.

Ну представь ситуацию: используется шедулер о пяти потоках. Запускаем 5 задач, которые ждут шестую через task.Wait() и затем закидываем шестую в этот же шедулер.
Причина тут — блокирующее ожидание, а не количество потоков, в которых можно запустить таску. И лечить лучше именно его


S> В понедельник разбираться буду. Чего то под вечер мозги не работают.

Удачи! Там на самом деле довольно грубая логическая ошибка, но заметить её не так просто, тем более что на шедулере с несколькими потоками оно таки работает.
Re[2]: SynchronizationContext a-la node.js
От: SuhanovSergey  
Дата: 01.10.16 09:32
Оценка: 46 (1) +1
Здравствуйте, Sinix, Вы писали:

S>Очень рекомендую переписать без изменяемого shared-состояния. И проще, и дешевле выйдет.


В конечном итоге так и сделал — без изменяемого shared-состояния Благо есть точка, где результаты от всех задач собираются, и состояние задач можно проанализировать и смёржить.


Мой Tasks-based код, наслаждавшийся преимуществами UI-ного контекста, пеперь должен выполняться в сервисном контексте. Мой вывод такой, что изначальный код был плох за счёт shared-состояния, а желание повторить свойства UI-ого SynchronizationContext — потенциальный истчник wft тех, кто будет в этом разбираться.
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.