Привет! Меня зовут Илья Макаров, я работаю архитектором решений и в статье расскажу про архитектуру цифровой платформы НЛМК, из каких компонент, помимо Apache Kafka, она состоит, к каким соглашениям по именованию топиков и договоренностям по передаче данных мы пришли, как всем этим управляем.

А это сразу ссылка на часть 2.

Архитектура и подходы, про которые я рассказываю – это эволюция, которая до сих пор продолжается. Мы начали с решения относительно простой задачи - организации взаимодействия между разными системами, для сбора логов и трейсов, но, за счет погружения в особенности производства, пришли к понимаю применимости нашей платформы для решения и других задач:

  • Телеметрия – сбор и организация доступа к данным с различных датчиков.

  • Обмен данными между системами в изолированных, с точки зрения требований ИБ, сегментах.

  • Загрузки данных в централизованные хранилища.

На основании данных групп задач мы решили сделать следующие кластеры:

  1. централизованный кластер в бизнес-сегменте под данные и интеграции ИС.

  2. выделенный кластер в бизнес-сегменте под логи и трейсинг.

  3. выделенный кластер в бизнес-сегменте под тренды.

  4. локальные к производству кластеры (под данные, логи и тренды сегмента производства).

Целевая, она же получившаяся, схема интеграций через Apache Kafka

Kafka можно назвать одним из основных, базовых, компонентов инфраструктуры. Подходы и принципы, заложенные в начале, живут очень долго и их очень сложно поменять. Поэтому очень важно на начальном этапе сразу продумать, как пользователи будут авторизоваться в разных кластерах, как будут именоваться топики, в каком формате передавать сообщения, как обеспечить контроль качества схем и данных.

Можно выделить несколько уровней зрелости систем в зависимости от степени автоматизации:

  • заявки, ручник и длительное ожидание.

  • автоматизированные инструменты администраторов, написанные тесты к входной информации от пользователей.

  • все запросы пользователей автоматизированы.

  • приложения-потребители могут жить без поддержки (будь то перевыпуск сертификата или CA, или смена пароля, всё для пользователей ИС происходит автоматически).

Мы сейчас находимся где-то между уровнями 2 и 3, и кажется, что уже можем поделиться тем, к чему пришли.

Кластеры и аутентификация

Каждому кластеру присваивается имя вида XXX-Y, состоящее из номера группы (ХXX) и суффикса (Y) - идентификатора среды.

Номера группы выдаются последовательно. В группе может быть только один кластер с продуктивной средой. Условлено, что среда prod всегда имеет суффикс 0test или dev- любые другие числа.

Например, кластерная группа 000- это наш центральный кластер под данные. Его продуктивный кластер Kafka имеет имя 000-0, его дев кластер 000-1, а тестовый кластер - 000-2. Имена кластеров уникальны в рамках компании. Это позволяет гарантировать уникальность имен топиков в рамках компании, определять кластер по имени топика, реплицировать топики без конфликта имен.

Аутентификация

Самый универсальный способ аутентификации - mTLS, позволяет аутентифицировать клиентов даже там, где нет, например, контроллера домена, и поддерживается всеми библиотеками и ПО, которое мы используем. Подсмотрев как это было сделано в Booking (спасибо Александру Миронову) мы разработали следующий подход к CA и сертификатам.

На каждый кластер выпускается свой CA(Cluster XXX-X CA). Этот CA выпускает сертификаты для брокеров и сервисов администрирования (KSM).

Также два CA для сервисов: Service PROD и Service NONPROD.

На продуктивные кластеры Kafka устанавливается два CA:

  • Cluster XXX-X CA

  • Service PROD CA

На тестовые устанавливается три CA:

  • Cluster XXX-X CA

  • Service PROD CA

  • Service NonPROD CA

Таким образом, приложения, у которых сертификат выпущен Service PROD CA, могут подключаться и к продуктивным, и к непродуктивным кластерам Kafka. Как пример, последнее сделано, чтобы можно было реализовать сервис по наполнению тестовых сред на основе данных из продуктивной среды. Приложения с сертификатами, выданными Service NonPROD CA, могут подключаться только к непродуктивным кластерам. Отдельный CA для сервисов позволяет приложениям с одним сертификатом подключаться к разным кластерам.

