Re[6]: Fixed: Interlocked Circle Buffer
От: remark Россия http://www.1024cores.net/
Дата: 10.03.10 12:53
Оценка:
Здравствуйте, Jolly Roger, Вы писали:

R>>Кстати возможностью отслеживания смерти потоков под мьютексом пользуются очень редко, т.к. что бы это имело смысл алгоритм внутри критической секции должен быть полностью lock-free, т.е. теоретически lock-free.


JR>Так ведь и смерть потока внутри мьютекса — событие экстраординарное, я даже сначала не хотел его упоминать Зато захват низкоприоритетным потоком вполне реален, а в этом случае система помогает, она разбудит такого захватчика, если тот-же мьютекс потребуется более высокоприоритетному потоку.


Если речь о потоках, то экстраординарное. А если речь о процессах, общающихся через разделяемую память, то вполне нормальное.
Если имеются потоки разного приоритета, то необходимо использовать полностью lock-free или wait-free алгоритмы, они иммуны к инверсиям приоритетов, смерти потоков и т.д. Благо такие алгоритмы есть.


1024cores — all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[5]: Interlocked Circle Buffer
От: remark Россия http://www.1024cores.net/
Дата: 10.03.10 13:00
Оценка:
Здравствуйте, Caracrist, Вы писали:

S>>
S>>    long read = InterlockedIncrement(&m_reading) % m_size;
S>>    *target = m_buffer[read].data; //если тут произойдёт исключение, то колл. читателей будет неверно.
S>>    m_buffer[read].status = cell_Read;
S>>    finalize_read();
S>>


C>Ура!

C>Наконец кто-то обратил внимание на это
C>Это легко решаемая проблема, весь необходимый finalize переносится в деструктор спецыального класса, добавляется ещё один флажок: cell_Broken
C>и всё работает как часы. однако такой код становится неудобо читаемым, и тут его анализировать(в виртуальной машине класса мозг) будет сложнее. Поэтому перед выкладкой я убрал весь код обработки ошибок.

Согласен.
Кстати, такой алгоритм используется в библиотеке Intel TBB, класс tbb::concurrent_queue. Там можно поглядеть детали реализации.

Хотя с исключением во время потребления элемента не так просто справится, т.к. поток уже сдвинул позицию для потребления, и что потом делать с этим "подвисшим" элементом не понятно.
В целом, я бы просто сказал, что копирование не должно кидать, для надёжности можно добавить проверку на __has_nothrow_assign/__has_nothrow_copy.


1024cores — all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[7]: Fixed: Interlocked Circle Buffer
От: remark Россия http://www.1024cores.net/
Дата: 10.03.10 17:10
Оценка:
Здравствуйте, remark, Вы писали:

R>Скоро я тут покажу как это сделать для очереди на основе фиксированного буфера — один _InterlockedCompareExchange() на операцию. Спин-мьютекс не может быть быстрее, т.к. он подразумевает как минимум один _InterlockedCompareExchange().


Здесь:
http://www.rsdn.ru/forum/cpp/3730905.1.aspx
Автор: remark
Дата: 10.03.10



1024cores — all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[6]: Fixed: Interlocked Circle Buffer
От: remark Россия http://www.1024cores.net/
Дата: 10.03.10 17:13
Оценка:
Здравствуйте, Caracrist, Вы писали:

C>Довёл до ума (если кто хочет посмотреть на это чудо, пишите )

C>После тестов с конечной версией, Interlocked быстрее в целом, в начале не значительно, но чем больше потоков тем больше разница. При 150+ потоках уже на порядки быстрее. Однако, есть ещё одно неприятное свойство: Interlocked вешает всю систему(win7 i7х64 4 x hyperthreading). Процессоры занимаются инвалидацией вместо выполнений других команд. Окна почти по пикселям рисуются.

Попробуй протестировать на том же тесте вот эту — будет интересно услышать результаты:
http://www.rsdn.ru/forum/cpp/3730905.1.aspx
Автор: remark
Дата: 10.03.10



