Всем привет.
Пытаюсь работать с азио и пайпами в режиме сообщений. Использую для этого stream_handle::async_read_some.
Когда буфер больше сообщения, оно принимается нормально, вызывается обработчик. Иначе — access violation (Unhandled exception at 0xfeeefeee).
В общем, портится overlapped.
Стек вызова (размер буфера 16 байт):
* feeefeee()
* boost::asio::detail::win_iocp_io_service::operation::do_completion(unsigned long last_error=0x000000ea, unsigned int bytes_transferred=0x00000010) Line 77 + 0x16 bytes C++
* boost::asio::detail::win_iocp_io_service::do_one(bool block=true, boost::system::error_code & ec={...}) Line 509 C++
* boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec={...}) Line 186 + 0xe bytes C++
* boost::asio::io_service::run() Line 58 + 0xf bytes C++
В общем, пришло ровно 16 байт, код ошибки MORE_DATA_AVAIL — что вполне нормально для режима сообщений. В этом случае надо просто повторять операции чтения, пока не придет success. К сожалению, до нас это богатство так и не добралось ввиду мусора в overlapped. Ощущение такое, что проблемы с синхронизацией, но тут весь IOCP в одном треде. Иногда "проходит" пара-тройка вызовов, но потом все опять заканчиывается ошибкой доступа.
В общем, буду благодарен за мысли по поводу. Код, воспроизводящий пример, приведен ниже (boost 1.38).
Спасибо!
--
Andrew Solodovnikov
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
struct PipeServer
{
PipeServer():m_hPipe(INVALID_HANDLE_VALUE), m_sName(NULL)
{
}
BOOL Create(LPCTSTR szName, DWORD dwTimeout = 2000, DWORD dwBufSize = 4096)
{
m_bStop = FALSE;
m_sName = _tcsdup(szName);
if (m_sName)
{
m_dwTimeout = dwTimeout;
m_dwBufSize = dwBufSize;
InternalCreatePipe();
if (m_hPipe != INVALID_HANDLE_VALUE)
return TRUE;
free(m_sName);
m_sName = NULL;
}
return FALSE;
}
BOOL Accept(HANDLE &theStream)
{
if (m_hPipe != INVALID_HANDLE_VALUE)
{
if (ConnectNamedPipe(m_hPipe, NULL) || (GetLastError() == ERROR_PIPE_CONNECTED))
{
if (!m_bStop)
{
theStream = m_hPipe;
InternalCreatePipe();
return TRUE;
}
VERIFY(::DisconnectNamedPipe(m_hPipe));
}
VERIFY(::CloseHandle(m_hPipe));
m_hPipe = INVALID_HANDLE_VALUE;
}
return FALSE;
}
void Stop()
{
::InterlockedExchange((long *)&m_bStop, TRUE);
// possible hang on ::CallNamedPipe - if another pipe connected between this calls...
DWORD dwTemp;
::CallNamedPipe(m_sName, NULL, 0, NULL, 0, &dwTemp, NMPWAIT_NOWAIT);
}
void Close()
{
if (m_hPipe != INVALID_HANDLE_VALUE)
{
::DisconnectNamedPipe(m_hPipe);
::CloseHandle(m_hPipe);
m_hPipe = INVALID_HANDLE_VALUE;
}
free(m_sName);
m_sName = NULL;
}
private:
void InternalCreatePipe()
{
m_hPipe = ::CreateNamedPipe(
m_sName, // pipe name
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, // read/write access
PIPE_TYPE_MESSAGE | // message type pipe
PIPE_READMODE_MESSAGE | // message-read mode
PIPE_WAIT, // blocking mode
PIPE_UNLIMITED_INSTANCES, // max. instances
m_dwBufSize, // output buffer size
m_dwBufSize, // input buffer size
m_dwTimeout, // client time-out
NULL // no security attribute
);
}
private:
BOOL m_bStop;
LPTSTR m_sName;
HANDLE m_hPipe;
DWORD m_dwTimeout;
DWORD m_dwBufSize;
};
#define BLOCK_SIZE 64
struct pipe_session
{
pipe_session(boost::asio::io_service& io_service, HANDLE hClientHandle):
m_hClientHandle(io_service, hClientHandle)
{
io_service.post(boost::bind(&pipe_session::client_read, this));
}
private:
void client_read()
{
//boost::asio::async_read(m_hClientHandle,
m_hClientHandle.async_read_some(
boost::asio::buffer(&szBuffer[0], BLOCK_SIZE),
boost::bind(&pipe_session::handle_client_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
));
}
void handle_client_read(const boost::system::error_code& error, size_t nReaded)
{
if (!error)
{
// end iteration
int i = 0;
}
else
{
if (error.value() == ERROR_MORE_DATA)
{
//m_sbClientBuffer.commit(BLOCK_SIZE);
client_read();
}
}
}
private:
boost::asio::windows::stream_handle m_hClientHandle;
char szBuffer[4096];
};
void server_thread(PipeServer *server)
{
boost::asio::io_service io_service;
boost::asio::add_service(io_service, new boost::asio::windows::stream_handle_service(io_service));
std::auto_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(io_service));
boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
HANDLE hStream;
while (server->Accept(hStream))
{
new pipe_session(io_service, hStream);
}
work.reset(); // Allow run() to exit.
t.join();
}
int _tmain(int argc, TCHAR* argv[], TCHAR* envp[])
{
PipeServer server;
if (server.Create(_T("\\\\.\\pipe\\test_channel")))
{
try
{
{
boost::thread t(boost::bind(&server_thread, &server));
getch();
HANDLE hService = CreateFile(_T("\\\\.\\pipe\\test_channel"), GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, NULL);
if (hService != INVALID_HANDLE_VALUE)
{
DWORD mode = PIPE_READMODE_MESSAGE;
::SetNamedPipeHandleState(hService, &mode, NULL, NULL);
std::vector<char> data(0xa5);
DWORD dw;
::WriteFile(hService, &data.front(), data.size(), &dw, NULL);
}
getch();
server.Stop();
t.join();
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
server.Close();
}
return 0;
}