сортировка файла
От: letsurf  
Дата: 12.06.10 09:02
Оценка:
Доброго времени суток!

Недавно столкнулся с задачей:

(привожу задание как оно есть)

Пусть у нас на диске есть файл размером 4 гигабайта. Его можно представить, как 2^30 32-битных беззнаковых чисел. Нужно отсортировать этот файл. То есть программа должна сгенерировать еще один файл размером в 4 гигабайта, в котором содержатся те же числа, что и в исходном, но упорядочены по возрастанию. Стандартные алгоритмы сортировки (qsort, std::sort) напрямую применить невозможно, так как для их выполнения нужно как минимум 4 гигабайта оперативной памяти. Но отсортировать числа на диске возможно, если использовать дополнительное пространство на жестком диске.

Нужно написать консольную программу, которая в argv[1] получает имя файла, который нужно отсортировать (размер файла до 16Gb, размер кратен четырем),
в argv[2] — имя файла, в который нужно записать отсортированные данные.

Ограничения:
1. Программа не может рассчитывать, что возможно выделить более 256Mb памяти.
2. Программа должна эффективно использовать несколько ядер.

Решение получилось следующее:
#include <iostream>
#include <sstream>
#include <stdio.h>
#include <vector>
#include <time.h>
#include <algorithm>
#include <process.h>
#include "windows.h"


#define MEMORY_LIMIT 268435456 // 256Mb

unsigned int nMemForThread = MEMORY_LIMIT / 4;

CRITICAL_SECTION csMergePool;
CRITICAL_SECTION csFileAccess;
CRITICAL_SECTION csChunkName;
CRITICAL_SECTION csMergeError;
CRITICAL_SECTION csSortError;

using namespace std;

// for qsort
int compare(const void *a, const void *b)
{
    return ( *(int*)a - *(int*)b );
}

//this function(generateNextChunkFileName) is only to be called within critical section
string generateNextChunkFileName()
{
    static int nChunkNum = 0;
    string strChunkFileName("chunk_");

    ostringstream oss;
    oss << nChunkNum;
    strChunkFileName += oss.str();
    ++nChunkNum;

    return strChunkFileName;
}

enum errType
{
    et_NoError = 0,
    et_InputData,
    et_Access,
    et_Write,
    et_Read,
    et_Merge,
    et_Unexpected
};

/**
* class SorterException
*/
class SorterException
{
    errType m_error;
    string m_strErrorMessage;
    string m_strParam;

public:
    static const char *pchExcFileAccessError;
    static const char *pchExcInputDataError;
    static const char *pchExcReadError;
    static const char *pchExcWriteError;
    static const char *pchExcMergeError;
    static const char *pchExcUnexpectedError;

    SorterException(const string &strErrorMessage, errType error) : 
    m_error(error), m_strErrorMessage(strErrorMessage), m_strParam("") {}

    void setParam(const string & strParam)
    {
        m_strParam = strParam;
    }

    string getErrorMessage() const
    {
        return m_strErrorMessage;
    }

    errType getErrorType() const
    {
        return m_error;
    }

    static void generateException(errType error)
    {
        switch (error)
        {
        case et_InputData:
            throw SorterException(pchExcInputDataError, et_InputData);

        case et_Access:
            throw SorterException(pchExcFileAccessError, et_Access);

        case et_Write:
            throw SorterException(pchExcWriteError, et_Write);

        case et_Read:
            throw SorterException(pchExcReadError, et_Read);

        case et_Merge:
            throw SorterException(pchExcMergeError, et_Merge);

        default:
            throw SorterException(pchExcUnexpectedError, et_Unexpected);
        }
    }

    friend ostream & operator<<(ostream &stream, SorterException &exc)
    {
        stream << exc.m_strErrorMessage;

        if (!exc.m_strParam.empty())
        {
            stream << " " << exc.m_strParam;
        }

        stream << endl;

        return stream;
    }
};
//static members
const char* SorterException::pchExcFileAccessError = "Error occured while accessing file";
const char* SorterException::pchExcInputDataError  = "Input data is wrong";
const char* SorterException::pchExcReadError       = "Error occured while reading from file";
const char* SorterException::pchExcWriteError      = "Error occured while writing into file";
const char* SorterException::pchExcMergeError      = "Error occured while merging";
const char* SorterException::pchExcUnexpectedError = "Unexpected error";
//---------------------------------------------------------------------------------------------------

struct FileWrapper
{
    FILE *file;
    const char *m_pchFileName;
    long long m_nFileSize;

    void open(const char *pchInputFileName, const char *mode)
    {
        if (!file)
        {
            m_pchFileName = pchInputFileName;
            file = fopen(m_pchFileName, mode);

            if (!file)
            {
                SorterException::generateException(et_Access);
            }
        }
    }

    void close()
    {
        if (file)
        {
            if (fclose(file))
            {
                SorterException::generateException(et_Access);
            }

            file = NULL;
        }
    }

    void write(void *dest, size_t nSize, size_t nCount)
    {
        fwrite(dest, nSize, nCount, file);

        if (ferror(file))
        {
            SorterException ex(SorterException::pchExcWriteError, et_Write);
            ex.setParam(m_pchFileName);
            throw ex;
        }
    }

    size_t read(void *dest, size_t nSize, size_t nCount)
    {
        size_t nReadCount = fread(dest, nSize, nCount, file);

        if (ferror(file))
        {
            SorterException ex(SorterException::pchExcReadError, et_Read);
            ex.setParam(m_pchFileName);
            throw ex;
        }

        return nReadCount;
    }

    long long getSize()
    {
        if (file && m_nFileSize == 0)
        {
            int nSeekEnd = _fseeki64(file, 0, SEEK_END);
            m_nFileSize = static_cast<long long>(_ftelli64(file));            
            int nSeekStart = _fseeki64(file, 0, SEEK_SET);

            if (nSeekEnd || nSeekStart)
            {
                SorterException::generateException(et_Access);
            }
        }

        return m_nFileSize;
    }

    FileWrapper() : file(NULL), m_pchFileName(NULL), m_nFileSize(0) {}

    ~FileWrapper()
    {
        try
        {
            close();
        }
        catch (...)
        {
            // write to log
        }
    }
};

/**
* class Merger
* Merge files
*/

class Merger
{
    static string m_strResultOfMergeFileName;
    static bool m_bStopMerge;
    static bool m_bImmediateExit;
    static vector<string> m_vPool2Put;
    static size_t m_nMergeIndex;
    static errType m_error;

    string m_strCurrMergeFileName;
    string m_strLeftSource;
    string m_strRightSource;

    Merger(const Merger&);
    Merger& operator = (const Merger&);

    void swapPools()
    {
        m_vPool2Take.clear();
        m_vPool2Take.swap(m_vPool2Put);
    }

public:

    static vector<string> m_vPool2Take;

    Merger() {}

    void setLeftSourceFileName(const string &strLeft)
    {
        m_strLeftSource = strLeft;
    }

    void setRightSourceFileName(const string &strRight)
    {
        m_strRightSource = strRight;
    }

    void setCurrMergeFileName(const string &strMergeFileName)
    {
        m_strCurrMergeFileName = strMergeFileName;
    }

