Допустим есть ситуация, когда X рабочих потоков, а в очереди Y поставленных запросов, выполнение запроса требует времени Z.
Т.е. скажем очередь не пуста и в данное время необходимо срочно провести деинициализацию. Самое на первый взгляд простое — поставить в очередь
с помощью PostQueuedCompletionStatus некие данные после анализа которых, рабочий поток завершиться. Но вся суть, что
PostQueuedCompletionStatus использует KeInsertQueue, т.е. запрос ставиться в конец очереди. Т.е. необходимо ставить запрос в начало очереди, что-бы
рабочий поток в первую очередь получал уведомление о том, что происходит деинициализация. А так выходит, что пока все предыдущие задания из очереди не будут выбранны, задание о завершении не получить. А деинциализироваться надо, например в случае выгрузки драйвера или при уведомлении от SCM для службы. Ситуация не частая, но бывает. Как выходить из такого положения?
Re: Completion Port + KeInsertQueue + управление очередью
Здравствуйте, abdul.zycor,
AZ>Допустим есть ситуация, когда X рабочих потоков, а в очереди Y поставленных запросов, выполнение запроса требует времени Z. AZ>Т.е. скажем очередь не пуста и в данное время необходимо срочно провести деинициализацию. Самое на первый взгляд простое — поставить в очередь AZ>с помощью PostQueuedCompletionStatus некие данные после анализа которых, рабочий поток завершиться. Но вся суть, что AZ>PostQueuedCompletionStatus использует KeInsertQueue, т.е. запрос ставиться в конец очереди. Т.е. необходимо ставить запрос в начало очереди, что-бы AZ>рабочий поток в первую очередь получал уведомление о том, что происходит деинициализация. А так выходит, что пока все предыдущие задания из очереди не будут выбранны, задание о завершении не получить. А деинциализироваться надо, например в случае выгрузки драйвера или при уведомлении от SCM для службы. Ситуация не частая, но бывает. Как выходить из такого положения?
--
Попробуйте использовать CompletionKey при вызове CreateIoCompletionPort как указатель на структуру, внутри которой передавайте глобальные данные всем рабочим потокам. Каждый из рабочих потоков, получив управление после вызова функции GetQueuedCompletionStatus, первым делом должен проверить эти данные и решить, что ему делать.
C уважением,
Геннадий Майко.
Re[2]: Completion Port + KeInsertQueue + управление очередью
Здравствуйте, Геннадий Майко, Вы писали:
ГМ>Попробуйте использовать CompletionKey при вызове CreateIoCompletionPort как указатель на структуру, внутри которой передавайте глобальные данные всем рабочим потокам. Каждый из рабочих потоков, получив управление после вызова функции GetQueuedCompletionStatus, первым делом должен проверить эти данные и решить, что ему делать.
Да тут других вариантов нет. Просто не очень красиво выходит.
Re[3]: Completion Port + KeInsertQueue + управление очередью
У меня снова вопрос по этой же теме.
Допустим такая ситуация, как я описал выше.
Да рабочие потоки после возврата управления из GetQueuedCompletionStatus делают что-то типа
if(GetQueuedCompletionStatus(ThreadPool->CompletionPort, &NumberOfBytes, (PULONG_PTR)&WorkItem, (LPOVERLAPPED *)&Overlapped, INFINITE)) {
//проверка флага окончания работы рабочего потокаif ( InterlockedCompareExchange(&ThreadPool->fDestroyCalled, 1, 1) ) {
status = ERROR_SUCCESS;
break;
}
т.е. на данные WorkItem они забивают и прекрастно завершаются досрочно, прекращая выбирать данные из очереди, но очередь не пуста ведь остается.
Но ставим в очередь мы данные, выделяется ведь память нами именно для переменной object, в которой скажем описается запрос какие данные для него там есть.
if ( !PostQueuedCompletionStatus(ThreadPool->CompletionPort, 0, (ULONG_PTR)object, NULL) ) {
status = CALL(GetLastError)();
break;
}
Т.е. вопрос звучит так — кто будет освобождать эту память, если скажем процесс не будет завершаться, это была так сказать реинициализация?
Самое простое это вести список где будет содержаться указатели на участки выделенной нами памяти, а по досрочному завершению освобождать это дело, но это ведь велосипед какой-то.
Я вижу два варианта:
1. Делать свою реализацию очереди, в качестве которой и используется Completion Port
2. Применять пулы памяти, например как в библиотеке APR (http://apr.apache.org/docs/apr/1.3/group__apr__pools.html)
т.е. память для object выделяется из пула, при реинициализации модуля/класса пул дестроиться, при дестрое пула, освобождается память пула
Короче прошу помощи.
Re[4]: Completion Port + KeInsertQueue + управление очередью
Здравствуйте, abdul.zycor,
AZ>Допустим такая ситуация, как я описал выше. AZ>Да рабочие потоки после возврата управления из GetQueuedCompletionStatus делают что-то типа AZ>
AZ> if(GetQueuedCompletionStatus(ThreadPool->CompletionPort, &NumberOfBytes, (PULONG_PTR)&WorkItem, (LPOVERLAPPED *)&Overlapped, INFINITE)) {
AZ> //проверка флага окончания работы рабочего потока
AZ> if ( InterlockedCompareExchange(&ThreadPool->fDestroyCalled, 1, 1) ) {
AZ> status = ERROR_SUCCESS;
AZ> break;
AZ> }
AZ>
AZ>т.е. на данные WorkItem они забивают и прекрастно завершаются досрочно, прекращая выбирать данные из очереди, но очередь не пуста ведь остается.
--
Я тоже думал над этим и предлагаю попробовать следующий подход для корректного окончания потоков и одновременного очищения очереди. Идея такая:
1. Для окончания работы основной поток устанваливает команду в общей структуре (на которую указывает CompletionKey) и помещает в очередь N "остановочных" completion packet с помощью функции PostQueuedCompletionStatus, где N — число потоков. После этого никакие запросы на передачу данных с помощью этого completion port'a не делаются. Это нужно для того, чтобы в очереди последние N completion packet'ы были "остановочными".
2. Все потоки, кроме последнего, заканчивают свою работу, получив соответствующую команду через эту общую структуру. Для этого в этой же структуре необходимо иметь счетчик заканчивающихся потоков. Каждый поток, определив, что ему пора заканчиваться, аккуратно декрементирует этот счетчик, и, проанализировав его, "понимает", последний ли он или нет.
3. Все потоки должны также каким-то образом идентифицировать "остановочный" пакет и аккуратно уменьшать еще один счетчик "остановочных" пакетов в той же самой структуре. Сделать это можно многими способами (например, передавая NULL в качестве lpOverlapped в функции PostQueuedCompletionStatus).
4. Последний поток должен определять, сколько еще "остановочных" пакетов находится в очереди (анализируя счетчик "остановочных" пакетов) и, когда ясно, что очередь пуста, заканчивать свою работу.