RetroEvent - позволяющий делать операции в процессе ожидания
От: Caracrist http://1pwd.org/
Дата: 26.09.12 23:02
Оценка:
Идея проста: позволить двойную проверку(double check).


  RetroEvent.h
#pragma once

#include <Windows.h>
#include <intrin.h>

class RetroEvent
{
public:
    RetroEvent();
    ~RetroEvent();

    __int64 RequestWait();
    DWORD PerformWait(__int64 index, DWORD timeout);
    void Set();
private:
    __int64 m_setIndex, m_waitIndex, m_waitingCount;
    HANDLE m_eventHandle;
};


  RetroEvent.cpp
#include "stdafx.h"
#include "RetroEvent.h"

RetroEvent::RetroEvent()
{
    m_eventHandle = CreateEvent(NULL, TRUE, FALSE, NULL);
    m_setIndex = 0;
    m_waitIndex = 0;
    m_waitingCount = 0;
}
RetroEvent::~RetroEvent()
{
    CloseHandle(m_eventHandle);
}

__int64 RetroEvent::RequestWait()
{
    return ::InterlockedIncrement64(&m_waitIndex);
}
DWORD RetroEvent::PerformWait(__int64 index, DWORD timeout)
{
    DWORD begin = GetTickCount();
    DWORD res = STATUS_WAIT_0;
    while (true)
    {
        ::InterlockedIncrement64(&m_waitingCount);
        if (::InterlockedCompareExchange64(&m_setIndex,  0, 0) > index)
        {
            ::InterlockedDecrement64(&m_waitingCount);
            break;
        }

        DWORD now = GetTickCount();
        if ((now - begin) > timeout)
        {
            ::InterlockedDecrement64(&m_waitingCount);
            SetLastError(WAIT_TIMEOUT);
            res = WAIT_TIMEOUT;
            break;
        }
        DWORD toWait = timeout - (now - begin);
        res = WaitForSingleObject(m_eventHandle, toWait);
        if (res != STATUS_WAIT_0)
        {
            ::InterlockedDecrement64(&m_waitingCount);
            break;
        }    
        
        if(::InterlockedDecrement64(&m_waitingCount) == 0)
            continue;


        while (::InterlockedCompareExchange64(&m_waitingCount, 0, 0)) SwitchToThread();
    } 
    return res;

}
void RetroEvent::Set()
{
    __int64 setIndex = RequestWait();
    __int64 prevIndex = m_setIndex;
    if (prevIndex > setIndex)
        return;
    if (::InterlockedCompareExchange64(&m_setIndex,  setIndex, prevIndex) != prevIndex)
        return;
    ::InterlockedIncrement64(&m_waitingCount);
    SetEvent(m_eventHandle);
    while (::InterlockedCompareExchange64(&m_waitingCount, 0, 0) > 1) 
    {
        if (::InterlockedCompareExchange64(&m_setIndex,  0, 0) != setIndex)
        {
            ::InterlockedDecrement64(&m_waitingCount);
            return;
        }        
        SwitchToThread();
    }
    ResetEvent(m_eventHandle);
    ::InterlockedDecrement64(&m_waitingCount);
}


Буду рад как критике, так и ссылкам на другие готовые реализации.
~~~~~
~lol~~
~~~ Cracked Minds (головоломки)
Re: PulsarEvent - позволяющий делать операции в процессе ожидания
От: Caracrist http://1pwd.org/
Дата: 28.09.12 22:03
Оценка:
В общем и целом, то что в первом сообщении — это нерабочий код. Там много проблем от голодания до дедлока.
Если кому интересно в медицинских целях можно всё это поискать.

А я предлагаю на рассмотрение исправленный вариант:
  PulsarEvent.h
#pragma once
#include <Windows.h>
#include <intrin.h>

class PulsarEvent
{
public:
    PulsarEvent();
    ~PulsarEvent();

    __int64 BeginWait();
    DWORD PerformWait(__int64 index, DWORD timeout);
    void Pulse();
private:
    __int64 m_setIndex, m_waitIndex, m_waitingCount, m_settingCount;
    long m_setLock;
    HANDLE m_eventHandle;
};

  PulsarEvent.cpp
#include "stdafx.h"
#include "PulsarEvent.h"

PulsarEvent::PulsarEvent()
{
    m_eventHandle = CreateEvent(NULL, TRUE, FALSE, NULL);
    m_setIndex = 0;
    m_waitIndex = 0;
    m_waitingCount = 0;
    m_settingCount = 0;
    m_setLock = 0;
}
PulsarEvent::~PulsarEvent()
{
    CloseHandle(m_eventHandle);
}

