Как стать автором
Обновить

Интеграция Kafka с Manticore Search: пошаговое руководство по обработке данных в реальном времени

Уровень сложностиСредний
Время на прочтение10 мин
Количество просмотров1.2K

Kafka — это популярный брокер сообщений, который используется в самых разных проектах: от обработки логов и управления очередями задач до персонализации контента и аналитики в реальном времени. Например, с его помощью можно индексировать изменения в Wikipedia или искать товары в интернет-магазинах. Manticore Search, в свою очередь, поддерживает интеграцию с Kafka, что позволяет автоматически импортировать данные и применять их для полнотекстового поиска, аналитики, векторного поиска и других задач.

При импорте данных в Manticore можно гибко их обрабатывать:

  • Удалять ненужные поля, добавлять новые или изменять существующие;

  • Рассчитывать расстояние между геопозициями;

  • Фильтровать данные перед сохранением;

  • Создавать сниппеты с помощью полнотекстового поиска.

В этой статье мы разберём, как построить небольшое приложение, которое получает данные из Kafka и обрабатывает их в Manticore Search. Для настройки окружения будем использовать Docker Compose. Руководство подойдёт как новичкам, так и опытным разработчикам. Готовый код и демо доступны на GitHub .

Подготовка окружения

Давайте начнем с настройки нашего окружения разработки. Мы будем использовать Docker Compose для создания всей инфраструктуры, включающей Kafka, Manticore Search и сервис Kafkacat для потоковой передачи данных. Сначала рассмотрим конфигурацию каждого сервиса отдельно, а затем предоставим полный файл docker-compose.yml.

Чтобы сэкономить время, вы можете скачать готовый файл docker-compose.yml из нашего репозитория на GitHub и сразу перейти к разделу Запуск окружения , если хотите начать быстрее.

Настройка Kafka

Начнём с развёртывания Kafka. Мы используем упрощённую конфигурацию с протоколом KRaft (Kafka Raft), который заменяет ZooKeeper для упрощения архитектуры. Вот часть файла docker-compose.yml, относящаяся к сервису Kafka:

kafka:
  image: docker.io/bitnami/kafka:3.7
  container_name: kafka
  networks:
    - app-network
  environment:
    # Настройки KRaft
    - KAFKA_CFG_NODE_ID=0
    - KAFKA_CFG_PROCESS_ROLES=controller,broker
    - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
    # Слушатели
    - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
    - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
    - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT

Настройка Manticore

Теперь, когда Kafka настроен для обработки потока сообщений, нам нужен движок поиска и аналитики для обработки данных. Настроим Manticore Search с минимальной, но функциональной конфигурацией:

manticore:
  image: manticoresearch/manticore:7.4.6
  container_name: manticore
  networks:
    - app-network

Запуск окружения

Запускаем базовые контейнеры (Kafka и Manticore) с помощью команды:

docker compose up -d

Это запустит сервисы Kafka и Manticore, но пока не запустит Kafkacat (так как он использует профиль manual). После запуска сервисов создаём топик в Kafka. Количество партиций (в нашем случае 4) задаём для параллельного чтения данных несколькими потребителями, что повышает производительность:

docker compose exec kafka kafka-topics.sh \
  --create \
  --topic wikimedia \
  --partitions 4 \
  --bootstrap-server localhost:9092

Подготовка источников данных

Теперь, когда инфраструктура запущена и топик готов принимать сообщения, настроим поток данных, который будет отправлять контент в Kafka в реальном времени. Для отправки данных в Kafka используем поток Wikimedia Stream. Настроим Kafkacat с профилем manual, чтобы запустить его вручную после настройки топика и Manticore:

kafkacat:
  profiles:
    - manual
  image: edenhill/kcat:1.7.1
  container_name: kcat
  tty: true
  entrypoint:
    - '/bin/sh'
    - '-c'
    - "apk update && apk add -f curl && curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, \"\"); print}' | kcat -P -b kafka:9092 -t wikimedia"
  networks:
    - app-network

После настройки сервиса Kafkacat с профилем manual, вы можете запустить его для начала передачи данных в Kafka:

docker compose --profile manual up -d

Пример получаемых данных

Как только поток Wikimedia начнёт поступать в Kafka, вы начнёте получать сообщения в формате JSON. Давайте рассмотрим типичное сообщение, чтобы понять структуру данных, с которой мы будем работать:

