Создание простого проекта потоковой обработки событий – не самая простая задача. В сегодняшней статье мы расскажем, как можно достаточно просто сделать это при помощи Kafka, BigQuery & Looker Studio.

Приложения для потоковой обработки в реальном времени иногда могут быть сложными. Пытаясь узнать о них, важно выбрать практический вариант использования, чтобы обеспечить увлекательный и эффективный процесс обучения. Надеемся, что с помощью приведенного ниже примера вы сможете легко понять основы создания приложения в реальном времени.

Сценарий

Давайте представим, что мы работаем в data engineering отделе компании, предоставляющей услуги потоковой передачи музыки. Нам нужно создать панель мониторинга в реальном времени, которая показывает самые популярные песни конкретного исполнителя (скажем, Тони Аллена) в каждый конкретный момент времени. Для этого мы будем использовать популярную платформу распределенной потоковой передачи Kafka для создания, потребления и трансляции необходимых событий песен в BigQuery, чтобы мы могли визуализировать популярные песни на панели в Looker Studio.

В итоге наша архитектура будет выглядеть примерно так:

Термины и концепции

Давайте быстро определим некоторые термины, которые мы рассмотрим в этой статье.

Kafka: Apache Kafka — это распределенная потоковая платформа с открытым исходным кодом, которая позволяет (среди прочего) разрабатывать приложения реального времени, управляемые событиями, что идеально подходит для нашего варианта использования.

Кластер Kafka: набор серверов (называемых брокерами), работающих вместе для обеспечения высокой доступности, отказоустойчивости и хранилища для приложений реального времени.

Брокер: Как указано выше, Брокер — это машина, которая выполняет фактическую работу в кластере Kafka. Он хранит некоторый набор партиций, обрабатывает входящие запросы на запись новых событий в эти партиции и позволяет потребителям извлекать сообщения по топику, партиции и смещению.

Топик: Топик — это просто журнал событий. Каждое новое событие от producer’а добавляется в конец топика. И топики разделены на партиции.

Producer: : приложение, которое вы пишете, которое публикует (производит) данные для темы в кластере Kafka.

Consumer: приложение или конечный пользователь, который извлекает данные из кластеров Kafka в режиме реального времени. Для эффективной выборки сообщений в режиме реального времени пользователи Kafka должны подписаться на соответствующие топики, присутствующие в кластере.

Zookeeper: Отслеживает состояние узлов кластера Kafka, а также топики, партиции Kafka и многое другое. (Примечание: Обновление под названием KIP-500 устранило необходимость в Zookeeper, но мы не будем использовать эту версию Kafka в этой статье).

Poll: Метод poll() — это функция, которую пользователь Kafka вызывает для извлечения записей из заданного топика.

Мы настроим описанную выше архитектуру в 4 шага. Но прежде чем мы начнем, убедитесь, что у вас есть эти предварительные условия:

Прежде, чем начать:

  • Убедитесь, что у вас установлен Docker.

  • Установите библиотеку confluent-kafka Python на свой компьютер.

  • Включите BigQuery API.

  • Создайте ключ учетной записи службы в Google Cloud с необходимыми разрешениями, важными для работы потокового API. Сохраните ключ где-нибудь на вашем компьютере, поскольку мы будем ссылаться на него позже.

#1. Разверните Kafka с помощью Docker

Kafka может быть развернута многими способами, но мы развернем ее с помощью Docker, поскольку это довольно просто.

Наш кластер Kafka будет состоять из двух основных объектов;

1 Экземпляр Брокер и

1 Экземпляр Zookeeper.

Мы будем использовать один файл Docker compose для настройки и запуска этих контейнеров. Вы заметите 2 службы и требуемые порты, представленные в docker-compose.yaml ниже:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    ports:
        - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.1
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

Убедитесь, что файл docker находится в том же каталоге, что и файлы Kafka producer и consumer, которые мы вскоре напишем.

