Информация об изменениях

Сообщение Re: Красивое решение для задачи с потоками от 13.03.2015 9:21

Изменено 13.03.2015 10:50 Sinix

Здравствуйте, Cynic, Вы писали:

C>Требуется найти "красивое" решение для следующей задачи.

C>Есть четыре потока: один "управляющий", и три "вычислительных". Задача "управляющего потока" раздавать задачи "вычислительным потокам", а задача "вычислительных" выполнять эти задачи. При этом "вычислительные потоки" должны работать по следующему алгоритму:

Таски наше всё. Пишется за 5 минут, работает с первого раза.
  код
    enum State
    {
        Idle, GetResult, Process
    }

    static volatile State state;

    static async Task<int> GetNextResult(int i)
    {
        Debug.Assert(state == State.GetResult, "Fail");
        Console.WriteLine("Geting result #{0}", i);

        var delayMs = (i % 5) * 200;
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs));

        Debug.Assert(state == State.GetResult, "Fail");
        Console.WriteLine("Result #{0} ready, moving on", i);

        return i;
    }

    static async Task<int> Process(int worker, int i)
    {
        Debug.Assert(state == State.Process, "Fail");
        Console.WriteLine("Worker{0}: processing result #{1}", worker, i);

        var delayMs = (i % 7) * 150;
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs));

        Debug.Assert(state == State.Process, "Fail");
        Console.WriteLine("Worker{0}: result #{1} processed", worker, i);
        return i;
    }

    static async Task Run(CancellationToken ct)
    {
        int num = 0;
        int taskCount = 3;
        while (!ct.IsCancellationRequested)
        {
            state = State.GetResult;
            var tasks = Enumerable.Range(0, taskCount).Select(i => GetNextResult(num + i));

            await Task.WhenAll(tasks);

            state = State.Process;
            tasks = Enumerable.Range(0, taskCount).Select(i => Process(i, num + i));

            await Task.WhenAll(tasks);

            state = State.Idle;
            num += taskCount;
        }
    }

    static void Main()
    {
        Console.WriteLine("Starting, any key to exit");
        var cts = new CancellationTokenSource();
        var task = Run(cts.Token);

        Console.ReadKey();

        cts.Cancel();
        task.Wait();
    }


Это на случай, если получение-обработку надо разделить. Если не надо, всё тоже несложно — producer-consumer и ConcurrentQueue.
Здравствуйте, Cynic, Вы писали:

C>Требуется найти "красивое" решение для следующей задачи.

C>Есть четыре потока: один "управляющий", и три "вычислительных". Задача "управляющего потока" раздавать задачи "вычислительным потокам", а задача "вычислительных" выполнять эти задачи. При этом "вычислительные потоки" должны работать по следующему алгоритму:

Таски наше всё. Пишется за 5 минут, работает с первого раза.
UPD: НЕ работает с первого раза, см ниже.
  код
    enum State
    {
        Idle, GetResult, Process
    }

    static volatile State state;

    static async Task<int> GetNextResult(int i)
    {
        Debug.Assert(state == State.GetResult, "Fail");
        Console.WriteLine("Geting result #{0}", i);

        var delayMs = (i % 5) * 200;
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs));

        Debug.Assert(state == State.GetResult, "Fail");
        Console.WriteLine("Result #{0} ready, moving on", i);

        return i;
    }

    static async Task<int> Process(int worker, int i)
    {
        Debug.Assert(state == State.Process, "Fail");
        Console.WriteLine("Worker{0}: processing result #{1}", worker, i);

        var delayMs = (i % 7) * 150;
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs));

        Debug.Assert(state == State.Process, "Fail");
        Console.WriteLine("Worker{0}: result #{1} processed", worker, i);
        return i;
    }

    static async Task Run(CancellationToken ct)
    {
        int num = 0;
        int taskCount = 3;
        while (!ct.IsCancellationRequested)
        {
            state = State.GetResult;
            var tasks = Enumerable.Range(0, taskCount).Select(i => GetNextResult(num + i));

            await Task.WhenAll(tasks);

            state = State.Process;
            tasks = Enumerable.Range(0, taskCount).Select(i => Process(i, num + i));

            await Task.WhenAll(tasks);

            state = State.Idle;
            num += taskCount;
        }
    }

    static void Main()
    {
        Console.WriteLine("Starting, any key to exit");
        var cts = new CancellationTokenSource();
        var task = Run(cts.Token);

        Console.ReadKey();

        cts.Cancel();
        task.Wait();
    }


Это на случай, если получение-обработку надо разделить. Если не надо, всё тоже несложно — producer-consumer и ConcurrentQueue.

UPD: Позорище и ещё раз позорище

Вот это
        while (!ct.IsCancellationRequested)
        {
            state = State.GetResult;
            var tasks = Enumerable.Range(0, taskCount).Select(i => GetNextResult(num + i)); // (1)

            await Task.WhenAll(tasks);

            state = State.Process;
            tasks = Enumerable.Range(0, taskCount).Select(i => Process(i, num + i)); // (2)

            await Task.WhenAll(tasks);

            state = State.Idle;
            num += taskCount;
        }

надо заменить на
        while (!ct.IsCancellationRequested)
        {
            state = State.GetResult;
            var tasks = Enumerable.Range(0, taskCount).Select(i => GetNextResult(num + i)).ToArray(); // (1)

            await Task.WhenAll(tasks);

            state = State.Process;
            var tasks2 = tasks.Select((t, i) => Process(i, t.Result)).ToArray(); // (2)

            await Task.WhenAll(tasks2);

            num += taskCount;
            state = State.Idle;
        }


С (2) всё понятно — надо обрабатывать результат, а не "Enumerable.Range(0, taskCount)".

С (1) всё чуть сложнее. Кто сообразит, почему там нужен .ToArray() (подчеркнул) — тому пряник.

Мораль: пишите ассерты.