1024cores — all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[7]: Fixed: Interlocked Circle Buffer
От: Caracrist https://1pwd.org/
Дата: 10.03.10 20:06
Оценка:
Здравствуйте, remark, Вы писали:

R>Здравствуйте, Caracrist, Вы писали:


C>>Довёл до ума (если кто хочет посмотреть на это чудо, пишите )

C>>После тестов с конечной версией, Interlocked быстрее в целом, в начале не значительно, но чем больше потоков тем больше разница. При 150+ потоках уже на порядки быстрее. Однако, есть ещё одно неприятное свойство: Interlocked вешает всю систему(win7 i7х64 4 x hyperthreading). Процессоры занимаются инвалидацией вместо выполнений других команд. Окна почти по пикселям рисуются.

R>Попробуй протестировать на том же тесте вот эту — будет интересно услышать результаты:

R>http://www.rsdn.ru/forum/cpp/3730905.1.aspx
Автор: remark
Дата: 10.03.10


R>



Ведёт себя точно также как и мой, нереально ускоряется при очень большом количестве потоков, тоже вешает систему,
но работает в полтора/два раза быстрее моего. Хорошее решение если будет ответ на http://www.rsdn.ru/forum/src/3731081.1.aspx
Автор: Caracrist
Дата: 10.03.10
других проблем пока не вижу. Кстати, хотел сделать подобное, но не смог придумать, что делать в упомянутом случае.
~~~~~
~lol~~
~~~ Single Password Solution
Re[8]: Fixed: Interlocked Circle Buffer
От: remark Россия http://www.1024cores.net/
Дата: 10.03.10 20:32
Оценка:
Здравствуйте, Caracrist, Вы писали:

C>

C>Ведёт себя точно также как и мой, нереально ускоряется при очень большом количестве потоков, тоже вешает систему,
C>но работает в полтора/два раза быстрее моего. Хорошее решение если будет ответ на http://www.rsdn.ru/forum/src/3731081.1.aspx
Автор: Caracrist
Дата: 10.03.10
других проблем пока не вижу. Кстати, хотел сделать подобное, но не смог придумать, что делать в упомянутом случае.


А что за тест и железо, если они работают почти одинаково?

У меня на двухядерном лаптопе получается:
Моя очередь:

cycles/op=48
cycles/op=52
cycles/op=46


Твоя:

cycles/op=338
cycles/op=338
cycles/op=338


Т.е. разница в 7 раз, что примерно соотв. кол-ву атомарных инструкций на операцию.
От кол-ва потоков практически не меняется, пробовал на 64 потоках.

Вот тест:


size_t const thread_count = 4;
size_t const batch_size = 16;
size_t const iter_count = 1000000;

bool volatile g_start = 0;

typedef mpmc_bounded_queue<int> queue_t;

unsigned __stdcall thread_func(void* ctx)
{
    queue_t& queue = *(queue_t*)ctx;
    int data;

    srand((unsigned)time(0) + GetCurrentThreadId());
    size_t pause = rand() % 1000;

    while (g_start == 0)
        SwitchToThread();

    for (size_t i = 0; i != pause; i += 1)
        _mm_pause();

    for (int iter = 0; iter != iter_count; ++iter)
    {
        for (size_t i = 0; i != batch_size; i += 1)
        {
            while (!queue.enqueue(i))
                SwitchToThread();
        }
        for (size_t i = 0; i != batch_size; i += 1)
        {
            while (!queue.dequeue(data))
            SwitchToThread();
        }
    }

    return 0;
}

int main()
{
    for (size_t x = 0; x != 3; x += 1)
    {
        queue_t queue (1024);

        HANDLE threads [thread_count];
        for (int i = 0; i != thread_count; ++i)
        {
            threads[i] = (HANDLE)_beginthreadex(0, 0, thread_func, &queue, 0, 0);
        }

        Sleep(1);

        unsigned __int64 start = __rdtsc();
        g_start = 1;

        WaitForMultipleObjects(thread_count, threads, 1, INFINITE);

        unsigned __int64 end = __rdtsc();
        unsigned __int64 time = end - start;
        std::cout << "cycles/op=" << time / (batch_size * iter_count * 2 * thread_count) << std::endl;
    }
}