Чтобы создать оба контейнера Docker, запустите эту команду, и оба контейнера будут доступ через несколького минут.

docker-compose up -d

#2. Соберите Producer

Мы напишем приложение / Producer, которое имитирует активность пользователя на платформе потоковой передачи музыки. Это приложение отправит событие под названием song-completed, которое запускается, когда пользователь завершает песню. Это событие будет отправлено в топик Кафки, который мы назовем tony-allen-plays.

Мы будем использовать пакет Faker для создания поддельных потоковых данных для нашего приложения. Наша поддельная полезная нагрузка события будет выглядеть примерно так:

{'user_id':001,
'artist': 'tony-allen',
'song_id': 03, 
'song_name':  'lady',
'event_type':'song_completed',
'timestamp': '2022-11-03 07:22:13'}

Чтобы установить пакет Faker, запустите это в окне терминала:

pip install Faker

Создайте поддельный список песен

Теперь в нашем коде мы инициируем объект Faker и создадим заранее закодированный список песен из 10 случайных песен Тони Аллена, которые будут частью полезной нагрузки события.

from confluent_kafka import Producer
from faker import Faker
import json
import time
import logging

#Create Faker object to generate fake data for Producer
fake=Faker()

#Create Tony Allen song list
songs = ["zombie", "lady", "secret-agent","kindness","soldiers","asiko","the-same-blood","upside-down","african-man","vip"]

Настройка формата протоколирования

Каждый раз, когда новое событие становится доступным, логи будут добавляться в  файл producer.log — который мы определяем ниже, и который находится в вашем главном каталоге. Здесь мы устанавливаем основные конфигурации для того, как мы хотим, чтобы этот файл журнала (лога) был отформатирован.

#Configure logger
logging.basicConfig(format='%(asctime)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='producer.log',
                    filemode='w')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

Запустите producer

Инициализируйте объект Kafka producer, указав сетевой порт вашего кластера Kafka, как определено в файле Docker compose выше:

#Create Kafka Producer
p=Producer({'bootstrap.servers':'localhost:9092'})

Настройка обратного вызова

Определите функцию обратного вызова, которая будет вызываться при подтверждении фиксации новых сообщений или в случае ошибок. Когда корректное сообщение становится доступным, оно декодируется в utf-8 и печатается в предпочтительном формате. Такое же сообщение также добавляется к файлу журналов.

#Callback function
def receipt(err,msg):
    if err is not None:
        print('Failed to deliver message: {}'.format(err))
    else:
        message = 'Produced message on topic {} with value of {}\n'.format(msg.topic(), msg.value().decode('utf-8'))
        logger.info(message)
        print(message)

Напишите Producer Loop

Это самая забавная часть! Здесь мы просто создаем цикл с 3-секундной задержкой, который имитирует активность реального пользователя на потоковой платформе. Мы создаем схему для нашего события JSON и используем Faker для генерации фактических наборов данных.

#Write Producer loop 
def main():
    for i in range(20):
        random_song_id = fake.random_int(min=0, max=9)
        data={
           'user_id': fake.random_int(min=20000, max=100000),
           'artist': 'tony-allen',
           'song_id': random_song_id, 
           'song_name':  songs[random_song_id],
           'event_type':'song_completed',
           'timestamp': str(fake.date_time_this_month())    
           }
        m=json.dumps(data)
        p.produce('tony-allen-plays', m.encode('utf-8'),callback=receipt)
        p.poll(1) # Polls/checks the producer for events and calls the corresponding callback functions.
        p.flush() #Wait for all messages in the Producer queue to be delivered. Should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered.
        time.sleep(3)

Обратите внимание, что когда мы вызываем p.produce, мы указываем топик Kafka, в который мы хотели бы опубликовать сообщение. В данном случае он называется  tony-allen-plays. Поскольку этот топик еще не существует в нашем кластере Kafka, он автоматически создается при первом запуске этого приложения.