    const string& getCurrMergeFileName() const
    {
        return m_strCurrMergeFileName;
    }

    static void setError(errType error)
    {
        m_error = error;
    }

    static errType getError()
    {
        return m_error;
    }

    static void setImmediateExit(bool bExit)
    {
        m_bImmediateExit = bExit;
    }

    static unsigned __stdcall mergeInThread(void *arg)
    {
        if (!m_bImmediateExit)
        {
            Merger *merger = static_cast<Merger*>(arg);

            string strFirstChunkFileName;
            string strSecondChunkFileName;

            while (1)
            {
                if (m_bStopMerge)
                {
                    break;
                }

                bool bMerge = false;

                // critical section
                EnterCriticalSection(&csMergePool);

                size_t nShiftIndex = 0;

                if (m_vPool2Take.size() > 2)
                {
                    nShiftIndex = m_nMergeIndex + 2;
                }
                else
                {
                    // left to merge two files
                    nShiftIndex = m_nMergeIndex + 1;
                    m_bStopMerge = true;
                }

                if (nShiftIndex < m_vPool2Take.size())
                {
                    merger->setLeftSourceFileName(m_vPool2Take[m_nMergeIndex]);
                    merger->setRightSourceFileName(m_vPool2Take[nShiftIndex]);

                    ++m_nMergeIndex;

                    if (!(m_nMergeIndex % 2))
                    {
                        m_nMergeIndex += 2;
                    }

                    bMerge = true;
                }
                else if (m_vPool2Put.size() == m_vPool2Take.size() / 2)
                {
                    merger->swapPools();
                    m_nMergeIndex = 0;
                }

                LeaveCriticalSection(&csMergePool);

                try
                {
                    if (bMerge)
                    {
                        EnterCriticalSection(&csChunkName);
                        merger->setCurrMergeFileName(generateNextChunkFileName());
                        LeaveCriticalSection(&csChunkName);

                        merger->mergeTwoFilesIntoOne();

                        EnterCriticalSection(&csMergePool);
                        m_vPool2Put.push_back(merger->getCurrMergeFileName());
                        LeaveCriticalSection(&csMergePool);
                    }
                }
                catch(SorterException &ex)
                {
                    EnterCriticalSection(&csMergeError);

                    if (ex.getErrorType() != et_NoError)
                    {
                        Merger::setError(ex.getErrorType());
                        m_bStopMerge = true;
                    }

                    LeaveCriticalSection(&csMergeError);
                }
                catch (...)
                {
                    EnterCriticalSection(&csMergeError);
                    Merger::setError(et_Unexpected);
                    m_bStopMerge = true;
                    LeaveCriticalSection(&csMergeError);
                }
            }
        }

        _endthreadex(0);
        return 0;
    }

    void mergeTwoFilesIntoOne()
    {
        FileWrapper left;
        left.open(m_strLeftSource.c_str(), "rb");

        FileWrapper right;
        right.open(m_strRightSource.c_str(), "rb");

        long long nLeftSize = left.getSize();
        long long nRightSize = right.getSize();

        if (m_bStopMerge)
        {
            m_strCurrMergeFileName = m_strResultOfMergeFileName;
        }

        FileWrapper mergeFile;
        mergeFile.open(m_strCurrMergeFileName.c_str(), "wb");

        vector<int> vLeft((nMemForThread / sizeof(int)) / 2);
        vector<int> vRight((nMemForThread / sizeof(int)) / 2);
        vector<int> vResultOfMerge(nMemForThread / sizeof(int));

        long long nCurrElemNumberFromLeft = 0;
        long long nCurrElemNumberFromRight = 0;

        long long nDefaultElemsToRead = (nMemForThread / sizeof(int)) / 2;

        while (nLeftSize && nRightSize)
        {
            //---------------------read left source--------------------------
            long long nElemsToRead = nDefaultElemsToRead;

            if (nElemsToRead > nLeftSize / sizeof(int))
            {
                nElemsToRead = nLeftSize / sizeof(int);
                vLeft.resize(static_cast<size_t>(nElemsToRead));
            }

            nCurrElemNumberFromLeft = left.read(&vLeft[0], sizeof(int), static_cast<size_t>(nElemsToRead));
            nLeftSize -= nCurrElemNumberFromLeft * sizeof(int);

            //---------------------read right source--------------------------
            nElemsToRead = nDefaultElemsToRead;

            if (nElemsToRead > nRightSize / sizeof(int))
            {
                nElemsToRead = nRightSize / sizeof(int);
                vRight.resize(static_cast<size_t>(nElemsToRead));
            }

            nCurrElemNumberFromRight = right.read(&vRight[0], sizeof(int), static_cast<size_t>(nElemsToRead));
            nRightSize -= nCurrElemNumberFromRight * sizeof(int);

            //---------------------merge in memory and then flush---------------
            merge(vLeft.begin(), vLeft.end(), vRight.begin(), vRight.end(), vResultOfMerge.begin());
            // flush just merged data
            mergeFile.write(&vResultOfMerge[0], sizeof(int), vResultOfMerge.size());
        }

        left.close();
        right.close();
        mergeFile.close();

        remove(m_strLeftSource.c_str());
        remove(m_strRightSource.c_str());

        if (nLeftSize || nRightSize)
        {
            // somthing wrong happend
            SorterException::generateException(et_Merge);
        }
    }

    static void setMergeFileName(const string &strFName)
    {
        m_strResultOfMergeFileName = strFName;
    }
};
// static members
string Merger::m_strResultOfMergeFileName;
bool Merger::m_bStopMerge = false;
bool Merger::m_bImmediateExit = false;
vector<string> Merger::m_vPool2Put(0);
vector<string> Merger::m_vPool2Take(0);
size_t Merger::m_nMergeIndex = 0;
errType Merger::m_error = et_NoError;
//---------------------------------------------------------------------------------------------------

/**
* class ChunkSorter
* Read and sort input data from file by small chunks
* After chunk is sorted it is flushed on disk for merge
*/

class ChunkSorter
{
    static bool m_bNoMoreData;
    static bool m_bAbortExit;
    static vector<string> *m_pChunkNamePool; //!< is to be set before using ChunkSorter
    static errType m_error;
    vector<int> m_vChunkToSort;

    ChunkSorter(const ChunkSorter&);
    ChunkSorter& operator = (const ChunkSorter&);

    void sortAndFlush()
    {
        while (1)
        {
            EnterCriticalSection(&csFileAccess);

            size_t nReadElems = m_fileToSort.read(&m_vChunkToSort[0], sizeof(int), nMemForThread / sizeof(int));

            if (feof(m_fileToSort.file))
            {
                m_bNoMoreData = true;
            }

            LeaveCriticalSection(&csFileAccess);

            if (nReadElems)
            {
                // sort data
                qsort(&m_vChunkToSort[0], m_vChunkToSort.size(), sizeof(int), compare);

                EnterCriticalSection(&csChunkName);
                string strChunkFileName = generateNextChunkFileName();
                LeaveCriticalSection(&csChunkName);

                // flush sorted data
                FileWrapper f;
                f.open(strChunkFileName.c_str(), "wb");
                f.write(&m_vChunkToSort[0], sizeof(int), m_vChunkToSort.size());
                f.close();

                // critical section
                EnterCriticalSection(&csMergePool);
                m_pChunkNamePool->push_back(strChunkFileName);
                LeaveCriticalSection(&csMergePool);
            }

            if (m_bNoMoreData || m_bAbortExit)
            {
                // end up with sorting
                m_vChunkToSort.clear();
                break;
            }
        }
    }

public:

