Тема старая, реализация новая. Конечно, не особо изящная
или особо неизящная
Зато поддерживает следующие фичи:
1. Ограниченное кол-во reader'ов
2. Учет приоритетов потоков (поток с меньшим приоритетом пропускает вперед потоки с большим приоритетом).
3. Пишущий поток может временно перейти в режим чтения (и позволить другим потокам тоже читать), а потом вернуться в начальное состояние (DowngradeToReader/UpgradeToWriter)
4. Не блокируется если один и тот же поток несколько раз вызывает AcquireRead/Write, считает вызовы.
5. Теперь вроде бы работает правильно
#include <windows.h>
#include <map>
#include <cassert>
using namespace std;
class RWLock2
{
enum PriorityLevel
{
asap, // используется в методе UpgradeToWriter
time_critical,
highest,
above_normal,
normal,
below_normal,
lowest,
idle
};
struct TInfo // информация о потоке, который работает с объектом или
{ // ждет его освобождения
HANDLE unlockEvent; // событие, которое запускает поток
PriorityLevel baseLevel, currentLevel; // приоритет потока: родной и временный
bool baseMode, currentMode; // режим работы: родной и временный,
// true - read, false - write
bool pending; // true - поток ждет, false - работает с объектом
int locks; // кол-во вызовов Acquire минус кол-во вызовов Release
TInfo(bool mode)
{
switch(GetThreadPriority(GetCurrentThread()))
{ // чтоб не привязываться к значениям констант...
case THREAD_PRIORITY_IDLE: baseLevel=idle; break;
case THREAD_PRIORITY_LOWEST: baseLevel=lowest; break;
case THREAD_PRIORITY_BELOW_NORMAL: baseLevel=below_normal; break;
case THREAD_PRIORITY_NORMAL: baseLevel=normal; break;
case THREAD_PRIORITY_ABOVE_NORMAL: baseLevel=above_normal; break;
case THREAD_PRIORITY_HIGHEST: baseLevel=highest; break;
case THREAD_PRIORITY_TIME_CRITICAL: baseLevel=time_critical; break;
}
currentLevel=baseLevel;
baseMode=currentMode=mode;
unlockEvent=NULL;
locks=0;
}
~TInfo()
{
if(unlockEvent!=NULL) CloseHandle(unlockEvent);
}
HANDLE getLock() // отложенное создание события unlockEvent
{
if(unlockEvent==NULL)
unlockEvent=CreateEvent(NULL, FALSE, FALSE, NULL);
ResetEvent(unlockEvent);
return unlockEvent;
}
};
typedef pair<DWORD, TInfo*> thread_pair;
typedef map<DWORD, TInfo*> thread_map;
thread_map threads;
int maxReads; // максимальное кол-во потоков, которые одновременно могут иметь
// доступ (на чтение) к объекту.
HANDLE mutex; // защищает объект класса RWLock2
TInfo* getTInfo(bool mode) // ищем запись о текущем потоке, если ее нет - создаем
{
DWORD hThread=GetCurrentThreadId();
thread_map::iterator fi=threads.find(hThread);
TInfo* res;
if(fi==threads.end())
{ // новый поток
res=new TInfo(mode);
thread_pair tp;
tp.first=hThread;
tp.second=res;
threads.insert(tp);
}else
{
res=fi->second;
assert(!res->currentMode || mode); // запретить смену
// режима с чтения на запись
}
return res;
}
void leaveObject() // поток выполнил последний Release
{
DWORD hThread=GetCurrentThreadId();
thread_map::iterator fi=threads.find(hThread);
assert(fi!=threads.end());
delete fi->second;
threads.erase(fi);
}
TInfo* findPending() // ищем первый попавшийся ждущий поток наибольшего приоритета
{
thread_map::iterator it=threads.begin(), end=threads.end();
if(it==end) return NULL;
TInfo* best=it->second;
for(it++;it!=end;it++)
{
TInfo* c=it->second;
if(c->pending && c->currentLevel<best->currentLevel)
best=c;
}
if(best->pending)
return best;
else
return NULL;
}
bool writing; // в объект сейчас пишут
int Reads; // сколько с объектом сейчас работает читателей
void releaseThreads() // запустить все ждущие потоки, которые могут быть запущены
// в данный момент
{
while(Reads<maxReads && !writing) // пока из объекта можно читать
{
TInfo* ti=findPending();
if(ti==NULL) // никто не ждет
break;
if(!ti->currentMode) // writer
{
if(Reads>0) // уже читают: писать нельзя, добавлять читателей тоже
break; // нежелательно
ti->pending=false; // ура! можно писать, чем и займемся
writing=true;
ti->currentLevel=ti->baseLevel;
SetEvent(ti->getLock()); // resume thread
}else
{
Reads++; // добавляем читателя
ti->pending=false;
ti->currentLevel=ti->baseLevel;
SetEvent(ti->getLock()); // resume thread
}
}
}
public:
RWLock2(int max_reads)
{
mutex=CreateMutex(NULL, FALSE, NULL);
maxReads=max_reads;
writing=false;
Reads=0;
}
~RWLock2()
{
assert(threads.size()==0 && Reads==0 && !writing);
CloseHandle(mutex);
}
void AcquireReader()
{
WaitForSingleObject(mutex, INFINITE); // сам RWLock2 тоже должен быть thread-safe
TInfo* cti=getTInfo(true); // регистрируем текущий поток
cti->locks++;
if(cti->locks>1) // повторный вызов Acquire ничего не дает
{
ReleaseMutex(mutex);
return;
}
if(writing || Reads==maxReads) // сразу приступить к чтению невозможно
{
cti->pending=true;
HANDLE unlock=cti->getLock();
ReleaseMutex(mutex);
WaitForSingleObject(unlock, INFINITE);
}else
{
TInfo* best=findPending();
bool lowpr=(best!=NULL);
if(lowpr) lowpr=(best->currentLevel<cti->currentLevel);
if(lowpr) // объект ждут более серьезные клиенты
{
cti->pending=true;
HANDLE unlock=cti->getLock();
ReleaseMutex(mutex);
WaitForSingleObject(unlock, INFINITE);
}else
{ // нечего ждать
Reads++;
cti->pending=false;
ReleaseMutex(mutex);
}
}
}
void AcquireWriter()
{
WaitForSingleObject(mutex, INFINITE);
TInfo* cti=getTInfo(false); // регистрируем поток
assert(!cti->currentMode); // менять режим с чтения на запись нельзя!
cti->locks++;
if(cti->locks>1) // повторный вызов Acquire ничего не дает
{
ReleaseMutex(mutex);
return;
}
if(writing || Reads>0) // объект занят, ждем
{
cti->pending=true;
HANDLE unlock=cti->getLock();
ReleaseMutex(mutex);
WaitForSingleObject(unlock, INFINITE);
}else // объект свободен, пишем
{
assert(threads.size()==1); // а иначе как?
writing=true;
cti->pending=false;
ReleaseMutex(mutex);
}
}
void ReleaseReader()
{
WaitForSingleObject(mutex, INFINITE);
TInfo* cti=getTInfo(true);
assert(cti->locks>0); // поток должен быть зарегистрирован
cti->locks--;
if(cti->locks==0)
{
Reads--;
leaveObject(); // поток удаляем из коллекции
}
releaseThreads(); // запускаем потоки, которые задерживал текущий
ReleaseMutex(mutex);
}
void ReleaseWriter()
{
WaitForSingleObject(mutex, INFINITE);
TInfo* cti=getTInfo(false);
assert(cti->locks>0 && writing);// поток должен быть зарегистрирован для записи
cti->locks--;
if(cti->locks==0)
{
writing=false; // в объект больше не пишут
leaveObject(); // последний Release, выкидываем информацию о потоке
}
releaseThreads();
ReleaseMutex(mutex);
}
void DowngradeToReader()
{ // был поток пишущим, станет читающим
WaitForSingleObject(mutex, INFINITE);
TInfo* cti=getTInfo(false);
assert(cti->locks>0 && !cti->currentMode); // поток 1) был 2) пишущим
cti->currentMode=true; // меняем текущий режим
writing=false; // теперь в объект не пишут
Reads++;
releaseThreads(); // у потоков появился шанс прочитать из объекта
ReleaseMutex(mutex);
}
void UpgradeToWriter()
{ // вернем читающему потоку его пишущий статус
WaitForSingleObject(mutex, INFINITE);
TInfo* cti=getTInfo(true);
assert(cti->locks>0 && cti->currentMode && !cti->baseMode);
// поток 1) был 2) читающим 3) а до этого - пишущим
cti->currentMode=false;
Reads--;
if(Reads==0) // из потока никто не читает, сразу приступем к записи
{
writing=true;
ReleaseMutex(mutex);
}
else // будем ждать, пока все читатели дочитают, больше никому читать не дадим
{
cti->currentLevel=asap; // для этого выставим максимальный приоритет
cti->pending=true;
HANDLE unlock=cti->getLock();
ReleaseMutex(mutex);
WaitForSingleObject(unlock, INFINITE);
}
}
};