SQL 2008 медленная загрузка через ISequentialStream.
От: blonduser  
Дата: 22.06.16 20:52
Оценка:
Всем доброго времени суток!

http://rsdn.ru/forum/db/6478637
Автор: blonduser
Дата: 22.06.16

Что бы избавится от ошибки. Ссылка на вопрос выше.

Переделал загрузку больших блобов через ISequentialStream.
Ошибки не возникает, но загрузка занимает продолжительное время.

Посмотрел. Чтение данных происходит по блоками 1 КБ.
Как можно увеличить размер блока и ускорить загрузку данных?


Спасибо.
Re: SQL 2008 медленная загрузка через ISequentialStream.
От: LuciferNovoros Россия  
Дата: 23.06.16 16:44
Оценка:
Здравствуйте, blonduser, Вы писали:

B>Переделал загрузку больших блобов через ISequentialStream.

B>Ошибки не возникает, но загрузка занимает продолжительное время.

Кода, как обычно, не будет... Но вот что-то мой telepate.sys сегодня сбоит и не работает. Подозреваю, что это не только у меня так.
Re[2]: SQL 2008 медленная загрузка через ISequentialStream.
От: blonduser  
Дата: 24.06.16 10:47
Оценка:
Здравствуйте, LuciferNovoros, Вы писали:

LN>Здравствуйте, blonduser, Вы писали:


B>>Переделал загрузку больших блобов через ISequentialStream.

B>>Ошибки не возникает, но загрузка занимает продолжительное время.

LN>Кода, как обычно, не будет... Но вот что-то мой telepate.sys сегодня сбоит и не работает. Подозреваю, что это не только у меня так.


Собственно сам код реализующий ISequentialStream. Взят из MSDN.

  Код реализующий ISequentalStream
class WorkStream : public ISequentialStream
{
private:  
   ULONG m_ulRefCount;  
   ULONG m_ulBufSize;  
   ULONG m_ulReadSize;  
   ULONG m_ulBytesLeft;  
   ULONG m_ulReadPos;  
   BYTE * m_pSrcData;  
   BYTE * m_pReadPtr;  
   BOOL m_fWasRead;  

public:

   WorkStream() {  
      m_ulRefCount = 1;  
      m_ulBufSize = 0;  
      m_ulReadSize = 0;  
      m_ulBytesLeft = 0;  
      m_ulReadPos = 0;  
      m_pSrcData = NULL;  
      m_pReadPtr = NULL;  
      m_fWasRead = FALSE;  
   }  
  
   ~WorkStream() {}  
  
   virtual ULONG STDMETHODCALLTYPE AddRef() {  
      return ++m_ulRefCount;  
   }  
  
   virtual ULONG STDMETHODCALLTYPE Release() {  
      --m_ulRefCount;  
      if (m_ulRefCount == 0) {  
         delete this;  
         return 0;  
      }  
      return m_ulRefCount;  
   }  
  
   virtual HRESULT STDMETHODCALLTYPE QueryInterface(REFIID riid, void ** ppvObj) {  
      if (!ppvObj)  
         return E_INVALIDARG;  
      else  
         *ppvObj = NULL;  
  
      if (riid != IID_ISequentialStream && riid != IID_IUnknown)  
         return E_NOINTERFACE;  
  
      AddRef();  
      *ppvObj = this;  
      return S_OK;  
   }  
  
   HRESULT Init(const void * pSrcData, const ULONG ulBufSize, const ULONG ulReadSize) {  
      if (NULL == pSrcData)  
         return E_INVALIDARG;  
  
      // Data length must be non-zero  
      if (0 == ulBufSize)  
         return E_INVALIDARG;  
  
      m_ulBufSize = ulBufSize;  
      m_ulReadSize = ulReadSize;  
      m_pSrcData = (BYTE *)pSrcData;  
      m_pReadPtr = m_pSrcData;  
      m_ulBytesLeft = m_ulReadSize;  
      m_ulReadPos = 0;  
      m_fWasRead = FALSE;  
  
      return S_OK;  
   }  
  