    static FileWrapper m_fileToSort;
    static long long m_nFileSize;

    ChunkSorter() :
    m_vChunkToSort(nMemForThread / sizeof(int))
    {
    }

    static void setError(errType error)
    {
        m_error = error;
    }

    static errType getError()
    {
        return m_error;
    }

    static unsigned __stdcall thread_func(void* arg)
    {
        ChunkSorter *sorter = static_cast<ChunkSorter*>(arg);

        try
        {
            if (!m_pChunkNamePool)
            {
                SorterException::generateException(et_InputData);
            }

            sorter->sortAndFlush();
        }
        catch(SorterException &ex)
        {
            EnterCriticalSection(&csSortError);

            if (ex.getErrorType() != et_NoError)
            {
                ChunkSorter::setError(ex.getErrorType());
            }

            m_bAbortExit = true;
            LeaveCriticalSection(&csSortError);
        }
        catch (...)
        {
            EnterCriticalSection(&csSortError);
            ChunkSorter::setError(et_Unexpected);
            m_bAbortExit = true;
            LeaveCriticalSection(&csSortError);
        }

        _endthreadex(0);
        return 0;
    }

    static void setFileToSort(const char *pchFileName)
    {
        m_fileToSort.open(pchFileName, "rb");
        m_nFileSize = m_fileToSort.getSize();
    }

    // method is to be called to check input data size after file is set
    static void checkInputDataSize()
    {
        if (m_nFileSize == 0 || m_nFileSize % sizeof(int))
        {
            SorterException::generateException(et_InputData);
        }
    }

    static void setNamePool(vector<string> *vNamePool)
    {
        m_pChunkNamePool = vNamePool;
    }
};

// static members
FileWrapper ChunkSorter::m_fileToSort;
long long ChunkSorter::m_nFileSize = 0;
bool ChunkSorter::m_bNoMoreData = false;
bool ChunkSorter::m_bAbortExit = false;
errType ChunkSorter::m_error = et_NoError;
vector<string>* ChunkSorter::m_pChunkNamePool = NULL;
//---------------------------------------------------------------------------------------------------


int main (int argc, char* argv[])
{
    try
    {
        if (argc < 2)
        {
            SorterException::generateException(et_InputData);
        }

        string strInputFileName = argv[1];
        string strOutputFileName = argv[2];

        ChunkSorter::setFileToSort(strInputFileName.c_str());
        ChunkSorter::checkInputDataSize();

        if (ChunkSorter::m_nFileSize <= MEMORY_LIMIT)
        {
            // data size is small enough to be allocated and sorted in memory
            vector<int> vDataToSort(static_cast<size_t>(ChunkSorter::m_nFileSize / sizeof(int)));

            ChunkSorter::m_fileToSort.read(&vDataToSort[0], sizeof(int), vDataToSort.capacity());

            qsort(&vDataToSort[0], vDataToSort.size(), sizeof(int), compare);

            FileWrapper merge;
            merge.open(strOutputFileName.c_str(), "wb");
            merge.write(&vDataToSort[0], sizeof(int), vDataToSort.size());
        }
        else
        {
            ChunkSorter::setNamePool(&Merger::m_vPool2Take);
            vector<char> vTime(10);
            _strtime_s(&vTime[0], vTime.size());
            cout << "start time: " << &vTime[0] << endl;

            InitializeCriticalSection(&csMergePool);
            InitializeCriticalSection(&csFileAccess);
            InitializeCriticalSection(&csChunkName);
            InitializeCriticalSection(&csMergeError);
            InitializeCriticalSection(&csSortError);

            //--------------------------------chunk_sorter initialization------------------------------------
            ChunkSorter SortAgent1;
            ChunkSorter SortAgent2;
            ChunkSorter SortAgent3;
            ChunkSorter SortAgent4;

            unsigned Agent1ThreadID;
            unsigned Agent2ThreadID;
            unsigned Agent3ThreadID;
            unsigned Agent4ThreadID;

            HANDLE hSortThreads[4];
            hSortThreads[0] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent1, 0, &Agent1ThreadID);
            hSortThreads[1] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent2, 0, &Agent2ThreadID);
            hSortThreads[2] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent3, 0, &Agent3ThreadID);
            hSortThreads[3] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent4, 0, &Agent4ThreadID);

            //------------------------------------merger initialization---------------------------------------
            Merger::setMergeFileName(strOutputFileName);
            Merger mergerAgent1;
            Merger mergerAgent2;

            unsigned mergerThredID1;
            unsigned mergerThredID2;

            HANDLE hMergeThreads[2];
            hMergeThreads[0] = (HANDLE)_beginthreadex(NULL, 0, &Merger::mergeInThread, &mergerAgent1, CREATE_SUSPENDED, &mergerThredID1);
            hMergeThreads[1] = (HANDLE)_beginthreadex(NULL, 0, &Merger::mergeInThread, &mergerAgent2, CREATE_SUSPENDED, &mergerThredID2);
            //-----------------------------------------------------------------------------------------------

            // wait until sorters finish their job and then resume threads to merge
            WaitForMultipleObjectsEx(4, hSortThreads, true, INFINITE, false);

            CloseHandle(hSortThreads[3]);
            CloseHandle(hSortThreads[2]);
            CloseHandle(hSortThreads[1]);
            CloseHandle(hSortThreads[0]);

            // check sort error here
            bool bSortError = false;

            if (ChunkSorter::getError() != et_NoError)
            {
                Merger::setImmediateExit(true);
                bSortError = true;
            }

            _strtime_s(&vTime[0], vTime.size());
            cout << "resume threads time: " << &vTime[0] << endl;

            ResumeThread(hMergeThreads[0]);
            ResumeThread(hMergeThreads[1]);

            // wait untill all threads finish
            WaitForMultipleObjectsEx(2, hMergeThreads, true, INFINITE, false);

            CloseHandle(hMergeThreads[1]);
            CloseHandle(hMergeThreads[0]);

            DeleteCriticalSection(&csChunkName);
            DeleteCriticalSection(&csFileAccess);
            DeleteCriticalSection(&csMergePool);

            DeleteCriticalSection(&csMergeError);
            DeleteCriticalSection(&csSortError);

            if (bSortError)
            {
                SorterException::generateException(ChunkSorter::getError());
            }

            // check merge error here
            if (Merger::getError() != et_NoError)
            {
                SorterException::generateException(Merger::getError());
            }

            _strtime_s(&vTime[0], vTime.size());
            cout << "end time: " << &vTime[0] << endl;
        }
    }

    catch (SorterException &ex)
    {
        cout << ex;
    }

    catch (...)
    {
        cerr << "Unknown error" << endl;
    }

    return 0;
}


Сразу скажу, что сделал изначально с ошибкой — сортирует файлы размером <= 256Mb либо размером кратным этому числу, т.е. файл размером 260Mb не будет отсортирован корректно, потеряется последний int(если sizeof(int) == 4). Но вопрос не в этом, программа компилируется и даже сортирует файл: 4G она обрабатывает за ~4 часа. Просьба покритиковать код, указать на явные ошибки(если есть) работы с потоками(до этой задачи никогда не работал с потоками вообще) ну и проблемы кода в целом.
добавил разметку — Кодт
12.06.10 19:15: Перенесено модератором из 'C/C++' — Кодт
18.06.10 15:10: Перенесено модератором из 'C/C++. Прикладные вопросы'. Пожалуй, собственно к языку реализации тема отношения не имеет. — Кодт
Re: сортировка файла
От: kaa.python Ниоткуда РСДН профессионально мёртв и завален ватой.
Дата: 12.06.10 09:17
Оценка:
Здравствуйте, letsurf, Вы писали:

L>Но вопрос не в этом, программа компилируется и даже сортирует файл: 4G она обрабатывает за ~4 часа. Просьба покритиковать код


Критикую. 4 часа на сортировку файла в 4 Гб это писец, мягко говоря. В Яндекс тебя не возьмут. Выкини и перепиши по новой
Re[2]: сортировка файла
От: letsurf  
Дата: 12.06.10 09:23
Оценка:
Здравствуйте, kaa.python, Вы писали:

KP>Здравствуйте, letsurf, Вы писали:


L>>Но вопрос не в этом, программа компилируется и даже сортирует файл: 4G она обрабатывает за ~4 часа. Просьба покритиковать код


KP>Критикую. 4 часа на сортировку файла в 4 Гб это писец, мягко говоря. В Яндекс тебя не возьмут. Выкини и перепиши по новой


Ну вот и не взяли . Спасибо
Re: сортировка файла
От: letsurf  
Дата: 12.06.10 09:24
Оценка:
#include <iostream>
#include <sstream>
#include <stdio.h>
#include <vector>
#include <time.h>
#include <algorithm>
#include <process.h>
#include "windows.h"


#define MEMORY_LIMIT 268435456 // 256Mb

unsigned int nMemForThread = MEMORY_LIMIT / 4;

CRITICAL_SECTION csMergePool;
CRITICAL_SECTION csFileAccess;
CRITICAL_SECTION csChunkName;
CRITICAL_SECTION csMergeError;
CRITICAL_SECTION csSortError;

using namespace std;

// for qsort
int compare(const void *a, const void *b)
{
    return ( *(int*)a - *(int*)b );
}

//this function(generateNextChunkFileName) is only to be called within critical section
string generateNextChunkFileName()
{
    static int nChunkNum = 0;
    string strChunkFileName("chunk_");

    ostringstream oss;
    oss << nChunkNum;
    strChunkFileName += oss.str();
    ++nChunkNum;

    return strChunkFileName;
}

enum errType
{
    et_NoError = 0,
    et_InputData,
    et_Access,
    et_Write,
    et_Read,
    et_Merge,
    et_Unexpected
};

/**
* class SorterException
*/
class SorterException
{
    errType m_error;
    string m_strErrorMessage;
    string m_strParam;

public:
    static const char *pchExcFileAccessError;
    static const char *pchExcInputDataError;
    static const char *pchExcReadError;
    static const char *pchExcWriteError;
    static const char *pchExcMergeError;
    static const char *pchExcUnexpectedError;

    SorterException(const string &strErrorMessage, errType error) : 
    m_error(error), m_strErrorMessage(strErrorMessage), m_strParam("") {}

    void setParam(const string & strParam)
    {
        m_strParam = strParam;
    }

    string getErrorMessage() const
    {
        return m_strErrorMessage;
    }

    errType getErrorType() const
    {
        return m_error;
    }

    static void generateException(errType error)
    {
        switch (error)
        {
        case et_InputData:
            throw SorterException(pchExcInputDataError, et_InputData);

        case et_Access:
            throw SorterException(pchExcFileAccessError, et_Access);

        case et_Write:
            throw SorterException(pchExcWriteError, et_Write);

        case et_Read:
            throw SorterException(pchExcReadError, et_Read);

        case et_Merge:
            throw SorterException(pchExcMergeError, et_Merge);

        default:
            throw SorterException(pchExcUnexpectedError, et_Unexpected);
        }
    }

    friend ostream & operator<<(ostream &stream, SorterException &exc)
    {
        stream << exc.m_strErrorMessage;

        if (!exc.m_strParam.empty())
        {
            stream << " " << exc.m_strParam;
        }

        stream << endl;

        return stream;
    }
};
//static members
const char* SorterException::pchExcFileAccessError = "Error occured while accessing file";
const char* SorterException::pchExcInputDataError  = "Input data is wrong";
const char* SorterException::pchExcReadError       = "Error occured while reading from file";
const char* SorterException::pchExcWriteError      = "Error occured while writing into file";
const char* SorterException::pchExcMergeError      = "Error occured while merging";
const char* SorterException::pchExcUnexpectedError = "Unexpected error";
//---------------------------------------------------------------------------------------------------

struct FileWrapper
{
    FILE *file;
    const char *m_pchFileName;
    long long m_nFileSize;

    void open(const char *pchInputFileName, const char *mode)
    {
        if (!file)
        {
            m_pchFileName = pchInputFileName;
            file = fopen(m_pchFileName, mode);

            if (!file)
            {
                SorterException::generateException(et_Access);
            }
        }
    }

    void close()
    {
        if (file)
        {
            if (fclose(file))
            {
                SorterException::generateException(et_Access);
            }

            file = NULL;
        }
    }

    void write(void *dest, size_t nSize, size_t nCount)
    {
        fwrite(dest, nSize, nCount, file);

        if (ferror(file))
        {
            SorterException ex(SorterException::pchExcWriteError, et_Write);
            ex.setParam(m_pchFileName);
            throw ex;
        }
    }

    size_t read(void *dest, size_t nSize, size_t nCount)
    {
        size_t nReadCount = fread(dest, nSize, nCount, file);

        if (ferror(file))
        {
            SorterException ex(SorterException::pchExcReadError, et_Read);
            ex.setParam(m_pchFileName);
            throw ex;
        }

        return nReadCount;
    }

    long long getSize()
    {
        if (file && m_nFileSize == 0)
        {
            int nSeekEnd = _fseeki64(file, 0, SEEK_END);
            m_nFileSize = static_cast<long long>(_ftelli64(file));            
            int nSeekStart = _fseeki64(file, 0, SEEK_SET);

            if (nSeekEnd || nSeekStart)
            {
                SorterException::generateException(et_Access);
            }
        }

        return m_nFileSize;
    }

    FileWrapper() : file(NULL), m_pchFileName(NULL), m_nFileSize(0) {}

    ~FileWrapper()
    {
        try
        {
            close();
        }
        catch (...)
        {
            // write to log
        }
    }
};

/**
* class Merger
* Merge files
*/

class Merger
{
    static string m_strResultOfMergeFileName;
    static bool m_bStopMerge;
    static bool m_bImmediateExit;
    static vector<string> m_vPool2Put;
    static size_t m_nMergeIndex;
    static errType m_error;

    string m_strCurrMergeFileName;
    string m_strLeftSource;
    string m_strRightSource;

    Merger(const Merger&);
    Merger& operator = (const Merger&);