p.poll важен, так как он производит проверку на наличие событий и вызывает соответствующую функцию обратного вызова, которую мы определили ранее.

Наш полный producer.py сценарий должен выглядеть следующим образом:

from confluent_kafka import Producer
from faker import Faker
import json
import time
import logging

#Create Faker object to generate fake data for Producer
fake=Faker()

#Create Tony Allen song list
songs = ["zombie", "lady", "secret-agent","kindness","soldiers","asiko","the-same-blood","upside-down","african-man","vip"]

#Configure logger
logging.basicConfig(format='%(asctime)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='producer.log',
                    filemode='w')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

#Create Kafka Producer
p=Producer({'bootstrap.servers':'localhost:9092'})

#Callback function
def receipt(err,msg):
    if err is not None:
        print('Failed to deliver message: {}'.format(err))
    else:
        message = 'Produced message on topic {} with value of {}\n'.format(msg.topic(), msg.value().decode('utf-8'))
        logger.info(message)
        print(message)

#Write Producer loop that acts like user activity
def main():
    for i in range(20):
        random_song_id = fake.random_int(min=0, max=9)
        data={
           'user_id': fake.random_int(min=20000, max=100000),
           'artist': 'tony-allen',
           'song_id': random_song_id, 
           'song_name':  songs[random_song_id],
           'event_type':'song_completed',
           'timestamp': str(fake.date_time_this_month())    
           }
        m=json.dumps(data)
        p.produce('tony-allen-plays', m.encode('utf-8'),callback=receipt)
        p.poll(1) # Polls/checks the producer for events and calls the corresponding callback functions.
        p.flush() #Wait for all messages in the Producer queue to be delivered. Should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered.
        time.sleep(3)

if __name__ == '__main__':
    main()

Чтобы подтвердить, что producer работает должным образом, выполните следующую команду в окне терминала:

python producer.py

#3. Создайте консумера (consumer)

Consumer будет делать две основные вещи:

Извлекать события из темы tony-allen-plays

Отправлять эти события в виде потока в BigQuery, используя BigQuery Streaming API

Установите библиотеку BigQuery Python

Для начала установите библиотеку BigQuery Python с помощью следующей команды.

pip install google-cloud-bigquery

Затем мы можем импортировать его в скрипт consumer.py  и настроить конфигурации BigQuery.

from confluent_kafka import Consumer
from google.cloud import bigquery
import ast
from google.oauth2 import service_account

#Create BQ credentials object
credentials = service_account.Credentials.from_service_account_file('PATH-TO-BQ-SERVICE-ACCOUNT')

# Construct a BigQuery client object.
bq_client = bigquery.Client(credentials=credentials)

#Speficy BigQuery table to stream to
table_id = 'PROJECT-ID.DATASET.TABLE-NAME'

Инициируйте Consumer

Далее мы инициируем Kafka консюмер, указав порт, а затем подписываемся на топик tony-allen-plays. При инициализации потребителя мы указываем идентификатор группы консюмеров, поскольку все консюмеры Kafka должны принадлежать к какой-то определенной группе консюмеров.

c=Consumer({'bootstrap.servers':'localhost:9092','group.id':'tony-allen-consumer','auto.offset.reset':'earliest'})
print('Kafka Consumer has been initiated...')

#Subscribe to topic
c.subscribe(['tony-allen-plays'])

Вы также заметите, что тут есть атрибут — auto.offset.reset — который имеет значение ‘самый ранний’. Он заставляет консюмер  начинать вычитывать сообщения последовательно с самого начала партиции топика.

Типичное приложение Kafka консюмера сосредоточено вокруг Consumer Loop. Итак, последний шаг – написать цикл, который последовательно опрашивает топик на предмет новых сообщений и, если находит, то отправляет эти сообщения в BigQuery.

Тогда наш полный код должен выглядеть следующим образом:

from confluent_kafka import Consumer
from google.cloud import bigquery
import ast

from google.oauth2 import service_account