   // Can't write data to SQL Server providers (SQLOLEDB/SQLNCLI).  Instead, they read from our object.  
   virtual HRESULT STDMETHODCALLTYPE Write(const void *, ULONG, ULONG * ) {  
      return E_NOTIMPL;  
   }  
  
   // This implementation simply copies data from the source buffer in whatever size requested.  
   // But you can do anything here such as reading from a file, reading from a different rowset, stream, etc.  
   virtual HRESULT STDMETHODCALLTYPE Read(void * pv, ULONG cb, ULONG * pcbRead) {  // cb всегда 1024 байта.
      ULONG ulBytesWritten = 0;  
      ULONG ulCBToWrite = cb;  
      ULONG ulCBToCopy;  
      BYTE * pvb = (BYTE *)pv;  
  
      m_fWasRead = TRUE;  
  
      if (NULL == m_pSrcData)  
         return E_FAIL;  
  
      if (NULL == pv)  
         return STG_E_INVALIDPOINTER;  
  
      while (ulBytesWritten < ulCBToWrite && m_ulBytesLeft) {  
         // Make sure we don't write more than our max read size or the size they asked for  
         ulCBToCopy = min(m_ulBytesLeft, cb);  
  
         // Make sure we don't read past the end of the internal buffer  
         ulCBToCopy = min(m_ulBufSize - m_ulReadPos, ulCBToCopy);  
  
         memcpy(pvb, m_pReadPtr + m_ulReadPos, ulCBToCopy);  
         pvb += ulCBToCopy;  
         ulBytesWritten += ulCBToCopy;  
         m_ulBytesLeft -= ulCBToCopy;  
         cb -= ulCBToCopy;  
  
         // Wrap reads around the src buffer  
         m_ulReadPos += ulCBToCopy;  
         if (m_ulReadPos >= m_ulBufSize)  
            m_ulReadPos = 0;  
      }  
  
      if (pcbRead)  
         *pcbRead = ulBytesWritten;  
  
      return S_OK;  
   }  
};

  Инициализация хранимой процедуры
struct SP_Insert_Blobs_PARAMS
{
    VARIANT        params[6];

    ISequentialStream*  pISeqStreamData;
    DBLENGTH        blobLengthData;

    SP_Insert_Blobs_PARAMS() { for(int i = 0; i < 6; i++) VariantInit(&params[i]);}
    ~SP_Insert_Blobs_PARAMS() { for(int i = 0; i < 6; i++) VariantClear(&params[i]);}
};

DBBINDING*    acDBBinding = new DBBINDING[paramsCount];

for(int i = 0; i < paramsCount; i++)
{
    acDBBinding[i].obLength = 0;
    acDBBinding[i].obStatus = 0;
    acDBBinding[i].pTypeInfo = NULL;
    acDBBinding[i].pObject = NULL;
    acDBBinding[i].pBindExt = NULL;
    acDBBinding[i].dwPart = DBPART_VALUE;
    acDBBinding[i].dwMemOwner = DBMEMOWNER_CLIENTOWNED;
    acDBBinding[i].dwFlags = 0;
    acDBBinding[i].bScale = 0;
}

acDBBinding[0].iOrdinal = 1;
acDBBinding[0].obValue = offsetof(SP_Insert_Blobs_PARAMS, params[0]);
acDBBinding[0].eParamIO = DBPARAMIO_INPUT;
acDBBinding[0].cbMaxLen = sizeof(VARIANT);
acDBBinding[0].wType = DBTYPE_VARIANT;
acDBBinding[0].bPrecision = 11;