__int64 PulsarEvent::BeginWait()
{
    return ::InterlockedIncrement64(&m_waitIndex);
}
DWORD PulsarEvent::PerformWait(__int64 index, DWORD timeout)
{
    DWORD begin = GetTickCount();
    DWORD res = STATUS_WAIT_0;
    while (true)
    {        
        ::InterlockedIncrement64(&m_waitingCount);
        __int64 prevIndex = m_setIndex;
        if (prevIndex > index) {
            ::InterlockedDecrement64(&m_waitingCount);
            break;
        }
        DWORD toWait = INFINITE;
        if (timeout != INFINITE) {
            DWORD now = GetTickCount();
            if ((now - begin) > timeout)
            {
                ::InterlockedDecrement64(&m_waitingCount);
                SetLastError(WAIT_TIMEOUT);
                res = WAIT_TIMEOUT;
                break;
            }
            toWait = timeout - (now - begin);
        }
        res = WaitForSingleObject(m_eventHandle, toWait);
        if (res != STATUS_WAIT_0)
        {
            ::InterlockedDecrement64(&m_waitingCount);
            break;
        }    
        
        if(::InterlockedDecrement64(&m_waitingCount) == 0)
            continue;


        while (m_settingCount && m_waitingCount && (m_setIndex == prevIndex)) 
            SwitchToThread();
    } 
    return res;

}
void PulsarEvent::Pulse()
{
    __int64 setIndex = BeginWait();
    __int64 prevIndex = m_setIndex;
    if (prevIndex > setIndex)
        return;
    if (::InterlockedCompareExchange64(&m_setIndex,  setIndex, prevIndex) != prevIndex)
        return;

    while(::InterlockedCompareExchange(&m_setLock, 1, 0)) 
        SwitchToThread();
    SetEvent(m_eventHandle);
    ::InterlockedExchange(&m_setLock, 0);

    ::InterlockedIncrement64(&m_settingCount);    
    while (m_waitingCount && (m_setIndex == setIndex)) 
        SwitchToThread();

    while(::InterlockedCompareExchange(&m_setLock, 1, 0)) 
        SwitchToThread();
    if (m_setIndex == setIndex) {
        ResetEvent(m_eventHandle);
    }
    ::InterlockedExchange(&m_setLock, 0);
    
    ::InterlockedDecrement64(&m_settingCount);    
}

Я немного изменил названия, так мне кажется правильней...

А вот пример использования:
  SmartSpinMutex
class SmartSpinMutex
{
public:
    SmartSpinMutex() : pEvent(&g_Event), m_lock(0) {}
    void Lock() 
    { 
        for (__int64 index = pEvent->BeginWait(); 
            ::InterlockedCompareExchange(&m_lock, 1, 0); 
            index = pEvent->BeginWait())
        {
            pEvent->PerformWait(index, INFINITE);
        }
    }
    void Unlock()
    {
        ::InterlockedExchange(&m_lock, 0);
        pEvent->Pulse();
    }
private:
    static PulsarEvent g_Event;
    PulsarEvent * pEvent;
    long m_lock;
};
DECLSPEC_SELECTANY PulsarEvent SmartSpinMutex::g_Event;
~~~~~
~lol~~
~~~ Cracked Minds (головоломки)
Re[2]: PulsarEvent - позволяющий делать операции в процессе ожидания
От: Caracrist http://1pwd.org/
Дата: 28.09.12 23:06
Оценка:
Добавлю результаты тестов скорости (I7 HT — 4/8 x64 win7, MS2010):

  Код теста
SmartSpinMutex mtx;
SpinMutex sm;
CRITICAL_SECTION cs;
long cnt = 0;
DWORD WINAPI Entry(void *)
{
    bool first = _InterlockedIncrement(&cnt) == 1;
    while(1)
    {
        //mtx.Lock();
        //sm.Lock();
        EnterCriticalSection(&cs);

        if (!first)    Sleep(1);    
        _InterlockedIncrement(&cnt);

        LeaveCriticalSection(&cs);
        //mtx.Unlock();
        //sm.Unlock();
        if (!first)    Sleep(10);
    }
}


  SmartSpinMutex