1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[9]: Fixed: Interlocked Circle Buffer
От: Caracrist https://1pwd.org/
Дата: 10.03.10 21:11
Оценка:
Здравствуйте, remark, Вы писали:

R>Здравствуйте, Caracrist, Вы писали:


C>>

C>>Ведёт себя точно также как и мой, нереально ускоряется при очень большом количестве потоков, тоже вешает систему,
C>>но работает в полтора/два раза быстрее моего. Хорошее решение если будет ответ на http://www.rsdn.ru/forum/src/3731081.1.aspx
Автор: Caracrist
Дата: 10.03.10
других проблем пока не вижу. Кстати, хотел сделать подобное, но не смог придумать, что делать в упомянутом случае.


R>А что за тест и железо, если они работают почти одинаково?



#define bufsize __int64(1024)
#define retries __int64(10)
#define N __int64(100000)
#define writers_cnt __int64(8)
#define readers_cnt __int64(8)


#define VALUE_TYPE 1
#define BUFFER_TYPE 1

#if VALUE_TYPE == 0
typedef string value_type;
value_type toPush("What de facto");
#elif VALUE_TYPE == 1
typedef int value_type;
value_type toPush(1);
#elif VALUE_TYPE == 2
struct value_type
{
    value_type() {} 
    value_type(int) {} 
    value_type & operator=(const value_type& in)
    {
        Sleep(rand() & 255);
        if (++value > 1) throw;
        --value 
        return *this;
    }
    cctl::volatile_long value;
};
value_type toPush(0);
#endif


#if BUFFER_TYPE == 0
typedef cctl::interlocked_circle_buffer<value_type> circle_buffer;
#elif BUFFER_TYPE == 1
typedef cctl::mutexed_circle_buffer<value_type> circle_buffer;
#elif BUFFER_TYPE == 2
typedef mpmc_bounded_queue<value_type> circle_buffer;
#endif


/*
#define wait
cctl::waitable_circle_buffer<circle_buffer> the_buffer(bufsize);
/*/
circle_buffer the_buffer(bufsize);
//*/

HANDLE start_event= CreateEvent(NULL, true, 0,0);

void Writer(PVOID)
{
    WaitForSingleObject(start_event, 10000);
    for (__int64 i = 0; i <  readers_cnt * N; i++)
    {
#ifdef wait
        the_buffer.push(toPush);
#else
        if (!the_buffer.push(toPush))
        {

            //cout << " !+1 " << tmp << endl;
        }
        else
        {
            SwitchToThread();
            //cout << "+";
            i--;
        }
#endif
    }
}

void Reader(PVOID)
{
    value_type tmp;
    WaitForSingleObject(start_event, 10000);
    for (__int64 i = 0; i < writers_cnt * N; i++)
    {
        //Sleep(1);
#ifdef wait
        the_buffer.pop(&tmp);
#else
        if (!the_buffer.pop(&tmp))
        {
            //cout << "-";
            SwitchToThread();
            i--;
        }
        else
        {
            //std::cout << " !-1 " << tmp << endl;
        }
#endif
    }
}
double Test()
{
    DWORD tid;
    srand(GetTickCount());
    HANDLE writers[writers_cnt], readers[readers_cnt];
    for (int i = 0 ; i < writers_cnt; i++)
        writers[i] = CreateThread(0, 0,(LPTHREAD_START_ROUTINE)Writer, 0, 0, &tid);
    for (int i = 0 ; i < readers_cnt; i++)
        readers[i] = CreateThread(0, 0,(LPTHREAD_START_ROUTINE)Reader, 0, 0, &tid);
    __int64 begin = __rdtsc();
    SetEvent(start_event);
    WaitForMultipleObjects(writers_cnt, writers, true, INFINITE);
    WaitForMultipleObjects(readers_cnt, readers, true, INFINITE);
    __int64 end = __rdtsc();
    ResetEvent(start_event);
    for (int i = 0 ; i < writers_cnt; i++) CloseHandle(writers[i]);
    for (int i = 0 ; i < readers_cnt; i++) CloseHandle(readers[i]);
    if (end == begin) end++;
    double mps = (double(double(writers_cnt) * N * readers_cnt * 1000000000)/double(end - begin));
    cout << "Sent in " << end - begin << " milliseconds, " << mps << "mps" << endl;
    return mps;
}
int _tmain(int argc, _TCHAR* argv[])
{
    cout << "Starting with " << writers_cnt << " sender threads and " << readers_cnt << " reciever threads " << endl;
    cout << "Buffer size: " << the_buffer.size() << endl;
    cout << "Sending " << (writers_cnt * N * readers_cnt) <<" messages"  << endl;
    double total = 0;
    for (int i = 0; i < retries; i++) total+=Test();
    cout << "Average speed: " << total / retries << "mps" << endl;
    return 0;
}