acDBBinding[1].iOrdinal = 2;
acDBBinding[1].obValue = offsetof(SP_Insert_Blobs_PARAMS, params[1]);
acDBBinding[1].eParamIO = DBPARAMIO_INPUT;
acDBBinding[1].cbMaxLen = sizeof(VARIANT);
acDBBinding[1].wType = DBTYPE_VARIANT;
acDBBinding[1].bPrecision = 11;

acDBBinding[2].iOrdinal = 3;
acDBBinding[2].obValue = offsetof(SP_Insert_Blobs_PARAMS, pISeqStreamData);
acDBBinding[2].obLength = offsetof(SP_Insert_Blobs_PARAMS, blobLengthData);
acDBBinding[2].dwPart = DBPART_VALUE | DBPART_LENGTH;
acDBBinding[2].pObject = new DBOBJECT();
acDBBinding[2].pObject->dwFlags = STGM_READ;
acDBBinding[2].pObject->iid = IID_ISequentialStream;
acDBBinding[2].eParamIO = DBPARAMIO_INPUT;
acDBBinding[2].cbMaxLen = 0;
acDBBinding[2].wType = DBTYPE_IUNKNOWN;
acDBBinding[2].bPrecision = 11;

...

CreateStoredProcedure(Name, paramsCount, sizeof(VARIANT) * paramsCount, acDBBinding);

delete[] acDBBinding;

  Сохранение данных
WorkStream* pWorkStream = new WorkStream();
pWorkStream->AddRef();
pWorkStream->Init(pByte, ullSize, ullSize);
    
SP_Insert_Blobs_PARAMS    tmpBlobs;


tmpBlobs.params[0] = variant_t((long)1009);        
tmpBlobs.params[1] = _variant_t(TEXT("SimpleFileName"));

tmpBlobs.pISeqStreamData = (ISequentialStream*)pWorkStream;
tmpBlobs.blobLengthData = ullSize;

ExecuteStoredProcedure((LPVOID)&tmpBlobs);
Re[3]: SQL 2008 медленная загрузка через ISequentialStream.
От: LuciferNovoros Россия  
Дата: 24.06.16 14:46
Оценка:
Здравствуйте, blonduser, Вы писали:

B>Собственно сам код реализующий ISequentialStream. Взят из MSDN.


А я нашел другой код:

#include <oledb.h>

IRowset *     pIRowset;
IAccessor *   pIAccessor;
IMalloc *     pIMalloc;

