Re[6]: Несколько потоков - быстрее вычичсление?
От: stump http://stump-workshop.blogspot.com/
Дата: 23.05.08 11:46
Оценка: 2 (1)
Здравствуйте, Димчанский, Вы писали:

Д>Здравствуйте, den123, Вы писали:


D>>Если в число таких задач поставить другие мои потоки, то в целом моя задача получит бОльшее количество квантов времени на выполнение.


Д>Ну мне лично это смахивает не некую попытку хака, чтобы заработать на этом копейку и то вопрос, заработаешь ли. С одной стороны да, твоим потокам больше времени будет уделятся, с другой стороны синхронизация на однопроцессорной машине сожрет это преимущество.

Д>Не проще ли тогда просто Real Time приоритет своему потоку сделать на однопроцессорной машине?

Д>Ну а глядя в будущее и не особо запариваясь об однопроцессорных машинах, лучше начать параллелить сейчас, осознавая, что на однопроцессроной будет потеря в производительности, зато на паре ядер можно ожидать прироста.


Кстати, я читал, что ParallelFX смотрит сколько ядер, и решает стоит ли рараллелить задачу.
Понедельник начинается в субботу
Re[7]: Несколько потоков - быстрее вычичсление?
От: Димчанский Литва http://dimchansky.github.io/
Дата: 23.05.08 12:10
Оценка: 9 (1)
Здравствуйте, stump, Вы писали:

S>Кстати, я читал, что ParallelFX смотрит сколько ядер, и решает стоит ли рараллелить задачу.


Да, если мне не изменяет память, то именно так и сделано в Parallel FX. Мне нужна была подобная либа под .NET 2.0, так я на коленке написал свою реализацию, пусть и не полный функционал, но то, что требуется. Там тоже распараллеливание происходит реально по числу процессоров.

Parallel.cs
using System;
using System.Collections.Generic;
using System.Threading;

namespace Dimchansky.Framework.Threading
{
    /// <summary>
    /// Provides support for parallel loops and regions.
    /// </summary>
    public static class Parallel
    {
        /// <summary>
        /// Number of workers thread used in parallel computations by default.
        /// </summary>
        private static readonly int idealNumberOfThreads = Environment.ProcessorCount;

        /// <summary>
        /// Executes each of the provided actions inside a discrete, asynchronous task. 
        /// </summary>
        /// <param name="actions">An array of actions to execute.</param>
        public static void Do(params Action[] actions)
        {
            Do(idealNumberOfThreads, actions);
        }

        /// <summary>
        /// Executes each of the provided actions inside a discrete, asynchronous task. 
        /// </summary>
        /// <param name="numberOfThreads">An array of actions to execute.</param>
        /// <param name="actions">An array of actions to execute.</param>
        public static void Do(int numberOfThreads, params Action[] actions)
        {
            if (actions == null || numberOfThreads < 1)
                return;

            if (actions.Length > 0)
            {
                ForEach(actions,
                        delegate(Action a)
                        {
                            if (a != null)
                                a();
                        },
                        numberOfThreads);
            }
        }

        /// <summary>
        /// Executes a for loop in which iterations may run in parallel. 
        /// </summary>
        /// <param name="fromInclusive">The start index, inclusive.</param>
        /// <param name="toExclusive">The end index, exclusive.</param>
        /// <param name="body">The body to be invoked for each iteration.</param>
        public static void For(int fromInclusive, int toExclusive, Action<int> body)
        {
            For(fromInclusive, toExclusive, body, idealNumberOfThreads);
        }

        /// <summary>
        /// Executes a for loop in which iterations may run in parallel. 
        /// </summary>
        /// <param name="fromInclusive">The start index, inclusive.</param>
        /// <param name="toExclusive">The end index, exclusive.</param>
        /// <param name="body">The body to be invoked for each iteration.</param>
        /// <param name="numberOfThreads">Number of threads to use for executing action.</param>
        public static void For(int fromInclusive, int toExclusive, Action<int> body, int numberOfThreads)
        {
            if (fromInclusive >= toExclusive || body == null || numberOfThreads < 1)
                return;

            parallelForWorker(fromInclusive, toExclusive, body, numberOfThreads);
        }

        /// <summary>
        /// Executes an action for each item in the enumerable data source, where each element may potentially be processed in parallel.
        /// </summary>
        /// <typeparam name="T">The type of the data in the enumerable.</typeparam>
        /// <param name="source">An enumerable data source.</param>
        /// <param name="body">The action to invoke for each element in the source.</param>
        public static void ForEach<T>(IEnumerable<T> source, Action<T> body)
        {
            ForEach(source, body, idealNumberOfThreads);
        }

        /// <summary>
        /// Executes an action for each item in the enumerable data source, where each element may potentially be processed in parallel.
        /// </summary>
        /// <typeparam name="T">The type of the data in the enumerable.</typeparam>
        /// <param name="source">An enumerable data source.</param>
        /// <param name="body">The action to invoke for each element in the source.</param>
        /// <param name="numberOfThreads">Number of threads to use for executing action.</param>
        public static void ForEach<T>(IEnumerable<T> source, Action<T> body, int numberOfThreads)
        {
            if (source == null || body == null || numberOfThreads < 1)
                return;

            T[] array = source as T[];
            if (array != null)
            {
                parallelForEachWorker(array, body, numberOfThreads);
            }
            else
            {
                IList<T> list = source as IList<T>;
                if (list != null)
                {
                    parallelForEachWorker(list, body, numberOfThreads);
                }
                else
                {
                    using (IEnumerator<T> enumerator = source.GetEnumerator())
                    {
                        parallelForEachWorker(enumerator, body, numberOfThreads);
                    }
                }
            }
        }

        #region Helpers

        private static void parallelForWorker(int fromInclusive, int toExclusive, Action<int> body, int numberOfThreads)
        {
            if (toExclusive > fromInclusive)
            {
                int elementsCount = toExclusive - fromInclusive;

                // reduce workers count if possible
                int workItems = Math.Max(Math.Min(elementsCount, numberOfThreads), 1);

                // Divide the list up into chunks
                int chunkSize = Math.Max(elementsCount/workItems, 1);
                int count = workItems;

                // Use an event to wait for all work items
                using (ManualResetEvent mre = new ManualResetEvent(false))
                {
                    List<Exception> innerExceptions = null;
                    object innerExceptionsLocker = new object();

                    // Each work item processes appx 1/Nth of the data items
                    WaitCallback callback = delegate(object state)
                                                {
                                                    int iteration = (int) state;
                                                    int from = chunkSize*iteration + fromInclusive;
                                                    int to = iteration == workItems - 1
                                                                 ? toExclusive
                                                                 : chunkSize*(iteration + 1) + fromInclusive;

                                                    while (from < to)
                                                    {
                                                        try
                                                        {
                                                            body(from++);
                                                        }
                                                        catch (Exception exception)
                                                        {
                                                            lock (innerExceptionsLocker)
                                                            {
                                                                if (innerExceptions == null)
                                                                {
                                                                    innerExceptions = new List<Exception>();
                                                                }
                                                                innerExceptions.Add(exception);
                                                            }
                                                            break;
                                                        }
                                                    }

                                                    if (Interlocked.Decrement(ref count) == 0)
                                                        mre.Set();
                                                };

                    // The ThreadPool is used to process all but one of the 
                    // chunks; the current thread is used for that chunk, 
                    // rather than just blocking.
                    for (int i = 0; i < workItems - 1; i++)
                    {
                        ThreadPool.QueueUserWorkItem(callback, i);
                    }

                    try
                    {
                        callback(workItems - 1);
                    }
                    catch (Exception exception)
                    {
                        lock (innerExceptionsLocker)
                        {
                            if (innerExceptions == null)
                            {
                                innerExceptions = new List<Exception>();
                            }
                            innerExceptions.Add(exception);
                        }
                    }

                    // Wait for all work to complete
                    mre.WaitOne();

                    if (innerExceptions != null)
                    {
                        throw new AggregateException(innerExceptions);
                    }
                }
            }
        }