i7 2.66GHz 4xCore HyperThreading
win7x64
32bit threads
Visual Studio 2008 (release)
~~~~~
~lol~~
~~~ Single Password Solution
Re[10]: Fixed: Interlocked Circle Buffer
От: Caracrist https://1pwd.org/
Дата: 10.03.10 21:26
Оценка:
Здравствуйте, Caracrist, Вы писали:

C>Здравствуйте, remark, Вы писали:


R>>Здравствуйте, Caracrist, Вы писали:


C>>>

C>>>Ведёт себя точно также как и мой, нереально ускоряется при очень большом количестве потоков, тоже вешает систему,
C>>>но работает в полтора/два раза быстрее моего. Хорошее решение если будет ответ на http://www.rsdn.ru/forum/src/3731081.1.aspx
Автор: Caracrist
Дата: 10.03.10
других проблем пока не вижу. Кстати, хотел сделать подобное, но не смог придумать, что делать в упомянутом случае.


R>>А что за тест и железо, если они работают почти одинаково?



C>#define bufsize __int64(1024)
C>#define retries __int64(10)
C>#define N __int64(100000)
C>#define writers_cnt __int64(8)
C>#define readers_cnt __int64(8)


C>#define VALUE_TYPE 1
C>#define BUFFER_TYPE 1


C>i7 2.66GHz 4xCore HyperThreading

C>win7x64
C>32bit threads
C>Visual Studio 2008 (release)
64bit threads Такой же результат:

BUFFER_TYPE 0:

Starting with 8 sender threads and 8 reciever threads
Buffer size: 1024
Sending 6400000 messages
Sent in 3855637416 milliseconds, 1.65991e+006mps
Sent in 3590368002 milliseconds, 1.78255e+006mps
Sent in 3631132264 milliseconds, 1.76254e+006mps
Sent in 3703565077 milliseconds, 1.72806e+006mps
Sent in 3966326413 milliseconds, 1.61358e+006mps
Sent in 3785466758 milliseconds, 1.69068e+006mps
Sent in 3723342145 milliseconds, 1.71889e+006mps
Sent in 4017820926 milliseconds, 1.5929e+006mps
Sent in 3811305657 milliseconds, 1.67921e+006mps
Sent in 3776062056 milliseconds, 1.69489e+006mps
Average speed: 1.69232e+006mps

BUFFER_TYPE 1:

Starting with 8 sender threads and 8 reciever threads
Buffer size: 1024
Sending 6400000 messages
Sent in 3206550233 milliseconds, 1.99591e+006mps
Sent in 3236804837 milliseconds, 1.97726e+006mps
Sent in 3235771104 milliseconds, 1.97789e+006mps
Sent in 3181427611 milliseconds, 2.01168e+006mps
Sent in 3203853852 milliseconds, 1.99759e+006mps
Sent in 3168177450 milliseconds, 2.02009e+006mps
Sent in 3238192411 milliseconds, 1.97641e+006mps
Sent in 3234528728 milliseconds, 1.97865e+006mps
Sent in 3225094310 milliseconds, 1.98444e+006mps
Sent in 3226326709 milliseconds, 1.98368e+006mps
Average speed: 1.99036e+006mps

