Как стать автором
Обновить

Создаем микросервисную архитектуру вместе с Apache Kafka и .NET Core 2.0

Время на прочтение10 мин
Количество просмотров87K

Доброго времени суток! Apache Kafka – очень быстрый распределенный брокер сообщений, и сегодня я расскажу как его “готовить” и реализовать с его помощью простую микросервисную архитектуру из консольных приложений. Итак, всем, кто хочет познакомиться с Apache Kafka и опробовать ее в деле, добро пожаловать под кат.

Обзорная часть


Введение


Данный материал ни в коем случае не претендует ни на доскональное описание Apache Kafka, ни на тонкие вопросы построения микросервисной архитектуры. Единственное, что надо знать — это как строить приложения на платформе .NET. Мы будем использовать .Net Core 2.0

Итак, что мы в итоге создадим? Приложение, которое подскажет, как назвать своего ребенка. Для простоты, оно будет выдавать случайные мужские и женские имена из заранее составленного списка. Система будет состоять из двух консольных приложений и одной библиотеки.

Идея в том, чтобы построить не “монолитное”, а распределенное приложение. Тем самым, мы обеспечим себе задел для будущего масштабирования и множество других преимуществ, описанных, например, здесь.

Вот какая структура будет у нашей системы:

3 синих “прямоугольника” по сторонам – это консольные приложения. По сути, те два, что внизу, это микросервисы, а MainApp – пользовательское приложение, через него мы будем запрашивать имена. NameService у нас будет универсальным сервисом, способным генерировать либо мужские, либо женские имена.
Оранжевый “прямоугольник” посередине – брокер сообщений Apache Kafka. Брокер сообщений это то, что связывает все части нашей системы воедино. В нашем случае мы будем использовать Apache Kafka, но с таким же успехом могли бы воспользоваться RabbitMQ, ActiveMQ, или каким-нибудь еще.

А вот так происходит взаимодействие MainApp c Apache Kafka:


Работает это по следующей схеме:

  1. Пользователь запрашивает какие-то данные (в нашем случае, мужское или женское имя).
  2. MainApp посылает сообщение (на схеме это «Команды») в Apache Kafka, которое автоматически получают все необходимые нам сервисы.
  3. Эти сервисы отвечают тем, что также посылают другое сообщение (на схеме – данные) в Apache Kafka. MainApp принимает это сообщение из Apache Kafka (на схеме это «Данные»), заключающее в себе нужную нам информацию, и предоставляет ее пользователю.

Взаимодействие каждого сервиса с Apache Kafka происходит по аналогичной “двухсторонней” схеме.

Обратите внимание, MainApp ничего не знает о NameService, и наоборот. Все взаимодействие происходит через Apache Kafka. Но и MainApp, и NameService должны использовать одни и те же «каналы связи». На практике это означает, что, например, название топика, куда посылает сообщения MainApp, должно полностью совпадать с названием топика, из которого «слушает» сообщения NameService.

Как видите, работа Apache Kafka в нашем примере заключается в передаче сообщений между разными элементами системы. Именно этим, исключительно быстро и надежно, она и занимается. Конечно, у нее есть другие возможности, почитать о них можно на официальном сайте здесь

Что такое Apache Kafka


Apache Kafka – распределенный брокер сообщений. По сути, это система, которая может очень быстро и эффективно передавать ваши сообщения. В качестве сообщений могут выступать любые типы данных, поскольку для Kafka это всего лишь последовательность байтов. Apache Kafka может работать как на одной машине, так и на нескольких, которые вместе образуют кластер и повышают эффективность всей системы. В нашем случае мы запустим Apache Kafka локально, и для взаимодействия с ней будем использовать библиотеку от Confluent.

Важно понять то, как работает Apache Kafka. Мы можем писать в нее сообщения, и можем читать из нее. Все сообщения в Kafka принадлежат к тому или иному топику (topic). Топик – это как заглавие, и он должен быть определен для каждого сообщения, которое мы хотим передать в Apache Kafka. Точно также, если мы собираемся читать из Kafka сообщения, мы должны указать, с каким топиком будут эти сообщения.

