TPL или Go
От: TK Лес кывт.рф
Дата: 07.11.16 18:43
Оценка: 3 (1) -1
Привет всем,

Небольшой тест на использование TPL и тому подобных библиотек и что будет если агентов будет мало или слишком много...
Идея простая: данные падают в очередь там разбираются на n воркеров, после обработки (5ms) каждым воркером сливаются обратно.

  TPL
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

internal class Program
{
    const int processingTime = 5;

    private static IPropagatorBlock<int, int> Reduce(int seed)
    {
        var total = seed;
        var input = new ActionBlock<int>(async arg =>
        {
            await Task.Delay(processingTime);
            total += arg;
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });

        var output = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 1 });
        input.Completion.ContinueWith(done =>
        {
            output.SendAsync(total).ContinueWith(task => output.Complete());
        });

        return DataflowBlock.Encapsulate(input, output);
    }

    private static IReadOnlyCollection<ISourceBlock<int>> CreateWorkers(ISourceBlock<int> source, int totalWorkers, Func<int, IPropagatorBlock<int, int>> workerFactory)
    {
        var output = new List<ISourceBlock<int>>();
        for (var i = 0; i < totalWorkers; ++i)
        {
            var block = workerFactory(0);
            source.LinkTo(block, new DataflowLinkOptions { PropagateCompletion = true });
            output.Add(block);
        }

        return output;
    }

    private static ISourceBlock<int> MergeResults(IReadOnlyCollection<ISourceBlock<int>> workers, Func<int, IPropagatorBlock<int, int>> factory)
    {
        var localdata = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = workers.Count });
        var output = factory(0);
        var totalWorkers = workers.Count;

        foreach (var worker in workers)
        {
            worker.LinkTo(localdata);
            worker.Completion.ContinueWith(task =>
            {
                if (Interlocked.Decrement(ref totalWorkers) == 0)
                    localdata.Complete();
            });
        }
        localdata.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true });

        return output;
    }

    private static async Task<int> Test(int iterations, int totalWorkers)
    {
        var buffer = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 1 });
        var workers = CreateWorkers(buffer, totalWorkers, Reduce);
        var results = MergeResults(workers, Reduce);
        for (var i = 0; i < iterations; ++i)
        {
            await buffer.SendAsync(1);
        }
        buffer.Complete();
        return await results.ReceiveAsync();
    }

    private static void Measure(TextWriter writer, int iterations, int workers)
    {
        var estimated = TimeSpan.FromMilliseconds(processingTime * iterations / workers).TotalSeconds;
        var timer = Stopwatch.StartNew();

        var total = Test(iterations, workers).GetAwaiter().GetResult();
        if (total != iterations)
            throw new ApplicationException($"{total} != {iterations}");

        var elapsed = timer.Elapsed.TotalSeconds;
        writer.WriteLine($"Iterations: {iterations}, Workers: {workers}, Elapsed: {elapsed}s, Estimated: {estimated}s, Loss: {elapsed - estimated}s");
    }

    static void Main(string[] args)
    {
        Measure(TextWriter.Null, 1, 1);

        Console.Out.WriteLine($"Processing time: {processingTime}ms");

        var data = new Dictionary<int, int[]>
        {
            [1000] = new[] { 1, 10, 100 },
            [10000] = new[] { 50, 100, 200 },
            [100000] = new[] { 50, 100, 500, 1000 },
            [1000000] = new[] { 100, 200, 500, 1000 },
        };

        var keys = data.OrderBy(item => item.Key);
        for (var i = 1; i <= 10; ++i)
        {
            var timer = Stopwatch.StartNew();
            foreach (var item in keys)
            {
                foreach (var workers in item.Value)
                {
                    Measure(Console.Out, item.Key, workers);
                }
            }
            var elapsed = timer.Elapsed.TotalSeconds;
            Console.Out.WriteLine($"Iteration: {i}, Elapsed: {elapsed}s");
        }
    }
}


  GoLang
package main

import (
    "io/ioutil"
    "log"
    "os"
    "sort"
    "sync"
    "time"
)

const processingTime = time.Duration(time.Millisecond * 5)

