Создайте конвейер потоковой обработки событий в реальном времени с помощью Kafka, BigQuery & Looker Studio
Создание простого проекта потоковой обработки событий – не самая простая задача. В сегодняшней статье мы расскажем, как можно достаточно просто сделать это при помощи Kafka, BigQuery & Looker Studio.
Приложения для потоковой обработки в реальном времени иногда могут быть сложными. Пытаясь узнать о них, важно выбрать практический вариант использования, чтобы обеспечить увлекательный и эффективный процесс обучения. Надеемся, что с помощью приведенного ниже примера вы сможете легко понять основы создания приложения в реальном времени.
Сценарий
Давайте представим, что мы работаем в data engineering отделе компании, предоставляющей услуги потоковой передачи музыки. Нам нужно создать панель мониторинга в реальном времени, которая показывает самые популярные песни конкретного исполнителя (скажем, Тони Аллена) в каждый конкретный момент времени. Для этого мы будем использовать популярную платформу распределенной потоковой передачи Kafka для создания, потребления и трансляции необходимых событий песен в BigQuery, чтобы мы могли визуализировать популярные песни на панели в Looker Studio.
В итоге наша архитектура будет выглядеть примерно так:
Термины и концепции
Давайте быстро определим некоторые термины, которые мы рассмотрим в этой статье.
Kafka: Apache Kafka — это распределенная потоковая платформа с открытым исходным кодом, которая позволяет (среди прочего) разрабатывать приложения реального времени, управляемые событиями, что идеально подходит для нашего варианта использования.
Кластер Kafka: набор серверов (называемых брокерами), работающих вместе для обеспечения высокой доступности, отказоустойчивости и хранилища для приложений реального времени.
Брокер: Как указано выше, Брокер — это машина, которая выполняет фактическую работу в кластере Kafka. Он хранит некоторый набор партиций, обрабатывает входящие запросы на запись новых событий в эти партиции и позволяет потребителям извлекать сообщения по топику, партиции и смещению.
Топик: Топик — это просто журнал событий. Каждое новое событие от producer’а добавляется в конец топика. И топики разделены на партиции.
Producer: : приложение, которое вы пишете, которое публикует (производит) данные для темы в кластере Kafka.
Consumer: приложение или конечный пользователь, который извлекает данные из кластеров Kafka в режиме реального времени. Для эффективной выборки сообщений в режиме реального времени пользователи Kafka должны подписаться на соответствующие топики, присутствующие в кластере.
Zookeeper: Отслеживает состояние узлов кластера Kafka, а также топики, партиции Kafka и многое другое. (Примечание: Обновление под названием KIP-500 устранило необходимость в Zookeeper, но мы не будем использовать эту версию Kafka в этой статье).
Poll: Метод poll()
— это функция, которую пользователь Kafka вызывает для извлечения записей из заданного топика.
Мы настроим описанную выше архитектуру в 4 шага. Но прежде чем мы начнем, убедитесь, что у вас есть эти предварительные условия:
Прежде, чем начать:
Убедитесь, что у вас установлен Docker.
Установите библиотеку
confluent-kafka
Python на свой компьютер.Включите BigQuery API.
Создайте ключ учетной записи службы в Google Cloud с необходимыми разрешениями, важными для работы потокового API. Сохраните ключ где-нибудь на вашем компьютере, поскольку мы будем ссылаться на него позже.
#1. Разверните Kafka с помощью Docker
Kafka может быть развернута многими способами, но мы развернем ее с помощью Docker, поскольку это довольно просто.
Наш кластер Kafka будет состоять из двух основных объектов;
1 Экземпляр Брокер и
1 Экземпляр Zookeeper.
Мы будем использовать один файл Docker compose для настройки и запуска этих контейнеров. Вы заметите 2 службы и требуемые порты, представленные в docker-compose.yaml
ниже:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.1
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
Убедитесь, что файл docker находится в том же каталоге, что и файлы Kafka producer и consumer, которые мы вскоре напишем.
Чтобы создать оба контейнера Docker, запустите эту команду, и оба контейнера будут доступ через несколького минут.
docker-compose up -d
#2. Соберите Producer
Мы напишем приложение / Producer, которое имитирует активность пользователя на платформе потоковой передачи музыки. Это приложение отправит событие под названием song-completed
, которое запускается, когда пользователь завершает песню. Это событие будет отправлено в топик Кафки, который мы назовем tony-allen-plays
.
Мы будем использовать пакет Faker для создания поддельных потоковых данных для нашего приложения. Наша поддельная полезная нагрузка события будет выглядеть примерно так:
{'user_id':001,
'artist': 'tony-allen',
'song_id': 03,
'song_name': 'lady',
'event_type':'song_completed',
'timestamp': '2022-11-03 07:22:13'}
Чтобы установить пакет Faker, запустите это в окне терминала:
pip install Faker
Создайте поддельный список песен
Теперь в нашем коде мы инициируем объект Faker и создадим заранее закодированный список песен из 10 случайных песен Тони Аллена, которые будут частью полезной нагрузки события.
from confluent_kafka import Producer
from faker import Faker
import json
import time
import logging
#Create Faker object to generate fake data for Producer
fake=Faker()
#Create Tony Allen song list
songs = ["zombie", "lady", "secret-agent","kindness","soldiers","asiko","the-same-blood","upside-down","african-man","vip"]
Настройка формата протоколирования
Каждый раз, когда новое событие становится доступным, логи будут добавляться в файл producer.log
— который мы определяем ниже, и который находится в вашем главном каталоге. Здесь мы устанавливаем основные конфигурации для того, как мы хотим, чтобы этот файл журнала (лога) был отформатирован.
#Configure logger
logging.basicConfig(format='%(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='producer.log',
filemode='w')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
Запустите producer
Инициализируйте объект Kafka producer, указав сетевой порт вашего кластера Kafka, как определено в файле Docker compose выше:
#Create Kafka Producer
p=Producer({'bootstrap.servers':'localhost:9092'})
Настройка обратного вызова
Определите функцию обратного вызова, которая будет вызываться при подтверждении фиксации новых сообщений или в случае ошибок. Когда корректное сообщение становится доступным, оно декодируется в utf-8 и печатается в предпочтительном формате. Такое же сообщение также добавляется к файлу журналов.
#Callback function
def receipt(err,msg):
if err is not None:
print('Failed to deliver message: {}'.format(err))
else:
message = 'Produced message on topic {} with value of {}\n'.format(msg.topic(), msg.value().decode('utf-8'))
logger.info(message)
print(message)
Напишите Producer Loop
Это самая забавная часть! Здесь мы просто создаем цикл с 3-секундной задержкой, который имитирует активность реального пользователя на потоковой платформе. Мы создаем схему для нашего события JSON и используем Faker для генерации фактических наборов данных.
#Write Producer loop
def main():
for i in range(20):
random_song_id = fake.random_int(min=0, max=9)
data={
'user_id': fake.random_int(min=20000, max=100000),
'artist': 'tony-allen',
'song_id': random_song_id,
'song_name': songs[random_song_id],
'event_type':'song_completed',
'timestamp': str(fake.date_time_this_month())
}
m=json.dumps(data)
p.produce('tony-allen-plays', m.encode('utf-8'),callback=receipt)
p.poll(1) # Polls/checks the producer for events and calls the corresponding callback functions.
p.flush() #Wait for all messages in the Producer queue to be delivered. Should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered.
time.sleep(3)
Обратите внимание, что когда мы вызываем p.produce
, мы указываем топик Kafka, в который мы хотели бы опубликовать сообщение. В данном случае он называется tony-allen-plays.
Поскольку этот топик еще не существует в нашем кластере Kafka, он автоматически создается при первом запуске этого приложения.
p.poll
важен, так как он производит проверку на наличие событий и вызывает соответствующую функцию обратного вызова, которую мы определили ранее.
Наш полный producer.py
сценарий должен выглядеть следующим образом:
from confluent_kafka import Producer
from faker import Faker
import json
import time
import logging
#Create Faker object to generate fake data for Producer
fake=Faker()
#Create Tony Allen song list
songs = ["zombie", "lady", "secret-agent","kindness","soldiers","asiko","the-same-blood","upside-down","african-man","vip"]
#Configure logger
logging.basicConfig(format='%(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='producer.log',
filemode='w')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
#Create Kafka Producer
p=Producer({'bootstrap.servers':'localhost:9092'})
#Callback function
def receipt(err,msg):
if err is not None:
print('Failed to deliver message: {}'.format(err))
else:
message = 'Produced message on topic {} with value of {}\n'.format(msg.topic(), msg.value().decode('utf-8'))
logger.info(message)
print(message)
#Write Producer loop that acts like user activity
def main():
for i in range(20):
random_song_id = fake.random_int(min=0, max=9)
data={
'user_id': fake.random_int(min=20000, max=100000),
'artist': 'tony-allen',
'song_id': random_song_id,
'song_name': songs[random_song_id],
'event_type':'song_completed',
'timestamp': str(fake.date_time_this_month())
}
m=json.dumps(data)
p.produce('tony-allen-plays', m.encode('utf-8'),callback=receipt)
p.poll(1) # Polls/checks the producer for events and calls the corresponding callback functions.
p.flush() #Wait for all messages in the Producer queue to be delivered. Should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered.
time.sleep(3)
if __name__ == '__main__':
main()
Чтобы подтвердить, что producer работает должным образом, выполните следующую команду в окне терминала:
python producer.py
#3. Создайте консумера (consumer)
Consumer будет делать две основные вещи:
Извлекать события из темы tony-allen-plays
Отправлять эти события в виде потока в BigQuery, используя BigQuery Streaming API
Установите библиотеку BigQuery Python
Для начала установите библиотеку BigQuery Python с помощью следующей команды.
pip install google-cloud-bigquery
Затем мы можем импортировать его в скрипт consumer.py
и настроить конфигурации BigQuery.
from confluent_kafka import Consumer
from google.cloud import bigquery
import ast
from google.oauth2 import service_account
#Create BQ credentials object
credentials = service_account.Credentials.from_service_account_file('PATH-TO-BQ-SERVICE-ACCOUNT')
# Construct a BigQuery client object.
bq_client = bigquery.Client(credentials=credentials)
#Speficy BigQuery table to stream to
table_id = 'PROJECT-ID.DATASET.TABLE-NAME'
Инициируйте Consumer
Далее мы инициируем Kafka консюмер, указав порт, а затем подписываемся на топик tony-allen-plays.
При инициализации потребителя мы указываем идентификатор группы консюмеров, поскольку все консюмеры Kafka должны принадлежать к какой-то определенной группе консюмеров.
c=Consumer({'bootstrap.servers':'localhost:9092','group.id':'tony-allen-consumer','auto.offset.reset':'earliest'})
print('Kafka Consumer has been initiated...')
#Subscribe to topic
c.subscribe(['tony-allen-plays'])
Вы также заметите, что тут есть атрибут — auto.offset.reset
— который имеет значение ‘самый ранний’. Он заставляет консюмер начинать вычитывать сообщения последовательно с самого начала партиции топика.
Типичное приложение Kafka консюмера сосредоточено вокруг Consumer Loop. Итак, последний шаг – написать цикл, который последовательно опрашивает топик на предмет новых сообщений и, если находит, то отправляет эти сообщения в BigQuery.
Тогда наш полный код должен выглядеть следующим образом:
from confluent_kafka import Consumer
from google.cloud import bigquery
import ast
from google.oauth2 import service_account
#Create BQ credentials object
credentials = service_account.Credentials.from_service_account_file('credentials/bq-service-account.json')
# Construct a BigQuery client object.
bq_client = bigquery.Client(credentials=credentials)
#Speficy BigQuery table to stream to
table_id = 'PROJECT-ID.DATASET.TABLE-NAME'
################ Kafka Consumer #################
c=Consumer({'bootstrap.servers':'localhost:9092','group.id':'tony-allen-consumer','auto.offset.reset':'earliest'})
print('Kafka Consumer has been initiated...')
#Subscribe to topic
c.subscribe(['tony-allen-plays'])
def main():
try:
while True:
msg=c.poll(timeout=1.0) #Retrieve records one-by-one that have been efficiently pre-fetched by the consumer behind the scenes
if msg is None:
continue
if msg.error():
print('Error: {}'.format(msg.error()))
continue
else:
data=msg.value().decode('utf-8')
res = ast.literal_eval(data) #Convert string response to dictionary
print(res)
##### Stream data into BigQuery table #######
rows_to_insert = [res]
print((rows_to_insert))
errors = bq_client.insert_rows_json(table_id,rows_to_insert) #Make API request
if errors==[]:
print("New rows added.")
else:
print("Encountered erros while inserting rows: {}".format(errors))
finally:
c.close() # Close down consumer to commit final offsets.
if __name__ == "__main__":
main()
Запустите конвейер Kafka
Теперь, когда Consumer и Producer настроены, откройте два отдельных окна терминала и снова запустите producer:
python producer.py
Затем запустите consumer после этого, чтобы он считывал данные из топика в режиме реального времени:
python consumer.py
Если вы видите, что сообщения, сгенерированные производителем, начинают отображаться в окне терминала потребителя, значит, ваш потребитель работает должным образом, и данные также должны передаваться в BigQuery:
#4. Визуализируйте данные
Последним шагом будет подключение таблицы BigQuery к Looker Studio и создание простой гистограммы для визуализации популярных песен практически в режиме реального времени.
Перейдите в Looker Studio, войдите в систему и:
Выберите новый “Пустой отчет”
В разделе “подключение к данным” выберите “BigQuery” в качестве источника данных
Затем выберите свой проект BigQuery, набор данных и таблицу
Теперь вам должен быть представлен аналогичный вид. Убедитесь, что поля измерений и метрик соответствуют приведенному ниже скриншоту, и у вас должна получиться простая столбчатая диаграмма, как показано на рисунке.
Курс по Apache Kafka для продвинутых разработчиков – старт 10 марта: slurm.club/3YkiRI7
Рядом с панелью мониторинга в реальном времени
В Looker Studio есть функция обновления данных, которая определяет, как часто следует обновлять панель мониторинга. Вы можете установить это значение равным 1 минуте, что является наименьшим циклом обновления, доступным в настоящее время в Looker Studio, и ваша панель мониторинга должна обновляться каждую минуту.
Вывод
Мы рассмотрели основы того, как настроить минимальный кластер Kafka с помощью Docker, загрузить данные в топик, а затем использовать и передавать эти данные в BigQuery. Наконец, мы создали панель мониторинга почти в режиме реального времени, чтобы представить окончательные результаты в Looker Studio.
Apache Kafka для разработчиков
Новый поток Apache Kafka для разработчиков с практикой на Java или Golang и платформой Spring+Docker+Postgres стартует 10 марта 2023 года. Он будет полезен разработчикам, архитекторам и системным администраторам, которые желающих расширить свои знания и отработать их на практике.
На курсе разберем:
Неправильное использование Кафка;
Отсутствие коммитов в Кафка;
Исчезновение сообщений;
Ваши кейсы о проблемах при работе с Apache Kafka;
Интересные кейсы от профессионалов.
Узнать подробности и записаться: slurm.club/3YkiRI7