Для создания CA и выпуска сертификатов мы используем Lemur, (опять же, спасибо Александру Миронову), а также Vault. Lemur предоставляет авторизацию, выпуск сертификатов через Rest, хранение сертификатов и выполняет их доставку в Hashicorp Vault.

Vault используем не только для хранения секретов, но и для выпуска сертификатов для сбора логов с машин. Каждая ВМ получает токен с которым может выпустить себе сертификат. Сертификат выпускается на 7 дней и автоматически ротируется с помощью Consul Template. Скорее всего, в будущем мы откажемся от Lemur и полностью перейдем на Vault.

Для управления ACL мы используем подход GitOps: Git -> CI -> KSM.

В качестве инструментов для администрирования используем:

Мониторинг

Хосты и сервисы​

Наш основной стек мониторинга - Prometheus и VictoriaMetrics. Для сбора метрик с Java сервисов Kafka и Kafka REST мы используем Prometheus JMX Exporter. Он запускается как Java agent и предоставляет http интерфейс на localhost с метриками JVM. Zookeeper, начиная с версии 3.6, нативно умеет предоставлять метрики в Prometheus-формате.

На виртуальных машинах в качестве агентов сбора метрик используется Telegraf.
Он собирает:

Про мониторинг Zookeeper отлично все собрано у Altinity: Zookeeper monitoring

Kafka REST

Для Kafka-REST мы собираем статистику по http кодам ответов с помощью модуля Nginx VTS в разрезе dataset name(см. соглашение об именовании топиков).

Пример nginx/conf.d/kafka-rest.conf:
map $uri $namespace {
    ~*/topics/[0-9-]+\.(?<datasetname>[a-z0-9-]+)\..+$  $datasetname;
    default      'unmatched';
}

server {
   ...
   host_traffic_status_filter_by_set_key $host $namespace;
   ...
}

А также, мы используем Blackbox Exporter, с помощью которого периодически отправляем тестовое сообщение в kafka-REST и проверяем код ответа. Данный метод имеет свои слабые стороны (например, AVRO схема кэшируется на стороне Kafka-REST и проблемы со Schema Registry можно не заметить). Улучшение этого мониторинга уже запланировано.

Внешний мониторинг Kafka

В качестве агента для мониторинга кластера снаружи мы используем: KMinion. Его функционал очень богат, он позволяет видеть статистику по кластеру в целом, по топикам, по консьюмер группам, а также размер топиков на диске, и даже есть end-to-end мониторинг.

Для деплоя и управления всеми инструментами администрирования мы используем подход GitOps и деплоим инструменты в OpenShift (очень удобно, что все инструменты можно закрыть с помощью OpenShift OAuth).

Публичные дашборды

Возможность самостоятельной диагностики и открытость - основной наш подход. В нашем централизованном сервисе мониторинга всем авторизованным пользователям доступны подготовленные нами дашборды для Kafka и Kafka REST.

Топ-3 вопроса от пользователей по Kafka и Kafka REST:

  • есть ли данные в топике?

  • пишутся ли данные?

  • все ли нормально с Kafka REST?

На первые два вопроса помогают ответить стандартные дашборды KMinion. Мы их слегка изменили, добавив возможность выбора кластера и среды.

Для ответа на третий вопрос, мы сделали дашборд из метрик Nginx VTS.

Пример дашборда Nginx VTS

Соглашение об именовании топиков​

Следующим важным шагом была разработка концепции именования топиков.

Мы пришли к следующему формату:

<cluster name>.<dataset name>.<message type>.<data name>.<version>

Где:

  • cluster name - уникальное имя кластера Kafka вида XXX-X. Для реплицированных из кластера YYY-Y: XXX-X-YYY-Y.

  • dataset name - условно - имя базы данных, уникальное в рамках всех кластеров. Используется как категоризация для группировки топиков вместе.

  • message type - тип сообщений, говорит о том как данные должны быть интерпретированы или использованы. Выбирается из справочника MessageType.

  • data name - сущность аналогичная "таблица" в БД, название потока данных.

  • version - версия данных, для обратно несовместимых изменений в схеме данных, начиная с 0.