{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
    "request_id": "66d1686b-500e-438c-8985-8c7a28295ae8",
    "id": "345ce42e-3cac-46b7-901e-2c3161f53436",
    "dt": "2024-12-10T16:30:32Z",
    "domain": "es.wikipedia.org",
    "stream": "mediawiki.recentchange",
    "topic": "codfw.mediawiki.recentchange",
    "partition": 0,
    "offset": 1313008266
  },
  "id": 323817817,
  "type": "edit",
  "namespace": 2,
  "title": "Usuario:Davicilio/Taller",
  "title_url": "https://es.wikipedia.org/wiki/Usuario:Davicilio/Taller",
  "comment": "/* Uniforme titular */",
  "timestamp": 1733848232,
  "user": "Davicilio",
  "bot": false,
  "notify_url": "https://es.wikipedia.org/w/index.php?diff=164049521&oldid=164010074",
  "minor": false,
  "length": {
    "old": 29666,
    "new": 29691
  },
  "revision": {
    "old
::contentReference[oaicite:0]{index=0}

Работа с данными в Manticore

Теперь, когда Kafka получает данные из потока Wikimedia, настроим Manticore Search для обработки и индексации этих данных для поиска и анализа.

Создание источника данных

Создаём источник (SOURCE), который будет читать данные из Kafka. Указываем только те поля, которые нас интересуют — остальные будут проигнорированы. Если поле есть в схеме, но отсутствует в сообщении, оно получит значение NULL или останется пустым в зависимости от типа данных:

docker compose exec manticore mysql -e "
CREATE SOURCE wiki_source (
  id bigint,
  schema '\$schema' text,
  meta json,
  type text,
  namespace int,
  title text,
  title_url text,
  comment text,
  \`timestamp\` timestamp,
  user text,
  bot bool,
  minor bool,
  length json,
  server_url text,
  server_name text,
  wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"

Пояснения:

  • CREATE SOURCE - команда создания источника данных (source)

  • (id bigint, schema '$schema' text, …) - перечисление полей входящего сообщения в соответствии с типами данных, поддерживаемыми Manticore ( список типов данных ).

    • Поле $schema — в Manticore нельзя использовать специальные символы в именах полей, поэтому применяется первичный маппинг:

      new_name 'original_name' type
      
      • new_name — совместимое с Manticore имя поля.

      • original_name — исходный ключ входящего JSON, который может включать специальные символы. Если требуется использовать апостроф в имени, его следует экранировать \'.

  • type = kafka - указывает, что источником данных является Kafka.

  • broker_list='kafka:9092' — список брокеров сообщений, разделённых запятой.

  • topic_list='wikimedia' — список топиков для чтения, также разделённых запятыми.

  • consumer_group='ms_wikimedia' — группа потребителей (consumer group).

  • num_consumers='1' — количество процессов, обрабатывающих сообщения. Обычно равно количеству партиций топика или кратно ему.

  • batch=200 — размер пакета сообщений для обработки, который влияет на скорость обработки и настраивается индивидуально.

Создание таблицы результатов

Мы создали источник данных для чтения из Kafka, но нам всё ещё нужно место назначения для этих данных. Создадим таблицу, в которую будут попадать обработанные сообщения:

Одним из ключевых полей является ID сообщения. В процессе передачи данных может произойти что угодно: сбой сети, падение брокера Kafka или самого Manticore. В таких случаях возможно получение дублирующихся сообщений. Чтобы избежать дубликатов, используется уникальный ID: если запись уже существует в таблице, она просто пропускается.

Помимо ID, в таблице будут присутствовать следующие поля: typetitletitle_urlcommenttimestampuserbotminorlengthserver_urlserver_namewiki и meta.

docker compose exec manticore mysql -e "create table wiki_results (
  id bigint, 
  \`schema\` text, 
  metadata json, 
  type text, 
  namespace int, 
  title text, 
  title_url text, 
  comment text, 
  \`timestamp\` timestamp, 
  user string, 
  bot bool, 
  minor bool, 
  length_old int, 
  length_new int, 
  length_diff int, 
  server_url text, 
  server_name text, 
  wiki text, 
  received_at timestamp
)"

Поле length мы разделили на length_old и length_new для демонстрации возможностей маппинга.

Создание материализованного представления

Теперь у нас есть источник (Kafka) и место назначения (таблица). Свяжем их и определим, как данные будут перетекать из одного в другое. Материализованное представление связывает источник и таблицу, выполняя преобразования данных в реальном времени — это аналог ETL-процесса, встроенный в Manticore. Вот как это выглядит:

JSON-ключ

Источник / Функция

Назначение

id

id

id

$schema

schema

schema

meta

meta

metadata

type

type

type

namespace

namespace

namespace

title

title

title

title_url

title_url

title_url

comment

comment

comment

timestamp

timestamp

timestamp

user

user

user

bot

bot

bot

minor

minor

minor

length.old

length.old

length_old

length.new

length.new

length_new

-

integer(length.old)-integer(new)

length_diff

server_url

server_url

server_url

server_name

server_name

server_name

wiki

wiki

wiki

-

UTC_TIMESTAMP()

received_at

Команда для создания:

docker compose exec manticore mysql -e "
CREATE MATERIALIZED VIEW wiki_mva
TO wiki_results AS
SELECT
  id,
  \`schema\`,
  meta AS metadata,
  type,
  namespace,
  title,
  title_url,
  comment,
  \`timestamp\`,
  user,
  bot,
  minor,
  length.old as length_old,
  length.new as length_new,
  integer(length.old) - integer(length.new) as length_diff,
  server_url,
  server_name,
  wiki,
  UTC_TIMESTAMP() as received_at
FROM wiki_source"

По сути, это обычный запрос SELECT, знакомый тем, кто работает с MySQL или подобными базами данных:

  • Поля, названия которых совпадают в источнике (SOURCE) и итоговой таблице, оставляем как есть (idschematype и т.д.).

  • Поля, которые нужно преобразовать (например, meta в metadata), указываем через оператор AS в формате исходное_имя AS новое_имя.

  • Для зарезервированных слов вроде schema и timestamp используем обратные кавычки (`).

  • Дочерние значения JSON выбираем с помощью точки и AS (например, length.new as length_new).

  • Manticore позволяет применять широкий набор функций для обработки данных — от вычислений до форматирования.

  • При необходимости можно добавить фильтрацию и группировку. Мы этого не сделали, чтобы не усложнять пример, но, например, после FROM wiki_source можно вставить WHERE MATCH(@title, 'pizza').

Полная конфигурация Docker Compose

Теперь, когда мы разобрались со всеми компонентами и их взаимодействием, давайте подведём итог, рассмотрев полный файл docker-compose.yml. Этот единый файл определяет всё наше окружение с тремя сервисами (Kafka, Manticore и Kafkacat) и конфигурацией сети.

Вы можете скопировать следующее содержимое или скачать готовый файл docker-compose.yml напрямую из нашего репозитория на GitHub:

Запуск

Теперь, когда окружение настроено, давайте проверим, как данные проходят через систему. После сохранения или скачивания файла docker-compose.yml в вашу рабочую директорию и запуска сервисов, как описано выше, вы можете отслеживать поступление данных, выполняя SQL-запросы к Manticore:

docker compose exec manticore mysql -e "SELECT count(*) FROM wiki_results"
+----------+
| count(*) |
+----------+
|     1200 |
+----------+

Подождите несколько секунд и выполните запрос снова — вы увидите обновлённое значение:

+----------+
| count(*) |
+----------+
|     1400 |
+----------+

Пример простого запроса для просмотра данных:

docker compose exec manticore mysql -e "SELECT title, user, timestamp FROM wiki_results LIMIT 5"

Сложный запрос с группировкой:

+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
| namespace | count | avg_length_change   | latest_edit | sample_title                                                        |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+
|        14 |   998 |  116196508.99599199 |  1740684056 | Category:Wiki For Minorities in the Middle East 2025                |
|         0 |   634 | 3075575718.85488939 |  1740684057 | Oklahoma Sooners men's basketball                                   |
|         6 |   313 |   2758109067.434505 |  1740684056 | File:Kluse - Phoenix dactylifera 03 ies.jpg                         |
|         2 |    40 |   1825360728.625000 |  1740684053 | User:SD2125!                                                        |
|         4 |    21 | 3272355882.52380943 |  1740684051 | Commons:Wiki For Minorities in the Middle East                      |
|         3 |    16 |   3489659770.625000 |  1740684054 | Brugerdiskussion:Silas Nicolaisen                                   |
|         1 |    13 |   3634202801.230769 |  1740684045 | Diskussion:Nordische Skiweltmeisterschaften 2025                    |
|      1198 |    10 |   1288490146.500000 |  1740684053 | Translations:Commons:Wiki Loves Folklore 2025/Page display title/id |
|        10 |     8 |   3221223681.500000 |  1740684055 | Predefinição:Mana (série)                                           |
+-----------+-------+---------------------+-------------+---------------------------------------------------------------------+

Изменение схемы источника данных

Если вам нужно изменить схему источника данных (например, добавить новые поля, удалить ненужные или изменить типы данных), это можно сделать следующим образом:

1) Приостановка материализованного представленияСначала нужно приостановить материализованное представление, чтобы остановить поток данных из Kafka в таблицу wiki_results:

docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=1"

2) Удаление существующего источника

Удалите текущий источник данных:

docker compose exec manticore mysql -e "DROP SOURCE wiki_source"

3) Создание нового источника с обновленной схемойСоздаём новый источник с изменённой схемой. Например, если вы хотите добавить поле domain из JSON-объекта meta, поле parsedcomment и изменить тип данных для namespace на bigint, команда будет выглядеть так:

docker compose exec manticore mysql -e "CREATE SOURCE wiki_source (
  id bigint,
  schema '$schema' text,
  meta json,
  parsedcomment text,
  type text,
  namespace bigint,
  title text,
  title_url text,
  comment text,
  \`timestamp\` timestamp,
  user text,
  bot bool,
  minor bool,
  length json,
  server_url text,
  server_name text,
  wiki text
)
type='kafka'
broker_list='kafka:9092'
topic_list='wikimedia'
consumer_group='ms_wikimedia'
num_consumers='1'
batch=200"

4) Обновление таблицы (добавляем domain и parsedcomment):

docker compose exec manticore mysql -e "ALTER TABLE wiki_results ADD COLUMN domain text;
ALTER TABLE wiki_results ADD COLUMN parsedcomment text"

5) Удаление материализованного представления:

6) Обновление представления:

docker compose exec manticore mysql -e "CREATE MATERIALIZED VIEW wiki_mva
TO wiki_results AS
SELECT
  id,
  \`schema\`,
  meta AS metadata,
  meta.domain as domain,
  parsedcomment,
  type,
  namespace,
  title,
  title_url,
  comment,
  \`timestamp\`,
  user,
  bot,
  minor,
  length.old as length_old,
  length.new as length_new,
  integer(length.old) - integer(length.new) as length_diff,
  server_url,
  server_name,
  wiki,
  UTC_TIMESTAMP() as received_at
FROM wiki_source"

Если вы только пересоздали SOURCE и не меняли MV, вам требуется запустить чтение данных заново:

docker compose exec manticore mysql -e "ALTER MATERIALIZED VIEW wiki_mva suspended=0"

В противном случае материализованное представление уже будет возобновлено.

Таким образом, процесс изменения схемы полностью контролируем и позволяет гибко адаптировать систему под новые требования.

Заключение

Интеграция Kafka с Manticore Search предлагает мощное решение для обработки и анализа данных в реальном времени. Следуя этому руководству, вы настроили надёжное окружение с помощью Docker Compose, настроили Kafka для обработки потока сообщений и использовали Manticore Search для индексации и выполнения запросов к данным. Эта интеграция не только расширяет функциональность ваших приложений, но и упрощает управление данными и их анализ.

Независимо от того, работаете ли вы над анализом логов, индексацией контента или любым другим приложением, зависящим от данных, эта настройка предоставляет масштабируемую и эффективную основу. Гибкость Manticore Search позволяет адаптировать конвейер обработки данных под ваши конкретные потребности, гарантируя быструю адаптацию к изменяющимся требованиям.

Мы рекомендуем вам поэкспериментировать с этой настройкой, изучить дополнительные возможности Manticore Search и адаптировать пример под ваши уникальные задачи. Полный код доступен на GitHub , а сообщество Manticore всегда готово помочь с любыми вопросами или трудностями, с которыми вы можете столкнуться. Погрузитесь в работу и раскройте весь потенциал обработки данных в реальном времени с Kafka и Manticore Search!

Теги:
Хабы:
+3
Комментарии0

Публикации

Работа

QT разработчик
5 вакансий
Программист C++
96 вакансий

Ближайшие события