    void swapPools()
    {
        m_vPool2Take.clear();
        m_vPool2Take.swap(m_vPool2Put);
    }

public:

    static vector<string> m_vPool2Take;

    Merger() {}

    void setLeftSourceFileName(const string &strLeft)
    {
        m_strLeftSource = strLeft;
    }

    void setRightSourceFileName(const string &strRight)
    {
        m_strRightSource = strRight;
    }

    void setCurrMergeFileName(const string &strMergeFileName)
    {
        m_strCurrMergeFileName = strMergeFileName;
    }

    const string& getCurrMergeFileName() const
    {
        return m_strCurrMergeFileName;
    }

    static void setError(errType error)
    {
        m_error = error;
    }

    static errType getError()
    {
        return m_error;
    }

    static void setImmediateExit(bool bExit)
    {
        m_bImmediateExit = bExit;
    }

    static unsigned __stdcall mergeInThread(void *arg)
    {
        if (!m_bImmediateExit)
        {
            Merger *merger = static_cast<Merger*>(arg);

            string strFirstChunkFileName;
            string strSecondChunkFileName;

            while (1)
            {
                if (m_bStopMerge)
                {
                    break;
                }

                bool bMerge = false;

                // critical section
                EnterCriticalSection(&csMergePool);

                size_t nShiftIndex = 0;

                if (m_vPool2Take.size() > 2)
                {
                    nShiftIndex = m_nMergeIndex + 2;
                }
                else
                {
                    // left to merge two files
                    nShiftIndex = m_nMergeIndex + 1;
                    m_bStopMerge = true;
                }

                if (nShiftIndex < m_vPool2Take.size())
                {
                    merger->setLeftSourceFileName(m_vPool2Take[m_nMergeIndex]);
                    merger->setRightSourceFileName(m_vPool2Take[nShiftIndex]);

                    ++m_nMergeIndex;

                    if (!(m_nMergeIndex % 2))
                    {
                        m_nMergeIndex += 2;
                    }

                    bMerge = true;
                }
                else if (m_vPool2Put.size() == m_vPool2Take.size() / 2)
                {
                    merger->swapPools();
                    m_nMergeIndex = 0;
                }

                LeaveCriticalSection(&csMergePool);

                try
                {
                    if (bMerge)
                    {
                        EnterCriticalSection(&csChunkName);
                        merger->setCurrMergeFileName(generateNextChunkFileName());
                        LeaveCriticalSection(&csChunkName);

                        merger->mergeTwoFilesIntoOne();

                        EnterCriticalSection(&csMergePool);
                        m_vPool2Put.push_back(merger->getCurrMergeFileName());
                        LeaveCriticalSection(&csMergePool);
                    }
                }
                catch(SorterException &ex)
                {
                    EnterCriticalSection(&csMergeError);

                    if (ex.getErrorType() != et_NoError)
                    {
                        Merger::setError(ex.getErrorType());
                        m_bStopMerge = true;
                    }

                    LeaveCriticalSection(&csMergeError);
                }
                catch (...)
                {
                    EnterCriticalSection(&csMergeError);
                    Merger::setError(et_Unexpected);
                    m_bStopMerge = true;
                    LeaveCriticalSection(&csMergeError);
                }
            }
        }

        _endthreadex(0);
        return 0;
    }

    void mergeTwoFilesIntoOne()
    {
        FileWrapper left;
        left.open(m_strLeftSource.c_str(), "rb");

        FileWrapper right;
        right.open(m_strRightSource.c_str(), "rb");

        long long nLeftSize = left.getSize();
        long long nRightSize = right.getSize();

        if (m_bStopMerge)
        {
            m_strCurrMergeFileName = m_strResultOfMergeFileName;
        }

        FileWrapper mergeFile;
        mergeFile.open(m_strCurrMergeFileName.c_str(), "wb");

        vector<int> vLeft((nMemForThread / sizeof(int)) / 2);
        vector<int> vRight((nMemForThread / sizeof(int)) / 2);
        vector<int> vResultOfMerge(nMemForThread / sizeof(int));

        long long nCurrElemNumberFromLeft = 0;
        long long nCurrElemNumberFromRight = 0;

        long long nDefaultElemsToRead = (nMemForThread / sizeof(int)) / 2;

        while (nLeftSize && nRightSize)
        {
            //---------------------read left source--------------------------
            long long nElemsToRead = nDefaultElemsToRead;

            if (nElemsToRead > nLeftSize / sizeof(int))
            {
                nElemsToRead = nLeftSize / sizeof(int);
                vLeft.resize(static_cast<size_t>(nElemsToRead));
            }

            nCurrElemNumberFromLeft = left.read(&vLeft[0], sizeof(int), static_cast<size_t>(nElemsToRead));
            nLeftSize -= nCurrElemNumberFromLeft * sizeof(int);

            //---------------------read right source--------------------------
            nElemsToRead = nDefaultElemsToRead;

            if (nElemsToRead > nRightSize / sizeof(int))
            {
                nElemsToRead = nRightSize / sizeof(int);
                vRight.resize(static_cast<size_t>(nElemsToRead));
            }

            nCurrElemNumberFromRight = right.read(&vRight[0], sizeof(int), static_cast<size_t>(nElemsToRead));
            nRightSize -= nCurrElemNumberFromRight * sizeof(int);

            //---------------------merge in memory and then flush---------------
            merge(vLeft.begin(), vLeft.end(), vRight.begin(), vRight.end(), vResultOfMerge.begin());
            // flush just merged data
            mergeFile.write(&vResultOfMerge[0], sizeof(int), vResultOfMerge.size());
        }

        left.close();
        right.close();
        mergeFile.close();

        remove(m_strLeftSource.c_str());
        remove(m_strRightSource.c_str());

        if (nLeftSize || nRightSize)
        {
            // somthing wrong happend
            SorterException::generateException(et_Merge);
        }
    }

    static void setMergeFileName(const string &strFName)
    {
        m_strResultOfMergeFileName = strFName;
    }
};
// static members
string Merger::m_strResultOfMergeFileName;
bool Merger::m_bStopMerge = false;
bool Merger::m_bImmediateExit = false;
vector<string> Merger::m_vPool2Put(0);
vector<string> Merger::m_vPool2Take(0);
size_t Merger::m_nMergeIndex = 0;
errType Merger::m_error = et_NoError;
//---------------------------------------------------------------------------------------------------

/**
* class ChunkSorter
* Read and sort input data from file by small chunks
* After chunk is sorted it is flushed on disk for merge
*/

class ChunkSorter
{
    static bool m_bNoMoreData;
    static bool m_bAbortExit;
    static vector<string> *m_pChunkNamePool; //!< is to be set before using ChunkSorter
    static errType m_error;
    vector<int> m_vChunkToSort;

    ChunkSorter(const ChunkSorter&);
    ChunkSorter& operator = (const ChunkSorter&);