Для валидации, что имя топика соответствует конвенции, мы используем следующий regexp:
^\d{3}-\d(-\d{3}-\d)?\.[a-z][a-z0-9-]+[a-z0-9]\.(db|cdc|bin|cmd|sys|log|tmp)\.([a-z0-9][a-z0-9-]*[a-z0-9]+)\.\d+$

Справочник MessageType

Название

Интерфейс записи

Формат сообщения

Публичный

Комментарий

db

Kafka REST

AVRO

+

Для фактов, неизменяемых событий (данные датчиков, действия пользователей и т.д.).

cdc

Kafka REST

AVRO

+

Топик типа Compaction. Содержит полный набор данных и получает изменения к ним. Может использоваться для перезаливки хранилищ и кэшей, справочников. Key обязателен.

cmd

Kafka REST

AVRO

+

Содержит команды, используется в реализации паттерна Запрос-Ответ.

bin

Kafka REST

Bin

+

Бинарные данные (Content-Type: application/vnd.kafka.binary.v2+json) - используются в исключительных ситуациях. Рекомендуется в концеdata name перед версией топика добавлять расширение данных (например: 000-0.dataset-name.bin.name.crt.0)

sys

Binary

Bin

-

Внутренние топики, используемые только этим сервисом и неинтересные другим.

log

Binary

Bin

-

Для передачи логов (Syslog, FluentBit).

tmp

Binary

Bin

-

Для временных или промежуточных данных.

Для упрощения понимания концепции, и пока не перешли полностью на Self-Service для Kafka на Портале, мы сделали следующую flow диаграмму:

Пара статей про подходы к именованию топиков:

Kafka REST Proxy​

Изначально, в компании интеграцию с Kafka на запись для систем сделали через NiFi: http интерфейс, формат данных JSON, схемы к сообщениям хранились так же в NiFi. В случае несоответствия сообщения схеме, данные все равно записывались, но помещались в очередь "битые". Данный подход имел ряд проблем: NiFi вызывал вопросы, были проблемы с битыми сообщениями и аргументами "с нашей стороны пуля вылетела". Хотелось перепроектировать интеграции так, чтобы пуля "не вылетала", если она не соответствует контракту.

И ещё, на мой взгляд, единственный, кто может обеспечить качество данных — это источник (и, как мне кажется, кто знает, как их лучше уложить/архивировать).

Итого, требования к интеграциям:

  • выбор бинарного формата, чтобы в стандартное сообщение в 1Мб помещалось больше полезных данных;

  • эволюция схем, предоставить отправителю возможность управлять схемами и версиями;

  • не принимать сообщения в Kafka, если не соответствуют контракту;

  • как можно большая поддержка и распространенность формата, наличие готовых инструментов, библиотек, поддержка формата в Hadoop.

По формату решили использовать AVRO. Для предоставления возможности записи в Kafka через HTTP-протокол, контроля формата сообщений и хранения схем, мы выбрали связку Confluent Kafka Rest и Confluent Schema Registry.

Для авторизации перед Kafka-REST, мы написали программу на Go - RA, которая выполняет следующие функции:

  • проверяет, что имя топика соответствует конвенции имен;

  • проверяет авторизацию и аутентификацию пользователя для топика (или схемы);

  • проверяет Content-Type для топика (обязывает отправлять AVRO).

Пример конфигурации (разрешаем пользователю l4-example создание и запись в топики его ИС):
auth:
  prefix: /topics/
  urlvalidreg: ^\d{3}-\d(-\d{3}-\d)?\.[a-z][a-z0-9-]+[a-z0-9]\.(db|cdc|bin|cmd|sys|log|tmp)\.([a-z0-9][a-z0-9-]*[a-z0-9]\.)+\d+$
  acl:
  - path: ^000-0\.l4-example\.db\.+?$
    users:
    - l4-example
    methods:
    - post
    contenttype:
    - application/vnd.kafka.avro.v2+json

Авторизацию интегрировали в Nginx для обоих сервисов (Rest и Schema Registry) через опцию auth_request.

