Сообщество .NET-разработчиков Райффайзенбанка продолжает краткий разбор содержимого ViennaNET. О том, как и зачем мы к этому пришли, можно почитать в первой части.
В этой статье пройдемся по еще не рассмотренным библиотекам для работы с распределенными транзакциями, очередями и БД, которые можно найти в нашем репозитории на GitHub (исходники лежат здесь), а Nuget-пакеты здесь.

Когда в проекте происходит переход на DDD и микросервисную архитектуру, то при разнесении бизнес-логики по разным сервисам, возникает проблема, связанная с необходимостью реализации механизма распределенных транзакций, ведь многие сценарии часто затрагивают сразу несколько доменов. С такими механизмами подробнее можно познакомиться, например, в книге «Microservices Patterns», Chris Richardson.
В наших проектах мы реализовали простой, но полезный механизм: сага, а точнее сага на основе оркестрации. Суть ее в следующем: есть некий бизнес-сценарий, в котором необходимо последовательно совершить операции в разных сервисах, при этом, в случае возникновения каких-либо проблем на любом шаге, необходимо вызвать процедуру отката всех предыдущих шагов, где она предусмотрена. Таким образом, в конце выполнения саги, независимо от успешности, мы получаем консистентные данные во всех доменах.
Наша реализация пока сделана в базовом виде и не завязана на использовании каких-либо способов взаимодействия с другими сервисами. Применять её несложно: достаточно сделать наследника от базового абстрактного класса SagaBase<Т>, где T – это ваш класс контекста, в котором можно хранить исходные данные, необходимые для работы саги, а также некоторые промежуточные результаты. Экземпляр контекста будет пробрасываться во все шаги во время выполнения. Сама сага является stateless классом, поэтому экземпляр может быть помещен в DI как Singleton, чтобы получить необходимые зависимости.
Пример объявления:
Пример вызова:
Полноценные примеры разных реализаций можно посмотреть здесь и в сборке с тестами.
Набор библиотек для работы с различными БД через Nhibernate. У нас используется подход DB-First с применением Liquibase, поэтому здесь присутствует только функционал по работе с данными в готовой БД.
Интерфейс
Для непосредственного обращения к БД используется механизм провайдеров. Для каждой используемой у нас в командах СУБД есть своя реализация:
При этом в одном приложении может быть зарегистрировано несколько провайдеров одновременно, что позволяет, например, в рамках одного сервиса без каких-либо затрат на доработку инфраструктуры провести пошаговую миграцию с одной СУБД на другую. Механизм выбора необходимого подключения и, следовательно, провайдера для конкретного класса-сущности (для которого и пишется маппинг на таблицы БД) реализован через регистрацию сущности в классе BoundedContext (содержит метод для регистрации доменных сущностей) или его наследника ApplicationContext (содержит методы для регистрации аппликационных сущностей, прямых запросов и команд), где в качестве аргумента принимается идентификатор подключения из конфигурации:
Пример ApplicationContext:
Если идентификатор подключения не указан, то будет использоваться подключение с именем «default».
Непосредственно маппинг сущностей на таблицы БД реализуется стандартными средствами NHibernate. Можно использовать описание как через xml-файлы, так и через классы. Для удобного написания репозиториев-заглушек в Unit-тестах, имеется библиотека
Полноценные примеры использования ViennaNET.Orm.* можно найти здесь.
Набор библиотек для работы с очередями.
Для работы с очередями был выбран такой же подход, что и с различными СУБД, а именно – максимально возможный унифицированный подход с точки зрения работы с библиотекой, независимо от используемого менеджера очередей. Библиотека
В работе с очередями есть два процесса: получение сообщения и отправка.
Рассмотрим получение. Здесь есть 2 варианта: для постоянного прослушивания и для получения единичного сообщения. Для постоянного прослушивания очереди необходимо для начала описать класс процессора, унаследованный от
Пример запуска прослушивания:
Затем, при старте сервиса и вызова метода для начала прослушивания, все сообщения из указанной очереди будут попадать в соответствующий процессор.
Для получения единичного сообщения в интерфейсе-фабрике
Для отправки сообщения необходимо воспользоваться всё той же
Для сериализации и десереализации сообщения есть три готовых варианта: просто текст, XML и JSON, но при необходимости спокойно можно сделать свои реализации интерфейсов
Мы постарались сохранить уникальные возможности каждого менеджера очередей, например,
Вот пример использования очередей с основными нюансами подключения.
Мы используем очереди не только для интеграции между разными системами, но и для общения между микросервисами одного приложения, например, в рамках саги. Это привело к необходимости передачи вместе с сообщением таких вспомогательных данных, как логин пользователя, идентификатор запроса для сквозного логирования, ip-адрес источника и авторизационые данные. Для реализации пробрасывания этих данных разработали библиотеку
Спасибо за внимание, ждём ваших комментариев и pull request-ов!
В этой статье пройдемся по еще не рассмотренным библиотекам для работы с распределенными транзакциями, очередями и БД, которые можно найти в нашем репозитории на GitHub (исходники лежат здесь), а Nuget-пакеты здесь.

