
Создание простого проекта потоковой обработки событий – не самая простая задача. В сегодняшней статье мы расскажем, как можно достаточно просто сделать это при помощи Kafka, BigQuery & Looker Studio.
Приложения для потоковой обработки в реальном времени иногда могут быть сложными. Пытаясь узнать о них, важно выбрать практический вариант использования, чтобы обеспечить увлекательный и эффективный процесс обучения. Надеемся, что с помощью приведенного ниже примера вы сможете легко понять основы создания приложения в реальном времени.
Сценарий
Давайте представим, что мы работаем в data engineering отделе компании, предоставляющей услуги потоковой передачи музыки. Нам нужно создать панель мониторинга в реальном времени, которая показывает самые популярные песни конкретного исполнителя (скажем, Тони Аллена) в каждый конкретный момент времени. Для этого мы будем использовать популярную платформу распределенной потоковой передачи Kafka для создания, потребления и трансляции необходимых событий песен в BigQuery, чтобы мы могли визуализировать популярные песни на панели в Looker Studio.
В итоге наша архитектура будет выглядеть примерно так:
Термины и концепции
Давайте быстро определим некоторые термины, которые мы рассмотрим в этой статье.
Kafka: Apache Kafka — это распределенная потоковая платформа с открытым исходным кодом, которая позволяет (среди прочего) разрабатывать приложения реального времени, управляемые событиями, что идеально подходит для нашего варианта использования.
Кластер Kafka: набор серверов (называемых брокерами), работающих вместе для обеспечения высокой доступности, отказоустойчивости и хранилища для приложений реального времени.
Брокер: Как указано выше, Брокер — это машина, которая выполняет фактическую работу в кластере Kafka. Он хранит некоторый набор партиций, обрабатывает входящие запросы на запись новых событий в эти партиции и позволяет потребителям извлекать сообщения по топику, партиции и смещению.
Топик: Топик — это просто журнал событий. Каждое новое событие от producer’а добавляется в конец топика. И топики разделены на партиции.
Producer: : приложение, которое вы пишете, которое публикует (производит) данные для темы в кластере Kafka.
Consumer: приложение или конечный пользователь, который извлекает данные из кластеров Kafka в режиме реального времени. Для эффективной выборки сообщений в режиме реального времени пользователи Kafka должны подписаться на соответствующие топики, присутствующие в кластере.
Zookeeper: Отслеживает состояние узлов кластера Kafka, а также топики, партиции Kafka и многое другое. (Примечание: Обновление под названием KIP-500 устранило необходимость в Zookeeper, но мы не будем использовать эту версию Kafka в этой статье).
Poll: Метод poll() — это функция, которую пользователь Kafka вызывает для извлечения записей из заданного топика.
Мы настроим описанную выше архитектуру в 4 шага. Но прежде чем мы начнем, убедитесь, что у вас есть эти предварительные условия:
Прежде, чем начать:
Убедитесь, что у вас установлен Docker.
Установите библиотеку
confluent-kafkaPython на свой компьютер.Включите BigQuery API.
Создайте ключ учетной записи службы в Google Cloud с необходимыми разрешениями, важными для работы потокового API. Сохраните ключ где-нибудь на вашем компьютере, поскольку мы будем ссылаться на него позже.
#1. Разверните Kafka с помощью Docker
Kafka может быть развернута многими способами, но мы развернем ее с помощью Docker, поскольку это довольно просто.
Наш кластер Kafka будет состоять из двух основных объектов;
1 Экземпляр Брокер и
1 Экземпляр Zookeeper.
Мы будем использовать один файл Docker compose для настройки и запуска этих контейнеров. Вы заметите 2 службы и требуемые порты, представленные в docker-compose.yaml ниже:
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:7.0.1 container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.0.1 container_name: broker depends_on: - zookeeper ports: - "29092:29092" - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost
Убедитесь, что файл docker находится в том же каталоге, что и файлы Kafka producer и consumer, которые мы вскоре напишем.
Чтобы создать оба контейнера Docker, запустите эту команду, и оба контейнера будут доступ через несколького минут.
docker-compose up -d
#2. Соберите Producer
Мы напишем приложение / Producer, которое имитирует активность пользователя на платформе потоковой передачи музыки. Это приложение отправит событие под названием song-completed, которое запускается, когда пользователь завершает песню. Это событие будет отправлено в топик Кафки, который мы назовем tony-allen-plays.
Мы будем использовать пакет Faker для создания поддельных потоковых данных для нашего приложения. Наша поддельная полезная нагрузка события будет выглядеть примерно так:
{'user_id':001, 'artist': 'tony-allen', 'song_id': 03, 'song_name': 'lady', 'event_type':'song_completed', 'timestamp': '2022-11-03 07:22:13'}
Чтобы установить пакет Faker, запустите это в окне терминала:
pip install Faker
Создайте поддельный список песен
Теперь в нашем коде мы инициируем объект Faker и создадим заранее закодированный список песен из 10 случайных песен Тони Аллена, которые будут частью полезной нагрузки события.
from confluent_kafka import Producer from faker import Faker import json import time import logging #Create Faker object to generate fake data for Producer fake=Faker() #Create Tony Allen song list songs = ["zombie", "lady", "secret-agent","kindness","soldiers","asiko","the-same-blood","upside-down","african-man","vip"]
Настройка формата протоколирования
Каждый раз, когда новое событие становится доступным, логи будут добавляться в файл producer.log — который мы определяем ниже, и который находится в вашем главном каталоге. Здесь мы устанавливаем основные конфигурации для того, как мы хотим, чтобы этот файл журнала (лога) был отформатирован.
#Configure logger logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='producer.log', filemode='w') logger = logging.getLogger() logger.setLevel(logging.INFO)
Запустите producer
Инициализируйте объект Kafka producer, указав сетевой порт вашего кластера Kafka, как определено в файле Docker compose выше:
#Create Kafka Producer p=Producer({'bootstrap.servers':'localhost:9092'})
Настройка обратного вызова
Определите функцию обратного вызова, которая будет вызываться при подтверждении фиксации новых сообщений или в случае ошибок. Когда корректное сообщение становится доступным, оно декодируется в utf-8 и печатается в предпочтительном формате. Такое же сообщение также добавляется к файлу журналов.
#Callback function def receipt(err,msg): if err is not None: print('Failed to deliver message: {}'.format(err)) else: message = 'Produced message on topic {} with value of {}\n'.format(msg.topic(), msg.value().decode('utf-8')) logger.info(message) print(message)
Напишите Producer Loop
Это самая забавная часть! Здесь мы просто создаем цикл с 3-секундной задержкой, который имитирует активность реального пользователя на потоковой платформе. Мы создаем схему для нашего события JSON и используем Faker для генерации фактических наборов данных.
#Write Producer loop def main(): for i in range(20): random_song_id = fake.random_int(min=0, max=9) data={ 'user_id': fake.random_int(min=20000, max=100000), 'artist': 'tony-allen', 'song_id': random_song_id, 'song_name': songs[random_song_id], 'event_type':'song_completed', 'timestamp': str(fake.date_time_this_month()) } m=json.dumps(data) p.produce('tony-allen-plays', m.encode('utf-8'),callback=receipt) p.poll(1) # Polls/checks the producer for events and calls the corresponding callback functions. p.flush() #Wait for all messages in the Producer queue to be delivered. Should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered. time.sleep(3)
Обратите внимание, что когда мы вызываем p.produce, мы указываем топик Kafka, в который мы хотели бы опубликовать сообщение. В данном случае он называется tony-allen-plays. Поскольку этот топик еще не существует в нашем кластере Kafka, он автоматически создается при первом запуске этого приложения.
p.poll важен, так как он производит проверку на наличие событий и вызывает соответствующую функцию обратного вызова, которую мы определили ранее.
Наш полный producer.py сценарий должен выглядеть следующим образом:
from confluent_kafka import Producer from faker import Faker import json import time import logging #Create Faker object to generate fake data for Producer fake=Faker() #Create Tony Allen song list songs = ["zombie", "lady", "secret-agent","kindness","soldiers","asiko","the-same-blood","upside-down","african-man","vip"] #Configure logger logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='producer.log', filemode='w') logger = logging.getLogger() logger.setLevel(logging.INFO) #Create Kafka Producer p=Producer({'bootstrap.servers':'localhost:9092'}) #Callback function def receipt(err,msg): if err is not None: print('Failed to deliver message: {}'.format(err)) else: message = 'Produced message on topic {} with value of {}\n'.format(msg.topic(), msg.value().decode('utf-8')) logger.info(message) print(message) #Write Producer loop that acts like user activity def main(): for i in range(20): random_song_id = fake.random_int(min=0, max=9) data={ 'user_id': fake.random_int(min=20000, max=100000), 'artist': 'tony-allen', 'song_id': random_song_id, 'song_name': songs[random_song_id], 'event_type':'song_completed', 'timestamp': str(fake.date_time_this_month()) } m=json.dumps(data) p.produce('tony-allen-plays', m.encode('utf-8'),callback=receipt) p.poll(1) # Polls/checks the producer for events and calls the corresponding callback functions. p.flush() #Wait for all messages in the Producer queue to be delivered. Should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered. time.sleep(3) if __name__ == '__main__': main()
Чтобы подтвердить, что producer работает должным образом, выполните следующую команду в окне терминала:
python producer.py
#3. Создайте консумера (consumer)
Consumer будет делать две основные вещи:
Извлекать события из темы tony-allen-plays
Отправлять эти события в виде потока в BigQuery, используя BigQuery Streaming API
Установите библиотеку BigQuery Python
Для начала установите библиотеку BigQuery Python с помощью следующей команды.
pip install google-cloud-bigquery
Затем мы можем импортировать его в скрипт consumer.py и настроить конфигурации BigQuery.
from confluent_kafka import Consumer from google.cloud import bigquery import ast from google.oauth2 import service_account #Create BQ credentials object credentials = service_account.Credentials.from_service_account_file('PATH-TO-BQ-SERVICE-ACCOUNT') # Construct a BigQuery client object. bq_client = bigquery.Client(credentials=credentials) #Speficy BigQuery table to stream to table_id = 'PROJECT-ID.DATASET.TABLE-NAME'
Инициируйте Consumer
Далее мы инициируем Kafka консюмер, указав порт, а затем подписываемся на топик tony-allen-plays. При инициализации потребителя мы указываем идентификатор группы консюмеров, поскольку все консюмеры Kafka должны принадлежать к какой-то определенной группе консюмеров.
c=Consumer({'bootstrap.servers':'localhost:9092','group.id':'tony-allen-consumer','auto.offset.reset':'earliest'}) print('Kafka Consumer has been initiated...') #Subscribe to topic c.subscribe(['tony-allen-plays'])
Вы также заметите, что тут есть атрибут — auto.offset.reset — который имеет значение ‘самый ранний’. Он заставляет консюмер начинать вычитывать сообщения последовательно с самого начала партиции топика.
Типичное приложение Kafka консюмера сосредоточено вокруг Consumer Loop. Итак, последний шаг – написать цикл, который последовательно опрашивает топик на предмет новых сообщений и, если находит, то отправляет эти сообщения в BigQuery.
Тогда наш полный код должен выглядеть следующим образом:
from confluent_kafka import Consumer from google.cloud import bigquery import ast from google.oauth2 import service_account #Create BQ credentials object credentials = service_account.Credentials.from_service_account_file('credentials/bq-service-account.json') # Construct a BigQuery client object. bq_client = bigquery.Client(credentials=credentials) #Speficy BigQuery table to stream to table_id = 'PROJECT-ID.DATASET.TABLE-NAME' ################ Kafka Consumer ################# c=Consumer({'bootstrap.servers':'localhost:9092','group.id':'tony-allen-consumer','auto.offset.reset':'earliest'}) print('Kafka Consumer has been initiated...') #Subscribe to topic c.subscribe(['tony-allen-plays']) def main(): try: while True: msg=c.poll(timeout=1.0) #Retrieve records one-by-one that have been efficiently pre-fetched by the consumer behind the scenes if msg is None: continue if msg.error(): print('Error: {}'.format(msg.error())) continue else: data=msg.value().decode('utf-8') res = ast.literal_eval(data) #Convert string response to dictionary print(res) ##### Stream data into BigQuery table ####### rows_to_insert = [res] print((rows_to_insert)) errors = bq_client.insert_rows_json(table_id,rows_to_insert) #Make API request if errors==[]: print("New rows added.") else: print("Encountered erros while inserting rows: {}".format(errors)) finally: c.close() # Close down consumer to commit final offsets. if __name__ == "__main__": main()
Запустите конвейер Kafka
Теперь, когда Consumer и Producer настроены, откройте два отдельных окна терминала и снова запустите producer:
python producer.py
Затем запустите consumer после этого, чтобы он считывал данные из топика в режиме реального времени:
python consumer.py
Если вы видите, что сообщения, сгенерированные производителем, начинают отображаться в окне терминала потребителя, значит, ваш потребитель работает должным образом, и данные также должны передаваться в BigQuery:
#4. Визуализируйте данные
Последним шагом будет подключение таблицы BigQuery к Looker Studio и создание простой гистограммы для визуализации популярных песен практически в режиме реального времени.
Перейдите в Looker Studio, войдите в систему и:
Выберите новый “Пустой отчет”
В разделе “подключение к данным” выберите “BigQuery” в качестве источника данных
Затем выберите свой проект BigQuery, набор данных и таблицу
Теперь вам должен быть представлен аналогичный вид. Убедитесь, что поля измерений и метрик соответствуют приведенному ниже скриншоту, и у вас должна получиться простая столбчатая диаграмма, как показано на рисунке.