type workerFactory func(int, chan int) chan int

func reduce(seed int, c chan int) chan int {
    output := make(chan int)
    process := func() {
        total := seed
        for i := range c {
            time.Sleep(processingTime)
            total = total + i
        }
        output <- total
        close(output)
    }

    go process()
    return output
}

func createWorkers(buffer chan int, totalWorkers int, factory workerFactory) []chan int {
    output := make([]chan int, totalWorkers)
    for i := 0; i < totalWorkers; i++ {
        output[i] = factory(0, buffer)
    }
    return output
}

func mergeResults(workers []chan int, factory workerFactory) chan int {
    localdata := make(chan int, len(workers))
    output := factory(0, localdata)
    wd := sync.WaitGroup{}

    for _, worker := range workers {
        wd.Add(1)
        go func(w chan int) {
            for data := range w {
                localdata <- data
            }

            wd.Done()
        }(worker)
    }

    go func() {
        wd.Wait()
        close(localdata)
    }()

    return output
}

func test(iterations int, totalWorkers int) int {
    buffer := make(chan int)
    workers := createWorkers(buffer, totalWorkers, reduce)
    results := mergeResults(workers, reduce)

    for i := 0; i < iterations; i++ {
        buffer <- 1
    }

    close(buffer)

    return <-results
}

func measure(writer *log.Logger, iterations int, totalWorkers int) {
    estimated := time.Duration(iterations * int(processingTime) / totalWorkers)
    startTime := time.Now()

    total := test(iterations, totalWorkers)
    if total != iterations {
        log.Fatalf("%d != %d", total, iterations)
    }

    elapsed := time.Since(startTime)
    writer.Printf("Iterations: %d, Workers: %d, Elapsed: %s, Estimated: %s, Loss: %s\n", iterations, totalWorkers, elapsed, estimated, elapsed-estimated)
}

func main() {
    null := log.New(ioutil.Discard, "", 0)
    std := log.New(os.Stdout, "", 0)
    measure(null, 1, 1)

    std.Printf("Processing time: %s\n", processingTime)
    data := map[int][]int{
        1000:    []int{1, 10, 100},
        10000:   []int{50, 100, 200},
        100000:  []int{50, 100, 200, 500, 1000},
        1000000: []int{100, 200, 500, 1000},
    }

    var keys []int
    for key := range data {
        keys = append(keys, key)
    }

    sort.Ints(keys)

    for i := 1; i <= 10; i++ {
        start := time.Now()
        for _, iterations := range keys {
            for _, workers := range data[iterations] {
                measure(std, iterations, workers)
            }
        }

        elapsed := time.Since(start)
        std.Printf("Iteration %d, Elapsed %s\n", i, elapsed)
    }
}


Что удивительно, по количеству строк кода совпало в пределах погрешности. Правда по числу букв больше почти на 40%

  Ну и результаты
GoLang:
Processing time: 5ms
Iterations: 1000, Workers: 1, Elapsed: 5.3568382s, Estimated: 5s, Loss: 356.8382ms
Iterations: 1000, Workers: 10, Elapsed: 583.9921ms, Estimated: 500ms, Loss: 83.9921ms
Iterations: 1000, Workers: 100, Elapsed: 590.8429ms, Estimated: 50ms, Loss: 540.8429ms
Iterations: 10000, Workers: 50, Elapsed: 1.3435108s, Estimated: 1s, Loss: 343.5108ms
Iterations: 10000, Workers: 100, Elapsed: 1.063587s, Estimated: 500ms, Loss: 563.587ms
Iterations: 10000, Workers: 200, Elapsed: 1.3624499s, Estimated: 250ms, Loss: 1.1124499s
Iterations: 100000, Workers: 50, Elapsed: 11.0128041s, Estimated: 10s, Loss: 1.0128041s
Iterations: 100000, Workers: 100, Elapsed: 5.9047845s, Estimated: 5s, Loss: 904.7845ms
Iterations: 100000, Workers: 200, Elapsed: 3.7495542s, Estimated: 2.5s, Loss: 1.2495542s
Iterations: 100000, Workers: 500, Elapsed: 3.7843446s, Estimated: 1s, Loss: 2.7843446s
Iterations: 100000, Workers: 1000, Elapsed: 5.911478s, Estimated: 500ms, Loss: 5.411478s
Iterations: 1000000, Workers: 100, Elapsed: 54.2715363s, Estimated: 50s, Loss: 4.2715363s
Iterations: 1000000, Workers: 200, Elapsed: 27.8255456s, Estimated: 25s, Loss: 2.8255456s
Iterations: 1000000, Workers: 500, Elapsed: 13.5996723s, Estimated: 10s, Loss: 3.5996723s
Iterations: 1000000, Workers: 1000, Elapsed: 11.1325319s, Estimated: 5s, Loss: 6.1325319s
Iteration 1, Elapsed 2m27.4934724s