#Create BQ credentials object
credentials = service_account.Credentials.from_service_account_file('credentials/bq-service-account.json')

# Construct a BigQuery client object.
bq_client = bigquery.Client(credentials=credentials)

#Speficy BigQuery table to stream to
table_id = 'PROJECT-ID.DATASET.TABLE-NAME'

################ Kafka Consumer #################
c=Consumer({'bootstrap.servers':'localhost:9092','group.id':'tony-allen-consumer','auto.offset.reset':'earliest'})
print('Kafka Consumer has been initiated...')

#Subscribe to topic
c.subscribe(['tony-allen-plays'])

def main():
    try:
        while True:
            msg=c.poll(timeout=1.0)  #Retrieve records one-by-one that have been efficiently pre-fetched by the consumer behind the scenes
            if msg is None:
                continue
            if msg.error():
                print('Error: {}'.format(msg.error()))
                continue
            else:
                data=msg.value().decode('utf-8')

                res = ast.literal_eval(data) #Convert string response to dictionary
                print(res)

                ##### Stream data into BigQuery table #######
                rows_to_insert = [res]
                print((rows_to_insert))
                errors = bq_client.insert_rows_json(table_id,rows_to_insert) #Make API request

                if errors==[]:
                    print("New rows added.")
                else:
                    print("Encountered erros while inserting rows: {}".format(errors))
    finally:
        c.close() # Close down consumer to commit final offsets.

if __name__ == "__main__":
    main()

Запустите конвейер Kafka

Теперь, когда Consumer и Producer настроены, откройте два отдельных окна терминала и снова запустите producer:

python producer.py

Затем запустите consumer после этого, чтобы он считывал данные из топика в режиме реального времени:

python consumer.py

Если вы видите, что сообщения, сгенерированные производителем, начинают отображаться в окне терминала потребителя, значит, ваш потребитель работает должным образом, и данные также должны передаваться в BigQuery:

#4. Визуализируйте данные

Последним шагом будет подключение таблицы BigQuery к Looker Studio и создание простой гистограммы для визуализации популярных песен практически в режиме реального времени.

Перейдите в Looker Studio, войдите в систему и:

Выберите новый “Пустой отчет”

В разделе “подключение к данным” выберите “BigQuery” в качестве источника данных

Затем выберите свой проект BigQuery, набор данных и таблицу

Теперь вам должен быть представлен аналогичный вид. Убедитесь, что поля измерений и метрик соответствуют приведенному ниже скриншоту, и у вас должна получиться простая столбчатая диаграмма, как показано на рисунке.

Курс по Apache Kafka для продвинутых разработчиков – старт 10 марта: slurm.club/3YkiRI7

Рядом с панелью мониторинга в реальном времени

В Looker Studio есть функция обновления данных, которая определяет, как часто следует обновлять панель мониторинга. Вы можете установить это значение равным 1 минуте, что является наименьшим циклом обновления, доступным в настоящее время в Looker Studio, и ваша панель мониторинга должна обновляться каждую минуту.

Вывод

Мы рассмотрели основы того, как настроить минимальный кластер Kafka с помощью Docker, загрузить данные в топик, а затем использовать и передавать эти данные в BigQuery. Наконец, мы создали панель мониторинга почти в режиме реального времени, чтобы представить окончательные результаты в Looker Studio.

Apache Kafka для разработчиков

Новый поток Apache Kafka для разработчиков с практикой на Java или Golang и платформой Spring+Docker+Postgres стартует 10 марта 2023 года. Он будет полезен разработчикам, архитекторам и системным администраторам, которые желающих расширить свои знания и отработать их на практике.

На курсе разберем:

  • Неправильное использование Кафка;

  • Отсутствие коммитов в Кафка;

  • Исчезновение сообщений;

  • Ваши кейсы о проблемах при работе с Apache Kafka;

  • Интересные кейсы от профессионалов.

Узнать подробности и записаться: slurm.club/3YkiRI7