Доброго времени суток! Apache Kafka – очень быстрый распределенный брокер сообщений, и сегодня я расскажу как его “готовить” и реализовать с его помощью простую микросервисную архитектуру из консольных приложений. Итак, всем, кто хочет познакомиться с Apache Kafka и опробовать ее в деле, добро пожаловать под кат.
Обзорная часть
Введение
Данный материал ни в коем случае не претендует ни на доскональное описание Apache Kafka, ни на тонкие вопросы построения микросервисной архитектуры. Единственное, что надо знать — это как строить приложения на платформе .NET. Мы будем использовать .Net Core 2.0
Итак, что мы в итоге создадим? Приложение, которое подскажет, как назвать своего ребенка. Для простоты, оно будет выдавать случайные мужские и женские имена из заранее составленного списка. Система будет состоять из двух консольных приложений и одной библиотеки.
Идея в том, чтобы построить не “монолитное”, а распределенное приложение. Тем самым, мы обеспечим себе задел для будущего масштабирования и множество других преимуществ, описанных, например, здесь.
Вот какая структура будет у нашей системы:
3 синих “прямоугольника” по сторонам – это консольные приложения. По сути, те два, что внизу, это микросервисы, а MainApp – пользовательское приложение, через него мы будем запрашивать имена. NameService у нас будет универсальным сервисом, способным генерировать либо мужские, либо женские имена.
Оранжевый “прямоугольник” посередине – брокер сообщений Apache Kafka. Брокер сообщений это то, что связывает все части нашей системы воедино. В нашем случае мы будем использовать Apache Kafka, но с таким же успехом могли бы воспользоваться RabbitMQ, ActiveMQ, или каким-нибудь еще.
А вот так происходит взаимодействие MainApp c Apache Kafka:
Работает это по следующей схеме:
- Пользователь запрашивает какие-то данные (в нашем случае, мужское или женское имя).
- MainApp посылает сообщение (на схеме это «Команды») в Apache Kafka, которое автоматически получают все необходимые нам сервисы.
- Эти сервисы отвечают тем, что также посылают другое сообщение (на схеме – данные) в Apache Kafka. MainApp принимает это сообщение из Apache Kafka (на схеме это «Данные»), заключающее в себе нужную нам информацию, и предоставляет ее пользователю.
Взаимодействие каждого сервиса с Apache Kafka происходит по аналогичной “двухсторонней” схеме.
Обратите внимание, MainApp ничего не знает о NameService, и наоборот. Все взаимодействие происходит через Apache Kafka. Но и MainApp, и NameService должны использовать одни и те же «каналы связи». На практике это означает, что, например, название топика, куда посылает сообщения MainApp, должно полностью совпадать с названием топика, из которого «слушает» сообщения NameService.
Как видите, работа Apache Kafka в нашем примере заключается в передаче сообщений между разными элементами системы. Именно этим, исключительно быстро и надежно, она и занимается. Конечно, у нее есть другие возможности, почитать о них можно на официальном сайте здесь
Что такое Apache Kafka
Apache Kafka – распределенный брокер сообщений. По сути, это система, которая может очень быстро и эффективно передавать ваши сообщения. В качестве сообщений могут выступать любые типы данных, поскольку для Kafka это всего лишь последовательность байтов. Apache Kafka может работать как на одной машине, так и на нескольких, которые вместе образуют кластер и повышают эффективность всей системы. В нашем случае мы запустим Apache Kafka локально, и для взаимодействия с ней будем использовать библиотеку от Confluent.
Важно понять то, как работает Apache Kafka. Мы можем писать в нее сообщения, и можем читать из нее. Все сообщения в Kafka принадлежат к тому или иному топику (topic). Топик – это как заглавие, и он должен быть определен для каждого сообщения, которое мы хотим передать в Apache Kafka. Точно также, если мы собираемся читать из Kafka сообщения, мы должны указать, с каким топиком будут эти сообщения.
Топик поделен на разделы, и их количество мы указываем, как правило, самостоятельно. Количество разделов в топике имеет большое значение для производительности, почитать об этом можно тут
Практическая часть
Скачиваем и запускаем Apache Kafka 0.11
На данный момент последней версией является версия 0.11. Скачайте архив с официального сайта (https://kafka.apache.org/downloads) и распакуйте в любую папку. Дальше из консоли надо запустить 2 файла (zookeeper-server.start и kafka-server-start) следующим образом.
Открываем первую консоль (если распаковали на диск С, открываем от имени Администратора, на всякий случай), переходим туда, где мы распаковали наш архив с Kafka, и вводим команду:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
После этого, если все хорошо и этот процесс не прекратился вскоре после старта, открываем так же вторую консоль, и запускаем уже саму Apache Kafka
bin\windows\kafka-server-start.bat config\server.properties
Только что мы запустили Zookeeper и Apache Kafka со стандартными настройками, указанными в zookeeper.properties и server.properties соответственно. Zookeeper – необходимый элемент, без него Apache Kafka не работает.
Полную информацию о запуске и конфигурировании Kafka можно посмотреть на официальном сайте
Начинаем кодить
Итак, Kafka запущена, теперь создадим наше “распределенное“ приложение. Оно будет состоять из 2 консольных приложений и одной библиотеки. В результате получим решение из 3 проектов, которое будет выглядеть примерно вот так:
Наша библиотека — это “обертка” вокруг библиотеки Confluent.Kafka, она нам нужна для взаимодействия с Apache Kafka. Кроме этого, она будет использоваться каждым из наших консольных приложений.
Библиотека предназначена для целевой платформы .NET Core 2.0 (Хотя, с таким же успехом могла бы быть создана для платформы .NET Standard) Ее код представлен ниже. Обратите внимание, для нее необходимо скачать nuget пакет Confluent.Kafka.
MessageBus.cs
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
namespace MessageBroker.Kafka.Lib
{
public sealed class MessageBus : IDisposable
{
private readonly Producer<Null, string> _producer;
private Consumer<Null, string> _consumer;
private readonly IDictionary<string, object> _producerConfig;
private readonly IDictionary<string, object> _consumerConfig;
public MessageBus() : this("localhost") { }
public MessageBus(string host)
{
_producerConfig = new Dictionary<string, object> { { "bootstrap.servers", host } };
_consumerConfig = new Dictionary<string, object>
{
{ "group.id", "custom-group"},
{ "bootstrap.servers", host }
};
_producer = new Producer<Null, string>(_producerConfig, null, new StringSerializer(Encoding.UTF8));
}
public void SendMessage(string topic, string message)
{
_producer.ProduceAsync(topic, null, message);
}
public void SubscribeOnTopic<T>(string topic, Action<T> action, CancellationToken cancellationToken) where T: class
{
var msgBus = new MessageBus();
using (msgBus._consumer = new Consumer<Null, string>(_consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
{
msgBus._consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset(topic, 0, -1) });
while (true)
{
if (cancellationToken.IsCancellationRequested)
break;
Message<Null, string> msg;
if (msgBus._consumer.Consume(out msg, TimeSpan.FromMilliseconds(10)))
{
action(msg.Value as T);
}
}
}
}
public void Dispose()
{
_producer?.Dispose();
_consumer?.Dispose();
}
}
}
Немного пояснений к коду библиотеки
Обертка для того и создана, чтобы все взаимодействие с Apache Kafka упростить донельзя и сосредоточиться на моментах во взаимодействии элементов системы друг с другом. В библиотеке есть два метода: SendMessage() и SubscribeOnTopic, в рамках туториала больше и не надо. Еще в SubscribeOnTopic мы подписываемся на топик и непрерывно «слушаем» сообщения, поэтому чтобы подписаться на несколько топиков, лучше запускать их в отдельных потоках, что мы и будем делать далее при использовании этой библиотеки с помощью конструкций Task.Run().
Далее мы построим “главное” приложение MainApp, а потом наш “микросервис” NameService, который запустим после в двух экземплярах. Каждый из них будет отвечать за генерацию либо мужских, либо женских имен.
Код простого консольного приложения MainApp для целевой платформы .NET Core 2.0 приведен ниже. Обратите внимание, в нем необходимо добавить ссылку на библиотеку, которую мы только что построили и которая находится в пространстве имен MessageBroker.Kafka.Lib.
MainApp.cs
using System;
using System.Threading;
using MessageBroker.Kafka.Lib;
using System.Threading.Tasks;
namespace MainApp
{
class Program
{
private static readonly string bTopicNameCmd= "b_name_command";
private static readonly string gTopicNameCmd = "g_name_command";
private static readonly string bMessageReq = "get_boy_name";
private static readonly string gMessageReq= "get_girl_name";
private static readonly string bTopicNameResp = "b_name_response";
private static readonly string gTopicNameResp= "g_name_response";
private static readonly string userHelpMsg = "MainApp: Enter 'b' for a boy or 'g' for a girl, 'q' to exit";
static void Main(string[] args)
{
using (var msgBus = new MessageBus())
{
Task.Run(() => msgBus.SubscribeOnTopic<string>(bTopicNameResp, msg => GetBoyNameHandler(msg), CancellationToken.None));
Task.Run(() => msgBus.SubscribeOnTopic<string>(gTopicNameResp, msg => GetGirlNameHandler(msg), CancellationToken.None));
string userInput;
do
{
Console.WriteLine(userHelpMsg);
userInput = Console.ReadLine();
switch (userInput)
{
case "b":
msgBus.SendMessage(topic: bTopicNameCmd, message: bMessageReq);
break;
case "g":
msgBus.SendMessage(topic: gTopicNameCmd, message: gMessageReq);
break;
case "q":
break;
default:
Console.WriteLine($"Unknown command. {userHelpMsg}");
break;
}
} while (userInput != "q");
}
}
public static void GetBoyNameHandler(string msg)
{
Console.WriteLine($"Boy name {msg} is recommended");
}
public static void GetGirlNameHandler(string msg)
{
Console.WriteLine($"Girl name {msg} is recommended");
}
}
}
Немного пояснений к коду MainApp
Видели много строковых readonly переменных вначале? Это названия всех топиков и сообщения, которые мы будем в них посылать. Иначе говоря, заглавия и текст сообщений. О них должны знать все сервисы, с которыми будет взаимодействовать наш MainApp, потому как названия топиков должны совпадать. Например, bTopicNameCmd — название топика для команды сервису о том, что нам надо получить мужское имя (gTopicNameCmd — аналогично). Сервис должен быть подписан на одноименный топик, чтобы получать из него сообщения и потом что-то делать.
Точно также, наш MainApp подписан на топики, в которые передают полезную информацию наши сервисы NameService. Например, переменная bTopicNameResp — это название топика, который предусмотрен для готовых мужских имен, которые сгенерировал NameService. Сервис посылает имя в этот топик, а MainApp их оттуда получает.
Точно также, наш MainApp подписан на топики, в которые передают полезную информацию наши сервисы NameService. Например, переменная bTopicNameResp — это название топика, который предусмотрен для готовых мужских имен, которые сгенерировал NameService. Сервис посылает имя в этот топик, а MainApp их оттуда получает.
Далее представлен код “микросервиса” NameService. Обратите, внимание, здесь тоже надо добавить ссылку на уже созданную нами библиотеку в пространстве имен MessageBroker.Kafka.Lib
NameService.cs
using System;
using System.Threading;
using System.Threading.Tasks;
using MessageBroker.Kafka.Lib;
namespace NameService
{
class Program
{
private static MessageBus msgBus;
private static readonly string userHelpMsg = "NameService.\nEnter 'b' or 'g' to process boy or girl names respectively";
private static readonly string bTopicNameCmd = "b_name_command";
private static readonly string gTopicNameCmd = "g_name_command";
private static readonly string bTopicNameResp = "b_name_response";
private static readonly string gTopicNameResp = "g_name_response";
private static readonly string[] _boyNames =
{
"Arsenii",
"Igor",
"Kostya",
"Ivan",
"Dmitrii",
};
private static readonly string[] _girlNames =
{
"Nastya",
"Lena",
"Ksusha",
"Katya",
"Olga"
};
static void Main(string[] args)
{
bool canceled = false;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
canceled = true;
};
using (msgBus = new MessageBus())
{
Console.WriteLine(userHelpMsg);
HandleUserInput(Console.ReadLine());
while (!canceled) { }
}
}
private static void HandleUserInput(string userInput)
{
switch (userInput)
{
case "b":
Task.Run(() => msgBus.SubscribeOnTopic<string>(bTopicNameCmd, (msg) => BoyNameCommandListener(msg), CancellationToken.None));
Console.WriteLine("Processing boy names");
break;
case "g":
Task gTask = Task.Run(() => msgBus.SubscribeOnTopic<string>(gTopicNameCmd, (msg) => GirlNameCommandListener(msg), CancellationToken.None));
Console.WriteLine("Processing girl names");
break;
default:
Console.WriteLine($"Unknown command. {userHelpMsg}");
HandleUserInput(Console.ReadLine());
break;
}
}
private static void BoyNameCommandListener(string msg)
{
var r = new Random().Next(0, 5);
var randName = _boyNames[r];
msgBus.SendMessage(bTopicNameResp, randName);
Console.WriteLine($"Sending {randName}");
}
private static void GirlNameCommandListener(string msg)
{
var r = new Random().Next(0, 5);
var randName = _girlNames[r];
msgBus.SendMessage(gTopicNameResp, randName);
Console.WriteLine($"Sending {randName}");
}
}
}
Немного пояснений к коду NameService
Сервис работает по следующей схеме:
В теле метода обработчика события мы посылаем сообщение с готовым именем в топик, на который MainApp уже подписан. А это событие наступает каждый раз, как MainApp посылает сообщение о том, что нужно получить какое-то имя.
- Сначала определяемся, мужские или женские имена данные сервис будет генерировать (т.е. просто выбирать случайное имя из подготовленного списка, в нашем случае)
- Подписываемся на соответствующий топик
В теле метода обработчика события мы посылаем сообщение с готовым именем в топик, на который MainApp уже подписан. А это событие наступает каждый раз, как MainApp посылает сообщение о том, что нужно получить какое-то имя.
Запускаем
На этом этапе у вас, по идее, уже должно быть готовое решение со всем необходимым кодом. Далее можно поступить следующим образом: настроить решение так, чтобы запускались сразу 2 приложения (MainApp и NameService), и запустить их (Только проверьте, что у вас уже запущена Apache Kafka). В NameService вводим 'b', или 'g', чтобы настроить сервис для генерирования мужских или женских имен, после чего, точно также, вводим в MainApp 'b' или 'g', но уже для получения этих самых имен. После чего в MainApp вы должны получить какое-то имя.
На данном этапе мы получаем имена только одного пола. Допустим, только мужского. Теперь мы захотели получать имена женского пола. Идем в папку, куда собрался наш проект NameService, и запускаем в консоли еще один сервис с помощью команды "dotnet NameService.dll".
Вводим в нем команду 'g', и теперь, при запросе женского имени в MainApp, мы его получаем.
Кстати, таким образом можно запустить сколько угодно сущностей NameService, и в этом заключается одно из достоинств микросервисной архитектуры. Например, если один из сервисов «упадет», вся система не рухнет, т.к. у нас есть другие сервисы, которые делают точно такую же работу.
Одно но: cейчас если мы, например, запустим 5 штук NameService, то в MainApp придет 5 имен, а не одно. Это из-за настроек Apache Kafka, прописанных в файле server.properties. В рамках туториала я этого намеренно не касаюсь, чтобы не усложнять материал.
Заключение
В данной статье я хотел как можно проще и доступнее описать принцип построения микросервисной архитектуры и познакомить читателя с распределенным брокером сообщений Apache Kafka на живом примере. Надеюсь, получилось, и спасибо за внимание:)