Как стать автором
Обновить

Как настроить интеграцию между kafka и rabbitmq (путешествие туда и обратно)

Время на прочтение5 мин
Количество просмотров6.2K

Данная статья будет полезна тем, кто столкнулся с проблемой интеграции kafka и rabbitmq. Материал не претендует на подробный туториал, но поможет помочь настроить рабочий процесс. Я расскажу, как отправить сообщение в rabbitmq и получить его на стороне kafka, а также обратный процесс, с которым, спойлер, всё оказалось сложнее.

Путешествие «туда и обратно».

Зачем?

Ситуация максимально банальна и проста. Когда разные команды, делают свои проекты с использованием разных message broker’ов, рано или поздно наступает момент, когда появляется необходимость интеграции (и да, я в курсе, что kafka - нечто большее, чем просто mb). Первое, что приходит на ум – это сделать интеграцию через REST, но мы получаем сразу несколько значительных минусов. Например, long pulling. Как поставить на расчет долгий процесс? Как дождаться ответа? Итог: реализации на REST очень далеки от идеала. Внедрение REST так же повлечет танцы с бубном, так как текущая архитектура уже заточена под message broker.

Следующей технологией для интеграции можно рассмотреть grpc. Хороший вариант, но тоже имеет изъян, так как не очень хорошо работает с асинхронными запросами, а нас интересовали именно длительные запросы.

Итак, мы пришли к решению использовать брокер сообщений, но тут появилась главная проблема: одна команда использовала rabbitmq, а другая apache kafka. Первой мыслью было выбрать одну технологию и использовать ее, но оценив трудозатраты по переписыванию pipleline взаимодействия в любой из систем решено было искать альтернативные варианты. Да, конечно, apache kafka – это нечто большее, чем просто брокер сообщений, но в данной ситуации нам требовалась именно эта его функция.

Просматривая интернеты, я наткнулся на интересную статью, в которой описывалась именно та ситуация, в которую я попал. Воодушевленный находкой, я подумал, вот она, «моя прелесть», начал изучать туториал и пытаться развернуть сервис локально.

Был собран и использован контейнер

from kafka-connect:6.2.0

Далее во всех туториалах была обозначена schema-registry

schema-registry:

image: confluentinc/cp-schema-registry:6.2.0
container_name: schema-registry
ports:
- 8081:8081
hostname: schema-registry
depends_on:
- kafka
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://server-name.local:9092
SCHEMA_REGISTRY_CUB_KAFKA_TIMEOUT: 300

Ее я тоже поднял, но в дальнейшем она оказалась для меня бесполезной, ее предназначение – это использование дополнительных сериализаторов, меня полностью устроили дефолтные.

Из kafka в rabbitmq

Следующим этапом была настройка коннекторов. Для успешного создания коннектора нужно сначала подготовить kafka и rabbitmq.

На стороне kafka создаем топик kafka_result. Со стороны rabbitmq нужно создать очередь kafka_to_rabbit, еще понадобиться exchange kafka_to_rabbit_exch и, конечно, выполнить binding (операцию привязки очереди к exchange) с ключом kafka-connectors.

Kafka, в отличие от rabbitmq, не удаляет сообщение из топиков, и для контроля прочитанных сообщений нам необходимы конфигурационные топики, которые наш сервис создаст самостоятельно, нам остается только при старте в окружение добавить названия этих топиков. В некоторых ситуациях создание топиков запрещено извне, поэтому придется обратиться к администраторам kafka.

В нашем случае это

CONNECT_OFFSET_STORAGE_TOPIC _kafka-connect-group-01-offsets

CONNECT_STATUS_STORAGE_TOPIC _kafka-connect-group-01-status

CONNECT_CONFIG_STORAGE_TOPIC _kafka-connect-group-01-configs

После чего, запускаем наш сервис и выполняем POST запрос.

https://service_addres/connectors

{
    "name": "kafka-to-rabbit",
    "config" : {
        "connector.class" : "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector",
        "tasks.max" : "1",
        "topics" : "kafka_result",
        "rabbitmq.queue" : "kafka_to_rabbit",
        "rabbitmq.username": "guest",
        "rabbitmq.password": "guest",
        "rabbitmq.host": "host-rabbitmq",
        "rabbitmq.port": "5672",
        "rabbitmq.exchange": "kafka-to-rabbit-exch",
        "rabbitmq.routing.key": "kafka-connectors",
        "rabbitmq.delivery.mode": "PERSISTENT",
        "confluent.topic.bootstrap.servers":"addres_kafka_server:9092",
        "rabbitmq.virtual.host": "/",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

 В случае удачного выполнения запроса, вернется 201 ответ. Проверить создавшийся коннектор можно в браузере по тому же адресу (или через get запрос)

https://service_addres/connectors

Можно увидеть массив коннекторов.

Отлично, если все получилось. Отправляем сообщение в топик kafka и получаем это сообщение на стороне rabbitmq, при чем конфигурационные топики будут контролировать, чтобы нам не приходило дубликатов, и сообщения не потерялись.

Прекрасно, но есть один нюанс. Почти все англоязычные туториалы на этом заканчиваются, как будто всех интересует только перепушивание сообщений из kafka rabbitmq.

А обратно то как? Как перепушить сообщение из rabbitmq в kafka?

Из rabbitmq в kafka

Вернемся к созданию нашего коннектора. Там есть строчка.

"connector.class" : "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector"

RabbitMQSinkConnector – отвечает за перепушивание из rabbitmq в kafka.

Теперь нам нужен RabbitMQSourceConnector. Пытаемся создать новый коннектор и получаем ошибку, что такой коннектор не найден в нашем контейнере. Неприятно.

Решение проблемы заключается в том, что при сборке нашего контейнера нужно дополнительно поставить плагин. Исправляем наш докер файл и добавляем туда строчкуconfluent-hub install --no-prompt --verbose confluentinc/kafka-connect-rabbitmq:1.5.2

Пересобираем контейнер и пробуем создать обратный коннектор

{
"name": " rabbit-to-kafka",
"config": {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"kafka.topic" : " kafka_task",
"rabbitmq.queue" : "rabbitmq_to_kafka",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"rabbitmq.host": "hostname",
"rabbitmq.port": "5672",
"rabbitmq.virtual.host": "/",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"kafka_host:9092",
"confluent.topic.replication.factor":1,
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}

Выполняем POST запрос и дожидаемся 201 ответа, проверяем коннекторы и видим успешно создавшийся

Отлично, теперь визуализируем нашу схему взаимодействия rabbitmq с kafka.

1. Отправляем сообщение в очередь rabbit_to_kafka и получаем его в топике kafka_task

2. Отправляем сообщение в топик kafka_result и получаем сообщение в очереди kafka_to_rabbit

В данной статье описано двусторонне взаимодействие, из rabbitMQ в kafka и обратно, в большинстве примеров найденных в сети описаны ситуации одностороннего взаимодействия, я потратил значительное время чтобы соединить все в одно.

Теги:
Хабы:
+8
Комментарии6

Публикации