Топик поделен на разделы, и их количество мы указываем, как правило, самостоятельно. Количество разделов в топике имеет большое значение для производительности, почитать об этом можно тут

Практическая часть


Скачиваем и запускаем Apache Kafka 0.11


На данный момент последней версией является версия 0.11. Скачайте архив с официального сайта (https://kafka.apache.org/downloads) и распакуйте в любую папку. Дальше из консоли надо запустить 2 файла (zookeeper-server.start и kafka-server-start) следующим образом.

Открываем первую консоль (если распаковали на диск С, открываем от имени Администратора, на всякий случай), переходим туда, где мы распаковали наш архив с Kafka, и вводим команду:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties

После этого, если все хорошо и этот процесс не прекратился вскоре после старта, открываем так же вторую консоль, и запускаем уже саму Apache Kafka
bin\windows\kafka-server-start.bat config\server.properties

Только что мы запустили Zookeeper и Apache Kafka со стандартными настройками, указанными в zookeeper.properties и server.properties соответственно. Zookeeper – необходимый элемент, без него Apache Kafka не работает.

Полную информацию о запуске и конфигурировании Kafka можно посмотреть на официальном сайте

Начинаем кодить


Итак, Kafka запущена, теперь создадим наше “распределенное“ приложение. Оно будет состоять из 2 консольных приложений и одной библиотеки. В результате получим решение из 3 проектов, которое будет выглядеть примерно вот так:


Наша библиотека — это “обертка” вокруг библиотеки Confluent.Kafka, она нам нужна для взаимодействия с Apache Kafka. Кроме этого, она будет использоваться каждым из наших консольных приложений.

Библиотека предназначена для целевой платформы .NET Core 2.0 (Хотя, с таким же успехом могла бы быть создана для платформы .NET Standard) Ее код представлен ниже. Обратите внимание, для нее необходимо скачать nuget пакет Confluent.Kafka.

MessageBus.cs
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace MessageBroker.Kafka.Lib
{
    public sealed class MessageBus : IDisposable
    {
        private readonly Producer<Null, string> _producer;
        private Consumer<Null, string> _consumer;

        private readonly IDictionary<string, object> _producerConfig;
        private readonly IDictionary<string, object> _consumerConfig;

        public MessageBus() : this("localhost") { }

        public MessageBus(string host)
        {
            _producerConfig = new Dictionary<string, object> { { "bootstrap.servers", host } };
            _consumerConfig = new Dictionary<string, object>
            {
                { "group.id", "custom-group"},
                { "bootstrap.servers", host }
            };

            _producer = new Producer<Null, string>(_producerConfig, null, new StringSerializer(Encoding.UTF8));
        }

        public void SendMessage(string topic, string message)
        {
             _producer.ProduceAsync(topic, null, message);
        }

        public void SubscribeOnTopic<T>(string topic, Action<T> action, CancellationToken cancellationToken) where T: class
        {
            var msgBus = new MessageBus();
            using (msgBus._consumer = new Consumer<Null, string>(_consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
            {
                msgBus._consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset(topic, 0, -1) });

                while (true)
                {
                    if (cancellationToken.IsCancellationRequested)
                        break;

                    Message<Null, string> msg;
                    if (msgBus._consumer.Consume(out msg, TimeSpan.FromMilliseconds(10)))
                    {
                        action(msg.Value as T);
                    }
                }
            }
        }

        public void Dispose()
        {
            _producer?.Dispose();
            _consumer?.Dispose();
        }
    }
}

Немного пояснений к коду библиотеки
Обертка для того и создана, чтобы все взаимодействие с Apache Kafka упростить донельзя и сосредоточиться на моментах во взаимодействии элементов системы друг с другом. В библиотеке есть два метода: SendMessage() и SubscribeOnTopic, в рамках туториала больше и не надо. Еще в SubscribeOnTopic мы подписываемся на топик и непрерывно «слушаем» сообщения, поэтому чтобы подписаться на несколько топиков, лучше запускать их в отдельных потоках, что мы и будем делать далее при использовании этой библиотеки с помощью конструкций Task.Run().

Далее мы построим “главное” приложение MainApp, а потом наш “микросервис” NameService, который запустим после в двух экземплярах. Каждый из них будет отвечать за генерацию либо мужских, либо женских имен.

Код простого консольного приложения MainApp для целевой платформы .NET Core 2.0 приведен ниже. Обратите внимание, в нем необходимо добавить ссылку на библиотеку, которую мы только что построили и которая находится в пространстве имен MessageBroker.Kafka.Lib.

MainApp.cs
using System;
using System.Threading;
using MessageBroker.Kafka.Lib;
using System.Threading.Tasks;

namespace MainApp
{
    class Program
    {
        private static readonly string bTopicNameCmd= "b_name_command";
        private static readonly string gTopicNameCmd = "g_name_command";
        private static readonly string bMessageReq = "get_boy_name";
        private static readonly string gMessageReq= "get_girl_name";

        private static readonly string bTopicNameResp = "b_name_response";
        private static readonly string gTopicNameResp= "g_name_response";

        private static readonly string userHelpMsg = "MainApp: Enter 'b' for a boy or 'g' for a girl, 'q' to exit";

        static void Main(string[] args)
        {
            using (var msgBus = new MessageBus())
            {
                Task.Run(() => msgBus.SubscribeOnTopic<string>(bTopicNameResp, msg => GetBoyNameHandler(msg), CancellationToken.None));
                Task.Run(() => msgBus.SubscribeOnTopic<string>(gTopicNameResp, msg => GetGirlNameHandler(msg), CancellationToken.None));

                string userInput;

                do
                {
                    Console.WriteLine(userHelpMsg);
                    userInput = Console.ReadLine();
                    switch (userInput)
                    {
                        case "b":
                            msgBus.SendMessage(topic: bTopicNameCmd, message: bMessageReq);
                            break;
                        case "g":
                            msgBus.SendMessage(topic: gTopicNameCmd, message: gMessageReq);
                            break;
                        case "q":
                            break;
                        default:
                            Console.WriteLine($"Unknown command. {userHelpMsg}");
                            break;
                    }

                } while (userInput != "q");
            }
        }

        public static void GetBoyNameHandler(string msg)
        {
            Console.WriteLine($"Boy name {msg} is recommended");
        }

        public static void GetGirlNameHandler(string msg)
        {
            Console.WriteLine($"Girl name {msg} is recommended");
        }
    }
}


Немного пояснений к коду MainApp
Видели много строковых readonly переменных вначале? Это названия всех топиков и сообщения, которые мы будем в них посылать. Иначе говоря, заглавия и текст сообщений. О них должны знать все сервисы, с которыми будет взаимодействовать наш MainApp, потому как названия топиков должны совпадать. Например, bTopicNameCmd — название топика для команды сервису о том, что нам надо получить мужское имя (gTopicNameCmd — аналогично). Сервис должен быть подписан на одноименный топик, чтобы получать из него сообщения и потом что-то делать.

Точно также, наш MainApp подписан на топики, в которые передают полезную информацию наши сервисы NameService. Например, переменная bTopicNameResp — это название топика, который предусмотрен для готовых мужских имен, которые сгенерировал NameService. Сервис посылает имя в этот топик, а MainApp их оттуда получает.

Далее представлен код “микросервиса” NameService. Обратите, внимание, здесь тоже надо добавить ссылку на уже созданную нами библиотеку в пространстве имен MessageBroker.Kafka.Lib

NameService.cs
using System;
using System.Threading;
using System.Threading.Tasks;
using MessageBroker.Kafka.Lib;

namespace NameService
{
    class Program
    {
        private static MessageBus msgBus;
        private static readonly string userHelpMsg = "NameService.\nEnter 'b' or 'g' to process boy or girl names respectively";
        private static readonly string bTopicNameCmd = "b_name_command";
        private static readonly string gTopicNameCmd = "g_name_command";

        private static readonly string bTopicNameResp = "b_name_response";
        private static readonly string gTopicNameResp = "g_name_response";
        private static readonly string[] _boyNames =
        {
            "Arsenii",
            "Igor",
            "Kostya",
            "Ivan",
            "Dmitrii",
        };
        private static readonly string[] _girlNames =
        {
            "Nastya",
            "Lena",
            "Ksusha",
            "Katya",
            "Olga"
        };

        static void Main(string[] args)
        {
            bool canceled = false;

            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true;
                canceled = true;
            };

            using (msgBus = new MessageBus())
            {
                Console.WriteLine(userHelpMsg);

                HandleUserInput(Console.ReadLine());

                while (!canceled) { }
            }
        }

        private static void HandleUserInput(string userInput)
        {
            switch (userInput)
            {
                case "b":
                    Task.Run(() => msgBus.SubscribeOnTopic<string>(bTopicNameCmd, (msg) => BoyNameCommandListener(msg), CancellationToken.None));
                    Console.WriteLine("Processing boy names");
                    break;
                case "g":
                    Task gTask = Task.Run(() => msgBus.SubscribeOnTopic<string>(gTopicNameCmd, (msg) => GirlNameCommandListener(msg), CancellationToken.None));
                    Console.WriteLine("Processing girl names");
                    break;
                default:
                    Console.WriteLine($"Unknown command. {userHelpMsg}");
                    HandleUserInput(Console.ReadLine());
                    break;
            }
        }

        private static void BoyNameCommandListener(string msg)
        {
            var r = new Random().Next(0, 5);
            var randName = _boyNames[r];

            msgBus.SendMessage(bTopicNameResp, randName);
            Console.WriteLine($"Sending {randName}");
        }

        private static void GirlNameCommandListener(string msg)
        {
            var r = new Random().Next(0, 5);
            var randName = _girlNames[r];

            msgBus.SendMessage(gTopicNameResp, randName);
            Console.WriteLine($"Sending {randName}");
        }
    }
}


