RWLock2 ;)
От: Atilla Россия  
Дата: 01.10.02 00:16
Оценка:
Тема старая, реализация новая. Конечно, не особо изящная или особо неизящная
Зато поддерживает следующие фичи:
1. Ограниченное кол-во reader'ов
2. Учет приоритетов потоков (поток с меньшим приоритетом пропускает вперед потоки с большим приоритетом).
3. Пишущий поток может временно перейти в режим чтения (и позволить другим потокам тоже читать), а потом вернуться в начальное состояние (DowngradeToReader/UpgradeToWriter)
4. Не блокируется если один и тот же поток несколько раз вызывает AcquireRead/Write, считает вызовы.
5. Теперь вроде бы работает правильно


#include <windows.h>
#include <map>
#include <cassert>

using namespace std;
class RWLock2
{
    enum PriorityLevel
    {
        asap, // используется в методе UpgradeToWriter
        time_critical,
        highest,
        above_normal,
        normal,
        below_normal,
        lowest,
        idle
    };
    struct TInfo // информация о потоке, который работает с объектом или
    {            // ждет его освобождения
        HANDLE unlockEvent; // событие, которое запускает поток
        PriorityLevel baseLevel, currentLevel; // приоритет потока: родной и временный
        bool baseMode, currentMode; // режим работы: родной и временный,
                    // true - read, false - write
        bool pending; // true - поток ждет, false - работает с объектом
        int locks; // кол-во вызовов Acquire минус кол-во вызовов Release
        TInfo(bool mode)
        {
            switch(GetThreadPriority(GetCurrentThread()))
            { // чтоб не привязываться к значениям констант...
            case THREAD_PRIORITY_IDLE: baseLevel=idle; break;
            case THREAD_PRIORITY_LOWEST: baseLevel=lowest; break;
            case THREAD_PRIORITY_BELOW_NORMAL: baseLevel=below_normal; break;
            case THREAD_PRIORITY_NORMAL: baseLevel=normal; break;
            case THREAD_PRIORITY_ABOVE_NORMAL: baseLevel=above_normal; break;
            case THREAD_PRIORITY_HIGHEST: baseLevel=highest; break;
            case THREAD_PRIORITY_TIME_CRITICAL: baseLevel=time_critical; break;
            }
            currentLevel=baseLevel;
            baseMode=currentMode=mode;
            unlockEvent=NULL;
            locks=0;
        }
        ~TInfo()
        {
            if(unlockEvent!=NULL) CloseHandle(unlockEvent);
        }
        HANDLE getLock() // отложенное создание события unlockEvent
        {
            if(unlockEvent==NULL)
                unlockEvent=CreateEvent(NULL, FALSE, FALSE, NULL);
            ResetEvent(unlockEvent);
            return unlockEvent;
        }
    };
    typedef pair<DWORD, TInfo*> thread_pair;
    typedef map<DWORD, TInfo*> thread_map;

    thread_map threads;
    int maxReads; // максимальное кол-во потоков, которые одновременно могут иметь
                // доступ (на чтение) к объекту.
    HANDLE mutex; // защищает объект класса RWLock2

    TInfo* getTInfo(bool mode) // ищем запись о текущем потоке, если ее нет - создаем
    {
        DWORD hThread=GetCurrentThreadId();
        thread_map::iterator fi=threads.find(hThread);
        TInfo* res;
        if(fi==threads.end())
        { // новый поток
            res=new TInfo(mode);
            thread_pair tp;
            tp.first=hThread;
            tp.second=res;
            threads.insert(tp);
        }else 
        {
            res=fi->second;
            assert(!res->currentMode || mode); // запретить смену 
                        // режима с чтения на запись
        }
        return res;
    }
    void leaveObject() // поток выполнил последний Release
    {
        DWORD hThread=GetCurrentThreadId();
        thread_map::iterator fi=threads.find(hThread);
        assert(fi!=threads.end());
        delete fi->second;
        threads.erase(fi);
    }
    TInfo* findPending() // ищем первый попавшийся ждущий поток наибольшего приоритета
    {
        thread_map::iterator it=threads.begin(), end=threads.end();
        if(it==end) return NULL;
        TInfo* best=it->second;
        for(it++;it!=end;it++)
        {
            TInfo* c=it->second;
            if(c->pending && c->currentLevel<best->currentLevel)
                best=c;
        }
        if(best->pending)
            return best;
        else
            return NULL;
    }

    bool writing; // в объект сейчас пишут
    int Reads; // сколько с объектом сейчас работает читателей