Processing time: 100ms
Iterations: 1000, Workers: 10, Elapsed: 11.0497247s, Estimated: 10s, Loss: 1.0497247s
Iterations: 1000, Workers: 100, Elapsed: 11.0446942s, Estimated: 1s, Loss: 10.0446942s
Iterations: 10000, Workers: 50, Elapsed: 25.1061292s, Estimated: 20s, Loss: 5.1061292s
Iterations: 10000, Workers: 100, Elapsed: 20.0803274s, Estimated: 10s, Loss: 10.0803274s
Iterations: 10000, Workers: 200, Elapsed: 25.1038125s, Estimated: 5s, Loss: 20.1038125s
Iterations: 100000, Workers: 50, Elapsed: 3m25.7957s, Estimated: 3m20s, Loss: 5.7957s
Iterations: 100000, Workers: 100, Elapsed: 1m50.4477704s, Estimated: 1m40s, Loss: 10.4477704s
Iterations: 100000, Workers: 200, Elapsed: 1m10.3049015s, Estimated: 50s, Loss: 20.3049015s
Iterations: 100000, Workers: 500, Elapsed: 1m10.3096703s, Estimated: 20s, Loss: 50.3096703s
TPL:
Processing time: 5ms
Iterations: 1000, Workers: 1, Elapsed: 15.6350619s, Estimated: 5s, Loss: 10.6350619s
Iterations: 1000, Workers: 10, Elapsed: 1.7193998s, Estimated: 0.5s, Loss: 1.2193998s
Iterations: 1000, Workers: 100, Elapsed: 1.7178575s, Estimated: 0.05s, Loss: 1.6678575s
Iterations: 10000, Workers: 50, Elapsed: 3.9055687s, Estimated: 1s, Loss: 2.9055687s
Iterations: 10000, Workers: 100, Elapsed: 3.1233519s, Estimated: 0.5s, Loss: 2.6233519s
Iterations: 10000, Workers: 200, Elapsed: 3.9079247s, Estimated: 0.25s, Loss: 3.6579247s
Iterations: 100000, Workers: 50, Elapsed: 32.0285062s, Estimated: 10s, Loss: 22.0285062s
Iterations: 100000, Workers: 100, Elapsed: 17.2048496s, Estimated: 5s, Loss: 12.2048496s
Iterations: 100000, Workers: 500, Elapsed: 15.1436388s, Estimated: 1s, Loss: 14.1436388s
Iterations: 100000, Workers: 1000, Elapsed: 28.7291761s, Estimated: 0.5s, Loss: 28.2291761s
Iterations: 1000000, Workers: 100, Elapsed: 157.9577418s, Estimated: 50s, Loss: 107.9577418s
Iterations: 1000000, Workers: 200, Elapsed: 81.5264742s, Estimated: 25s, Loss: 56.5264742s
Iterations: 1000000, Workers: 500, Elapsed: 80.9878838s, Estimated: 10s, Loss: 70.9878838s
Iterations: 1000000, Workers: 1000, Elapsed: 144.7310845s, Estimated: 5s, Loss: 139.7310845s
Iteration: 1, Elapsed: 588.3252783s
Processing time: 100ms
Iterations: 1000, Workers: 10, Elapsed: 12.0203535s, Estimated: 10s, Loss: 2.0203535s
Iterations: 1000, Workers: 100, Elapsed: 12.0300487s, Estimated: 1s, Loss: 11.0300487s
Iterations: 10000, Workers: 50, Elapsed: 27.3427199s, Estimated: 20s, Loss: 7.3427199s
Iterations: 10000, Workers: 100, Elapsed: 21.8628653s, Estimated: 10s, Loss: 11.8628653s
Iterations: 10000, Workers: 200, Elapsed: 27.3254114s, Estimated: 5s, Loss: 22.3254114s
Iterations: 100000, Workers: 50, Elapsed: 224.1880851s, Estimated: 200s, Loss: 24.1880851s
Iterations: 100000, Workers: 100, Elapsed: 119.6893727s, Estimated: 100s, Loss: 19.6893727s