    void sortAndFlush()
    {
        while (1)
        {
            EnterCriticalSection(&csFileAccess);

            size_t nReadElems = m_fileToSort.read(&m_vChunkToSort[0], sizeof(int), nMemForThread / sizeof(int));

            if (feof(m_fileToSort.file))
            {
                m_bNoMoreData = true;
            }

            LeaveCriticalSection(&csFileAccess);

            if (nReadElems)
            {
                // sort data
                qsort(&m_vChunkToSort[0], m_vChunkToSort.size(), sizeof(int), compare);

                EnterCriticalSection(&csChunkName);
                string strChunkFileName = generateNextChunkFileName();
                LeaveCriticalSection(&csChunkName);

                // flush sorted data
                FileWrapper f;
                f.open(strChunkFileName.c_str(), "wb");
                f.write(&m_vChunkToSort[0], sizeof(int), m_vChunkToSort.size());
                f.close();

                // critical section
                EnterCriticalSection(&csMergePool);
                m_pChunkNamePool->push_back(strChunkFileName);
                LeaveCriticalSection(&csMergePool);
            }

            if (m_bNoMoreData || m_bAbortExit)
            {
                // end up with sorting
                m_vChunkToSort.clear();
                break;
            }
        }
    }

public:

    static FileWrapper m_fileToSort;
    static long long m_nFileSize;

    ChunkSorter() :
    m_vChunkToSort(nMemForThread / sizeof(int))
    {
    }

    static void setError(errType error)
    {
        m_error = error;
    }

    static errType getError()
    {
        return m_error;
    }

    static unsigned __stdcall thread_func(void* arg)
    {
        ChunkSorter *sorter = static_cast<ChunkSorter*>(arg);

        try
        {
            if (!m_pChunkNamePool)
            {
                SorterException::generateException(et_InputData);
            }

            sorter->sortAndFlush();
        }
        catch(SorterException &ex)
        {
            EnterCriticalSection(&csSortError);

            if (ex.getErrorType() != et_NoError)
            {
                ChunkSorter::setError(ex.getErrorType());
            }

            m_bAbortExit = true;
            LeaveCriticalSection(&csSortError);
        }
        catch (...)
        {
            EnterCriticalSection(&csSortError);
            ChunkSorter::setError(et_Unexpected);
            m_bAbortExit = true;
            LeaveCriticalSection(&csSortError);
        }

        _endthreadex(0);
        return 0;
    }

    static void setFileToSort(const char *pchFileName)
    {
        m_fileToSort.open(pchFileName, "rb");
        m_nFileSize = m_fileToSort.getSize();
    }

    // method is to be called to check input data size after file is set
    static void checkInputDataSize()
    {
        if (m_nFileSize == 0 || m_nFileSize % sizeof(int))
        {
            SorterException::generateException(et_InputData);
        }
    }

    static void setNamePool(vector<string> *vNamePool)
    {
        m_pChunkNamePool = vNamePool;
    }
};

// static members
FileWrapper ChunkSorter::m_fileToSort;
long long ChunkSorter::m_nFileSize = 0;
bool ChunkSorter::m_bNoMoreData = false;
bool ChunkSorter::m_bAbortExit = false;
errType ChunkSorter::m_error = et_NoError;
vector<string>* ChunkSorter::m_pChunkNamePool = NULL;
//---------------------------------------------------------------------------------------------------


int main (int argc, char* argv[])
{
    try
    {
        if (argc < 2)
        {
            SorterException::generateException(et_InputData);
        }

        string strInputFileName = argv[1];
        string strOutputFileName = argv[2];

        ChunkSorter::setFileToSort(strInputFileName.c_str());
        ChunkSorter::checkInputDataSize();

        if (ChunkSorter::m_nFileSize <= MEMORY_LIMIT)
        {
            // data size is small enough to be allocated and sorted in memory
            vector<int> vDataToSort(static_cast<size_t>(ChunkSorter::m_nFileSize / sizeof(int)));

            ChunkSorter::m_fileToSort.read(&vDataToSort[0], sizeof(int), vDataToSort.capacity());

            qsort(&vDataToSort[0], vDataToSort.size(), sizeof(int), compare);

            FileWrapper merge;
            merge.open(strOutputFileName.c_str(), "wb");
            merge.write(&vDataToSort[0], sizeof(int), vDataToSort.size());
        }
        else
        {
            ChunkSorter::setNamePool(&Merger::m_vPool2Take);
            vector<char> vTime(10);
            _strtime_s(&vTime[0], vTime.size());
            cout << "start time: " << &vTime[0] << endl;

            InitializeCriticalSection(&csMergePool);
            InitializeCriticalSection(&csFileAccess);
            InitializeCriticalSection(&csChunkName);
            InitializeCriticalSection(&csMergeError);
            InitializeCriticalSection(&csSortError);

            //--------------------------------chunk_sorter initialization------------------------------------
            ChunkSorter SortAgent1;
            ChunkSorter SortAgent2;
            ChunkSorter SortAgent3;
            ChunkSorter SortAgent4;

            unsigned Agent1ThreadID;
            unsigned Agent2ThreadID;
            unsigned Agent3ThreadID;
            unsigned Agent4ThreadID;

            HANDLE hSortThreads[4];
            hSortThreads[0] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent1, 0, &Agent1ThreadID);
            hSortThreads[1] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent2, 0, &Agent2ThreadID);
            hSortThreads[2] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent3, 0, &Agent3ThreadID);
            hSortThreads[3] = (HANDLE)_beginthreadex(NULL, 0, &ChunkSorter::thread_func, &SortAgent4, 0, &Agent4ThreadID);

            //------------------------------------merger initialization---------------------------------------
            Merger::setMergeFileName(strOutputFileName);
            Merger mergerAgent1;
            Merger mergerAgent2;

            unsigned mergerThredID1;
            unsigned mergerThredID2;

            HANDLE hMergeThreads[2];
            hMergeThreads[0] = (HANDLE)_beginthreadex(NULL, 0, &Merger::mergeInThread, &mergerAgent1, CREATE_SUSPENDED, &mergerThredID1);
            hMergeThreads[1] = (HANDLE)_beginthreadex(NULL, 0, &Merger::mergeInThread, &mergerAgent2, CREATE_SUSPENDED, &mergerThredID2);
            //-----------------------------------------------------------------------------------------------

            // wait until sorters finish their job and then resume threads to merge
            WaitForMultipleObjectsEx(4, hSortThreads, true, INFINITE, false);

            CloseHandle(hSortThreads[3]);
            CloseHandle(hSortThreads[2]);
            CloseHandle(hSortThreads[1]);
            CloseHandle(hSortThreads[0]);

            // check sort error here
            bool bSortError = false;

            if (ChunkSorter::getError() != et_NoError)
            {
                Merger::setImmediateExit(true);
                bSortError = true;
            }

            _strtime_s(&vTime[0], vTime.size());
            cout << "resume threads time: " << &vTime[0] << endl;

            ResumeThread(hMergeThreads[0]);
            ResumeThread(hMergeThreads[1]);

            // wait untill all threads finish
            WaitForMultipleObjectsEx(2, hMergeThreads, true, INFINITE, false);

            CloseHandle(hMergeThreads[1]);
            CloseHandle(hMergeThreads[0]);

            DeleteCriticalSection(&csChunkName);
            DeleteCriticalSection(&csFileAccess);
            DeleteCriticalSection(&csMergePool);

            DeleteCriticalSection(&csMergeError);
            DeleteCriticalSection(&csSortError);

            if (bSortError)
            {
                SorterException::generateException(ChunkSorter::getError());
            }

            // check merge error here
            if (Merger::getError() != et_NoError)
            {
                SorterException::generateException(Merger::getError());
            }

            _strtime_s(&vTime[0], vTime.size());
            cout << "end time: " << &vTime[0] << endl;
        }
    }

    catch (SorterException &ex)
    {
        cout << ex;
    }

    catch (...)
    {
        cerr << "Unknown error" << endl;
    }

    return 0;
}
Re: сортировка файла
От: rus blood Россия  
Дата: 12.06.10 10:08
Оценка:
Здравствуйте, letsurf, Вы писали:

