Как сделать синхронизацию действий потоков?
Есть несколько потоков, один из которых отслеживает изменения в каталоге и его подкаталогах, а остальные обрабатывают файлы из подкаталогов. Необходимо сделать так, чтобы при изменении структуры каталогов отслеживающий поток оповещал об этом все остальные.
Пока идея такая:
в отслеживающем потоке (ОП) создать и захватить мьютекс; в остальных потоках проверять мьютекс на занятость (WaitForSingleObject с нулевым временем ожидания); при изменении структуры ОП освобождает мьютекс, другие потоки синхронизируют стуктуру каталогов с ОП, после чего ОП вновь захватывает мьютекс.
Есть другие варианты?
... <<#1 — 03 The Maxwell L'homme en peluche>>
Здравствуйте, Softwarer, Вы писали:
S>Здравствуйте, _spin_, Вы писали:
S>Наибольший интерес вызывает последний этап — "после чего ОП вновь захватывает мьютекс". Интересно, как он узнает что уже наступило "после чего".
Ещё не знаю.
S>Я бы сказал, это задача на событие + оповещение нескольких подписчиков. Вариантов решения можно назвать много, и api-шными средствами и дополнительными. Например, через TEvent. Я бы, возможно, сделал так:
Возможно и через TEvent, надо подумать.
S>- используется TMultiReadExclusiveWriteSynchronizer, в который обернуты нужные данные и некоторое поле "версия данных".
S>- при изменении структуры ОП захватывает данные в монопольном режиме, модифицирует и увеличивает номер версии.
S>- другие потоки хватают данные в shared режиме, проверяют номер версии, и если нужно, модифицируют свои внутренние структуры.
TMultiReadExclusiveWriteSynchronizer — можно поподробнее что это и с чем его есть положено

