Синхронизация нескольких потоков
От: _spin_ Россия  
Дата: 02.11.05 16:56
Оценка:
Как сделать синхронизацию действий потоков?

Есть несколько потоков, один из которых отслеживает изменения в каталоге и его подкаталогах, а остальные обрабатывают файлы из подкаталогов. Необходимо сделать так, чтобы при изменении структуры каталогов отслеживающий поток оповещал об этом все остальные.

Пока идея такая:
в отслеживающем потоке (ОП) создать и захватить мьютекс; в остальных потоках проверять мьютекс на занятость (WaitForSingleObject с нулевым временем ожидания); при изменении структуры ОП освобождает мьютекс, другие потоки синхронизируют стуктуру каталогов с ОП, после чего ОП вновь захватывает мьютекс.

Есть другие варианты?
... <<#1 — 03 The Maxwell L'homme en peluche>>
Не восхрапи на работе, ибо храпом своим разбудишь начальника своего.
Re: Синхронизация нескольких потоков
От: Softwarer http://softwarer.ru
Дата: 02.11.05 17:06
Оценка:
Здравствуйте, _spin_, Вы писали:

Наибольший интерес вызывает последний этап — "после чего ОП вновь захватывает мьютекс". Интересно, как он узнает что уже наступило "после чего".

Я бы сказал, это задача на событие + оповещение нескольких подписчиков. Вариантов решения можно назвать много, и api-шными средствами и дополнительными. Например, через TEvent. Я бы, возможно, сделал так:

— используется TMultiReadExclusiveWriteSynchronizer, в который обернуты нужные данные и некоторое поле "версия данных".

— при изменении структуры ОП захватывает данные в монопольном режиме, модифицирует и увеличивает номер версии.

— другие потоки хватают данные в shared режиме, проверяют номер версии, и если нужно, модифицируют свои внутренние структуры.

Вообще, пункт "другие потоки синхронизируют структуру" выглядит чуть подозрительным — может быть, им правильнее читать одну копию?

И в любом случае — продумайте, что должно происходить, если во время "другой поток синхронизирует" структура меняется повторно.
Re[2]: Синхронизация нескольких потоков
От: _spin_ Россия  
Дата: 02.11.05 17:29
Оценка:
Здравствуйте, Softwarer, Вы писали:

S>Здравствуйте, _spin_, Вы писали:


S>Наибольший интерес вызывает последний этап — "после чего ОП вновь захватывает мьютекс". Интересно, как он узнает что уже наступило "после чего".

Ещё не знаю.

S>Я бы сказал, это задача на событие + оповещение нескольких подписчиков. Вариантов решения можно назвать много, и api-шными средствами и дополнительными. Например, через TEvent. Я бы, возможно, сделал так:

Возможно и через TEvent, надо подумать.

S>- используется TMultiReadExclusiveWriteSynchronizer, в который обернуты нужные данные и некоторое поле "версия данных".

S>- при изменении структуры ОП захватывает данные в монопольном режиме, модифицирует и увеличивает номер версии.
S>- другие потоки хватают данные в shared режиме, проверяют номер версии, и если нужно, модифицируют свои внутренние структуры.
TMultiReadExclusiveWriteSynchronizer — можно поподробнее что это и с чем его есть положено ? А лучше на промере.

S>Вообще, пункт "другие потоки синхронизируют структуру" выглядит чуть подозрительным — может быть, им правильнее читать одну копию?

У каждого потока — своя копия, которя является частью общей структуры каталогов.

S>И в любом случае — продумайте, что должно происходить, если во время "другой поток синхронизирует" структура меняется повторно.

Всё повторяется, т.к. изменения в каталоге будут считаны после синхронизации (ReadDirectoryChanges) и если изменения есть — новая синхронизация.
... <<#1 — 09 Pimple Jackson Hey you>>
Не восхрапи на работе, ибо храпом своим разбудишь начальника своего.
Re[3]: Синхронизация нескольких потоков
От: iskatel  
Дата: 06.11.05 07:46
Оценка: +1
делал я как раз то о чем ты пишешь, все отлично работает. На те пример, это кусок моей проги, так что извиняй что есть кое что лишнее, но принцип поймешь ))
основная идея в том что нет вообще никаких мьютексов, потоки отправляют сообщения с пом PostThreadMessage, a основной поток их обрабатывает

основной поток:

void __fastcall TMainThread::Execute()
{
try{
try{
FDatabase=new TDatabase();
FMonitors=new TObjectList(true);
FDatabase->Log("Start: " +Now().DateTimeString());



PrepareToSend();

FSender=new TSender(GetCurrentThreadId());
FSender->StartTransmit();


DirInfo di;

for(int i=0;i<FDatabase->DirCount;i++ )
   {
   try{
   FDatabase->GetDir(i,&di);
   TFolderMonitor *fm=new TFolderMonitor(di.Name,di.IncMask,di.ExcMask,GetCurrentThreadId());
   FMonitors->Add(fm);
   }
      catch(...)
      {
      }
   }

for(int mtr=0;mtr < FMonitors->Count;mtr++)
   {
   try{
   ((TFolderMonitor *)FMonitors->Items[mtr])->StartMonitor();
   }
   catch(...)
      {
      }
   }

TMsg m;
while(GetMessage(&m,NULL,0,0))
   {
   switch (m.message)
      {
      case FM_NEWFILE:
         {
         NewFile(*(AnsiString *)m.lParam);
         delete (AnsiString *)m.lParam;
         }
      break;
      case FM_FILECHANGE:
         {
         FileChange(*(AnsiString *)m.lParam);
         delete (AnsiString *)m.lParam;
         }
      break;
      case FM_FILEDELETE:
         {
         FileDelete(*(AnsiString *)m.lParam);
         delete (AnsiString *)m.lParam;
         }
      break;
      case FM_ERROR:
         {
         Err(*(AnsiString *)m.lParam);
         delete (AnsiString *)m.lParam;
         }
      break;
      case FM_SEND:
         {
         if(PrepareToSend())
         FSender->RestartTransmit();
         }
      break;
      }
   }
FDatabase->Log("Shutdown: " +Now().DateTimeString());
}

__finally
   {
   if(FMonitors) delete FMonitors;
   if(FDatabase) delete FDatabase;
   if(FSender) delete  FSender;
   }
   }
catch(Exception &ex)
   {
   }
}


класс потока монитора:

#ifndef TFolderMonitorThreadH
#define TFolderMonitorThreadH
//---------------------------------------------------------------------------
#include <Classes.hpp>
//---------------------------------------------------------------------------
typedef void __fastcall (__closure *TMsgEvent)(AnsiString msg);

class TFolderMonitorThread : public TThread
{            
private:
   HANDLE FEvents[2];
   HANDLE FDirHandle;
   BYTE *FBuffer;
   OVERLAPPED FOverlapped;
   TMsgEvent FOnError;
   TMsgEvent FOnNewFile;
   TMsgEvent FOnFileChange;
   TMsgEvent FOnFileDelete;
   void __fastcall ParseNotifyBuffer();

protected:
   void __fastcall Execute();
public:
   __fastcall ~TFolderMonitorThread();
   __fastcall TFolderMonitorThread(HANDLE StopEvent,HANDLE DirHandle);
   __property TMsgEvent OnError  = { read=FOnError, write=FOnError };
   __property TMsgEvent OnNewFile  = { read=FOnNewFile, write=FOnNewFile };
   __property TMsgEvent OnFileChange  = { read=FOnFileChange, write=FOnFileChange };
   __property TMsgEvent OnFileDelete  = { read=FOnFileDelete, write=FOnFileDelete };
};
//---------------------------------------------------------------------------
#endif


его реализация:
#include <vcl.h>
#pragma hdrstop

#include "TFolderMonitorThread.h"
#pragma package(smart_init)
#define BUFFSIZE 65530

__fastcall TFolderMonitorThread::TFolderMonitorThread(HANDLE StopEvent,HANDLE DirHandle)
   : TThread(true)
{

FDirHandle=DirHandle;
FEvents[0]=StopEvent;

FEvents[1]=NULL;
FBuffer=NULL;
FreeOnTerminate=false;
try
   {
   FEvents[1]= CreateEvent(NULL, true, false,NULL);
   Win32Check((int)FEvents[1]);
   FOverlapped.hEvent=FEvents[1];
   FBuffer=new BYTE[BUFFSIZE];
   }
catch(...)
   {
   if(FEvents[1]) CloseHandle(FEvents[1]);
   if(FBuffer) delete FBuffer;
   }
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitorThread::Execute()
{
DWORD BytesRead;
DWORD WaitResult;
if(WaitForSingleObject(FEvents[0],1)!= WAIT_TIMEOUT) return;
while(!Terminated)
   {
   if(!ReadDirectoryChangesW (FDirHandle, FBuffer,BUFFSIZE, true, FILE_NOTIFY_CHANGE_FILE_NAME    |FILE_NOTIFY_CHANGE_SIZE,
                              &BytesRead, &FOverlapped, NULL))
      {
      if(FOnError) FOnError(SysErrorMessage(GetLastError()));
      return;
      }
   WaitResult=WaitForMultipleObjects(2,FEvents,false,INFINITE);
   if(WaitResult==WAIT_OBJECT_0) return;
   if(WaitResult!=WAIT_OBJECT_0+1)
      {
      if(FOnError) FOnError(SysErrorMessage(GetLastError()));
      return;
      }
   if(!GetOverlappedResult(FDirHandle,&FOverlapped,&BytesRead,false))
      {
      if(FOnError) FOnError(SysErrorMessage(GetLastError()));
      return;
      }
   ParseNotifyBuffer();
   }
}
//---------------------------------------------------------------------------


void __fastcall TFolderMonitorThread::ParseNotifyBuffer()
{
FILE_NOTIFY_INFORMATION *f;
f=(FILE_NOTIFY_INFORMATION*)FBuffer;
bool NextEntryPresent=true;
while(NextEntryPresent)
   {
   if(!f->NextEntryOffset) NextEntryPresent=false;
   switch (f->Action)
      {
      case FILE_ACTION_ADDED:
         if(FOnNewFile) FOnNewFile(WideCharLenToString(f->FileName,f->FileNameLength / sizeof(wchar_t)));
      break;
      case FILE_ACTION_REMOVED:
         if(FOnFileDelete) FOnFileDelete(WideCharLenToString(f->FileName,f->FileNameLength / sizeof(wchar_t)));
      break;
      case FILE_ACTION_MODIFIED:
         if(FOnFileChange) FOnFileChange(WideCharLenToString(f->FileName,f->FileNameLength / sizeof(wchar_t)));
      break;
      }
   }
}

__fastcall TFolderMonitorThread::~TFolderMonitorThread()
{
   if(FEvents[1]) CloseHandle(FEvents[1]);
   if(FBuffer) delete FBuffer;
}


класс самого монитора:
//---------------------------------------------------------------------------

#ifndef TFolderMonitorH
#define TFolderMonitorH
#include <Classes.hpp>
#include "TFolderMonitorThread.h"
#include "TScanDir.h"
//---------------------------------------------------------------------------
#define FM_NEWFILE WM_USER+1
#define FM_FILECHANGE WM_USER+2
#define FM_FILEDELETE WM_USER+3
#define FM_ERROR WM_USER+4

class TFolderMonitor : public TObject
{
public:
   __fastcall TFolderMonitor(AnsiString DirName,AnsiString incmask,AnsiString excmask, unsigned long TId);
   __fastcall ~TFolderMonitor();
   void __fastcall StartMonitor();
   void __fastcall StopMonitor();
private:
   TStringList *Finc;
   TStringList *Fexc;
   ULONG FTId;
   AnsiString FDirName;
   HANDLE FDirHandle;
   HANDLE FStopEvent;
   TFolderMonitorThread *FFMThread;
   void __fastcall NewFile(AnsiString Name);
   void __fastcall FileChange(AnsiString Name);
   void __fastcall FileDelete(AnsiString Name);
   void __fastcall Err(AnsiString Msg);
   void __fastcall ReadContent();
   void __fastcall ContentNewFile(_FileInfo *fi);
};

//---------------------------------------------------------------------------
#endif


его реализация:
#include <vcl.h>
#pragma hdrstop

#include "TScanDir.h"
#include "TFolderMonitorThread.h"
#include "TFolderMonitor.h"
#include "MatchesMaskEx.h"



//---------------------------------------------------------------------------

#pragma package(smart_init)

//---------------------------------------------------------------------------
__fastcall TFolderMonitor::TFolderMonitor(AnsiString DirName,AnsiString incmask,AnsiString excmask, unsigned long TId)
{
   Finc=NULL;
   Fexc=NULL;
   FTId=TId;
   FFMThread=NULL;
   FStopEvent=NULL;
   FDirHandle=INVALID_HANDLE_VALUE;
   FDirName=DirName;
try
  {
   Finc=new TStringList;
   Fexc=new TStringList;
   Finc->Text=incmask;
   Fexc->Text=excmask;

   FStopEvent=CreateEvent(NULL,true,false,NULL);
   Win32Check((int)FStopEvent);
   FDirHandle=CreateFile(FDirName.c_str(), FILE_LIST_DIRECTORY,
      FILE_SHARE_READ | FILE_SHARE_DELETE, NULL, OPEN_EXISTING,
      FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, 0);
   Win32Check(FDirHandle != INVALID_HANDLE_VALUE);

   }
catch(Exception &ex)
   {
   if(Finc) delete Finc;
   if(Fexc) delete Fexc;
   if(FStopEvent) CloseHandle(FStopEvent);
   if(FDirHandle !=INVALID_HANDLE_VALUE) CloseHandle(FDirHandle);
   throw Exception(ex);
   }
}
//---------------------------------------------------------------------------
__fastcall TFolderMonitor::~TFolderMonitor()
{
StopMonitor();
if(Finc) delete Finc;
if(Fexc) delete Fexc;
if(FStopEvent) CloseHandle(FStopEvent);
if(FDirHandle !=INVALID_HANDLE_VALUE) CloseHandle(FDirHandle);
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::StartMonitor()
{
if(!FFMThread)
   {
   ReadContent();
   ResetEvent(FStopEvent);
   FFMThread=new TFolderMonitorThread(FStopEvent,FDirHandle);
   FFMThread->OnNewFile=NewFile;
   FFMThread->OnFileChange=FileChange;
   FFMThread->OnFileDelete=FileDelete;
   FFMThread->OnError=Err;
   FFMThread->Resume();
   }
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::StopMonitor()
{
if(FFMThread)
   {
   SetEvent(FStopEvent);
   FFMThread->WaitFor();
   delete FFMThread;
   FFMThread=NULL;
   }
}
//---------------------------------------------------------------------------


void __fastcall TFolderMonitor::NewFile(AnsiString Name)
{
AnsiString tmp=ExtractFileName(Name);
if(MatchesMaskEx(tmp,Finc,Fexc))
   {
   AnsiString fullname=IncludeTrailingPathDelimiter(FDirName)+Name;
   PostThreadMessage(FTId,FM_NEWFILE,0,(LPARAM)new AnsiString(fullname));
   }
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::FileChange(AnsiString Name)
{
AnsiString tmp=ExtractFileName(Name);
if(MatchesMaskEx(tmp,Finc,Fexc))
   {
   AnsiString fullname=IncludeTrailingPathDelimiter(FDirName)+Name;
   PostThreadMessage(FTId,FM_FILECHANGE,0,(LPARAM)new AnsiString(fullname));
   }
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::FileDelete(AnsiString Name)
{
AnsiString tmp=ExtractFileName(Name);
if(MatchesMaskEx(tmp,Finc,Fexc))
   {
   AnsiString fullname=IncludeTrailingPathDelimiter(FDirName)+Name;
   PostThreadMessage(FTId,FM_FILEDELETE,0,(LPARAM)new AnsiString(fullname));
   }
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::Err(AnsiString Msg)
{
PostThreadMessage(FTId,FM_ERROR,0,(LPARAM)new AnsiString(Msg));
}
//---------------------------------------------------------------------------

void __fastcall TFolderMonitor::ReadContent()
{
TScanDir s;
s.Path=FDirName;
s.IncMask=Finc->Text;
s.ExcMask=Fexc->Text;
s.OnNewFile=ContentNewFile;
s.Scan();
}
//---------------------------------------------------------------------------
void __fastcall TFolderMonitor::ContentNewFile(_FileInfo *fi)
{
AnsiString fullname=fi->Path+fi->Name;
PostThreadMessage(FTId,FM_NEWFILE,0,(LPARAM)new AnsiString(fullname));
}


ну и на последок ф-ция MatchesMaskEx — не критично, но удобно...
bool __fastcall MatchesMaskEx(AnsiString  String, TStringList *incmask, TStringList *excmask)
{
bool match=false;
for(int mc=0; (mc < incmask->Count)&& !match; mc++)
   {
   match=MatchesMask(String,incmask->Strings[mc]);
   }
for(int nmc=0;(nmc < excmask->Count) && match; nmc++)
   {
   match=!MatchesMask(String,excmask->Strings[nmc]);
   }
return match;
}
Re[4]: Синхронизация нескольких потоков
От: _spin_ Россия  
Дата: 06.11.05 08:02
Оценка:
Здравствуйте, iskatel, Вы писали:

I>делал я как раз то о чем ты пишешь, все отлично работает. На те пример, это кусок моей проги, так что извиняй что есть кое что лишнее, но принцип поймешь ))

I>основная идея в том что нет вообще никаких мьютексов, потоки отправляют сообщения с пом PostThreadMessage, a основной поток их обрабатывает

Хоть я и предпочитаю Delphi, но идея понятна. Как-то я забыл про PostThreadMessage . Попробую сделать по этому принципу.

Спасибо.
... <<#4 — 05 Mambana No reason>>
Не восхрапи на работе, ибо храпом своим разбудишь начальника своего.
Re[5]: Синхронизация нескольких потоков
От: iskatel  
Дата: 06.11.05 08:43
Оценка:
кстати это полный код модуля, он скомпилится и будет работать, в этих двух классах ничего лишнего, разве что на делфи придется перевести ))
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.