L>Пусть у нас на диске есть файл размером 4 гигабайта. Его можно представить, как 2^30 32-битных беззнаковых чисел. Нужно отсортировать этот файл. То есть программа должна сгенерировать еще один файл размером в 4 гигабайта, в котором содержатся те же числа, что и в исходном, но упорядочены по возрастанию. Стандартные алгоритмы сортировки (qsort, std::sort) напрямую применить невозможно, так как для их выполнения нужно как минимум 4 гигабайта оперативной памяти. Но отсортировать числа на диске возможно, если использовать дополнительное пространство на жестком диске.


L>Нужно написать консольную программу, которая в argv[1] получает имя файла, который нужно отсортировать (размер файла до 16Gb, размер кратен четырем),


У тебя есть файл с 32-битными числами.
Таких чисел может быть up to 4 млрд.
Нужно вычислить индексы вхождения каждого числа в исходный файл.
Индексы вхождения можно записать в промежуточный файл.
Промежуточный файл размером 16Gb, где в каждые 4b записывается индекс вхождения соответствующего числа в исходный файл.
Потом на основе индексов вхождения собираем файл результата.
Имею скафандр — готов путешествовать!
Re: сортировка файла
От: alsemm Россия  
Дата: 12.06.10 10:57
Оценка:
Здравствуйте, letsurf, Вы писали:

L>Доброго времени суток!


L>Недавно столкнулся с задачей:


L>(привожу задание как оно есть)


L>Пусть у нас на диске есть файл размером 4 гигабайта. Его можно представить, как 2^30 32-битных беззнаковых чисел. Нужно отсортировать этот файл. То есть программа должна сгенерировать еще один файл размером в 4 гигабайта, в котором содержатся те же числа, что и в исходном, но упорядочены по возрастанию. Стандартные алгоритмы сортировки (qsort, std::sort) напрямую применить невозможно, так как для их выполнения нужно как минимум 4 гигабайта оперативной памяти. Но отсортировать числа на диске возможно, если использовать дополнительное пространство на жестком диске.


L>Нужно написать консольную программу, которая в argv[1] получает имя файла, который нужно отсортировать (размер файла до 16Gb, размер кратен четырем),

L>в argv[2] — имя файла, в который нужно записать отсортированные данные.

L>Ограничения:

L>1. Программа не может рассчитывать, что возможно выделить более 256Mb памяти.
L>2. Программа должна эффективно использовать несколько ядер.
Запустить несколько процессов, которые будут читать из исходного файла отдельные непересекающиеся фрагменты. Размер фрагмента — 256Mb, так что получается 16 процессов. Прочитав фрагмент целиком в память напускаем на него qsort. Получаем отсортированный фрагмент исходного файла, сохраняем его на диск. Получаем 16 отсортированных фрагментов исходного файла.
Теперь остается "слить" промежуточные файлы в один — сортировка слиянием, тут проблем нет. Можно запустить один процесс, который будет сливать все 16 фрагментов, а можно в несколько этапов слияние делать. Например, запустить 8 процессов слияния, которые из 16 фрагментов сдалют 8 по 512Mb, потом из 8 — 4 и т.д. На этом этапе все упрется в производительность файловой системы — из фрагментов надо будет читать по 32бита.
Re[2]: сортировка файла
От: letsurf  
Дата: 12.06.10 11:43
Оценка:
Здравствуйте, alsemm, Вы писали:

A>Здравствуйте, letsurf, Вы писали:


A>Запустить несколько процессов, которые будут читать из исходного файла отдельные непересекающиеся фрагменты. Размер фрагмента — 256Mb, так что получается 16 процессов. Прочитав фрагмент целиком в память напускаем на него qsort. Получаем отсортированный фрагмент исходного файла, сохраняем его на диск. Получаем 16 отсортированных фрагментов исходного файла.

A>Теперь остается "слить" промежуточные файлы в один — сортировка слиянием, тут проблем нет. Можно запустить один процесс, который будет сливать все 16 фрагментов, а можно в несколько этапов слияние делать. Например, запустить 8 процессов слияния, которые из 16 фрагментов сдалют 8 по 512Mb, потом из 8 — 4 и т.д. На этом этапе все упрется в производительность файловой системы — из фрагментов надо будет читать по 32бита.

Примерно так я и сделал, но, видимо, это не самый быстрый способ.
Re: сортировка файла
От: minorlogic Украина  
Дата: 12.06.10 12:23
Оценка:
Скорее всего сортировку подсчетом было бы быстрее, но не факт.
... << RSDN@Home 1.2.0 alpha 4 rev. 1237>>
Ищу работу, 3D, SLAM, computer graphics/vision.
Re[3]: сортировка файла
От: alsemm Россия  
Дата: 12.06.10 13:10
Оценка:
Здравствуйте, letsurf, Вы писали:

L>Здравствуйте, alsemm, Вы писали:


A>>Здравствуйте, letsurf, Вы писали:


A>>Запустить несколько процессов, которые будут читать из исходного файла отдельные непересекающиеся фрагменты. Размер фрагмента — 256Mb, так что получается 16 процессов. Прочитав фрагмент целиком в память напускаем на него qsort. Получаем отсортированный фрагмент исходного файла, сохраняем его на диск. Получаем 16 отсортированных фрагментов исходного файла.

A>>Теперь остается "слить" промежуточные файлы в один — сортировка слиянием, тут проблем нет. Можно запустить один процесс, который будет сливать все 16 фрагментов, а можно в несколько этапов слияние делать. Например, запустить 8 процессов слияния, которые из 16 фрагментов сдалют 8 по 512Mb, потом из 8 — 4 и т.д. На этом этапе все упрется в производительность файловой системы — из фрагментов надо будет читать по 32бита.

L>Примерно так я и сделал, но, видимо, это не самый быстрый способ.

Это не способ небыстрый, а реализация кривая. Потому что если открыть исходный файл только один раз (static FileWrapper m_fileToSort), т.е. создать единственный дескриптор FILE*, то читать через него асинхронно из нескольких потоков очень тяжело.
Мне вообще непонятно, что у тебя в коде происходит. Т.е. может оно конечно и выдает корректный результат, но асинхронностью тут и не пахнет.
Re[4]: сортировка файла
От: letsurf  
Дата: 12.06.10 13:21
Оценка:
Здравствуйте, alsemm, Вы писали:

L>>Примерно так я и сделал, но, видимо, это не самый быстрый способ.