ViennaNET.Sagas
Когда в проекте происходит переход на DDD и микросервисную архитектуру, то при разнесении бизнес-логики по разным сервисам, возникает проблема, связанная с необходимостью реализации механизма распределенных транзакций, ведь многие сценарии часто затрагивают сразу несколько доменов. С такими механизмами подробнее можно познакомиться, например, в книге «Microservices Patterns», Chris Richardson.
В наших проектах мы реализовали простой, но полезный механизм: сага, а точнее сага на основе оркестрации. Суть ее в следующем: есть некий бизнес-сценарий, в котором необходимо последовательно совершить операции в разных сервисах, при этом, в случае возникновения каких-либо проблем на любом шаге, необходимо вызвать процедуру отката всех предыдущих шагов, где она предусмотрена. Таким образом, в конце выполнения саги, независимо от успешности, мы получаем консистентные данные во всех доменах.
Наша реализация пока сделана в базовом виде и не завязана на использовании каких-либо способов взаимодействия с другими сервисами. Применять её несложно: достаточно сделать наследника от базового абстрактного класса SagaBase<Т>, где T – это ваш класс контекста, в котором можно хранить исходные данные, необходимые для работы саги, а также некоторые промежуточные результаты. Экземпляр контекста будет пробрасываться во все шаги во время выполнения. Сама сага является stateless классом, поэтому экземпляр может быть помещен в DI как Singleton, чтобы получить необходимые зависимости.
Пример объявления:
public class ExampleSaga : SagaBase<ExampleContext> { public ExampleSaga() { Step("Step 1") .WithAction(c => ...) .WithCompensation(c => ...); AsyncStep("Step 2") .WithAction(async c => ...); } }
Пример вызова:
var saga = new ExampleSaga(); var context = new ExampleContext(); await saga.Execute(context);
Полноценные примеры разных реализаций можно посмотреть здесь и в сборке с тестами.
ViennaNET.Orm.*
Набор библиотек для работы с различными БД через Nhibernate. У нас используется подход DB-First с применением Liquibase, поэтому здесь присутствует только функционал по работе с данными в готовой БД.
ViennaNET.Orm.Seedwork и ViennaNET.Orm – главные сборки, содержащие базовые интерфейсы и их реализации соответственно. Остановимся на их содержимом подробнее.Интерфейс
IEntityFactoryService и его реализация EntityFactoryService являются главной отправной точкой для работы с БД, так как здесь создается Unit of Work, репозитории для работы с конкретными сущностями, а также исполнители команд и прямых SQL-запросов. Иногда удобно ограничить возможности класса по работе с БД, например, дать возможность только для чтения данных. Для таких случаев у IEntityFactoryService есть предок – интерфейс IEntityRepositoryFactory, в котором объявлен только метод для создания репозиториев.Для непосредственного обращения к БД используется механизм провайдеров. Для каждой используемой у нас в командах СУБД есть своя реализация:
ViennaNET.Orm.MSSQL, ViennaNET.Orm.Oracle, ViennaNET.Orm.SQLite, ViennaNET.Orm.PostgreSql. При этом в одном приложении может быть зарегистрировано несколько провайдеров одновременно, что позволяет, например, в рамках одного сервиса без каких-либо затрат на доработку инфраструктуры провести пошаговую миграцию с одной СУБД на другую. Механизм выбора необходимого подключения и, следовательно, провайдера для конкретного класса-сущности (для которого и пишется маппинг на таблицы БД) реализован через регистрацию сущности в классе BoundedContext (содержит метод для регистрации доменных сущностей) или его наследника ApplicationContext (содержит методы для регистрации аппликационных сущностей, прямых запросов и команд), где в качестве аргумента принимается идентификатор подключения из конфигурации:
"db": [ { "nick": "mssql_connection", "dbServerType": "MSSQL", "ConnectionString": "...", "useCallContext": true }, { "nick": "oracle_connection", "dbServerType": "Oracle", "ConnectionString": "..." } ],
Пример ApplicationContext:
internal sealed class DbContext : ApplicationContext { public DbContext() { AddEntity<SomeEntity>("mssql_connection"); AddEntity<MigratedSomeEntity>("oracle_connection"); AddEntity<AnotherEntity>("oracle_connection"); } }
Если идентификатор подключения не указан, то будет использоваться подключение с именем «default».
Непосредственно маппинг сущностей на таблицы БД реализуется стандартными средствами NHibernate. Можно использовать описание как через xml-файлы, так и через классы. Для удобного написания репозиториев-заглушек в Unit-тестах, имеется библиотека
ViennaNET.TestUtils.Orm.Полноценные примеры использования ViennaNET.Orm.* можно найти здесь.
ViennaNET.Messaging.*
Набор библиотек для работы с очередями.
Для работы с очередями был выбран такой же подход, что и с различными СУБД, а именно – максимально возможный унифицированный подход с точки зрения работы с библиотекой, независимо от используемого менеджера очередей. Библиотека
ViennaNET.Messaging как раз отвечает за эту унификацию, а ViennaNET.Messaging.MQSeriesQueue, ViennaNET.Messaging.RabbitMQQueue и ViennaNET.Messaging.KafkaQueue содержат реализации адаптеров для IBM MQ, RabbitMQ и Kafka соответственно.В работе с очередями есть два процесса: получение сообщения и отправка.
Рассмотрим получение. Здесь есть 2 варианта: для постоянного прослушивания и для получения единичного сообщения. Для постоянного прослушивания очереди необходимо для начала описать класс процессора, унаследованный от
IMessageProcessor, который будет отвечать за обработку входящего сообщения. Далее его необходмо «привязать» к определенной очереди, делается это через регистрацию в IQueueReactorFactory с указанием идентификатора очереди из конфигурации:"messaging": { "ApplicationName": "MyApplication" }, "rabbitmq": { "queues": [ { "id": "myQueue", "queuename": "lalala", ... } ] },
Пример запуска прослушивания:
_queueReactorFactory.Register<MyMessageProcessor>("myQueue"); var queueReactor = queueReactorFactory.CreateQueueReactor("myQueue"); queueReactor.StartProcessing();
Затем, при старте сервиса и вызова метода для начала прослушивания, все сообщения из указанной очереди будут попадать в соответствующий процессор.
Для получения единичного сообщения в интерфейсе-фабрике
IMessagingComponentFactory есть метод CreateMessageReceiver, который создаст получателя, ожидающего сообщения из указанной ему очереди:using (var receiver = _messagingComponentFactory.CreateMessageReceiver<TestMessage>("myQueue")) { var message = receiver.Receive(); }
Для отправки сообщения необходимо воспользоваться всё той же
IMessagingComponentFactory и создать отправителя сообщения:using (var sender = _messagingComponentFactory.CreateMessageSender<MyMessage>("myQueue")) { sender.SendMessage(new MyMessage { Value = ...}); }
Для сериализации и десереализации сообщения есть три готовых варианта: просто текст, XML и JSON, но при необходимости спокойно можно сделать свои реализации интерфейсов
IMessageSerializer и IMessageDeserializer.Мы постарались сохранить уникальные возможности каждого менеджера очередей, например,
ViennaNET.Messaging.MQSeriesQueue позволяет отправлять не только текстовые, но и байтовые сообщения, а ViennaNET.Messaging.RabbitMQQueue поддерживает роутинг и создание очередей “на лету”. В нашей обертке адаптера для RabbitMQ также реализовано некоторое подобие RPC: отправляем сообщение и ожидаем ответа из специальной временной очереди, которая создается только для одного ответного сообщения.Вот пример использования очередей с основными нюансами подключения.
ViennaNET.CallContext
Мы используем очереди не только для интеграции между разными системами, но и для общения между микросервисами одного приложения, например, в рамках саги. Это привело к необходимости передачи вместе с сообщением таких вспомогательных данных, как логин пользователя, идентификатор запроса для сквозного логирования, ip-адрес источника и авторизационые данные. Для реализации пробрасывания этих данных разработали библиотеку
ViennaNET.CallContext, которая позволяет хранить данные из входящего в сервис запроса. При этом то, каким образом был сделан запрос, через очередь или через Http, не играет роли. Затем, перед отправкой исходящего запроса или сообщения, достаются данные из контекста и помещаются в заголовки. Таким образом, следующий сервис получает вспомогательные данные и аналогично ими распоряжается.Спасибо за внимание, ждём ваших комментариев и pull request-ов!