    void releaseThreads() // запустить все ждущие потоки, которые могут быть запущены
               // в данный момент
    {
        while(Reads<maxReads && !writing) // пока из объекта можно читать
        {
            TInfo* ti=findPending();
            if(ti==NULL) // никто не ждет
                break;
            if(!ti->currentMode) // writer
            {
                if(Reads>0) // уже читают: писать нельзя, добавлять читателей тоже
                    break; // нежелательно
                ti->pending=false; // ура! можно писать, чем и займемся
                writing=true;
                ti->currentLevel=ti->baseLevel;
                SetEvent(ti->getLock()); // resume thread
            }else
            {
                Reads++; // добавляем читателя
                ti->pending=false;
                ti->currentLevel=ti->baseLevel;
                SetEvent(ti->getLock()); // resume thread
            }
        }
    }
public:
    RWLock2(int max_reads)
    {
        mutex=CreateMutex(NULL, FALSE, NULL);
        maxReads=max_reads;
        writing=false;
        Reads=0;
    }
    ~RWLock2()
    {
        assert(threads.size()==0 && Reads==0 && !writing);
        CloseHandle(mutex);
    }
    void AcquireReader()
    {
        WaitForSingleObject(mutex, INFINITE); // сам RWLock2 тоже должен быть thread-safe
        TInfo* cti=getTInfo(true); // регистрируем текущий поток
        cti->locks++;
        if(cti->locks>1) // повторный вызов Acquire ничего не дает
        {
            ReleaseMutex(mutex);
            return;
        }
        if(writing || Reads==maxReads) // сразу приступить к чтению невозможно
        {
            cti->pending=true;
            HANDLE unlock=cti->getLock();
            ReleaseMutex(mutex);
            WaitForSingleObject(unlock, INFINITE);
        }else
        {
            TInfo* best=findPending();
            bool lowpr=(best!=NULL);
            if(lowpr) lowpr=(best->currentLevel<cti->currentLevel);
            if(lowpr) // объект ждут более серьезные клиенты
            {
                cti->pending=true;
                HANDLE unlock=cti->getLock();
                ReleaseMutex(mutex);
                WaitForSingleObject(unlock, INFINITE);
            }else
            { // нечего ждать
                Reads++;
                cti->pending=false;
                ReleaseMutex(mutex);
            }
        }
    }
    void AcquireWriter()
    {
        WaitForSingleObject(mutex, INFINITE);
        TInfo* cti=getTInfo(false); // регистрируем поток
        assert(!cti->currentMode); // менять режим с чтения на запись нельзя!
        cti->locks++;
        if(cti->locks>1) // повторный вызов Acquire ничего не дает
        {
            ReleaseMutex(mutex);
            return;
        }
        if(writing || Reads>0) // объект занят, ждем
        {
            cti->pending=true;
            HANDLE unlock=cti->getLock();
            ReleaseMutex(mutex);
            WaitForSingleObject(unlock, INFINITE);
        }else // объект свободен, пишем
        {
            assert(threads.size()==1); // а иначе как?
            writing=true;
            cti->pending=false;
            ReleaseMutex(mutex);
        }
    }
    void ReleaseReader()
    {
        WaitForSingleObject(mutex, INFINITE);
        TInfo* cti=getTInfo(true);
        assert(cti->locks>0); // поток должен быть зарегистрирован
        cti->locks--;
        if(cti->locks==0)
        {
            Reads--;
            leaveObject(); // поток удаляем из коллекции
        }
        releaseThreads(); // запускаем потоки, которые задерживал текущий
        ReleaseMutex(mutex);
    }
    void ReleaseWriter()
    {
        WaitForSingleObject(mutex, INFINITE);
        TInfo* cti=getTInfo(false);
        assert(cti->locks>0 && writing);// поток должен быть зарегистрирован для записи
        cti->locks--;
        if(cti->locks==0)
        {
            writing=false; // в объект больше не пишут
            leaveObject(); // последний Release, выкидываем информацию о потоке
        }
        releaseThreads();
        ReleaseMutex(mutex);
    }
    void DowngradeToReader()
    { // был поток пишущим, станет читающим
        WaitForSingleObject(mutex, INFINITE);
        TInfo* cti=getTInfo(false);
        assert(cti->locks>0 && !cti->currentMode); // поток 1) был  2) пишущим
        cti->currentMode=true; // меняем текущий режим
        writing=false; // теперь в объект не пишут
        Reads++;
        releaseThreads(); // у потоков появился шанс прочитать из объекта
        ReleaseMutex(mutex);
    }
    void UpgradeToWriter()
    { // вернем читающему потоку его пишущий статус
        WaitForSingleObject(mutex, INFINITE);
        TInfo* cti=getTInfo(true);
        assert(cti->locks>0 && cti->currentMode && !cti->baseMode);
            // поток 1) был 2) читающим 3) а до этого - пишущим
        cti->currentMode=false;
        Reads--;
        if(Reads==0) // из потока никто не читает, сразу приступем к записи
        {
            writing=true;
            ReleaseMutex(mutex);
        }
        else // будем ждать, пока все читатели дочитают, больше никому читать не дадим
        {
            cti->currentLevel=asap; // для этого выставим максимальный приоритет
            cti->pending=true;
            HANDLE unlock=cti->getLock();
            ReleaseMutex(mutex);
            WaitForSingleObject(unlock, INFINITE);
        }
    }
};
Кр-ть — с.т.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.