class SmartSpinMutex
{
public:
    SmartSpinMutex() : pEvent(&g_Event), m_lock(0) {}
    void Lock() 
    { 
        for (__int64 index = pEvent->BeginWait(); 
            ::_InterlockedIncrement(&m_lock) > 1; 
            index = pEvent->BeginWait())
        {
            pEvent->PerformWait(index, INFINITE);
        }
    }
    void Unlock()
    {
        if (::_InterlockedExchange(&m_lock, 0) != 1)
            pEvent->Pulse();
    }
private:
    static PulsarEvent g_Event;
    PulsarEvent * pEvent;
    long m_lock;
};
DECLSPEC_SELECTANY PulsarEvent SmartSpinMutex::g_Event;


  SpinMutex
class SpinMutex
{
public:
    SpinMutex() : m_lock(0) {}
    void Lock() 
    { 
        while (_InterlockedCompareExchange(&m_lock, 1, 0))
            SwitchToThread();
    }
    void Unlock()
    {
        _InterlockedExchange(&m_lock, 0);
    }
private:
    long m_lock;
};


1 поток
CRITICAL_SECTION: 2,158,300,174
SpinMutex: 2,671,681,914
SmartSpinMutex: 2,052,295,164

2 потока
CRITICAL_SECTION: 1,952,741,620
SpinMutex: 2,422,393,231
SmartSpinMutex: 1,857,165,297

20 потоков
CRITICAL_SECTION: 416,256
SpinMutex: 88,314
SmartSpinMutex: 1,069,619

*cnt после минуты

Резюмирую: пустые или малоконкурентные локи работают так-же, как у критической секции, с конкуренцией требующей ожиданий более эффективен, при этом не требует системных объектов(на указатель больше спинлока, и можно раздавать его хоть каждому объекту в программе)
~~~~~
~lol~~
~~~ Cracked Minds (головоломки)
Re[3]: PulsarEvent - позволяющий делать операции в процессе ожидания
От: Caracrist http://1pwd.org/
Дата: 01.10.12 19:07
Оценка:
Здравствуйте, Caracrist, Вы писали:

После замены:
while (m_settingCount && m_waitingCount && (m_setIndex == prevIndex))

на
while (m_settingCount && (m_setIndex == prevIndex))

замерил ещё один параметр: cpu time
для 20 потоков за минуту работы(на 8 логических):
CRITICAL_SECTION: 0
SpinMutex: 8 минут
SmartSpinMutex: 5 минут

Однако поскольку это время тратится с минимальным приоритетом, то я запустил параллельно в программе ещё 20 потоков вот такого содержания:
DWORD WINAPI Work(void *)
{    
    while(1)
    {
        _InterlockedIncrement64(&wrk);
    }
}

и замерил сколько они успевают сделать работы
после done, каждые десять секунд:

printf("%d set %I64d done %I64d\n", GetCurrentThreadId(), cnt, wrk);


  результаты
CRITICAL_SECTION:
6100 set 3635791 done 865967975
6100 set 6694219 done 1730704524
6100 set 10623651 done 2591867287
6100 set 13566810 done 3459369466
6100 set 16742211 done 4325375554
6100 set 19955911 done 5191390141

SpinMutex:
4384 set 215330 done 872795371
4384 set 792636 done 1742500158
4384 set 1513333 done 2612406125
4384 set 2106909 done 3479599717
4384 set 2760201 done 4349449513
4384 set 3339203 done 5219177745

SmartSpinMutex:
3336 set 99395 done 784914987
3336 set 207087 done 1653246893
3336 set 339514 done 2521517183
3336 set 384181 done 3389750134
3336 set 473368 done 4257453598
3336 set 534001 done 5125593331

Вывод к моему сожалению прост:
игра не стоит свечь...
~~~~~
~lol~~
~~~ Cracked Minds (головоломки)
Re: Final: RetroEvent - позволяющий делать операции в процессе ожидания
От: Caracrist http://1pwd.org/
Дата: 04.10.12 23:32
Оценка:
После долгих мучений был найден удовлетворительный результат.


  PulsarEvent