? А лучше на промере.
S>Вообще, пункт "другие потоки синхронизируют структуру" выглядит чуть подозрительным — может быть, им правильнее читать одну копию?
У каждого потока — своя копия, которя является частью общей структуры каталогов.
S>И в любом случае — продумайте, что должно происходить, если во время "другой поток синхронизирует" структура меняется повторно.
Всё повторяется, т.к. изменения в каталоге будут считаны после синхронизации (ReadDirectoryChanges) и если изменения есть — новая синхронизация.
... <<#1 — 09 Pimple Jackson Hey you>>
делал я как раз то о чем ты пишешь, все отлично работает. На те пример, это кусок моей проги, так что извиняй что есть кое что лишнее, но принцип поймешь ))
основная идея в том что нет вообще никаких мьютексов, потоки отправляют сообщения с пом PostThreadMessage, a основной поток их обрабатывает
основной поток:
void __fastcall TMainThread::Execute()
{
try{
try{
FDatabase=new TDatabase();
FMonitors=new TObjectList(true);
FDatabase->Log("Start: " +Now().DateTimeString());
PrepareToSend();
FSender=new TSender(GetCurrentThreadId());
FSender->StartTransmit();
DirInfo di;
for(int i=0;i<FDatabase->DirCount;i++ )
{
try{
FDatabase->GetDir(i,&di);
TFolderMonitor *fm=new TFolderMonitor(di.Name,di.IncMask,di.ExcMask,GetCurrentThreadId());
FMonitors->Add(fm);
}
catch(...)
{
}
}
for(int mtr=0;mtr < FMonitors->Count;mtr++)
{
try{
((TFolderMonitor *)FMonitors->Items[mtr])->StartMonitor();
}
catch(...)
{
}
}
TMsg m;
while(GetMessage(&m,NULL,0,0))
{
switch (m.message)
{
case FM_NEWFILE:
{
NewFile(*(AnsiString *)m.lParam);
delete (AnsiString *)m.lParam;
}
break;
case FM_FILECHANGE:
{
FileChange(*(AnsiString *)m.lParam);
delete (AnsiString *)m.lParam;
}
break;
case FM_FILEDELETE:
{
FileDelete(*(AnsiString *)m.lParam);
delete (AnsiString *)m.lParam;
}
break;
case FM_ERROR:
{
Err(*(AnsiString *)m.lParam);
delete (AnsiString *)m.lParam;
}
break;
case FM_SEND:
{
if(PrepareToSend())
FSender->RestartTransmit();
}
break;
}
}
FDatabase->Log("Shutdown: " +Now().DateTimeString());
}
__finally
{
if(FMonitors) delete FMonitors;
if(FDatabase) delete FDatabase;
if(FSender) delete FSender;
}
}
catch(Exception &ex)
{
}
}
класс потока монитора:
#ifndef TFolderMonitorThreadH
#define TFolderMonitorThreadH
//---------------------------------------------------------------------------
#include <Classes.hpp>
//---------------------------------------------------------------------------
typedef void __fastcall (__closure *TMsgEvent)(AnsiString msg);
class TFolderMonitorThread : public TThread
{
private:
HANDLE FEvents[2];
HANDLE FDirHandle;
BYTE *FBuffer;
OVERLAPPED FOverlapped;
TMsgEvent FOnError;
TMsgEvent FOnNewFile;
TMsgEvent FOnFileChange;
TMsgEvent FOnFileDelete;
void __fastcall ParseNotifyBuffer();
protected:
void __fastcall Execute();
public:
__fastcall ~TFolderMonitorThread();
__fastcall TFolderMonitorThread(HANDLE StopEvent,HANDLE DirHandle);
__property TMsgEvent OnError = { read=FOnError, write=FOnError };
__property TMsgEvent OnNewFile = { read=FOnNewFile, write=FOnNewFile };
__property TMsgEvent OnFileChange = { read=FOnFileChange, write=FOnFileChange };
__property TMsgEvent OnFileDelete = { read=FOnFileDelete, write=FOnFileDelete };
};
//---------------------------------------------------------------------------
#endif
его реализация:
#include <vcl.h>
#pragma hdrstop
#include "TFolderMonitorThread.h"
#pragma package(smart_init)
#define BUFFSIZE 65530
__fastcall TFolderMonitorThread::TFolderMonitorThread(HANDLE StopEvent,HANDLE DirHandle)
: TThread(true)
{
FDirHandle=DirHandle;
FEvents[0]=StopEvent;
FEvents[1]=NULL;
FBuffer=NULL;
FreeOnTerminate=false;
try
{
FEvents[1]= CreateEvent(NULL, true, false,NULL);
Win32Check((int)FEvents[1]);
FOverlapped.hEvent=FEvents[1];
FBuffer=new BYTE[BUFFSIZE];
}
catch(...)
{
if(FEvents[1]) CloseHandle(FEvents[1]);
if(FBuffer) delete FBuffer;
}
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitorThread::Execute()
{
DWORD BytesRead;
DWORD WaitResult;
if(WaitForSingleObject(FEvents[0],1)!= WAIT_TIMEOUT) return;
while(!Terminated)
{
if(!ReadDirectoryChangesW (FDirHandle, FBuffer,BUFFSIZE, true, FILE_NOTIFY_CHANGE_FILE_NAME |FILE_NOTIFY_CHANGE_SIZE,
&BytesRead, &FOverlapped, NULL))
{
if(FOnError) FOnError(SysErrorMessage(GetLastError()));
return;
}
WaitResult=WaitForMultipleObjects(2,FEvents,false,INFINITE);
if(WaitResult==WAIT_OBJECT_0) return;
if(WaitResult!=WAIT_OBJECT_0+1)
{
if(FOnError) FOnError(SysErrorMessage(GetLastError()));
return;
}
if(!GetOverlappedResult(FDirHandle,&FOverlapped,&BytesRead,false))
{
if(FOnError) FOnError(SysErrorMessage(GetLastError()));
return;
}
ParseNotifyBuffer();
}
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitorThread::ParseNotifyBuffer()
{
FILE_NOTIFY_INFORMATION *f;
f=(FILE_NOTIFY_INFORMATION*)FBuffer;
bool NextEntryPresent=true;
while(NextEntryPresent)
{
if(!f->NextEntryOffset) NextEntryPresent=false;
switch (f->Action)
{
case FILE_ACTION_ADDED:
if(FOnNewFile) FOnNewFile(WideCharLenToString(f->FileName,f->FileNameLength / sizeof(wchar_t)));
break;
case FILE_ACTION_REMOVED:
if(FOnFileDelete) FOnFileDelete(WideCharLenToString(f->FileName,f->FileNameLength / sizeof(wchar_t)));
break;
case FILE_ACTION_MODIFIED:
if(FOnFileChange) FOnFileChange(WideCharLenToString(f->FileName,f->FileNameLength / sizeof(wchar_t)));
break;
}
}
}
__fastcall TFolderMonitorThread::~TFolderMonitorThread()
{
if(FEvents[1]) CloseHandle(FEvents[1]);
if(FBuffer) delete FBuffer;
}
класс самого монитора:
//---------------------------------------------------------------------------
#ifndef TFolderMonitorH
#define TFolderMonitorH
#include <Classes.hpp>
#include "TFolderMonitorThread.h"
#include "TScanDir.h"
//---------------------------------------------------------------------------
#define FM_NEWFILE WM_USER+1
#define FM_FILECHANGE WM_USER+2
#define FM_FILEDELETE WM_USER+3
#define FM_ERROR WM_USER+4
class TFolderMonitor : public TObject
{
public:
__fastcall TFolderMonitor(AnsiString DirName,AnsiString incmask,AnsiString excmask, unsigned long TId);
__fastcall ~TFolderMonitor();
void __fastcall StartMonitor();
void __fastcall StopMonitor();
private:
TStringList *Finc;
TStringList *Fexc;
ULONG FTId;
AnsiString FDirName;
HANDLE FDirHandle;
HANDLE FStopEvent;
TFolderMonitorThread *FFMThread;
void __fastcall NewFile(AnsiString Name);
void __fastcall FileChange(AnsiString Name);
void __fastcall FileDelete(AnsiString Name);
void __fastcall Err(AnsiString Msg);
void __fastcall ReadContent();
void __fastcall ContentNewFile(_FileInfo *fi);
};
//---------------------------------------------------------------------------
#endif
его реализация:
#include <vcl.h>
#pragma hdrstop
#include "TScanDir.h"
#include "TFolderMonitorThread.h"
#include "TFolderMonitor.h"
#include "MatchesMaskEx.h"
//---------------------------------------------------------------------------
#pragma package(smart_init)
//---------------------------------------------------------------------------
__fastcall TFolderMonitor::TFolderMonitor(AnsiString DirName,AnsiString incmask,AnsiString excmask, unsigned long TId)
{
Finc=NULL;
Fexc=NULL;
FTId=TId;
FFMThread=NULL;
FStopEvent=NULL;
FDirHandle=INVALID_HANDLE_VALUE;
FDirName=DirName;
try
{
Finc=new TStringList;
Fexc=new TStringList;
Finc->Text=incmask;
Fexc->Text=excmask;
FStopEvent=CreateEvent(NULL,true,false,NULL);
Win32Check((int)FStopEvent);
FDirHandle=CreateFile(FDirName.c_str(), FILE_LIST_DIRECTORY,
FILE_SHARE_READ | FILE_SHARE_DELETE, NULL, OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, 0);
Win32Check(FDirHandle != INVALID_HANDLE_VALUE);
}
catch(Exception &ex)
{
if(Finc) delete Finc;
if(Fexc) delete Fexc;
if(FStopEvent) CloseHandle(FStopEvent);
if(FDirHandle !=INVALID_HANDLE_VALUE) CloseHandle(FDirHandle);
throw Exception(ex);
}
}
//---------------------------------------------------------------------------
__fastcall TFolderMonitor::~TFolderMonitor()
{
StopMonitor();
if(Finc) delete Finc;
if(Fexc) delete Fexc;
if(FStopEvent) CloseHandle(FStopEvent);
if(FDirHandle !=INVALID_HANDLE_VALUE) CloseHandle(FDirHandle);
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::StartMonitor()
{
if(!FFMThread)
{
ReadContent();
ResetEvent(FStopEvent);
FFMThread=new TFolderMonitorThread(FStopEvent,FDirHandle);
FFMThread->OnNewFile=NewFile;
FFMThread->OnFileChange=FileChange;
FFMThread->OnFileDelete=FileDelete;
FFMThread->OnError=Err;
FFMThread->Resume();
}
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::StopMonitor()
{
if(FFMThread)
{
SetEvent(FStopEvent);
FFMThread->WaitFor();
delete FFMThread;
FFMThread=NULL;
}
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::NewFile(AnsiString Name)
{
AnsiString tmp=ExtractFileName(Name);
if(MatchesMaskEx(tmp,Finc,Fexc))
{
AnsiString fullname=IncludeTrailingPathDelimiter(FDirName)+Name;
PostThreadMessage(FTId,FM_NEWFILE,0,(LPARAM)new AnsiString(fullname));
}
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::FileChange(AnsiString Name)
{
AnsiString tmp=ExtractFileName(Name);
if(MatchesMaskEx(tmp,Finc,Fexc))
{
AnsiString fullname=IncludeTrailingPathDelimiter(FDirName)+Name;
PostThreadMessage(FTId,FM_FILECHANGE,0,(LPARAM)new AnsiString(fullname));
}
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::FileDelete(AnsiString Name)
{
AnsiString tmp=ExtractFileName(Name);
if(MatchesMaskEx(tmp,Finc,Fexc))
{
AnsiString fullname=IncludeTrailingPathDelimiter(FDirName)+Name;
PostThreadMessage(FTId,FM_FILEDELETE,0,(LPARAM)new AnsiString(fullname));
}
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::Err(AnsiString Msg)
{
PostThreadMessage(FTId,FM_ERROR,0,(LPARAM)new AnsiString(Msg));
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::ReadContent()
{
TScanDir s;
s.Path=FDirName;
s.IncMask=Finc->Text;
s.ExcMask=Fexc->Text;
s.OnNewFile=ContentNewFile;
s.Scan();
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::ContentNewFile(_FileInfo *fi)
{
AnsiString fullname=fi->Path+fi->Name;
PostThreadMessage(FTId,FM_NEWFILE,0,(LPARAM)new AnsiString(fullname));
}
ну и на последок ф-ция MatchesMaskEx — не критично, но удобно...
bool __fastcall MatchesMaskEx(AnsiString String, TStringList *incmask, TStringList *excmask)
{
bool match=false;
for(int mc=0; (mc < incmask->Count)&& !match; mc++)
{
match=MatchesMask(String,incmask->Strings[mc]);
}
for(int nmc=0;(nmc < excmask->Count) && match; nmc++)
{
match=!MatchesMask(String,excmask->Strings[nmc]);
}
return match;
}
Здравствуйте, iskatel, Вы писали:
I>делал я как раз то о чем ты пишешь, все отлично работает. На те пример, это кусок моей проги, так что извиняй что есть кое что лишнее, но принцип поймешь ))
I>основная идея в том что нет вообще никаких мьютексов, потоки отправляют сообщения с пом PostThreadMessage, a основной поток их обрабатывает
Хоть я и предпочитаю Delphi, но идея понятна. Как-то я забыл про PostThreadMessage

. Попробую сделать по этому принципу.
Спасибо.
... <<#4 — 05 Mambana No reason>>
кстати это полный код модуля, он скомпилится и будет работать, в этих двух классах ничего лишнего, разве что на делфи придется перевести ))