Альтернативный вариант Thread.Sleep и TPL:
Processing time: 5ms
Iterations: 1000, Workers: 1, Elapsed: 5.6992209s, Estimated: 5s, Loss: 0.699220899999999s
Iterations: 1000, Workers: 10, Elapsed: 0.7892746s, Estimated: 0.5s, Loss: 0.2892746s
Iterations: 1000, Workers: 100, Elapsed: 1.2913144s, Estimated: 0.05s, Loss: 1.2413144s
Iterations: 10000, Workers: 50, Elapsed: 7.5742033s, Estimated: 1s, Loss: 6.5742033s
Iterations: 10000, Workers: 100, Elapsed: 7.9033241s, Estimated: 0.5s, Loss: 7.4033241s
Iterations: 10000, Workers: 200, Elapsed: 8.4574322s, Estimated: 0.25s, Loss: 8.2074322s
Iterations: 100000, Workers: 50, Elapsed: 41.7972214s, Estimated: 10s, Loss: 31.7972214s
Iterations: 100000, Workers: 100, Elapsed: 13.6094305s, Estimated: 5s, Loss: 8.6094305s
Iterations: 100000, Workers: 500, Elapsed: 12.7404464s, Estimated: 1s, Loss: 11.7404464s
Iterations: 100000, Workers: 1000, Elapsed: 13.8424686s, Estimated: 0.5s, Loss: 13.3424686s
Iterations: 1000000, Workers: 100, Elapsed: 77.8382217s, Estimated: 50s, Loss: 27.8382217s
Iterations: 1000000, Workers: 200, Elapsed: 66.6656733s, Estimated: 25s, Loss: 41.6656733s
Iterations: 1000000, Workers: 500, Elapsed: 57.0298951s, Estimated: 10s, Loss: 47.0298951s
Iterations: 1000000, Workers: 1000, Elapsed: 53.365894s, Estimated: 5s, Loss: 48.365894s
Iteration: 1, Elapsed: 368.6105906s
Processing time: 100ms
Iterations: 1000, Workers: 10, Elapsed: 11.4807278s, Estimated: 10s, Loss: 1.4807278s
Iterations: 1000, Workers: 100, Elapsed: 16.9978477s, Estimated: 1s, Loss: 15.9978477s
Iterations: 10000, Workers: 50, Elapsed: 36.6479652s, Estimated: 20s, Loss: 16.6479652s
Iterations: 10000, Workers: 100, Elapsed: 24.3926649s, Estimated: 10s, Loss: 14.3926649s
Iterations: 10000, Workers: 200, Elapsed: 31.9125948s, Estimated: 5s, Loss: 26.9125948s
Iterations: 100000, Workers: 50, Elapsed: 206.0061483s, Estimated: 200s, Loss: 6.00614829999998s
Iterations: 100000, Workers: 100, Elapsed: 111.0950787s, Estimated: 100s, Loss: 11.0950787s
Iterations: 100000, Workers: 500, Elapsed: 119.0079956s, Estimated: 20s, Loss: 99.0079956s


Итого:
1. На мелкие асинхронные задачах TPL откровенно сливается.
2. На относительно крупных лучше но, тоже не фонтан.
3. ОС писали похоже не дураки и ей проще поднять кучу потоков, чем разбираться с Task.Delay
Если у Вас нет паранойи, то это еще не значит, что они за Вами не следят.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.