// для компиляции кода необходимо реализовать используемые классы, либо заменить их
class PulsarEvent
    {
    public:
        PulsarEvent() : m_eventHandle(NULL, FALSE, FALSE, NULL)
        {
            m_setIndex.Set_relaxed(0);
        }

        int64 BeginWait() 
        {
            return m_setIndex.Get();
        }
        DWORD PerformWait(uint64 index)
        {
            while (true)
            { 
                size_t tmp = m_setState.Get_relaxed();
                while(1)
                {
                    if (index < m_setIndex.Get()){
                        return STATUS_WAIT_0;
                    }
                    if (tmp & (size_t)0x80000000)
                    {
                        SwitchToThread();
                        tmp = m_setState.Get();
                        continue;
                    }
                    size_t tmp2 = m_setState.Get_if_Eq_then_Set(tmp, tmp + (size_t)1);
                    if (tmp2 == tmp)
                        break;
                    tmp = tmp2;
                }
                if (index < m_setIndex.Get()){
                    m_setState.Dec_and_Get();
                    return STATUS_WAIT_0;
                }    
                DWORD res = m_eventHandle.WaitEvent();
                if (m_setState.Dec_and_Get() > (size_t)0x80000000)
                    m_eventHandle.SetEvent();
                if (res != STATUS_WAIT_0) {
                    return res;
                }
            }
            return STATUS_WAIT_0;
        }
        void Pulse()
        {
            m_setIndex.Inc_and_Get();
            size_t tmp = m_setState.Get_relaxed();
            while(1)
            {
                if (tmp == 0 || tmp & (size_t)0x80000000)
                    return;
                size_t tmp2 = m_setState.Get_if_Eq_then_Set(tmp, tmp | (size_t)0x80000000);
                if (tmp2 == tmp)
                    break;
                tmp = tmp2;
            }
            m_eventHandle.SetEvent();
            tmp = m_setState.Get_relaxed();
            while(1)
            {
                if(tmp != (size_t)0x80000000) {
                    SwitchToThread();
                    tmp = m_setState.Get();
                } else {
                    m_setState.Set(0);
                    return;
                }
            }
        }
    private:
        Atomic<uint64> m_setIndex;
        Atomic<size_t> m_setState;
        TAutoEventHandle m_eventHandle;
    };

  SmartSpinExclusiveMutex
// для компиляции кода необходимо реализовать используемые классы, либо заменить их
struct SmartSpinExclusiveMutex : SpinExclusiveMutex
    {
    public:
        SmartSpinExclusiveMutex() : pEvent(&g_Event) {}
        
        virtual void ExclusiveLock() const override
        { 
            for (int64 index = pEvent->BeginWait(); 
                !TryExclusiveLock(); 
                index = pEvent->BeginWait())
            {
                pEvent->PerformWait(index);
            }
        }
        virtual void ExclusiveRelease() const override
        {
            SpinExclusiveMutex::ExclusiveRelease();
            pEvent->Pulse();
        }
    private:
        static PulsarEvent g_Event;
        mutable Atomic<int> m_set;
        PulsarEvent * pEvent;
    };
    DECLSPEC_SELECTANY Vrns::PulsarEvent SmartSpinExclusiveMutex::g_Event;


  код теста
// для компиляции кода необходимо реализовать используемые классы, либо заменить их
CSExclusiveMutex csm; //CRITICAL_SECTION
SmartSpinExclusiveMutex ssm;
SpinExclusiveMutex sm;
Atomic<int64> cnt;
Atomic<int64> wrk;

DWORD WINAPI Entry(void*ptr)
{
    IExclusiveMutex * pMutex = (IExclusiveMutex *)ptr;
    bool first = (cnt.Get_and_Inc() == 0);
    while(1)
    {
        {
            ScopeExclusiveLock lock(pMutex);
            cnt.Inc_and_Get();
            if (!first)
                Sleep(cnt.Inc_and_Get() % 11);
        }
        if (!first)
            Sleep(50);
    }
}

DWORD WINAPI Work(void*)
{
    while(1)
    {
        wrk.Inc_and_Get();
    }
}
int _tmain(int argc, _TCHAR* argv[])
{

    DWORD tid;
    for (int i = 0; i < 20; i++)
    {
        CreateThread(0, 0, Entry, (IExclusiveMutex*)&ssm, 0, &tid);
        //CreateThread(0, 0, Entry, (IExclusiveMutex*)&csm, 0, &tid);
        //CreateThread(0, 0, Entry, (IExclusiveMutex*)&sm, 0, &tid);
        //CreateThread(0, 0, Work, 0, 0, &tid);
    }
    while(1)
    {
        Sleep(10000);
        FILETIME ct, et, kt, ut;
        GetProcessTimes(GetCurrentProcess(), &ct, &et, &kt, &ut);
        printf("%I64d\t-\t%I64d\t-\t%d\t-\t%d\n", cnt.Get(), wrk.Get(), kt.dwLowDateTime/156001, ut.dwLowDateTime/156001);
    }
}

