Сообщество .NET-разработчиков Райффайзенбанка продолжает краткий разбор содержимого ViennaNET. О том, как и зачем мы к этому пришли, можно почитать в первой части.
В этой статье пройдемся по еще не рассмотренным библиотекам для работы с распределенными транзакциями, очередями и БД, которые можно найти в нашем репозитории на GitHub (исходники лежат здесь), а Nuget-пакеты здесь.
Когда в проекте происходит переход на DDD и микросервисную архитектуру, то при разнесении бизнес-логики по разным сервисам, возникает проблема, связанная с необходимостью реализации механизма распределенных транзакций, ведь многие сценарии часто затрагивают сразу несколько доменов. С такими механизмами подробнее можно познакомиться, например, в книге «Microservices Patterns», Chris Richardson.
В наших проектах мы реализовали простой, но полезный механизм: сага, а точнее сага на основе оркестрации. Суть ее в следующем: есть некий бизнес-сценарий, в котором необходимо последовательно совершить операции в разных сервисах, при этом, в случае возникновения каких-либо проблем на любом шаге, необходимо вызвать процедуру отката всех предыдущих шагов, где она предусмотрена. Таким образом, в конце выполнения саги, независимо от успешности, мы получаем консистентные данные во всех доменах.
Наша реализация пока сделана в базовом виде и не завязана на использовании каких-либо способов взаимодействия с другими сервисами. Применять её несложно: достаточно сделать наследника от базового абстрактного класса SagaBase<Т>, где T – это ваш класс контекста, в котором можно хранить исходные данные, необходимые для работы саги, а также некоторые промежуточные результаты. Экземпляр контекста будет пробрасываться во все шаги во время выполнения. Сама сага является stateless классом, поэтому экземпляр может быть помещен в DI как Singleton, чтобы получить необходимые зависимости.
Пример объявления:
Пример вызова:
Полноценные примеры разных реализаций можно посмотреть здесь и в сборке с тестами.
Набор библиотек для работы с различными БД через Nhibernate. У нас используется подход DB-First с применением Liquibase, поэтому здесь присутствует только функционал по работе с данными в готовой БД.
Интерфейс
Для непосредственного обращения к БД используется механизм провайдеров. Для каждой используемой у нас в командах СУБД есть своя реализация:
При этом в одном приложении может быть зарегистрировано несколько провайдеров одновременно, что позволяет, например, в рамках одного сервиса без каких-либо затрат на доработку инфраструктуры провести пошаговую миграцию с одной СУБД на другую. Механизм выбора необходимого подключения и, следовательно, провайдера для конкретного класса-сущности (для которого и пишется маппинг на таблицы БД) реализован через регистрацию сущности в классе BoundedContext (содержит метод для регистрации доменных сущностей) или его наследника ApplicationContext (содержит методы для регистрации аппликационных сущностей, прямых запросов и команд), где в качестве аргумента принимается идентификатор подключения из конфигурации:
Пример ApplicationContext:
Если идентификатор подключения не указан, то будет использоваться подключение с именем «default».
Непосредственно маппинг сущностей на таблицы БД реализуется стандартными средствами NHibernate. Можно использовать описание как через xml-файлы, так и через классы. Для удобного написания репозиториев-заглушек в Unit-тестах, имеется библиотека
Полноценные примеры использования ViennaNET.Orm.* можно найти здесь.
Набор библиотек для работы с очередями.
Для работы с очередями был выбран такой же подход, что и с различными СУБД, а именно – максимально возможный унифицированный подход с точки зрения работы с библиотекой, независимо от используемого менеджера очередей. Библиотека
В работе с очередями есть два процесса: получение сообщения и отправка.
Рассмотрим получение. Здесь есть 2 варианта: для постоянного прослушивания и для получения единичного сообщения. Для постоянного прослушивания очереди необходимо для начала описать класс процессора, унаследованный от
Пример запуска прослушивания:
Затем, при старте сервиса и вызова метода для начала прослушивания, все сообщения из указанной очереди будут попадать в соответствующий процессор.
Для получения единичного сообщения в интерфейсе-фабрике
Для отправки сообщения необходимо воспользоваться всё той же
Для сериализации и десереализации сообщения есть три готовых варианта: просто текст, XML и JSON, но при необходимости спокойно можно сделать свои реализации интерфейсов
Мы постарались сохранить уникальные возможности каждого менеджера очередей, например,
Вот пример использования очередей с основными нюансами подключения.
Мы используем очереди не только для интеграции между разными системами, но и для общения между микросервисами одного приложения, например, в рамках саги. Это привело к необходимости передачи вместе с сообщением таких вспомогательных данных, как логин пользователя, идентификатор запроса для сквозного логирования, ip-адрес источника и авторизационые данные. Для реализации пробрасывания этих данных разработали библиотеку
Спасибо за внимание, ждём ваших комментариев и pull request-ов!
В этой статье пройдемся по еще не рассмотренным библиотекам для работы с распределенными транзакциями, очередями и БД, которые можно найти в нашем репозитории на GitHub (исходники лежат здесь), а Nuget-пакеты здесь.
ViennaNET.Sagas
Когда в проекте происходит переход на DDD и микросервисную архитектуру, то при разнесении бизнес-логики по разным сервисам, возникает проблема, связанная с необходимостью реализации механизма распределенных транзакций, ведь многие сценарии часто затрагивают сразу несколько доменов. С такими механизмами подробнее можно познакомиться, например, в книге «Microservices Patterns», Chris Richardson.
В наших проектах мы реализовали простой, но полезный механизм: сага, а точнее сага на основе оркестрации. Суть ее в следующем: есть некий бизнес-сценарий, в котором необходимо последовательно совершить операции в разных сервисах, при этом, в случае возникновения каких-либо проблем на любом шаге, необходимо вызвать процедуру отката всех предыдущих шагов, где она предусмотрена. Таким образом, в конце выполнения саги, независимо от успешности, мы получаем консистентные данные во всех доменах.
Наша реализация пока сделана в базовом виде и не завязана на использовании каких-либо способов взаимодействия с другими сервисами. Применять её несложно: достаточно сделать наследника от базового абстрактного класса SagaBase<Т>, где T – это ваш класс контекста, в котором можно хранить исходные данные, необходимые для работы саги, а также некоторые промежуточные результаты. Экземпляр контекста будет пробрасываться во все шаги во время выполнения. Сама сага является stateless классом, поэтому экземпляр может быть помещен в DI как Singleton, чтобы получить необходимые зависимости.
Пример объявления:
public class ExampleSaga : SagaBase<ExampleContext>
{
public ExampleSaga()
{
Step("Step 1")
.WithAction(c => ...)
.WithCompensation(c => ...);
AsyncStep("Step 2")
.WithAction(async c => ...);
}
}
Пример вызова:
var saga = new ExampleSaga();
var context = new ExampleContext();
await saga.Execute(context);
Полноценные примеры разных реализаций можно посмотреть здесь и в сборке с тестами.
ViennaNET.Orm.*
Набор библиотек для работы с различными БД через Nhibernate. У нас используется подход DB-First с применением Liquibase, поэтому здесь присутствует только функционал по работе с данными в готовой БД.
ViennaNET.Orm.Seedwork и ViennaNET.Orm
– главные сборки, содержащие базовые интерфейсы и их реализации соответственно. Остановимся на их содержимом подробнее.Интерфейс
IEntityFactoryService
и его реализация EntityFactoryService
являются главной отправной точкой для работы с БД, так как здесь создается Unit of Work, репозитории для работы с конкретными сущностями, а также исполнители команд и прямых SQL-запросов. Иногда удобно ограничить возможности класса по работе с БД, например, дать возможность только для чтения данных. Для таких случаев у IEntityFactoryService
есть предок – интерфейс IEntityRepositoryFactory
, в котором объявлен только метод для создания репозиториев.Для непосредственного обращения к БД используется механизм провайдеров. Для каждой используемой у нас в командах СУБД есть своя реализация:
ViennaNET.Orm.MSSQL, ViennaNET.Orm.Oracle, ViennaNET.Orm.SQLite, ViennaNET.Orm.PostgreSql
. При этом в одном приложении может быть зарегистрировано несколько провайдеров одновременно, что позволяет, например, в рамках одного сервиса без каких-либо затрат на доработку инфраструктуры провести пошаговую миграцию с одной СУБД на другую. Механизм выбора необходимого подключения и, следовательно, провайдера для конкретного класса-сущности (для которого и пишется маппинг на таблицы БД) реализован через регистрацию сущности в классе BoundedContext (содержит метод для регистрации доменных сущностей) или его наследника ApplicationContext (содержит методы для регистрации аппликационных сущностей, прямых запросов и команд), где в качестве аргумента принимается идентификатор подключения из конфигурации:
"db": [
{
"nick": "mssql_connection",
"dbServerType": "MSSQL",
"ConnectionString": "...",
"useCallContext": true
},
{
"nick": "oracle_connection",
"dbServerType": "Oracle",
"ConnectionString": "..."
}
],
Пример ApplicationContext:
internal sealed class DbContext : ApplicationContext
{
public DbContext()
{
AddEntity<SomeEntity>("mssql_connection");
AddEntity<MigratedSomeEntity>("oracle_connection");
AddEntity<AnotherEntity>("oracle_connection");
}
}
Если идентификатор подключения не указан, то будет использоваться подключение с именем «default».
Непосредственно маппинг сущностей на таблицы БД реализуется стандартными средствами NHibernate. Можно использовать описание как через xml-файлы, так и через классы. Для удобного написания репозиториев-заглушек в Unit-тестах, имеется библиотека
ViennaNET.TestUtils.Orm
.Полноценные примеры использования ViennaNET.Orm.* можно найти здесь.
ViennaNET.Messaging.*
Набор библиотек для работы с очередями.
Для работы с очередями был выбран такой же подход, что и с различными СУБД, а именно – максимально возможный унифицированный подход с точки зрения работы с библиотекой, независимо от используемого менеджера очередей. Библиотека
ViennaNET.Messaging
как раз отвечает за эту унификацию, а ViennaNET.Messaging.MQSeriesQueue, ViennaNET.Messaging.RabbitMQQueue и ViennaNET.Messaging.KafkaQueue
содержат реализации адаптеров для IBM MQ, RabbitMQ и Kafka соответственно.В работе с очередями есть два процесса: получение сообщения и отправка.
Рассмотрим получение. Здесь есть 2 варианта: для постоянного прослушивания и для получения единичного сообщения. Для постоянного прослушивания очереди необходимо для начала описать класс процессора, унаследованный от
IMessageProcessor
, который будет отвечать за обработку входящего сообщения. Далее его необходмо «привязать» к определенной очереди, делается это через регистрацию в IQueueReactorFactory
с указанием идентификатора очереди из конфигурации:"messaging": {
"ApplicationName": "MyApplication"
},
"rabbitmq": {
"queues": [
{
"id": "myQueue",
"queuename": "lalala",
...
}
]
},
Пример запуска прослушивания:
_queueReactorFactory.Register<MyMessageProcessor>("myQueue");
var queueReactor = queueReactorFactory.CreateQueueReactor("myQueue");
queueReactor.StartProcessing();
Затем, при старте сервиса и вызова метода для начала прослушивания, все сообщения из указанной очереди будут попадать в соответствующий процессор.
Для получения единичного сообщения в интерфейсе-фабрике
IMessagingComponentFactory
есть метод CreateMessageReceiver
, который создаст получателя, ожидающего сообщения из указанной ему очереди:using (var receiver = _messagingComponentFactory.CreateMessageReceiver<TestMessage>("myQueue"))
{
var message = receiver.Receive();
}
Для отправки сообщения необходимо воспользоваться всё той же
IMessagingComponentFactory
и создать отправителя сообщения:using (var sender = _messagingComponentFactory.CreateMessageSender<MyMessage>("myQueue"))
{
sender.SendMessage(new MyMessage { Value = ...});
}
Для сериализации и десереализации сообщения есть три готовых варианта: просто текст, XML и JSON, но при необходимости спокойно можно сделать свои реализации интерфейсов
IMessageSerializer и IMessageDeserializer
.Мы постарались сохранить уникальные возможности каждого менеджера очередей, например,
ViennaNET.Messaging.MQSeriesQueue
позволяет отправлять не только текстовые, но и байтовые сообщения, а ViennaNET.Messaging.RabbitMQQueue
поддерживает роутинг и создание очередей “на лету”. В нашей обертке адаптера для RabbitMQ также реализовано некоторое подобие RPC: отправляем сообщение и ожидаем ответа из специальной временной очереди, которая создается только для одного ответного сообщения.Вот пример использования очередей с основными нюансами подключения.
ViennaNET.CallContext
Мы используем очереди не только для интеграции между разными системами, но и для общения между микросервисами одного приложения, например, в рамках саги. Это привело к необходимости передачи вместе с сообщением таких вспомогательных данных, как логин пользователя, идентификатор запроса для сквозного логирования, ip-адрес источника и авторизационые данные. Для реализации пробрасывания этих данных разработали библиотеку
ViennaNET.CallContext
, которая позволяет хранить данные из входящего в сервис запроса. При этом то, каким образом был сделан запрос, через очередь или через Http, не играет роли. Затем, перед отправкой исходящего запроса или сообщения, достаются данные из контекста и помещаются в заголовки. Таким образом, следующий сервис получает вспомогательные данные и аналогично ими распоряжается.Спасибо за внимание, ждём ваших комментариев и pull request-ов!