A>Это не способ небыстрый, а реализация кривая. Потому что если открыть исходный файл только один раз (static FileWrapper m_fileToSort), т.е. создать единственный дескриптор FILE*, то читать через него асинхронно из нескольких потоков очень тяжело.
A>Мне вообще непонятно, что у тебя в коде происходит. Т.е. может оно конечно и выдает корректный результат, но асинхронностью тут и не пахнет.

Исходный файл читается цепочками по 64Mb, которые сортируются qsort'ом и складываются на диск. Этот процесс не занимает много времени, основное же время у меня уходит на слияние этих цепочек.
Re[2]: сортировка файла
От: Кодт Россия  
Дата: 12.06.10 15:08
Оценка:
Здравствуйте, rus blood, Вы писали:

RB>Нужно вычислить индексы вхождения каждого числа в исходный файл.


Индексы-то зачем? Нужны счётчики, раз это сортировка подсчётом.
Перекуём баги на фичи!
Re[3]: сортировка файла
От: WolfHound  
Дата: 12.06.10 15:59
Оценка:
Здравствуйте, Кодт, Вы писали:

К>Индексы-то зачем? Нужны счётчики, раз это сортировка подсчётом.

Гы. Произвольный доступ убъет всю производительность.
... << RSDN@Home 1.2.0 alpha 4 rev. 1472>>
Пусть это будет просто:
просто, как только можно,
но не проще.
(C) А. Эйнштейн
Re: сортировка файла
От: Angler Россия  
Дата: 12.06.10 16:26
Оценка:
Здравствуйте, letsurf, Вы писали:

L>Ограничения:

L>1. Программа не может рассчитывать, что возможно выделить более 256Mb памяти.
L>2. Программа должна эффективно использовать несколько ядер.

Просто как идея, не знаю на сколько будет (не)эффективно: cчитывать числа из исходного файла и записывать в n файлов, каждый их которых может уместить m 32-битных беззнаковых чисел. Таким образом в первый файл попадут числа из диапозона (0;m — 1), во второй (m;2*m — 1) и т.д. Теперь считываем в цикле содержимое мелких файлов, сортируем и пишем в выходной файл.
Re: сортировка файла
От: rm822 Россия  
Дата: 12.06.10 16:34
Оценка: -1
из очевидных недостатков это конечно крайне низний технологический уровень
-есть такая штука как stxxl — это как раз и есть stl для объемов данных которые в память не влезают, но вы ей не воспользовались
-RAII презрет, зачем эти голые CRITICAL_SECTION Enter\Leave, что нельзя было обертками воспользоваться из ATL или буста?
-файлы зачем-то читаются в память — нафиг это нужно? с файл мэппингом было бы гораздо проще
-зачем-то вручную создаются потоки — опять дикость. Гораздо лучше было бы воспользоваться ОМP

ошибок достаточно много, даже если вы и решили задачу правильно слив засчитан
... << RSDN@Home 1.1.4 stable SR1 rev. 568>>
Re[2]: сортировка файла
От: letsurf  
Дата: 12.06.10 16:42
Оценка:
Здравствуйте, rm822, Вы писали:

R>из очевидных недостатков это конечно крайне низний технологический уровень

R>-есть такая штука как stxxl — это как раз и есть stl для объемов данных которые в память не влезают, но вы ей не воспользовались
R>-RAII презрет, зачем эти голые CRITICAL_SECTION Enter\Leave, что нельзя было обертками воспользоваться из ATL или буста?
R>-файлы зачем-то читаются в память — нафиг это нужно? с файл мэппингом было бы гораздо проще
R>-зачем-то вручную создаются потоки — опять дикость. Гораздо лучше было бы воспользоваться ОМP

R>ошибок достаточно много, даже если вы и решили задачу правильно слив засчитан


все по существу, спасибо.
Re: сортировка файла
От: Andruh Россия  
Дата: 12.06.10 16:56
Оценка:
Привет!
Преждевременная оптимизация и здесь тоже зло. Первым делом избавься от идеи многопоточности для первого варианта решения задачи. Очень легко наступить на грабли, получив два параллельных потока, работающих одновременно с диском.
К чему четыре потока чанксортеров, если они читают из файла под критической секцией? Только чтобы параллельно запустить сортировку в другом, который уже свой чанк прочитал? Это можно было бы сделать более прямым способом, да и не нужно, по крайне мере в первой версии — сортировка чанка вряд ли занимает существенное время, чтобы её сразу делать параллельно.
Чанксортер читает под критической секцией, потом сортирует и пишет в файл без оной — писать может параллельно с чтением другим чанксортером — io деградирует?
Не до конца понял, как происходит merge, но по-моему так: 2 куска читаются в память, затем сливаются std::merge-ом в память, затем сбрасываются на диск. Во-первых, читать в память такими кусками ни к чему, можно сливать по мере чтения. Тут можно сделать что-то вроде istream_iterator или переписать руками без использования merge. Можно, конечно, и читать в память кусками и сливать std::merge-ом по-старому, но не выделять память под массивы левой, правой и результирующей части в mergeTwoFilesIntoOne при каждом слиянии двух файлов! Ну и во-вторых, не стоит сливать файлы всего по два, тем более в двух merger-ах, которые параллельно насилуют диск и перегонять бедные int-ы по несколько раз из файлов в файл. Нужно слить сразу все чанки — смотри в сторону std::priority_queue.
Re: сортировка файла
От: vitali_y Беларусь www.stopka.us
Дата: 12.06.10 17:10
Оценка: :)
должно быть написано не "как 2^30 32-битных беззнаковых чисел" а "как 2^32 32-битных беззнаковых чисел"
или где неточность?
Re[2]: сортировка файла
От: letsurf  
Дата: 12.06.10 17:13
Оценка:
Здравствуйте, Andruh, Вы писали:

A>К чему четыре потока чанксортеров, если они читают из файла под критической секцией? Только чтобы параллельно запустить сортировку в другом, который уже свой чанк прочитал?


Да, для этого.

A>Не до конца понял, как происходит merge, но по-моему так: 2 куска читаются в память, затем сливаются std::merge-ом в память, затем сбрасываются на диск.


все верно, именно это он и делает.

A>Можно, конечно, и читать в память кусками и сливать std::merge-ом по-старому, но не выделять память под массивы левой, правой и результирующей части в mergeTwoFilesIntoOne при каждом слиянии двух файлов!


ага, на это уже сам обратил внимание.

A>Ну и во-вторых, не стоит сливать файлы всего по два, тем более в двух merger-ах, которые параллельно насилуют диск и перегонять бедные int-ы по несколько раз из файлов в файл. Нужно слить сразу все чанки — смотри в сторону std::priority_queue.


спасибо.
Re[2]: сортировка файла
От: vitali_y Беларусь www.stopka.us
Дата: 12.06.10 17:13
Оценка:
еще неточность "Пусть у нас на диске есть файл размером 4 гигабайта" должно быть "Пусть у нас на диске есть файл размером от 4 гигабайт"?
Re[2]: сортировка файла
От: letsurf  
Дата: 12.06.10 17:18
Оценка:
Здравствуйте, vitali_y, Вы писали:

_>должно быть написано не "как 2^30 32-битных беззнаковых чисел" а "как 2^32 32-битных беззнаковых чисел"

_>или где неточность?

нет, все верное написано, т.е. 2^30 32-битных числа занимают как раз 4Г
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.