Пример секции auth_request:
 auth_request /auth;
 location = /auth {
   internal;
   proxy_pass http://ra:8080;
   proxy_pass_request_body     off;
   proxy_set_header Content-Length "";
   proxy_set_header X-Original-URI $request_uri;
   proxy_set_header X-Original-Method $request_method;
   proxy_set_header Host $http_host;
   proxy_set_header X-Real-IP $remote_addr;
   proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
   proxy_set_header X-Forwarded-Proto $scheme;
   proxy_set_header X-Service "kafka-rest";
 }

Kafka REST и Schema Registry работают в HA-режиме и позволяют горизонтально масштабироваться.

Schema Registry на продуктивных и тестовых кластерах доступна только в режиме Read-Only, т.е. все могут получить любую схему, но зарегистрировать её можно только через Kafka REST. Schema Registry Dev кластера доступна без какой-либо авторизации и ограничений. Схемы продуктивных сред на регулярной основе выгружаются в Confluence для возможности быстро посмотреть схему топика или текстового поиска по ним.

Репликация данных в централизованный кластер

Кластеры Kafka разных производственных площадок НЛМК находятся за Firewall. Мы реплицируем данные из этих Kafka в централизованный кластер, чтобы уменьшить количество клиентов, которые обращаются в технологический сегмент, и собрать все данные в одном кластере.

Для репликации мы используем программу, написанную нами на python. Она умеет реплицировать топики в формате Confluent-Avro из одного кластера в другой с минимальными изменениями и поддерживает Exactly Once.

Логика ее проста:

  • в целевом кластере создается топик с таким же количеством партиций, как и в исходном кластере;

  • при записи в целевой кластер номер партиции сохраняется;

  • составляется соответствие: номер схемы в исходном кластере - номер схемы в целевом.  При необходимости, схема регистрируются в Schema Registry целевого кластера;

  • в исходном бинарном сообщении заменяется только номер схемы (без десериализация в AVRO);

  • для поддержки Exactly Once, при записи к сообщению в Headers добавляем offset в исходном топике и используем эту информацию для начального выставления смещений или при сбоях.

Пример сообщений в целевом кластере

Требования к AVRO схеме​

Для топиков, которые пишутся в AVRO формате, мы решили стандартизовать базовый формат сообщений (схему), правила именования полей и пространств, чтобы у подписчиков не было проблем с получением данных.

За основу взяли стиль для Java от Google:

И получилось:

  • наименования (name) всех атрибутов должны быть в формате camelCase (состоять только из букв и цифр и не начинаться с цифры) и с учетом следующих требований:

    • Если у атрибута "type" - "record", то наименование (name) должно начинаться с большой буквы.

    • Если у атрибута "type" не "record", то наименование (name) должно начинаться с маленькой буквы.

  • атрибут "namespace" состоит из префикса "ORG_NAME." и dataset name (точки и тире удаляются, после них буква заменяется на заглавную).

  • значение заглавного атрибута "name" должно начинаться с большой буквы.

Базовая (обязательная для всех) схема сообщений топика выглядит так
{
    "namespace": "__AVRO_ROOT_NAMESPACE__",
    "type": "record",
    "name": "__AVRO_ROOT_NAME__",
    "doc": "Example message",
    "fields": [
       {"name": "ts", "type": "string", "doc": "Время события в ISO 8601:2004 с указанием TZ"},
       {"name": "op", "type": {
                    "type": "enum", "name": "EnumOp", "namespace": "ORG_NAME",
                    "symbols": ["U", "D", "I"]
            }, "doc": "Вид операции [U]pdate, [D]elete, [I]nsert"
        },
        {"name": "pk",
         "type": {"type": "record", "name":"PkType", "namespace": "__AVRO_DATASET_NAMESPACE__",
            "fields":
            [
                { "name": "pkID", "type":"string"}
            ]
        },
         "doc": "Первичный ключ записи в виде структуры"
        },
        {
            "name": "sys",
            "type": ["null", {
                "name": "Sys", "namespace": "ORG_NAME",
                "type": "record",
                "fields": [
                    {"name": "seqID", "type": "long", "default": -1,"doc": "Монотонно возрастающий счетчик для проверки или восстановления оригинальной последовательности"},
                    {"name": "traceID", "type": "string", "default": "", "doc": "Сквозной Trace ID (обычно UUID)"}
                ]
            }],
            "default": null,
            "doc": "Системные поля, заполняемые источником"
        },
        {
            "name": "metadata",
            "type": ["null", {
                "type": "record",
                "name": "Metadata", "namespace": "ORG_NAME",
                "fields":[
                    {"name": "kafkaKeySchemaID","type": "int","default": -1, "doc":"Версия схемы ключа"},
                    {"name": "kafkaValueSchemaID","type": "int","default": -1, "doc": "Версия схемы значения"},
                    {"name": "kafkaKey","type": "string", "default": "", "doc": "Ключ в Kafka"},
                    {"name": "kafkaPartition","type": "int","default": -1, "doc": "Номер партиции"},
                    {"name": "kafkaOffset","type": "long","default": -1, "doc": "Offset в Kafka"},
                    {"name": "kafkaTimestamp","type": "string", "default": "", "doc":"Время сообщения в Kafka"},
                    {"name": "kafkaTopic","type": "string", "default": "", "doc":"Имя топика"},
                    {"name": "kafkaHeaders", "type": ["null", {"type": "map", "values": "string"}], "default": null}
                ]
            }],
            "doc": "Мета структура, заполняется Подписчиком после чтения",
            "default": null
        },
        {"name": "data",
         "type":["null",{
            "type": "record",
            "name": "RecordData", "namespace": "__AVRO_DATASET_NAMESPACE__",
            "fields":[
                {"name": "message", "type": "string", "doc": "Пример передачи строки message"}
                 
            ]
         }],
         "doc": "Полезная нагрузка"
        }
    ]
}

Где:

  • AVRO_ROOT_NAMESPACE Корневое пространство имен. 
    Генерируется из <dataset name>путем замены всех тире на точки и добавления префикса ORG_NAME.

    Пример: имя топика: 000-0.l3-hy.db.sales.order-confirmation.0;
    корневое пространство имен: ORG_NAME.l3.hy.

  • AVRO_ROOT_NAME Корневое имя. 
    Генерируется как <message type>.<data name>.ver<version>, где первая и все буквы следующие за точкой или тире заменяются на заглавные.

    Пример: имя топика: 000-0.l3-hy.db.sales.order-confirmation.0
    корневое имя: DbSalesOrderConfirmationVer0.

  • AVRO_DATASET_NAMESPACE Namespace топика.
    Генерируется как корневой namespace + <message type>.<data name>.ver<version>, все - заменяются на ..

    Пример: имя топика: 000-0.l3-hy.db.sales.order-confirmation.0;
    namespace топика: ORG_NAME.l3.hy.db.sales.order.confirmation.ver0.

Источник может расширять только структуры pk и data, сам корневой уровень изменять не разрешается.

Большинству подписчиков нужна мета информация к сообщению. С помощью неё можно проверить данные на предмет отсутствия пропусков, выстроить последовательность, точно указать на сообщение в случае проблем. Мы решили сразу заложить в базовую схему структуру под эту мета информацию. Ее заполняет подписчик, если ему это нужно.

Это решение позволило нам так же сохранять метаданные сообщений Kafka в Stage слой на HDFS и в HBase в AVRO формате.

Небольшие рекомендации по заполнению структуры data

Необходимо стараться сохранить исходный тип данных или следить, чтобы аналог не потерял в точности.

Тип float не подходит для передачи отчетных типов.

Возможные способы решения проблемы:

  • методом домножения на множитель и приведением к целочисленному типу.

    • Статический множитель:
      пример:

      "name": "priceX10000", "type": "long", "doc": " Цена домноженная на 10000"
    • Динамический множитель:

      {"name": "price",
          "type": {"type": "record", "name":"Price", "fields":
             [
                 { "name": "value", "type": "long", "doc":"Цена домноженная на multiplier"},
                 { "name": "multiplier", "type": "int", "doc":"Множитель"},
             ]
         },
      
  • приведение к меньшим единицами измерения (в зависимости от точности измерений, а лучше - с запасом):

    • целое число секунд вместо дробных часов

    • целое число метров вместо дробных километров

Продолжение следует...

В следующей части я расскажу о том, как мы централизованно доставляем данные потребителям и в корпоративное хранилище на базе Hadoop. И немного про наш Портал самообслуживания.