        private static void parallelForEachWorker<T>(T[] array, Action<T> body, int numberOfThreads)
        {
            int lowerBound = array.GetLowerBound(0);
            int toExclusive = array.GetUpperBound(0) + 1;

            parallelForWorker(lowerBound, toExclusive,
                              delegate(int i) { body(array[i]); }, numberOfThreads);
        }

        private static void parallelForEachWorker<T>(IList<T> list, Action<T> body, int numberOfThreads)
        {
            parallelForWorker(0, list.Count, delegate(int i) { body(list[i]); }, numberOfThreads);
        }

        private static void parallelForEachWorker<T>(IEnumerator<T> enumerator, Action<T> action, int numberOfThreads)
        {
            int count = numberOfThreads;

            // Use an event to wait for all work items to complete
            using (ManualResetEvent mre = new ManualResetEvent(false))
            {
                List<Exception> innerExceptions = null;
                object innerExceptionsLocker = new object();

                // Each work item will continually pull data from the 
                // enumerator and process it until there is no more data
                // to process
                WaitCallback callback = delegate
                                            {
                                                while (true)
                                                {
                                                    T data;
                                                    lock (enumerator)
                                                    {
                                                        if (!enumerator.MoveNext()) break;
                                                        data = enumerator.Current;
                                                    }

                                                    try
                                                    {
                                                        action(data);
                                                    }
                                                    catch (Exception exception)
                                                    {
                                                        lock (innerExceptionsLocker)
                                                        {
                                                            if (innerExceptions == null)
                                                            {
                                                                innerExceptions = new List<Exception>();
                                                            }
                                                            innerExceptions.Add(exception);
                                                        }
                                                        break;
                                                    }
                                                }

                                                if (Interlocked.Decrement(ref count) == 0)
                                                    mre.Set();
                                            };

                // The ThreadPool is used to process all but one of the 
                // chunks; the current  thread is used for that chunk, 
                // rather than just blocking.
                for (int i = 0; i < numberOfThreads - 1; i++)
                {
                    ThreadPool.QueueUserWorkItem(callback, i);
                }

                try
                {
                    callback(numberOfThreads - 1);
                }
                catch (Exception exception)
                {
                    lock (innerExceptionsLocker)
                    {
                        if (innerExceptions == null)
                        {
                            innerExceptions = new List<Exception>();
                        }
                        innerExceptions.Add(exception);
                    }
                }

                // Wait for all work to complete
                mre.WaitOne();

                if (innerExceptions != null)
                {
                    throw new AggregateException(innerExceptions);
                }
            }
        }

        #endregion
    }
}


AggregateException.cs — беспощадно выдрал из Parallel FX
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Security.Permissions;

namespace Dimchansky.Framework.Threading
{
    public delegate TResult Func<T, TResult>(T arg);

    [Serializable, DebuggerDisplay("Count = {InnerExceptions.Count}")]
    public class AggregateException : Exception
    {
        // Fields
        private Exception[] m_innerExceptions;

        // Methods
        public AggregateException()
        {
        }

        public AggregateException(IEnumerable<Exception> innerExceptions)
            : this(null, innerExceptions)
        {
        }

        public AggregateException(string message)
            : base(message)
        {
        }

        [SecurityPermission(SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.SerializationFormatter)]
        protected AggregateException(SerializationInfo info, StreamingContext context)
            : base(info, context)
        {
        }

        public AggregateException(string message, IEnumerable<Exception> innerExceptions)
            : this(message, (innerExceptions == null) ? null : new List<Exception>(innerExceptions))
        {
        }

        private AggregateException(string message, List<Exception> innerExceptions)
            : base(message, ((innerExceptions != null) && (innerExceptions.Count > 0)) ? innerExceptions[0] : null)
        {
            if (innerExceptions == null)
            {
                throw new ArgumentNullException("innerExceptions");
            }
            this.m_innerExceptions = innerExceptions.ToArray();
            for (int i = 0; i < this.m_innerExceptions.Length; i++)
            {
                if (this.m_innerExceptions[i] == null)
                {
                    throw new ArgumentException("An element of innerExceptions was null.");
                }
            }
        }

        public AggregateException(string message, Exception inner)
            : this(message, new Exception[] { inner })
        {
        }

        public AggregateException Flatten(params AggregateException[] exceptions)
        {
            if (exceptions == null)
            {
                throw new ArgumentNullException("exceptions");
            }
            List<Exception> list = new List<Exception>(this.m_innerExceptions);
            AggregateException[] exceptionArray = (AggregateException[])exceptions.Clone();
            for (int i = 0; i < exceptionArray.Length; i++)
            {
                if (exceptionArray[i] == null)
                {
                    throw new ArgumentException("An element in exceptions was null.");
                }
                list.AddRange(exceptionArray[i].InnerExceptions);
            }
            return new AggregateException(this.Message, list.ToArray());
        }

        public void Handle(Func<Exception, bool> handler)
        {
            if (handler == null)
            {
                throw new ArgumentNullException("handler");
            }
            List<Exception> list = null;
            for (int i = 0; i < this.m_innerExceptions.Length; i++)
            {
                if (!handler(this.m_innerExceptions[i]))
                {
                    if (list == null)
                    {
                        list = new List<Exception>();
                    }
                    list.Add(this.m_innerExceptions[i]);
                }
            }
            if (list != null)
            {
                throw new AggregateException(this.Message, list.ToArray());
            }
        }

        public override string ToString()
        {
            string str = base.ToString();
            for (int i = 0; i < this.m_innerExceptions.Length; i++)
            {
                str = string.Format("{0}{1}---> (Inner Exception #{2}) {3}{4}{5}", new object[] { str, Environment.NewLine, i, this.m_innerExceptions[i].ToString(), "<---", Environment.NewLine });
            }
            return str;
        }

        // Properties
        public ReadOnlyCollection<Exception> InnerExceptions
        {
            get
            {
                return new ReadOnlyCollection<Exception>(this.m_innerExceptions);
            }
        }
    }
}


Пример обработки исключений приводил уже здесь
Автор: Димчанский
Дата: 28.02.08
.

Пример распареллеливания кода.
Было:
for (int i = 0; i < 100; i++) { 
  a[i] = a[i]*a[i]; 
}


Стало:
Parallel.For(0, 100, delegate(int i) { 
  a[i] = a[i]*a[i]; 
});
Re[4]: Несколько потоков - быстрее вычичсление?
От: AndrewVK Россия http://blogs.rsdn.org/avk
Дата: 23.05.08 12:34
Оценка:
Здравствуйте, stump, Вы писали:

S>плюс каждое переключение контекста потоков занимает несколько тысяч процессорных тактов.


Переключение контекстов происходит вне зависимости от количества потоков, просто по таймеру. Но время между переключениями контекста для современного процессора это просто вечность.

S>Есть еще один фактор — это кэширование данных. Для оптимизации часть данных с которыми работает поток подгужается в процессорный кэш. Если к этим данным (или даже не именно к этим, а к рядом лежащим) обращаются из другого потока, они выгружаются из кэша и обращение идет непосредственно к памяти.


А вот этот факт имеет место быть.
... <<RSDN@Home 1.2.0 alpha 4 rev. 1082 on Windows Vista 6.0.6001.65536>>
AVK Blog
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.