int main() {
   // Assume the consumer has a pointer (pIRowset) that
   // points to the rowset. Create an accessor for the
   // BLOB column. Assume it is column 1.

   HACCESSOR      hAccessor;
   DBBINDSTATUS   rgStatus[1];
   DBOBJECT       ObjectStruct;
   DBBINDING      rgBinding[1] = {
      1,                            // Column 1
      0,                            // Offset to data
      0,                            // Ignore length field
      sizeof(IUnknown *),           // Offset to status field
      NULL,                         // No type info
      &ObjectStruct,                // Object structure
      NULL,                         // Ignore binding extensions
      DBPART_VALUE|DBPART_STATUS,   // Bind value and status
      DBMEMOWNER_CLIENTOWNED,       // Consumer owned memory
      DBPARAMIO_NOTPARAM,           // Not a parameter
      0,                            // Ignore size of data
      0,                            // Reserved
      DBTYPE_IUNKNOWN,              // Type DBTYPE_IUNKNOWN
      0,                            // Precision not applicable
      0                             // Scale not applicable
   } ;
   ISequentialStream *   pISequentialStream;

   // Set the elements in the object structure so that the
   // provider creates a readable ISequentialStream object over
   // the column. The consumer will read data from this object.
   ObjectStruct.dwFlags = STGM_READ;
   ObjectStruct.iid = IID_ISequentialStream;

   pIRowset->QueryInterface(IID_IAccessor, (void**) &pIAccessor);
   pIAccessor->CreateAccessor(DBACCESSOR_ROWDATA, 1, rgBinding,
               sizeof(IUnknown *) + sizeof(ULONG), &hAccessor, rgStatus);
   pIAccessor->Release();

   // Allocate memory for the returned pointer and the
   // status field. The first sizeof(IUnknown*) bytes are
   // for the pointer to the object, and the next sizeof(ULONG)
   // bytes are for the status.

   void * pData = pIMalloc->Alloc(sizeof(IUnknown *) + sizeof(ULONG));

   // Get the next row, and get the pointer to ISequentialStream.
   HROW *   rghRows = NULL;
   DBROWCOUNT    cRows;

   pIRowset->GetNextRows(NULL, 0, 1, &cRows, &rghRows);
   pIRowset->GetData(rghRows[0], hAccessor, pData);

   // Read and process 5000 bytes at a time.
   BYTE    rgBuffer[5000];
   ULONG   cb;

   if ((ULONG)((BYTE*)pData)[rgBinding[0].obStatus] == DBSTATUS_S_ISNULL) {
   // Process NULL data.
   } else if ((DBSTATUS)((BYTE *)pData)[rgBinding[0].obStatus] ==
               DBSTATUS_S_OK) {
      //Obtain pointer to ISequentialStream.
      pISequentialStream = *(ISequentialStream **)pData;
      do {
         pISequentialStream->Read(rgBuffer,    // Buffer to write the data
                           sizeof(rgBuffer),   // Number of bytes to get
                           &cb);   // Number of bytes written to buffer
         if (cb > 0)               // Check to see if we got data
         {
            // Process data here.
         }
      } while (cb >= sizeof(rgBuffer));
   };

   pIMalloc->Free(rghRows);

   return 0;
};


Ключевое я выделил. Вообще непонятно, с чего ты взял, что cb всегда равно 1024? Нигде я не нашел такого ограничения.
Re[4]: SQL 2008 медленная загрузка через ISequentialStream.
От: blonduser  
Дата: 24.06.16 15:36
Оценка:
Здравствуйте, LuciferNovoros, Вы писали:

LN>Здравствуйте, blonduser, Вы писали:


B>>Собственно сам код реализующий ISequentialStream. Взят из MSDN.


LN>А я нашел другой код:


LN>Ключевое я выделил. Вообще непонятно, с чего ты взял, что cb всегда равно 1024? Нигде я не нашел такого ограничения.


Поставил точку остановки В функции Read и посмотрел значение.
Вы пишете напрямую в колонку, а я сохраняю через хранимую процедуру.
Re[5]: SQL 2008 медленная загрузка через ISequentialStream.
От: LuciferNovoros Россия  
Дата: 25.06.16 07:03
Оценка:
Здравствуйте, blonduser, Вы писали:

B>Поставил точку остановки В функции Read и посмотрел значение.


А кто мешает передать в функцию нужное тебе значение? Ты ж ее откуда-то вызываешь, вот и передай туда то, что тебя устроит.

B>Вы пишете напрямую в колонку, а я сохраняю через хранимую процедуру.


А в чем разница? Я просто показал, что функция может читать и большее число байт. Ну нет ограничений.
Re[6]: SQL 2008 медленная загрузка через ISequentialStream.
От: blonduser  
Дата: 25.06.16 07:45
Оценка:
Здравствуйте, LuciferNovoros, Вы писали:

LN>Здравствуйте, blonduser, Вы писали:


B>>Поставил точку остановки В функции Read и посмотрел значение.


LN>А кто мешает передать в функцию нужное тебе значение? Ты ж ее откуда-то вызываешь, вот и передай туда то, что тебя устроит.


B>>Вы пишете напрямую в колонку, а я сохраняю через хранимую процедуру.


LN>А в чем разница? Я просто показал, что функция может читать и большее число байт. Ну нет ограничений.


Если бы вы обратили внимание, то заметили бы что функцию "read" вызываю не я, а SQL-сервер.
SQL-сервер и передает её всегда 1КБ.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.