Курс по Apache Kafka для продвинутых разработчиков – старт 10 марта: slurm.club/3YkiRI7
Рядом с панелью мониторинга в реальном времени
В Looker Studio есть функция обновления данных, которая определяет, как часто следует обновлять панель мониторинга. Вы можете установить это значение равным 1 минуте, что является наименьшим циклом обновления, доступным в настоящее время в Looker Studio, и ваша панель мониторинга должна обновляться каждую минуту.
Вывод
Мы рассмотрели основы того, как настроить минимальный кластер Kafka с помощью Docker, загрузить данные в топик, а затем использовать и передавать эти данные в BigQuery. Наконец, мы создали панель мониторинга почти в режиме реального времени, чтобы представить окончательные результаты в Looker Studio.
Apache Kafka для разработчиков
Новый поток Apache Kafka для разработчиков с практикой на Java или Golang и платформой Spring+Docker+Postgres стартует 10 марта 2023 года. Он будет полезен разработчикам, архитекторам и системным администраторам, которые желающих расширить свои знания и отработать их на практике.
На курсе разберем:
Неправильное использование Кафка;
Отсутствие коммитов в Кафка;
Исчезновение сообщений;
Ваши кейсы о проблемах при работе с Apache Kafka;
Интересные кейсы от профессионалов.
Узнать подробности и записаться: slurm.club/3YkiRI7
