Изучая многопоточность в Хаскелле, сделал простенькую программку, которая создаёт некоторое количество потоков, каждый из которых всего лишь увеличивает на единицу определённое количесвто раз мутабельную переменную (var). При создании каждого потока также на единицу увеличивается другая переменная (cnt); при завершении работы каждый поток уменьшает на единицу эту переменную, а основная программа после создания кучи потоков просто ждёт, пока значение переменной cnt не станет равным нулю...
import IO
import Time
import System
import Control.Concurrent
import Control.Concurrent.MVar
n_threads', n_incs' :: Int
n_threads' = 1000
n_incs' = 1000
-- Командная строка: имя.exe +RTS -K100M -RTS кол-во_потоков кол-во_инкрементов
main = do
arg <- getArgs
let (n_threads, n_incs) = case ((map read arg)::[Int]) of
nt:ni:_ -> (nt, ni)
_ -> (n_threads', n_incs')
t1 <- Time.getClockTime
hSetBuffering stdout NoBuffering
putStr $ "Config: " ++ show n_threads ++ " by " ++ show n_incs ++ "\nStarting... "
var <- newMVar 0 -- Общий счётчик
cnt <- newMVar 0 -- Число активных потоковlet foo 0 = cnt -:= 1
foo n = var +:= 1 >> foo (n-1)
makeThreads 0 = return ()
makeThreads n = cnt +:= 1 >> forkIO (foo n_incs) >> makeThreads (n-1)
waitFor0 = do cnt' <- readMVar cnt
if cnt' == 0 then return ()
else threadDelay 40000 >> waitFor0
makeThreads n_threads
putStr "Finishing... "
waitFor0
val <- takeMVar var
putStrLn $ "Done: " ++ show val
t2 <- Time.getClockTime; printTimeDif t2 t1
where
(+:=), (-:=) :: MVar Int -> Int -> IO ()
var -:= n = var +:= (-n)
var +:= n = do prev <- takeMVar var
var `putMVar` (prev+n)
printTimeDif ft st = putStrLn $ "Time = " ++ show(Time.tdHour td) ++ ":" ++ show(Time.tdMin td) ++ ":"
++ show nsecs ++ "." ++ snms
where
td = ft `Time.diffClockTimes` st
secs = Time.tdSec td
ms = Time.tdPicosec td `div` 1000000000
(nsecs, nms) = if ms < 0 then (secs - 1, ms + 1000) else (secs, ms)
snms = reverse $ take 3 $ (reverse $ show nms) ++ "000"
Интересный тест получился (Celeron 1.8, RAM DualDDR266 512MB, Win2k3, GHC 6.6).
Столбцы под forkIO — время работы лёгких потоков, создаваемых по forkIO, а forkOS — тяжёлых потоков OS (по forkOS).
Иногда (очень редко) шедулер распределяет потоки так, что они вообще не конфликтуют, и время выполнения минимально (0.7 сек), а иногда (тоже редко) начинается своппинг и задачу приходится снимать...
Когда количество потоков больше ста тысяч, то начинаются проблемы на моём железе — большой разброс времени работы. Ни о каком soft real time явно речи идти не может.
С процессами forkOS картинка немного другая — с одной стороны, очень повторяемые результаты, а с другой стороны — большие накладные расходы при количестве процессов от нескольких тысяч и более. Больше 170 тыс. процессов вообще создать не удалось...
Также лёгкие потоки создают очень большую нагрузку на стек и вообще памяти жрут море, с процессами памяти расходуется вроде бы мало...
Вот интересно, как такая программа выглядела бы на Эрланге и Немерле (на макросах), и какие характеристики у этих вариантов...
Re: Playing with Concurrent Haskell
От:
Аноним
Дата:
13.05.07 07:51
Оценка:
для erlang на celeron 1.5, 256mb, erlang-1:10.b.7-1, в своп не уходил.
но сравнение с хаскелем для примера ниже не вполне корректно — без глобальных переменных передача тика идет через сообщение, что при 1млн потоков и мгновенном завершении каждого из них создает огромную очередь в ожидании приема сообщений. 41> rsdn:test(1000000,1).
{ok,30.2363} 42> rsdn:test(1000000,1).
{ok,28.1009} 49> rsdn:test(1,1000000).
{ok,4.58340e-2} 50> rsdn:test(10,100000).
{ok,4.74110e-2} 51> rsdn:test(100,10000).
{ok,4.81870e-2} 53> rsdn:test(1000,1000).
{ok,0.515728} 54> rsdn:test(1000,1000).
{ok,5.49930e-2} 56> rsdn:test(5000,200).
{ok,9.18940e-2} 57> rsdn:test(10000,100).
{ok,0.164270} 58> rsdn:test(10000,100).
{ok,0.140982} 59> rsdn:test(10000,100).
{ok,0.154148} 60> rsdn:test(100000,10).
{ok,1.17681} 61> rsdn:test(100000,10).
кстати, если запустить процедуру ожидания в отдельный поток — можно добиться ускорения работы в 3-4 раза на большом количестве потоков. но все равно разница между 1000000, 1 и 1,1000000 — 2-3 порядка. 68> rsdn:test(1000000,1).
{ok,8.79680} 69> rsdn:test(1000000,1).
{ok,8.77959} 70> rsdn:test(1,1000000).
{ok,4.59500e-2} 71> rsdn:test(1,1000000).
{ok,4.96880e-2}
исправленный код:
Да, разброс с параметрами 333333 3 — 1..40 сек.
Но есть еще интересная опция линковки GHC -threaded. После компиляции с ее применением в RTS появляются дополнительные опции:
-N<n> Use <n> OS threads (default: 1)
-qw Migrate a thread to the current CPU when it is woken up
Если запускать так: +RTS -N2 -qw -RTS, то стабильно получается 2.5 сек. Это всё для версии с forkIO.
Здравствуйте, Аноним, Вы писали:
А>кстати, если запустить процедуру ожидания в отдельный поток — можно добиться ускорения работы в 3-4 раза на большом количестве потоков. но все равно разница между 1000000, 1 и 1,1000000 — 2-3 порядка.
Да, у Эрланга, похоже, действительно очень лёгкие процессы (по сравнению с Хаскеллем)...
Здравствуйте, Аноним, Вы писали:
A> сравнение с хаскелем для примера ниже не вполне корректно — без глобальных переменных передача тика идет через сообщение, что при 1млн потоков и мгновенном завершении каждого из них создает огромную очередь в ожидании приема сообщений
Окей, я переписал программу так, что вместо мутабельных переменных (локальных, кстати) используется обмен сообщениями через три канала Chan: ticker, finish и result. Если я правильно понял программу на Эрланге — принцип примерно такой же.
Программа создаёт три канала и соответственно процессы для работы с этими каналами:
1) Процесс tick считывает с канала ticker сообщения типа Ticker. В случае сообщения Tick происходит увеличение на единицу значения параметра k функции tick; сообщение Done сигнализирует об окончании работы — пора выдавать результат через канал result.
2) Процесс wait считывает с канала finish сигнал о том, что очередная копия процесса foo завершила свою работу. Тип сигнала неважен — принимаем тип () — что-то типа unit из F#/OCaml...
Как только количество полученных сигналов finish станет равно количеству запущенных копий процесса foo (что означает завершение всех копий foo) — в канал ticker будет передано сообщение Done.
3) Функция makeThreads запускает нужное количество (n_threads) копий процесса foo.
4) Процессы foo просто посылают в канал ticker сообщения Tick в количестве n_incs штук, а затем — сигнал () в канал finish.
5) Ну и наконец, после создания всех нужных процессов, основной процесс (функция main просто ждёт, когда ему кто-нибудь пришлёт результат в канал result, который затем и распечатает... :о)
import IO
import Time
import System
import Control.Concurrent
import Control.Concurrent.Chan
data Ticker = Tick | Done
n_threads', n_incs' :: Int
n_threads' = 1000; n_incs' = 1000
-- Командная строка: имя.exe +RTS -K100M -RTS кол-во_потоков кол-во_инкрементов
main = do
arg <- getArgs
let (n_threads, n_incs) = case (map read arg) of
nt:ni:_ -> (nt, ni)
_ -> (n_threads', n_incs')
t1 <- Time.getClockTime
hSetBuffering stdout NoBuffering
putStr $ "Config: " ++ show n_threads ++ " by " ++ show n_incs ++ " Starting... "
ticker <- newChan
finish <- newChan
result <- newChan
let foo 0 = finish `writeChan` ()
foo n = ticker `writeChan` Tick >> foo (n-1)
makeThreads 0 = return ()
makeThreads n = forkIO (foo n_incs) >> makeThreads (n-1)
tick :: Int -> IO ()
tick k = do msg <- readChan ticker
case msg of
Tick -> tick (k+1)
Done -> result `writeChan` k
wait 0 = ticker `writeChan` Done
wait n = readChan finish >> wait (n-1)
forkIO (tick 0)
forkIO (wait n_threads)
makeThreads n_threads
putStr "Finishing... "
val <- readChan result
putStr $ "Done: " ++ show val
t2 <- Time.getClockTime; printTimeDif t2 t1
printTimeDif ft st = putStrLn $ " Time = " ++ show(Time.tdHour td) ++ ":" ++ show(Time.tdMin td) ++ ":"
++ show nsecs ++ "." ++ snms
where
td = ft `Time.diffClockTimes` st
secs = Time.tdSec td
ms = Time.tdPicosec td `div` 1000000000
(nsecs, nms) = if ms < 0 then (secs - 1, ms + 1000) else (secs, ms)
snms = reverse $ take 3 $ (reverse $ show nms) ++ "000"
Результаты, честно говоря, удручающие. Результаты привожу только для версии с forkIO, потому что с forkOS результаты вроде не лучше. Запускал с параметрами +RTS -K100M -RTS кол-во_потоков кол-во_инкрементов
следовательно, вывод — на большом количестве потоков erlang рвет своих конкурентов. что и требовалось доказать . а haskell, похоже, скоро станет чем-то вроде common lisp — есть всё, что можно себе представить, пусть и не в лучшей реализации
Здравствуйте, Аноним, Вы писали:
А>следовательно, вывод — на большом количестве потоков erlang рвет своих конкурентов. что и требовалось доказать . а haskell, похоже, скоро станет чем-то вроде common lisp — есть всё, что можно себе представить, пусть и не в лучшей реализации
Да в принципе, конкуренты Эрланга представлены-то и не были. Хаскелл — не конкурент в этих делах, да и не пытается вроде им стать (пока) :о)
Вот любопытно, что даст Хаскеллу Nested Data Parallelism, когда будет реализован... Но опять же, это параллельная обработка данных, а не конкурентная...
Всё таки я не смог успокоиться и сделал третий вариант программы, заменив первоначальные MVar Int на IORef Int. А что бы конкурентные потоки не внесли хаос одновременным изменением IORef-переменной, операции над ними атомарными.
import IO
import Time
import System
import Data.IORef
import Control.Concurrent
n_threads', n_incs' :: Int
n_threads' = 1000; n_incs' = 1000
-- Командная строка: имя.exe +RTS -K100M -RTS кол-во_потоков кол-во_инкрементов
main = do
arg <- getArgs
let (n_threads, n_incs) = case (map read arg) of
nt:ni:_ -> (nt, ni)
_ -> (n_threads', n_incs')
t1 <- Time.getClockTime
hSetBuffering stdout NoBuffering
putStr $ "Config: " ++ show n_threads ++ " by " ++ show n_incs ++ " Starting... "
var <- newIORef 0 -- Общий счётчик
cnt <- newIORef 0 -- Число активных потоковlet foo 0 = cnt -:= 1
foo n = var +:= 1 >> foo (n-1)
makeThreads 0 = return ()
makeThreads n = cnt +:= 1 >> forkIO (foo n_incs) >> makeThreads (n-1)
waitFor0 = do cnt' <- readIORef cnt
if cnt' == 0 then return ()
else threadDelay 40000 >> waitFor0
makeThreads n_threads
putStr "Finishing... "
waitFor0
val <- readIORef var
putStr $ " Done: " ++ show val
t2 <- Time.getClockTime; printTimeDif t2 t1
where
(+:=), (-:=) :: IORef Int -> Int -> IO ()
var -:= n = var +:= (-n)
var +:= n = var `atomicModifyIORef` (\x -> (x+n, ()))
printTimeDif ft st = putStrLn $ " Time = " ++ show(Time.tdHour td) ++ ":" ++ show(Time.tdMin td) ++ ":"
++ show nsecs ++ "." ++ snms
where
td = ft `Time.diffClockTimes` st
secs = Time.tdSec td
ms = Time.tdPicosec td `div` 1000000000
(nsecs, nms) = if ms < 0 then (secs - 1, ms + 1000) else (secs, ms)
snms = reverse $ take 3 $ (reverse $ show nms) ++ "000"
Что порадовало — результаты стали очень стабильными и в целом — неплохими... :о)
Похоже, в многопоточных программах не стоит игнорировать IORef'ы, если это позволяет задача...
Здравствуйте, geniepro, Вы писали:
G>Всё таки я не смог успокоиться и сделал третий вариант программы, заменив первоначальные MVar Int на IORef Int. А что бы конкурентные потоки не внесли хаос одновременным изменением IORef-переменной, операции над ними атомарными.
G>Что порадовало — результаты стали очень стабильными и в целом — неплохими... :о) G>Похоже, в многопоточных программах не стоит игнорировать IORef'ы, если это позволяет задача...
Мне не совсем понятно, что именно измеряет твой бенчмарк. Все-таки тысячи потоков, непрерывно модифицирующие одну глобальную переменную — довольно странный юз-кейс, понятно, что все время будет тратиться на синхронизацию. Надо бы придумать какой-то бенчмарк пореалистичнее. А пока, с вот такой модификацией:
foo n = do when (n `mod` 20==0) $ var +:= 1
foo (n-1)
получается такая картинка:
Config: 1 by 1000000 Starting... Finishing... Done: 50000 Time = 0:0:0.046
Config: 10 by 100000 Starting... Finishing... Done: 50000 Time = 0:0:0.047
Config: 100 by 10000 Starting... Finishing... Done: 50000 Time = 0:0:0.078
Config: 1000 by 1000 Starting... Finishing... Done: 50000 Time = 0:0:0.078
Config: 5000 by 200 Starting... Finishing... Done: 50000 Time = 0:0:0.062
Config: 10000 by 100 Starting... Finishing... Done: 50000 Time = 0:0:0.109
Config: 25000 by 40 Starting... Finishing... Done: 50000 Time = 0:0:0.110
Config: 50000 by 20 Starting... Finishing... Done: 50000 Time = 0:0:0.141
Config: 100000 by 10 Starting... Finishing... Done: 0 Time = 0:0:0.218
Config: 250000 by 4 Starting... Finishing... Done: 0 Time = 0:0:0.594
Config: 333333 by 3 Starting... Finishing... Done: 0 Time = 0:0:0.875
Config: 500000 by 2 Starting... Finishing... Done: 0 Time = 0:0:1.501
Config: 1000000 by 1 Starting... Finishing... Done: 0 Time = 0:0:4.250
Здравствуйте, palm mute, Вы писали:
PM> Мне не совсем понятно, что именно измеряет твой бенчмарк. Все-таки тысячи потоков, непрерывно модифицирующие одну глобальную переменную — довольно странный юз-кейс, понятно, что все время будет тратиться на синхронизацию.
Да меня, вобщем-то, именно накладные расходы на синхронизацию и интересовали, поэтому никакой сложной логики я и не закладывал — просто граничный случай...
А расходы на изменение самих IORef'ов и MVar'ов (в однопоточном режиме) вроде ненамного больше, чем на изменение аргументов функций: IORef — примерно в два раза медленнее, MVar — в 2.7 раза дольше...
PM> А пока, с вот такой модификацией: PM> foo n = do when (n `mod` 20==0) $ var +:= 1 PM> foo (n-1) PM> получается такая картинка:
Так тут уже гораздо (в 20 раз) меньше операций над переменной var получается — понятно, что меньше конфликтов, и программа быстрее работает...
Кстати, что за when такой? Я порылся по библиотеке — не нашёл... Симитировал так: