Как стать автором
Обновить

Настройка Kafka для работы в режиме подтверждения о принятии сообщения (ack/nack)

Время на прочтение3 мин
Количество просмотров14K

На новом проекте, на котором я работаю в качестве PHP Tech Lead. Команда столкнулась с вопросом наведения порядков, один из которых - унификация:

  • единый тип реляционной базы данных

  • единый брокер сообщений

  • единый фреймворк в разрезе языка программирования

  • etc.

В этой статье пойдет речь о выборе брокера сообщений.

Kafka на данный момент уже используется в компании и эта технология является более масштабируемой чем тот же RabbitMQ, поэтому мы и смотрим в ее сторону. Перед тем как начать внедрять Kafka в наши проекты, мы протестируем закрывает ли эта технология наши потребности.

Предъявляемые требования к брокеру сообщений:

  • Совместимость с PHP

  • Стабильная работа, отказоустойчивость, высокий SLA(>99,95%)

  • Масштабируемость

  • Быстрая скорость отправки сообщения в очередь

  • Уверенность в доставке сообщения

  • Уверенность в получении и обработке сообщения(ack/nack)

По сути это все все и так есть и отлично работает(на малых и средних объемах) у RabbitMQ.

Но как я уже писал выше - мы не хотим плодить зоопарк технологий.

1. Выбор клиента для PHP

На официальном сайте Kafka есть несколько ссылок на репозиториев интеграции.

Только 1н репозиторий "живой" на текущий момент и имеет большое количество звезд на GitHub - arnaud-lb/php-rdkafka.

2. Сетап окружения

В 21м году Docker для разработчика это как кнут для Индианы Джонс.

  1. Я зашел на hub.docker.com и нашел самый популярный официальный образ kafka.

  2. Дальше поиск в гугл готового примера docker-compose файла с PHP, возможно даже с kafka. И о сюрприз - первая строка выдачи поискового результата - Phillaf/php-kafka-demo. Тут уже присутствует образ из первого пункта.

git clone <https://github.com/Phillaf/php-kafka-demo>
  1. После клонирования репозитория, запускаем:

docker-compose up -d

С первого раза конечно же проект не собрался(как минимум у меня на MacOs BigSur). Для того чтобы испраить ошибку было достаточно просто убрать чётко заданую версию Kafka.

  1. Все готово для работы приожения, но а как же без доп. бонуса? Для того чтобы лучше понимать что происходит с неизвестным мне инструментом, я решил сразу же поискать визуальный менеджер. Было несколько платных версий которые нужно устанавливать на ПК. Но это же не трушно. Поэтому я установил opensource admin panel

    Для этого добавляем несколько строк в наш docker-compose.yml

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8080:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
      - KAFKA_CLUSTERS_0_READONLY=false

  • На старт, внимание, пуск....

    docker-compose up -d

  • Go to http://localhost - Добавляет сообщения в нашу очередь(топик).

  • Go to http://localhost:8080 - откроет нашу admin panel - где мы увидим наш топик и первые сообщения в нем.

  • заходим в контейнер (лично я предпочитаю пользоватьтся docker в phpStorm), но можно и черз GUI docker так и с помощью консоли(не буду тут на этом останавливаться, думаю что тем кому уже интересна Kafka - точно умеют смогут зайти в контейнер Docker) и запускаем consumer

php ./public/consumer_low.php

В консоли мы увидим наши сообщения.

Поздравляю вас теперь вы адепты Kafka.

3. Настраиваем конфигурацию под наши требования

Как я писал выше, нас интересует полный контроль над сообщениями, наше приложение хочет получать сообщение ровно 1н раз, не больше и не меньше. При этом мы хотим быть уверенными что если в процессе обработки сообщения консюмер внезапно завершит свою работу(в следствии деплоя, возникновения ошибки или падения сервера, ...) то после перезапуска - это сообщение будет обработано, а не канет в небытие.

  1. Для этого нам потребуется отключить автоматическую синхронизацию кафки о сохраненном оффсете.

$conf->set('enable.auto.commit', 'false');
  1. Для того чтобы севрвер мог сохранять offset мы должны назначить consumer'у уникальный идентификатор чтобы при переподключении кафка знала на каком месте мы остановлись.

$conf->set('group.id', 'group_1');
  1. Начинаем прослушивать очередь:

$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);

1й параметр - номер партиции для чтения(по дефолту топик имеет 1у партицию и значение переменной будет = 0). 2й параметр - число(int) - offset c которого стоит начать читать сообщения. Существует 3 предустановленных режима:

  • RD_KAFKA_OFFSET_BEGINNING - начать чтение с первого сообщения в партиции

  • RD_KAFKA_OFFSET_END - читать только новые сообщения, которые появятся в очереди только после подключения консюмера

  • RD_KAFKA_OFFSET_STORED - читать с того места где остановились в прошлый раз(этот флаг возможно использовать только в случае указания - 'group.id')

  1. Получение сообщения из очереди

$msg = $topic->consume($partition, 1000);
  1. После успешной обработки сообщения(сохранения в БД или тп) помечаем сообщение как прочитанное(инкрементируем оффсет)

$topic->offsetStore($partition, $msg->offset);

Вуаля, мы реализовали консистентную работу c Apache Kafka.

Ссылка на репозиторий.

Буду рад вашим комментариям.

Теги:
Хабы:
Всего голосов 8: ↑7 и ↓1+6
Комментарии2

Публикации

Истории

Работа

PHP программист
146 вакансий

Ближайшие события