Далее прилагается полная распечатка результатов теста:
(количество локов)-(количество инкрементов рабочими потоками)-(кернел время процесса)-(юзер время процесса)
100% у меня это примерно 1300(кернел + юзер) за 10 секунд
ss = SmartSpinExclusiveMutex
cs = CRITICAL_SECTION
s = SpinExclusiveMutex
  8 потоков + работа
ss
37,100,377 — 805,973,938 — 1 — 1,284
77,028,340 — 1,608,787,637 — 1 — 2,572
116,752,553 — 2,393,629,552 — 1 — 3,874
155,335,719 — 3,182,591,378 — 1 — 5,166

cs
68,115,304 — 768,354,784 — 1 — 1,291
138,171,096 — 1,537,447,111 — 1 — 2,581
207,040,949 — 2,295,221,637 — 1 — 3,878
274,574,580 — 3,066,565,603 — 1 — 5,169

s
155,075,234 — 954,114,363 — 50 — 1,232
309,383,733 — 1,914,573,771 — 100 — 2,468
464,083,889 — 2,877,295,662 — 142 — 3,717
624,008,005 — 3,824,859,399 — 184 — 4,970

  8 потоков
ss
58,155,594 — 0 — 62 — 352
111,398,867 — 0 — 106 — 726
174,515,093 — 0 — 164 — 1094
232,094,120 — 0 — 212 — 1442

cs
82,126,069 — 0 — 1 — 288
142,670,151 — 0 — 1 — 538
201,661,782 — 0 — 1 — 773
258,330,973 — 0 — 1 — 1,008

s
101,257,771 — 0 — 264 — 549
199,697,451 — 0 — 556 — 1,080
301,518,533 — 0 — 831 — 1,604
402,098,078 — 0 — 1,089 — 2,253

  2 потока
ss
148,018,238 — 0 — 1 — 585
295,856,685 — 0 — 2 — 1,162
445,146,400 — 0 — 2 — 1,747
593,486,933 — 0 — 2 — 2,332

cs
222,607,335 — 0 — 1 — 579
444,428,823 — 0 — 1 — 1,158
666,785,898 — 0 — 1 — 1,745
890,312,048 — 0 — 1 — 2,331

s
229,022,844 — 0 — 27 — 612
459,918,189 — 0 — 55 — 1,226
688,961,624 — 0 — 90 — 1,832
919,740,920 — 0 — 112 — 2,455

  20 потоков
ss
2,262,081 — 0 — 18 — 35
4,190,718 — 0 — 28 — 60
7,499,464 — 0 — 34 — 93
9,374,988 — 0 — 52 — 128

cs
2,223,472 — 0 — 1 — 11
4,165,688 — 0 — 1 — 15
6,134,292 — 0 — 1 — 23
8,772,057 — 0 — 1 — 32

s
984,945 — 0 — 888 — 220
1,186,733 — 0 — 1797 — 416
1,347,222 — 0 — 2690 — 634
1,403,609 — 0 — 3595 — 843

  20 потоков + работа
ss
12,851,147 — 756,860,203 — 0 — 1,340
31,811,700 — 1,516,441,391 — 0 — 2,676
48,164,419 — 2,272,531,636 — 0 — 4,006
66,210,941 — 3,033,188,978 — 0 — 5,341

cs
40,235,628 — 751,769,795 — 2 — 1,332
81,633,564 — 1,504,524,604 — 2 — 2,673
121,854,728 — 2,247,953,371 — 2 — 4,017
162,036,862 — 2,995,430,821 — 2 — 5,358

s
124,267,420 — 1,104,594,843 — 141 — 1,178
251,639,363 — 2,209,356,302 — 283 — 2,366
388,659,974 — 3,276,558,982 — 406 — 3,581
511,358,318 — 4,395,718,232 — 562 — 4,762


В данном тесте потоки спят в локе от 0 до 10, а между локами 50, то есть критическая масса потоков около 10 даст в результате переодически длинные ожидания. Это объясняет разницу в результатах на 2, 8 и 20 потоков.
В целом: ss во всех случаях отстаёт примерно в 2 раза от cs, тратит не больше 5% кернела в самом сложном для него случае(8 потоков), относительно хорошо справляется с большой конкуренцей с ожиданиями(20 потоков), занимает мало места.
В общем имеет право на существование!
~~~~~
~lol~~
~~~ Cracked Minds (головоломки)
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.