[.NET] Фреймворк для написания собственных канальных синков
От: Нахлобуч Великобритания https://hglabhq.com
Дата: 02.12.05 16:44
Оценка: 83 (12)
Для тех, кто знает, что такое синк

Порыскал тут по форумам и понял, что на написание своего синка уходит от нескольких часов до бесконечности и, в результате, ненаписание оного. Но мне написать его надо было, так что я управился за 1.5 дня, т.е. около 14 часов

Все это хозяйство основано на Custom Sinks. Классы, как водится (я ж русский программист, в конце то концов!) основательно переписано под себя из-за слишком сильной "общности" упомянутой реализации. Ну и комментариев еще добавил.

Ну, поехали. Забрать исходники можно тут (8 Кб).
Итак. Для того, чтобы написать свой синк, вам потребуется написать 4 класса. Два собственно синка (клиентский и серверный) и два провайдера (аналогично).

Для написания класса клиентского синка унаследуйтесь от ClientSinkBase и переопределите оба виртуальных метода. Вот пример синка, архивирующего трафик с помошью ICSharpCode.SharpZipLib. Данный синк устанавливается после Formatter Sink'а, поскольку требуется работа с потоком, в который уже сериализовано сообщение, а не с самим сообщением.

    /// <summary>
    /// Клиентский приемник с поддержкой архивирования данных. Приемник требует установки
    /// после Форматирующего Приемника (Formatter Sink)
    /// </summary>
    public class CompressionClientSink : ClientSinkBase
    {
        #region Закрытые переменные-члены
        private int compressionLevel = 6;
        #endregion

        #region Защищенные свойства
        /// <summary>
        /// Возвращает или устанавливает уровень сжатия
        /// </summary>
        protected int CompressionLevel
        {
            get { return compressionLevel; }
            set { compressionLevel = value; }
        }
        #endregion

        /// <summary>
        /// Создает объект
        /// </summary>
        public CompressionClientSink()
        {
        }

        /// <summary>
        /// Создает объект
        /// </summary>
        /// <param name="clientSinkConfiguration">Конфигурационные данные</param>
        public CompressionClientSink(ClientSinkConfiguration clientSinkConfiguration)
        {
            IDictionary properties = clientSinkConfiguration.ConfigurationData.Properties;
            if(properties["compressionLevel"] != null)
            {
                CompressionLevel = int.Parse(properties["compressionLevel"].ToString());
            } // if
        }

        #region Переопределенные методы ClientSinkBase
        /// <summary>
        /// Переопределите этот метод если требуется обработка запросов.
        /// Поведение и значение параметров зависят от того, где в цепочке Приемников
        /// находится текуший Приемник (до или после Форматирующего Приемника,
        /// Formatter Sink) и от того, клиентский это Приемник или серверный
        /// </summary> 
        /// <param name="message">
        ///    Клиент, после Форматирующего Приемника - Сообщение запроса
        /// </param>
        /// <param name="headers">
        ///    Клиент, после Форматирующего Приемника - Транспортные заголовки сообщения запроса
        ///    </param>
        /// <param name="stream">
        ///    Клиент, после Форматирующего Приемника - Поток сообщения запроса
        /// </param>
        /// <param name="state">Состояние</param>
        public override void ProcessRequest(IMessage message, ITransportHeaders headers, 
            ref Stream stream, ref object state)
        {
            //
            // Архивируем, идем в начало и добавляем элемент в заголовок
            Stream compressedStream = CompressionManager.Compress(stream, CompressionLevel);
            if(compressedStream.CanSeek)
                compressedStream.Seek(0, SeekOrigin.Begin);

            //
            // Закрываем старый поток
            stream.Close();
            stream = compressedStream;

            if(headers != null)
                headers["Compressed"] = "true";
        }

        /// <summary>
        /// Переопределите этот метод если требуется обработка ответов.
        /// Поведение и значение параметров зависят от того, где в цепочке Приемников
        /// находится текуший Приемник (до или после Форматирующего Приемника,
        /// Formatter Sink) и от того, клиентский это Приемник или серверный
        /// </summary>
        /// <param name="message">
        ///    Клиент, после Форматирующего Приемника - null
        ///    </param>
        /// <param name="headers">
        ///    Клиент, после Форматирующего Приемника - Транспортные заголовки ответного сообщения
        /// </param>
        /// <param name="stream">
        ///    Клиент, после Форматирующего Приемника - Поток ответного сообщения
        /// </param>
        /// <param name="state">
        ///    Клиент, после Форматирующего Приемника - 
        /// </param>
        public override void ProcessResponse(IMessage message, ITransportHeaders headers, 
            ref Stream stream, object state)
        {
            //
            // Если в заголовках есть элемент Compressed, то разархивируем
            if(headers != null && headers["Compressed"] != null)
            {
                //
                // Разархивируем и идем в начало
                if(stream.CanSeek) 
                    stream.Seek(0, SeekOrigin.Begin);

                Stream decompressedStream = CompressionManager.Decompress(stream);

                if(decompressedStream.CanSeek) 
                    decompressedStream.Seek(0, SeekOrigin.Begin);

                //
                // Закрываем старый поток
                stream.Close();
                stream = decompressedStream;
            } // if
        }
        #endregion

Основная работа, как видно, происходит в ProcessRequest и ProcessResponse — в первой функции трафик архивируется и отправляется на Transport Sink (т.е. не совсем туда, а ниже по стеку), а в ProcessResponse — разархивируется и направляется выше по стеку.

На сервере ситуация обратная — в ProcessRequest разархивируем, а в ProcessResponse архивируем. Почему это происходит поясняет следующая схема:

      Клиент                          Сервер

    Прокси                          Объект
        |                               |
    Пользовательские синки          Пользовательские синки <-- Синки, которым важно содержание сообщение, находятся тут
        |                               |
    Форматирующий синк              Форматирующий синк
        |                               |
    Пользовательские синки          Пользовательские синки <-- Синки, которым не важно содержание сообщения, находятся здесь
        |                               |
    Транспортный синк               Транспортный синк
        |                               |
        +-------------------------------+

У клиента направление движения для Request — сверху вниз, Response — снизу вверх. Для сервера наоборот: Request — снизу вверх, Response — сверху вниз.

Для пояснения — код серверного синка:

using System.Collections;
using System.IO;
using System.Runtime.Remoting.Channels;
using System.Runtime.Remoting.Messaging;

using ESystem.CommonServer.Management.Sinks.Framework;
using ESystem.CommonServer.Management.Sinks.Framework.Configuration;

namespace ESystem.CommonServer.Management.Sinks.Compression.Server
{
    /// <summary>
    /// Summary description for CompressionServerSink.
    /// </summary>
    public class CompressionServerSink : ServerSinkBase
    {
        #region Закрытые переменные-члены
        private int compressionLevel = 6;
        #endregion

        #region Защищенные свойства
        protected int CompressionLevel
        {
            get { return compressionLevel; }
            set { compressionLevel = value; }
        }
        #endregion

        /// <summary>
        /// Создает объект
        /// </summary>
        public CompressionServerSink()
        {
        }

        /// <summary>
        /// Создает объект
        /// </summary>
        /// <param name="serverSinkConfiguration">Конфигурационные данные</param>
        public CompressionServerSink(ServerSinkConfiguration serverSinkConfiguration)
        {
            IDictionary properties = serverSinkConfiguration.ConfigurationData.Properties;
            if(properties["compressionLevel"] != null)
            {
                CompressionLevel = int.Parse(properties["compressionLevel"].ToString());
            } // if
        }

        #region Переопределенные методы ServerSinkBase
        /// <summary>
        /// Переопределите этот метод если требуется обработка запросов.
        /// Поведение и значение параметров зависят от того, где в цепочке Приемников
        /// находится текуший Приемник (до или после Форматирующего Приемника,
        /// Formatter Sink) и от того, клиентский это Приемник или серверный
        /// </summary> 
        /// <param name="message">
        ///    Сервер, после Форматирующего Приемника - Сообщение запроса
        /// </param>
        /// <param name="headers">
        ///    Сервер, после Форматирующего Приемника - Транспортные заголовки сообщения запроса
        ///    </param>
        /// <param name="stream">
        ///    Сервер, после Форматирующего Приемника - null
        /// </param>
        /// <param name="state">Состояние</param>
        public override void ProcessRequest(IMessage message, ITransportHeaders headers,
            ref Stream stream, ref object state)
        {
            if(headers != null && headers["Compressed"] != null)
            {
                //
                // Разархивируем
                stream = CompressionManager.Decompress(stream);
                state = true;

                //
                // Идем в начало
                if(stream.CanSeek)
                    stream.Seek(0, SeekOrigin.Begin);
            } // if
        }

        /// <summary>
        /// Переопределите этот метод если требуется обработка ответов.
        /// Поведение и значение параметров зависят от того, где в цепочке Приемников
        /// находится текуший Приемник (до или после Форматирующего Приемника,
        /// Formatter Sink) и от того, клиентский это Приемник или серверный
        /// </summary>
        /// <param name="message">
        ///    Сервер, после Форматирующего Приемника - Ответное сообщение
        ///    </param>
        /// <param name="headers">
        ///    Сервер, после Форматирующего Приемника - null
        /// </param>
        /// <param name="stream">
        ///    Сервер, после Форматирующего Приемника - null
        /// </param>
        /// <param name="state">Состояние</param>
        public override void ProcessResponse(IMessage message, ITransportHeaders headers, 
            ref Stream stream, object state)
        {
            if(state != null)
            {
                //
                // Архивируем
                stream = CompressionManager.Compress(stream, CompressionLevel);
                if(headers != null)
                    headers["Compressed"] = "true";

                //
                // Идем в начало
                if(stream.CanSeek)
                    stream.Seek(0, SeekOrigin.Begin);
            } // if
        }
        #endregion
    }
}

С синками покончили, надо переходить к провайдерам. Провайдеры — это классы, которые по запросу .NET Framework создают одним им известным способом экземпляры классов, реализующих IClientChannelSink/IServerChannelSink. Вам потребуется лва провайдера — как водится, клиентский и серверный.

Клиентский провайдер:

using System.Collections;
using System.Runtime.Remoting.Channels;
using System.Runtime.Remoting.Messaging;

using ESystem.CommonServer.Management.Sinks.Framework.Configuration;
using ESystem.CommonServer.Management.Sinks.Framework.Providers;

namespace ESystem.CommonServer.Management.Sinks.Compression.Client
{
    /// <summary>
    /// Провайдер Архивирующего Приемника
    /// </summary>
    public class CompressionClientSinkProvider : SinkProviderBase, 
        IClientChannelSinkProvider
    {
        #region Закрытые переменные-члены
        private IClientChannelSinkProvider nextClientChannelSinkProvider;
        //private object sinkCreator;
        #endregion

        /// <summary>
        /// Создает объект
        /// </summary>
        /// <param name="properties">Свойства</param>
        /// <param name="providerData">Данные Провайдера</param>
        public CompressionClientSinkProvider(IDictionary properties, ICollection providerData)
            : base(properties, providerData)
        {
        }

        #region Методы IClientChannelSinkProvider
        /// <summary>
        /// Создает цепь Приемников
        /// </summary>
        /// <param name="channel">Канал, для которого создается Приемник</param>
        /// <param name="url">URL объекта, с которым требуется установить соединение</param>
        /// <param name="remoteChannelData">Данные, описывающие канал на удаленном сервере</param>
        /// <returns>Цепь Приемников</returns>
        public IClientChannelSink CreateSink(IChannelSender channel, string url, object remoteChannelData)
        {
            //
            // Устанавливаем данные Приемника через CallContext а не
            // передаем в конструктор или установливаем через свойство
            CompressionClientSink compressionClientSink = null;
            CallContext.SetData("perProviderState", PerProviderState);

            //
            // Создаем следующий Приемник в цепочке, а потом - наш
            // Класс ClientSinkConfiguration используется для передачи конфигурационных
            // данных нашему синку.
            ClientSinkConfiguration clientSinkConfiguration = 
                new ClientSinkConfiguration(SinkProviderData, channel, url, remoteChannelData);
            IClientChannelSink nextSink = Next.CreateSink(channel, url, remoteChannelData);
            compressionClientSink = new CompressionClientSink(clientSinkConfiguration);

            //
            // Связываем все в цепочку
            if(compressionClientSink != null)
            {
                compressionClientSink.SetNextSink(nextSink);
                return compressionClientSink;
            } // if

            return nextSink;
        }

        /// <summary>
        /// Возвращает или устанавливает следующего Провайдера в цепочке
        /// </summary>
        public IClientChannelSinkProvider Next
        {
            get { return nextClientChannelSinkProvider; }
            set { nextClientChannelSinkProvider = value; }
        }
        #endregion
    }
}

Ну и серверный провайдер:

using System.Collections;
using System.Runtime.Remoting.Channels;
using System.Runtime.Remoting.Messaging;

using ESystem.CommonServer.Management.Sinks.Framework.Configuration;
using ESystem.CommonServer.Management.Sinks.Framework.Providers;

namespace ESystem.CommonServer.Management.Sinks.Compression.Server
{
    /// <summary>
    /// Summary description for CompressionServerSinkProvider.
    /// </summary>
    public class CompressionServerSinkProvider : SinkProviderBase,
        IServerChannelSinkProvider
    {
        #region Закрытые переменные-члены
        private IServerChannelSinkProvider nextServerChannelSinkProvider;
        #endregion

        /// <summary>
        /// Создает объект
        /// </summary>
        /// <param name="properties"></param>
        /// <param name="providerData"></param>
        public CompressionServerSinkProvider(IDictionary properties, ICollection providerData)
            : base(properties, providerData)
        {
        }

        #region Методы IServerChannelSinkProvider
        /// <summary>
        /// Возвращает данные канала
        /// </summary>
        /// <param name="channelData">Канал</param>
        public void GetChannelData(IChannelDataStore channelData)
        {
        }

        /// <summary>
        /// Создает цепь Приемников
        /// </summary>
        /// <param name="channel">Канал, для которого создается цепь</param>
        /// <returns>Цепь Приемников</returns>
        public IServerChannelSink CreateSink(IChannelReceiver channel)
        {
            //
            // Устанавливаем данные Приемника через CallContext а не
            // передаем в конструктор или установливаем через свойство
            CompressionServerSink compressionServerSink = null; 
            CallContext.SetData("perProviderState", PerProviderState);

            //
            // Создаем Приемник, а потом - следующий за ним в цепочке
            ServerSinkConfiguration serverSinkConfiguration = 
                new ServerSinkConfiguration(SinkProviderData, channel);
            compressionServerSink = new CompressionServerSink(serverSinkConfiguration);
            IServerChannelSink nextSink = Next.CreateSink(channel);
            
            //
            // Связываем в цепочку
            if(compressionServerSink != null)
            {
                compressionServerSink.SetNextSink(nextSink);
                return compressionServerSink;
            } // if

            return nextSink;
        }

        /// <summary>
        /// Возвращает или устанавливает следующего Провайдера в цепи
        /// </summary>
        public IServerChannelSinkProvider Next
        {
            get { return nextServerChannelSinkProvider; }
            set { nextServerChannelSinkProvider = value; }
        }
        #endregion
    }
}

В обоих случаях мы создаем свои синки и передаем дальше по цепочке провайдеров инструкцию "Создать Синк". Ну и в конце концов самодобавляемся в цепочку.

А теперь гвоздь программы. Настройка. Как мне показалось, одна из самых неочевидных частей в процессе.

В целом, когда вы пишете свои канальные синки, у вас есть два варианта их размещения — до форматтера и после форматтера, но до транспортного синка. Все это прописывается в App.config. Но прописывается хитро.

К примеру, если мы пишем синк, размещающийся между форматтером и транспортным синком, то .config-файлы должны быть следующего содержания:

Клиентский:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
   <system.runtime.remoting>
      <application>
         <client>
            <wellknown type="ESystem.CommonServer.Management.Shared.ICommonServerManagementServer, ESystem.CommonServer.Management.Shared" 
                url="http://localhost:89/CommonServerManagementServer.rem" />
         </client>
         <channels>
            <channel ref="http">
                <clientProviders>
                    <formatter ref="soap" />
                    <provider type="ESystem.CommonServer.Management.Sinks.Compression.Client.CompressionClientSinkProvider, ESystem.CommonServer.Management.Sinks" 
                        customSinkType="ESystem.CommonServer.Management.Sinks.Compression.Client.CompressionClientSink, ESystem.CommonServer.Management.Sinks">
                        <customData compressionLevel="9" />
                    </provider>
                </clientProviders>
            </channel>
         </channels>         
      </application>
   </system.runtime.remoting>
</configuration>

Сразу смотрим на схему. Клиентский синк размещается после форматтера, так что в App.config тег formatter должен стоять над тегом provider, в котором определяется провайдер нашего клиентского синка - т.е. исходящий запрос сначала форматируется, а потом архивируется (вспоминаем направления движения Request/Response для клиента), а входящий ответ сначала разархивируется, а потом поступает на вход форматтера.


Всегда явно указывайте тег formatter! Всегда!

А вот серверный конфиг:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
   <system.runtime.remoting>
      <application>
         <service>
            <wellknown mode="Singleton" type="ESystem.CommonServer.Management.Server.CommonServerManagementServer, ESystem.CommonServer.Management.Server" 
               objectUri="CommonServerManagementServer.rem" />
         </service>
         <channels>
            <channel ref="http" port="89">
                <serverProviders>                    
                    <provider type="ESystem.CommonServer.Management.Sinks.Compression.Server.CompressionServerSinkProvider, ESystem.CommonServer.Management.Sinks" 
                        customSinkType="ESystem.CommonServer.Management.Sinks.Compression.Server.CompressionServerSink, ESystem.CommonServer.Management.Sinks">
                        <customData compressionLevel="9" />
                    </provider>
                    <formatter ref="soap" />
                </serverProviders>
            </channel>
         </channels>
      </application>
   </system.runtime.remoting>
</configuration>

Опять идем к схеме. Серверный синк стоит между транспортным и форматтером, так что тег formatter должен быть после нашего тега provider — т.е. входящий запрос сначала разархивируется, а потом идет к форматтеру. С исходящим ответом все логично — он форматируется, а потом уже архивируется.

Вот оно как. А если вы пишете синки, встающие перед форматтером, то в App.config клиента сначала будет записан тег provider, а потом formatter, а в App.config сервера — cначала formatter, а уж потом — provider.

Засим позвольте закруглиться. Надеюсь, кому-то этот пост поможет управиться часов за 5 Если что — обращайтесь, буду рад помочь
... << RSDN@Home 1.1.4 stable SR1 rev. 568>>
HgLab: Mercurial Server and Repository Management for Windows
Re: [.NET] Фреймворк для написания собственных канальных син
От: _FRED_ Черногория
Дата: 15.03.06 18:41
Оценка:
Здравствуйте, Нахлобуч, Вы писали:

Н>…

Н>Вот пример синка, архивирующего трафик с помошью ICSharpCode.SharpZipLib. Данный синк устанавливается после Formatter Sink'а, поскольку требуется работа с потоком, в который уже сериализовано сообщение, а не с самим сообщением.
Н>…

Относительно примера: А не получится-ли ерунда, если в настройках клиента и сервера указаны разные значения уровня компрессии? ИМХО, его-то и надо в параметрах передовать заместо флага "Compressed" (если параметра нет, значит нет и компрессии)
... << RSDN@Home 1.2.0 alpha rev. 648>>
Now playing: «Тихо в лесу…»
Help will always be given at Hogwarts to those who ask for it.
Re[2]: [.NET] Фреймворк для написания собственных канальных
От: Нахлобуч Великобритания https://hglabhq.com
Дата: 16.03.06 07:31
Оценка:
Здравствуйте, _FRED_, Вы писали:

_FR>Относительно примера: А не получится-ли ерунда, если в настройках клиента и сервера указаны разные значения уровня компрессии? ИМХО, его-то и надо в параметрах передовать заместо флага "Compressed" (если параметра нет, значит нет и компрессии)


Не, все пучком будет Если вы заметили, то CompressionLevel передается только в метод Compress класса CompressionManager и это же значение пишется куда-то во внутренние структуры, из которых оно достается при вызове Decompress на той стороне провода.

public sealed class CompressionManager
{
    /// <summary>
    /// Архивирует указанный поток с заданным уровнем сжатия.
    /// </summary>
    /// <param name="sourceStream">Исходный поток</param>
    /// <param name="compressionLevel">Уровень сжатия</param>
    /// <returns>Заархивированный поток</returns>
    public static Stream Compress(Stream sourceStream, int compressionLevel)
    {
        //
        // Создаем выходной поток
        Stream baseStream = new MemoryStream();

        ZipOutputStream zipOutputStream = new ZipOutputStream(baseStream);
        zipOutputStream.SetLevel(compressionLevel);
        
        //
        // Новая запись
        ZipEntry zipEntry = new ZipEntry("serverRequest");
        zipOutputStream.PutNextEntry(zipEntry);

        //
        // Пишем в поток
        CopyStreams(sourceStream, zipOutputStream);
        zipOutputStream.Finish();

        return baseStream;
    }

    /// <summary>
    /// Разархивирует указанный поток
    /// </summary>
    /// <param name="sourceStream">Исходный поток</param>
    /// <returns>Разархивированный поток</returns>
    public static Stream Decompress(Stream sourceStream)
    {
        //
        // Создаем выходной поток
        Stream baseStream = new MemoryStream();

        ZipInputStream zipInputStream = new ZipInputStream(sourceStream);
        
        //
        // В потоке должна быть одна запись
        zipInputStream.GetNextEntry();
        CopyStreams(zipInputStream, baseStream);

        //
        // Закрываем-возвращаем
        zipInputStream.Close();
        return baseStream;
    }
}
... << RSDN@Home 1.1.4 stable SR1 rev. 568>>
HgLab: Mercurial Server and Repository Management for Windows
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.