Немного пояснений к коду NameService
Сервис работает по следующей схеме:

  1. Сначала определяемся, мужские или женские имена данные сервис будет генерировать (т.е. просто выбирать случайное имя из подготовленного списка, в нашем случае)
  2. Подписываемся на соответствующий топик

В теле метода обработчика события мы посылаем сообщение с готовым именем в топик, на который MainApp уже подписан. А это событие наступает каждый раз, как MainApp посылает сообщение о том, что нужно получить какое-то имя.

Запускаем


На этом этапе у вас, по идее, уже должно быть готовое решение со всем необходимым кодом. Далее можно поступить следующим образом: настроить решение так, чтобы запускались сразу 2 приложения (MainApp и NameService), и запустить их (Только проверьте, что у вас уже запущена Apache Kafka). В NameService вводим 'b', или 'g', чтобы настроить сервис для генерирования мужских или женских имен, после чего, точно также, вводим в MainApp 'b' или 'g', но уже для получения этих самых имен. После чего в MainApp вы должны получить какое-то имя.

На данном этапе мы получаем имена только одного пола. Допустим, только мужского. Теперь мы захотели получать имена женского пола. Идем в папку, куда собрался наш проект NameService, и запускаем в консоли еще один сервис с помощью команды "dotnet NameService.dll".
Вводим в нем команду 'g', и теперь, при запросе женского имени в MainApp, мы его получаем.

Кстати, таким образом можно запустить сколько угодно сущностей NameService, и в этом заключается одно из достоинств микросервисной архитектуры. Например, если один из сервисов «упадет», вся система не рухнет, т.к. у нас есть другие сервисы, которые делают точно такую же работу.

Одно но: cейчас если мы, например, запустим 5 штук NameService, то в MainApp придет 5 имен, а не одно. Это из-за настроек Apache Kafka, прописанных в файле server.properties. В рамках туториала я этого намеренно не касаюсь, чтобы не усложнять материал.

Заключение


В данной статье я хотел как можно проще и доступнее описать принцип построения микросервисной архитектуры и познакомить читателя с распределенным брокером сообщений Apache Kafka на живом примере. Надеюсь, получилось, и спасибо за внимание:)

Ссылки на использованные в статье материалы


  1. Официальный сайт Apache Kafka
  2. О разделах в Apache Kafka от Confluent
  3. Перевод статьи Мартина Фаулера о микросервисах
  4. Apache Kafka для начинающих
  5. Перевод статьи от Mail.ru о микросервисах
Теги:
Хабы:
Всего голосов 18: ↑16 и ↓2+14
Комментарии28

Публикации

Истории

Работа

Ближайшие события

15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань