Доброго времени суток!
Недавно столкнулся с задачей:
(привожу задание как оно есть)
Пусть у нас на диске есть файл размером 4 гигабайта. Его можно представить, как 2^30 32-битных беззнаковых чисел. Нужно отсортировать этот файл. То есть программа должна сгенерировать еще один файл размером в 4 гигабайта, в котором содержатся те же числа, что и в исходном, но упорядочены по возрастанию. Стандартные алгоритмы сортировки (qsort, std::sort) напрямую применить невозможно, так как для их выполнения нужно как минимум 4 гигабайта оперативной памяти. Но отсортировать числа на диске возможно, если использовать дополнительное пространство на жестком диске.
Нужно написать консольную программу, которая в argv[1] получает имя файла, который нужно отсортировать (размер файла до 16Gb, размер кратен четырем),
в argv[2] — имя файла, в который нужно записать отсортированные данные.
Ограничения:
1. Программа не может рассчитывать, что возможно выделить более 256Mb памяти.
2. Программа должна эффективно использовать несколько ядер.
Решение получилось следующее:
#include <iostream>
#include <sstream>
#include <stdio.h>
#include <vector>
#include <time.h>
#include <algorithm>
#include <process.h>
#include "windows.h"
#define MEMORY_LIMIT 268435456 // 256Mb
unsigned int nMemForThread = MEMORY_LIMIT / 4;
CRITICAL_SECTION csMergePool;
CRITICAL_SECTION csFileAccess;
CRITICAL_SECTION csChunkName;
CRITICAL_SECTION csMergeError;
CRITICAL_SECTION csSortError;
using namespace std;
// for qsort
int compare(const void *a, const void *b)
{
return ( *(int*)a - *(int*)b );
}
//this function(generateNextChunkFileName) is only to be called within critical section
string generateNextChunkFileName()
{
static int nChunkNum = 0;
string strChunkFileName("chunk_");
ostringstream oss;
oss << nChunkNum;
strChunkFileName += oss.str();
++nChunkNum;
return strChunkFileName;
}
enum errType
{
et_NoError = 0,
et_InputData,
et_Access,
et_Write,
et_Read,
et_Merge,
et_Unexpected
};
/**
* class SorterException
*/
class SorterException
{
errType m_error;
string m_strErrorMessage;
string m_strParam;
public:
static const char *pchExcFileAccessError;
static const char *pchExcInputDataError;
static const char *pchExcReadError;
static const char *pchExcWriteError;
static const char *pchExcMergeError;
static const char *pchExcUnexpectedError;
SorterException(const string &strErrorMessage, errType error) :
m_error(error), m_strErrorMessage(strErrorMessage), m_strParam("") {}
void setParam(const string & strParam)
{
m_strParam = strParam;
}
string getErrorMessage() const
{
return m_strErrorMessage;
}
errType getErrorType() const
{
return m_error;
}
static void generateException(errType error)
{
switch (error)
{
case et_InputData:
throw SorterException(pchExcInputDataError, et_InputData);
case et_Access:
throw SorterException(pchExcFileAccessError, et_Access);
case et_Write:
throw SorterException(pchExcWriteError, et_Write);
case et_Read:
throw SorterException(pchExcReadError, et_Read);
case et_Merge:
throw SorterException(pchExcMergeError, et_Merge);
default:
throw SorterException(pchExcUnexpectedError, et_Unexpected);
}
}
friend ostream & operator<<(ostream &stream, SorterException &exc)
{
stream << exc.m_strErrorMessage;
if (!exc.m_strParam.empty())
{
stream << " " << exc.m_strParam;
}
stream << endl;
return stream;
}
};
//static members
const char* SorterException::pchExcFileAccessError = "Error occured while accessing file";
const char* SorterException::pchExcInputDataError = "Input data is wrong";
const char* SorterException::pchExcReadError = "Error occured while reading from file";
const char* SorterException::pchExcWriteError = "Error occured while writing into file";
const char* SorterException::pchExcMergeError = "Error occured while merging";
const char* SorterException::pchExcUnexpectedError = "Unexpected error";
//---------------------------------------------------------------------------------------------------
struct FileWrapper
{
FILE *file;
const char *m_pchFileName;
long long m_nFileSize;
void open(const char *pchInputFileName, const char *mode)
{
if (!file)
{
m_pchFileName = pchInputFileName;
file = fopen(m_pchFileName, mode);
if (!file)
{
SorterException::generateException(et_Access);
}
}
}
void close()
{
if (file)
{
if (fclose(file))
{
SorterException::generateException(et_Access);
}
file = NULL;
}
}
void write(void *dest, size_t nSize, size_t nCount)
{
fwrite(dest, nSize, nCount, file);
if (ferror(file))
{
SorterException ex(SorterException::pchExcWriteError, et_Write);
ex.setParam(m_pchFileName);
throw ex;
}
}
size_t read(void *dest, size_t nSize, size_t nCount)
{
size_t nReadCount = fread(dest, nSize, nCount, file);
if (ferror(file))
{
SorterException ex(SorterException::pchExcReadError, et_Read);
ex.setParam(m_pchFileName);
throw ex;
}
return nReadCount;
}
long long getSize()
{
if (file && m_nFileSize == 0)
{
int nSeekEnd = _fseeki64(file, 0, SEEK_END);
m_nFileSize = static_cast<long long>(_ftelli64(file));
int nSeekStart = _fseeki64(file, 0, SEEK_SET);
if (nSeekEnd || nSeekStart)
{
SorterException::generateException(et_Access);
}
}
return m_nFileSize;
}
FileWrapper() : file(NULL), m_pchFileName(NULL), m_nFileSize(0) {}
~FileWrapper()
{
try
{
close();
}
catch (...)
{
// write to log
}
}
};
/**
* class Merger
* Merge files
*/
class Merger
{
static string m_strResultOfMergeFileName;
static bool m_bStopMerge;
static bool m_bImmediateExit;
static vector<string> m_vPool2Put;
static size_t m_nMergeIndex;
static errType m_error;
string m_strCurrMergeFileName;
string m_strLeftSource;
string m_strRightSource;
Merger(const Merger&);
Merger& operator = (const Merger&);
void swapPools()
{
m_vPool2Take.clear();
m_vPool2Take.swap(m_vPool2Put);
}
public:
static vector<string> m_vPool2Take;
Merger() {}
void setLeftSourceFileName(const string &strLeft)
{
m_strLeftSource = strLeft;
}
void setRightSourceFileName(const string &strRight)
{
m_strRightSource = strRight;
}
void setCurrMergeFileName(const string &strMergeFileName)
{
m_strCurrMergeFileName = strMergeFileName;
}
const string& getCurrMergeFileName() const
{
return m_strCurrMergeFileName;
}
static void setError(errType error)
{
m_error = error;
}
static errType getError()
{
return m_error;
}
static void setImmediateExit(bool bExit)
{
m_bImmediateExit = bExit;
}
static unsigned __stdcall mergeInThread(void *arg)
{
if (!m_bImmediateExit)
{
Merger *merger = static_cast<Merger*>(arg);
string strFirstChunkFileName;
string strSecondChunkFileName;
while (1)
{
if (m_bStopMerge)
{
break;
}
bool bMerge = false;
// critical section
EnterCriticalSection(&csMergePool);
size_t nShiftIndex = 0;
if (m_vPool2Take.size() > 2)
{
nShiftIndex = m_nMergeIndex + 2;
}
else
{
// left to merge two files
nShiftIndex = m_nMergeIndex + 1;
m_bStopMerge = true;
}
if (nShiftIndex < m_vPool2Take.size())
{
merger->setLeftSourceFileName(m_vPool2Take[m_nMergeIndex]);
merger->setRightSourceFileName(m_vPool2Take[nShiftIndex]);
++m_nMergeIndex;
if (!(m_nMergeIndex % 2))
{
m_nMergeIndex += 2;
}
bMerge = true;
}
else if (m_vPool2Put.size() == m_vPool2Take.size() / 2)
{
merger->swapPools();
m_nMergeIndex = 0;
}
LeaveCriticalSection(&csMergePool);
try
{
if (bMerge)
{
EnterCriticalSection(&csChunkName);
merger->setCurrMergeFileName(generateNextChunkFileName());
LeaveCriticalSection(&csChunkName);
merger->mergeTwoFilesIntoOne();
EnterCriticalSection(&csMergePool);
m_vPool2Put.push_back(merger->getCurrMergeFileName());
LeaveCriticalSection(&csMergePool);
}
}
catch(SorterException &ex)
{
EnterCriticalSection(&csMergeError);
if (ex.getErrorType() != et_NoError)
{
Merger::setError(ex.getErrorType());
m_bStopMerge = true;
}
LeaveCriticalSection(&csMergeError);
}
catch (...)
{
EnterCriticalSection(&csMergeError);
Merger::setError(et_Unexpected);
m_bStopMerge = true;
LeaveCriticalSection(&csMergeError);
}
}
}
_endthreadex(0);
return 0;
}
void mergeTwoFilesIntoOne()
{
FileWrapper left;
left.open(m_strLeftSource.c_str(), "rb");
FileWrapper right;
right.open(m_strRightSource.c_str(), "rb");
long long nLeftSize = left.getSize();
long long nRightSize = right.getSize();
if (m_bStopMerge)
{
m_strCurrMergeFileName = m_strResultOfMergeFileName;
}
FileWrapper mergeFile;
mergeFile.open(m_strCurrMergeFileName.c_str(), "wb");
vector<int> vLeft((nMemForThread / sizeof(int)) / 2);
vector<int> vRight((nMemForThread / sizeof(int)) / 2);
vector<int> vResultOfMerge(nMemForThread / sizeof(int));
long long nCurrElemNumberFromLeft = 0;
long long nCurrElemNumberFromRight = 0;
long long nDefaultElemsToRead = (nMemForThread / sizeof(int)) / 2;
while (nLeftSize && nRightSize)
{
//---------------------read left source--------------------------
long long nElemsToRead = nDefaultElemsToRead;
if (nElemsToRead > nLeftSize / sizeof(int))
{
nElemsToRead = nLeftSize / sizeof(int);
vLeft.resize(static_cast<size_t>(nElemsToRead));
}
nCurrElemNumberFromLeft = left.read(&vLeft[0], sizeof(int), static_cast<size_t>(nElemsToRead));
nLeftSize -= nCurrElemNumberFromLeft * sizeof(int);
//---------------------read right source--------------------------
nElemsToRead = nDefaultElemsToRead;
if (nElemsToRead > nRightSize / sizeof(int))
{
nElemsToRead = nRightSize / sizeof(int);
vRight.resize(static_cast<size_t>(nElemsToRead));
}
nCurrElemNumberFromRight = right.read(&vRight[0], sizeof(int), static_cast<size_t>(nElemsToRead));
nRightSize -= nCurrElemNumberFromRight * sizeof(int);
//---------------------merge in memory and then flush---------------
merge(vLeft.begin(), vLeft.end(), vRight.begin(), vRight.end(), vResultOfMerge.begin());
// flush just merged data
mergeFile.write(&vResultOfMerge[0], sizeof(int), vResultOfMerge.size());
}
left.close();
right.close();
mergeFile.close();
remove(m_strLeftSource.c_str());
remove(m_strRightSource.c_str());
if (nLeftSize || nRightSize)
{
// somthing wrong happend
SorterException::generateException(et_Merge);
}
}
static void setMergeFileName(const string &strFName)
{
m_strResultOfMergeFileName = strFName;
}
};
// static members
string Merger::m_strResultOfMergeFileName;
bool Merger::m_bStopMerge = false;
bool Merger::m_bImmediateExit = false;
vector<string> Merger::m_vPool2Put(0);
vector<string> Merger::m_vPool2Take(0);
size_t Merger::m_nMergeIndex = 0;
errType Merger::m_error = et_NoError;
//---------------------------------------------------------------------------------------------------
/**
* class ChunkSorter
* Read and sort input data from file by small chunks
* After chunk is sorted it is flushed on disk for merge
*/
class ChunkSorter
{
static bool m_bNoMoreData;
static bool m_bAbortExit;
static vector<string> *m_pChunkNamePool; //!< is to be set before using ChunkSorter
static errType m_error;
vector<int> m_vChunkToSort;
ChunkSorter(const ChunkSorter&);
ChunkSorter& operator = (const ChunkSorter&);
void sortAndFlush()
{
while (1)
{
EnterCriticalSection(&csFileAccess);
size_t nReadElems = m_fileToSort.read(&m_vChunkToSort[0], sizeof(int), nMemForThread / sizeof(int));
if (feof(m_fileToSort.file))
{
m_bNoMoreData = true;
}
LeaveCriticalSection(&csFileAccess);
if (nReadElems)
{
// sort data
qsort(&m_vChunkToSort[0], m_vChunkToSort.size(), sizeof(int), compare);
EnterCriticalSection(&csChunkName);
string strChunkFileName = generateNextChunkFileName();
LeaveCriticalSection(&csChunkName);
// flush sorted data
FileWrapper f;
f.open(strChunkFileName.c_str(), "wb");
f.write(&m_vChunkToSort[0], sizeof(int), m_vChunkToSort.size());
f.close();
// critical section
EnterCriticalSection(&csMergePool);
m_pChunkNamePool->push_back(strChunkFileName);
LeaveCriticalSection(&csMergePool);
}
if (m_bNoMoreData || m_bAbortExit)
{
// end up with sorting
m_vChunkToSort.clear();
break;
}
}
}
public:
static FileWrapper m_fileToSort;
static long long m_nFileSize;
ChunkSorter() :
m_vChunkToSort(nMemForThread / sizeof(int))
{
}
static void setError(errType error)
{
m_error = error;
}
static errType getError()
{
return m_error;
}
static unsigned __stdcall thread_func(void* arg)
{
ChunkSorter *sorter = static_cast<ChunkSorter*>(arg);
try
{
if (!m_pChunkNamePool)
{
SorterException::generateException(et_InputData);
}
sorter->sortAndFlush();
}
catch(SorterException &ex)
{
EnterCriticalSection(&csSortError);
if (ex.getErrorType() != et_NoError)
{
ChunkSorter::setError(ex.getErrorType());
}
m_bAbortExit = true;
LeaveCriticalSection(&csSortError);
}
catch (...)
{
EnterCriticalSection(&csSortError);
ChunkSorter::setError(et_Unexpected);
m_bAbortExit = true;
LeaveCriticalSection(&csSortError);
}
_endthreadex(0);
return 0;
}
static void setFileToSort(const char *pchFileName)
{
m_fileToSort.open(pchFileName, "rb");
m_nFileSize = m_fileToSort.getSize();
}
// method is to be called to check input data size after file is set
static void checkInputDataSize()
{
if (m_nFileSize == 0 || m_nFileSize % sizeof(int))
{
SorterException::generateException(et_InputData);
}
}
static void setNamePool(vector<string> *vNamePool)
{
m_pChunkNamePool = vNamePool;
}
};
// static members
FileWrapper ChunkSorter::m_fileToSort;
long long ChunkSorter::m_nFileSize = 0;
bool ChunkSorter::m_bNoMoreData = false;
bool ChunkSorter::m_bAbortExit = false;
errType ChunkSorter::m_error = et_NoError;
vector<string>* ChunkSorter::m_pChunkNamePool = NULL;
//---------------------------------------------------------------------------------------------------
int main (int argc, char* argv[])
{
try
{
if (argc < 2)
{
SorterException::generateException(et_InputData);
}
string strInputFileName = argv[1];
string strOutputFileName = argv[2];
ChunkSorter::setFileToSort(strInputFileName.c_str());
ChunkSorter::checkInputDataSize();
if (ChunkSorter::m_nFileSize <= MEMORY_LIMIT)
{
// data size is small enough to be allocated and sorted in memory
vector<int> vDataToSort(static_cast<size_t>(ChunkSorter::m_nFileSize / sizeof(int)));
ChunkSorter::m_fileToSort.read(&vDataToSort[0], sizeof(int), vDataToSort.capacity());
qsort(&vDataToSort[0], vDataToSort.size(), sizeof(int), compare);
FileWrapper merge;
merge.open(strOutputFileName.c_str(), "wb");
merge.write(&vDataToSort[0], sizeof(int), vDataToSort.size());
}
else
{
ChunkSorter::setNamePool(&Merger::m_vPool2Take);
vector<char> vTime(10);
_strtime_s(&vTime[0], vTime.size());
cout << "start time: " << &vTime[0] << endl;
InitializeCriticalSection(&csMergePool);
InitializeCriticalSection(&csFileAccess);
InitializeCriticalSection(&csChunkName);
InitializeCriticalSection(&csMergeError);
InitializeCriticalSection(&csSortError);
//--------------------------------chunk_sorter initialization------------------------------------
ChunkSorter SortAgent1;
ChunkSorter SortAgent2;
ChunkSorter SortAgent3;
ChunkSorter SortAgent4;
unsigned Agent1ThreadID;
unsigned Agent2ThreadID;
unsigned Agent3ThreadID;
unsigned Agent4ThreadID;
HANDLE hSortThreads[4];
hSortThreads[0] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent1, 0, &Agent1ThreadID);
hSortThreads[1] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent2, 0, &Agent2ThreadID);
hSortThreads[2] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent3, 0, &Agent3ThreadID);
hSortThreads[3] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent4, 0, &Agent4ThreadID);
//------------------------------------merger initialization---------------------------------------
Merger::setMergeFileName(strOutputFileName);
Merger mergerAgent1;
Merger mergerAgent2;
unsigned mergerThredID1;
unsigned mergerThredID2;
HANDLE hMergeThreads[2];
hMergeThreads[0] = (HANDLE)_beginthreadex(NULL, 0, &Merger::mergeInThread, &mergerAgent1, CREATE_SUSPENDED, &mergerThredID1);
hMergeThreads[1] = (HANDLE)_beginthreadex(NULL, 0, &Merger::mergeInThread, &mergerAgent2, CREATE_SUSPENDED, &mergerThredID2);
//-----------------------------------------------------------------------------------------------
// wait until sorters finish their job and then resume threads to merge
WaitForMultipleObjectsEx(4, hSortThreads, true, INFINITE, false);
CloseHandle(hSortThreads[3]);
CloseHandle(hSortThreads[2]);
CloseHandle(hSortThreads[1]);
CloseHandle(hSortThreads[0]);
// check sort error here
bool bSortError = false;
if (ChunkSorter::getError() != et_NoError)
{
Merger::setImmediateExit(true);
bSortError = true;
}
_strtime_s(&vTime[0], vTime.size());
cout << "resume threads time: " << &vTime[0] << endl;
ResumeThread(hMergeThreads[0]);
ResumeThread(hMergeThreads[1]);
// wait untill all threads finish
WaitForMultipleObjectsEx(2, hMergeThreads, true, INFINITE, false);
CloseHandle(hMergeThreads[1]);
CloseHandle(hMergeThreads[0]);
DeleteCriticalSection(&csChunkName);
DeleteCriticalSection(&csFileAccess);
DeleteCriticalSection(&csMergePool);
DeleteCriticalSection(&csMergeError);
DeleteCriticalSection(&csSortError);
if (bSortError)
{
SorterException::generateException(ChunkSorter::getError());
}
// check merge error here
if (Merger::getError() != et_NoError)
{
SorterException::generateException(Merger::getError());
}
_strtime_s(&vTime[0], vTime.size());
cout << "end time: " << &vTime[0] << endl;
}
}
catch (SorterException &ex)
{
cout << ex;
}
catch (...)
{
cerr << "Unknown error" << endl;
}
return 0;
}
Сразу скажу, что сделал изначально с ошибкой — сортирует файлы размером <= 256Mb либо размером кратным этому числу, т.е. файл размером 260Mb не будет отсортирован корректно, потеряется последний int(если sizeof(int) == 4). Но вопрос не в этом, программа компилируется и даже сортирует файл: 4G она обрабатывает за ~4 часа. Просьба покритиковать код, указать на явные ошибки(если есть) работы с потоками(до этой задачи никогда не работал с потоками вообще) ну и проблемы кода в целом.
добавил разметку — Кодт
12.06.10 19:15: Перенесено модератором из 'C/C++' — Кодт
18.06.10 15:10: Перенесено модератором из 'C/C++. Прикладные вопросы'. Пожалуй, собственно к языку реализации тема отношения не имеет. — Кодт