BUFFER_TYPE 2:

Starting with 8 sender threads and 8 reciever threads
Buffer size: 1024
Sending 6400000 messages
Sent in 2533942931 milliseconds, 2.52571e+006mps
Sent in 2472673173 milliseconds, 2.58829e+006mps
Sent in 2111953419 milliseconds, 3.03037e+006mps
Sent in 2482179117 milliseconds, 2.57838e+006mps
Sent in 2460225930 milliseconds, 2.60139e+006mps
Sent in 2360783846 milliseconds, 2.71096e+006mps
Sent in 2419441588 milliseconds, 2.64524e+006mps
Sent in 2361790763 milliseconds, 2.70981e+006mps
Sent in 2386956468 milliseconds, 2.68124e+006mps
Sent in 2346342869 milliseconds, 2.72765e+006mps
Average speed: 2.6799e+006mps


BUFFER_TYPE 0:

Starting with 80 sender threads and 80 reciever threads
Buffer size: 1024
Sending 640000000 messages
Sent in 120578 milliseconds, 5.30777e+012mps
Sent in 77520 milliseconds, 8.25593e+012mps
Sent in 82910 milliseconds, 7.71921e+012mps
Sent in 75049 milliseconds, 8.52776e+012mps
Sent in 90059 milliseconds, 7.10645e+012mps
Sent in 115923 milliseconds, 5.52091e+012mps
Sent in 171828 milliseconds, 3.72465e+012mps
Sent in 268397 milliseconds, 2.38453e+012mps
Sent in 171268 milliseconds, 3.73683e+012mps
Sent in 303200 milliseconds, 2.11082e+012mps
Average speed: 5.43949e+012mps


BUFFER_TYPE 1:

Starting with 80 sender threads and 80 reciever threads
Buffer size: 1024
Sending 640000000 messages
Sent in 143314 milliseconds, 4.46572e+012mps
Sent in 113635 milliseconds, 5.63207e+012mps
Sent in 118024 milliseconds, 5.42263e+012mps
Sent in 160056 milliseconds, 3.9986e+012mps
Sent in 142089 milliseconds, 4.50422e+012mps
Sent in 149221 milliseconds, 4.28894e+012mps
Sent in 5081233 milliseconds, 1.25954e+011mps
Sent in 164228 milliseconds, 3.89702e+012mps
Sent in 157656 milliseconds, 4.05947e+012mps
Sent in 127153 milliseconds, 5.03331e+012mps
Average speed: 4.14279e+012mps


BUFFER_TYPE 2:

Starting with 80 sender threads and 80 reciever threads
Buffer size: 1024
Sending 640000000 messages
Sent in 107283 milliseconds, 5.96553e+012mps
Sent in 65079 milliseconds, 9.8342e+012mps
Sent in 49538 milliseconds, 1.29194e+013mps
Sent in 68178 milliseconds, 9.38719e+012mps
Sent in 53686 milliseconds, 1.19212e+013mps
Sent in 96981 milliseconds, 6.59923e+012mps
Sent in 59298 milliseconds, 1.07929e+013mps
Sent in 135170 milliseconds, 4.73478e+012mps
Sent in 150741 milliseconds, 4.24569e+012mps
Sent in 123354 milliseconds, 5.18832e+012mps
Average speed: 8.15884e+012mps

~~~~~
~lol~~
~~~ Single Password Solution
Re[10]: Fixed: Interlocked Circle Buffer
От: Caracrist https://1pwd.org/
Дата: 10.03.10 22:00
Оценка:
ну и конечно я тестировал версию которая уже малость пофиксина...
я упоминал об этом.


    template<typename T, typename VolatileLongT = volatile_long>
    class circle_buffer_base
    {
    public:
        typedef T value_type;
        typedef VolatileLongT volatile_long_type;
    protected:
        enum cell_status_enum 
        {
            cell_Free = 0,
            cell_Written,
            cell_Full,
            cell_Read
        };
        struct cell_struct    {
            volatile_long_type status;
            T data;
            cell_struct(): status(cell_Free){}
        };

    public:
        circle_buffer_base(size_t size) :
            m_size(size),
            m_buffer(new cell_struct[size]),
            m_writing(-1),
            m_reading(-1){}
        ~circle_buffer_base(){delete [] m_buffer;}
        
        size_t size() { return m_size;}
    protected:
        cell_struct * m_buffer;
        long m_size;
        volatile_long_type m_writing, m_reading;
    };

    template<typename T>
    class interlocked_circle_buffer : public circle_buffer_base<T>
    {
    public:
        interlocked_circle_buffer(size_t size) : 
            circle_buffer_base(size),
            m_read(0),
            m_written(0),
            m_filled(0),
            m_filling(0)
            {}
        
        bool push(const T& value)
        {
            long diff = ++m_filling - m_size;
            if (diff > finalize_read())
            {
                --m_filling;
                return false;
            }
            long write = ++m_writing % m_size;
            m_buffer[write].data = value; 
            m_buffer[write].status = cell_Written;
            //finalize_write();
            return true;
        }
        bool pop(T * target)
        {
            long diff = - --m_filled;
            if (diff > finalize_write())
            {
                ++m_filled;
                return false;
            }
            long read = ++m_reading % m_size;
            *target = m_buffer[read].data;
            m_buffer[read].status = cell_Read;
            //finalize_read();
            return true;
        }
    private:
        long finalize_read()
        {
            long result = 0;
            for (long read = m_read.exchange(-1);
                read != -1;
                ++result, ++read)
            {
                if (m_buffer[read % m_size].status.compare_exchange(cell_Free, cell_Read) != cell_Read)
                {
                    if (result)
                    {
                        m_filling -= result;
                        if (read > m_size) 
                        {
                            m_reading -= m_size; 
                            read -= m_size;
                        }
                    }
                    m_read = read;
                    break;
                }
            }
            return result;
        }
        long finalize_write()
        {
            long result = 0;
            for (long write = m_written.exchange(-1);
                write != -1;
                ++result, ++write)
            {
                if (m_buffer[write % m_size].status.compare_exchange(cell_Full, cell_Written) != cell_Written)
                {
                    if (result)
                    {
                        m_filled += result;
                        if (write > m_size) 
                        {
                            m_writing -= m_size; 
                            write -= m_size;
                        }
                    }
                    m_written = write;
                    break;
                }
            }
            return result;
        }
    protected: // member fields
        volatile_long m_written, m_read
            , m_filling, m_filled;

    };
~~~~~
~lol~~
~~~ Single Password Solution
Re[11]: Fixed: Interlocked Circle Buffer
От: remark Россия http://www.1024cores.net/
Дата: 12.03.10 15:19
Оценка:
Здравствуйте, Caracrist, Вы писали:

C>>>>

C>>>>Ведёт себя точно также как и мой, нереально ускоряется при очень большом количестве потоков, тоже вешает систему,
C>>>>но работает в полтора/два раза быстрее моего. Хорошее решение если будет ответ на http://www.rsdn.ru/forum/src/3731081.1.aspx
Автор: Caracrist
Дата: 10.03.10
других проблем пока не вижу. Кстати, хотел сделать подобное, но не смог придумать, что делать в упомянутом случае.


R>>>А что за тест и железо, если они работают почти одинаково?


У тебя что-то не так в тесте. Не может моя очередь потреблять 1730 тактов на операцию. У меня она ~40-90. С поправкой на HT это будет ~50-180.
100000 операций на поток это маловато. Моя очередь может это обработать за 2мс, а это сравнимо со стоимостью запуска/завершения потока, т.е. получается, что ты стоимость удвоил.
Я затрудняюсь трактовать:
C>Average speed: 1.69232e+006mps
C>Average speed: 5.43949e+012mps
Почему скорость выросла в 1000000 раз?


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.