Как стать автором
Обновить

Python streaming (spark+kafka)

Уровень сложностиПростой
Время на прочтение4 мин
Количество просмотров5.2K

Привет, меня зовут Роман Вороновский и в этой статье, посвященной Python streaming с использованием Spark и Kafka мы рассмотрим основные шаги, представленные в видео:

Чтобы предоставить вам более подробное описание процесса и помочь при развертывании локального окружения.

Видео,как и статья нацелена на тех, кто только начинает свой путь в Big Data и еще не совсем умеет настраивать различные окружения для легкого старта в профессии.

Итак начнем.

необходимые компоненты и их проверка

Для установки необходимого окружения и проверки компонентов, перед началом работы с Python streaming в связке Spark и Kafka, вам потребуются следующие инструменты и плагины.

0. Docker Desktop (скачать можно по ссылке: https://www.docker.com/products/docker-desktop/)

1. Spark: Apache Spark - это быстрая и мощная платформа для обработки больших объемов данных. Для установки Spark достаточно воспользоваться интерпретатором conda (скачать можно по ссылке: https://www.anaconda.com/download) а не стандартным интерпретатором python, для этого:

  • в правом нижнем углу жмем по интерпретаторам

  • выбираем опцию "Add new interpreter"

  • выбираем опцию "Add local interpreter"

  • выбираем Conda Environment

установка Conda интерпретатора
установка Conda интерпретатора

С данным интерпретатором Spark работает из коробки и никаких допов в виде Hadoop и тд. разворачивать не прийдется.

2. Kafka: Apache Kafka - это распределенная платформа, предназначенная для обработки потоковых данных. Для установки Kafka достаточно использовать docker-compose.yaml файл, закрепленный в репозитории. Для его запуска можно воспользоваться командой, находясь в корне проекта:

docker-compose up -d

3. Python: Python - популярный язык программирования, используемый для разработки приложений и анализа данных. Убедитесь, что на вашем компьютере установлена актуальная версия Python. Вы можете скачать и установить Python с официального сайта (https://www.python.org/downloads). В видео я использую IDE- PyCharm (скачать можно по ссылке: https://www.jetbrains.com/pycharm/?var=1)

4. Для работы с Kafka в Python, вам понадобится установить библиотеку pykafka. Откройте командную строку и выполните команду:

pip install pykafka

для установки этой библиотеки.

5.PySpark - это Python API для Apache Spark. Чтобы установить PySpark, выполните команду:

pip install pyspark

в командной строке.

После установки всех необходимых компонентов и плагинов, проверяем их работоспособность.

Проверка работоспособности

Kafka

После выполнения команды по поднятию контейнера мы должны наблюдать следующую картину:

отображение Kafka в docker desktop
отображение Kafka в docker desktop

Запускаем Python скрипт который бы писал сообщения в Kafka(producer.py):

from pykafka import KafkaClient

if __name__ == "__main__":
    #хост подключения к кафка
    client = KafkaClient(hosts='127.0.0.1:9092')
    #имя топика в который мы собираемся отправлять сообщения
    topic = client.topics[b'stream_topic']
    #создание продюсера, который и будет отправлять сообщения
    producer = topic.get_producer()
    #отправка самих сообщений
    producer.produce(b'Hello, Streaming!')
    producer.produce(b'Kafka first Message!')
    #остановка продюсера
    producer.stop()

Следом запускаем скрипт который бы распечатывал сообщения из топика kafka(consumer.py):

from pykafka import KafkaClient

if __name__ == "__main__":
    #хост подключения к кафка
    client = KafkaClient(hosts='127.0.0.1:9092')
    #имя топика из которого мы собираемся получать сообщения
    topic = client.topics[b'stream_topic']
    #создание простого консьюмера
    consumer = topic.get_simple_consumer()
    #обработка сообщений если они еще есть в топике
    for message in consumer:
        if message is not None:
            print(message.value.decode('utf-8')) #берется значение сообщений и декодируется по utf-8 для валидного отображения

В результате все сообщения которые были отправлены должны были распечататься в консоль при помощи consumer.py программы.

Spark+kafka

Для проверки уже в связке необходимо использовать программу - sparkConsumeStreamKafkaWriteToConsole.py :

import os

from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

if __name__ == '__main__':
    spark = SparkSession \
        .builder \
        .appName("ProduceConsoleApp") \
        .getOrCreate()

    source = (spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "127.0.0.1:9092")
              .option("subscribe", "stream_topic")
              .option("startingOffsets", "earliest")
              .load()
              )
    #печать схемы - необязательна
    source.printSchema()
    #работа над получаемыми DataFrame
    df = (source
          .selectExpr('CAST(value AS STRING)', 'offset'))
    # блок записи полученных результатов после действий над DataFrame
    console = (df
               .writeStream
               .format('console'))

    console.start().awaitTermination()

Работа над получаемыми DataFrame может максимально отличаться от моего примера, тк данный пример был представлен исключительно в обучающих целях, для реальных проектов данные запросы могут быть гораздо сложнее : )

В блоке записи мы можем указывать различные форматы, так же для примера был выбран самый простой - в консоль. Если хотите усложнить и прописать обратную запись в какой-нибудь результирующий топик, то попробуйте изменить код начиная с .format следующим образом:

 .format("kafka") \
        .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
        .option("topic", "result_topic") \
        .option('checkpointLocation', './.local/checkpoint') \
        .start().awaitTermination()

установка Hadoop_Home

Если возникли ошибки как на видео , связанные с Hadoop, то данный параграф необходимо прочесть.

Первое с чего стоит начать - установить себе папку winutils которую я показывал на видео(скачать можно по ссылке: https://github.com/kontext-tech/winutils). Я использовал другую версию, которая также закреплена линком в репозитории, привожу её и тут: https://github.com/cdarlint/winutils

После успешной установки(в моем случае это версия 3.2.2) необходимо задать системную переменную HADOOP_HOME. Делается это следующим образом:

  • заходим в параметры системы

  • прописываем: "изменение системных переменных среды"

  • жмем на кнопку: "переменные среды"

  • жмем на кнопку: "создать"

  • в поле имя переменной прописываем: "HADOOP_HOME"

  • в поле значение переменной прописываем путь до нашего скаченного файла нужной версии ( в моем случае: "C:\Users\Roman\Documents\hadoop-3.2.2"

  • Находим среди системных переменных- переменную с наименованием: "Path"

  • изменяем её дописывая следующий текст: "%HADOOP_HOME%\bin"

  • сохраняем и перезагружаем ПК

Листинг изображениями:

Итог

После проделанных манипуляций ошибка уйдет и проект должен работать корректно, в данном случае мои версии для которых все запускалось:

  • Python 3.11 - conda interpreter

  • pyspark 3.4.1

  • pykafka 2.8.0

Всем спасибо за внимание! Оставляйте комментарии и подписывайтесь!

Репозиторий закреплен под видео.

Теги:
Хабы:
Всего голосов 3: ↑3 и ↓0+3
Комментарии7

Публикации

Истории

Работа

Data Scientist
93 вакансии
Python разработчик
196 вакансий

Ближайшие события

27 августа – 7 октября
Премия digital-кейсов «Проксима»
МоскваОнлайн
28 сентября – 5 октября
О! Хакатон
Онлайн
3 – 18 октября
Kokoc Hackathon 2024
Онлайн
10 – 11 октября
HR IT & Team Lead конференция «Битва за IT-таланты»
МоскваОнлайн
25 октября
Конференция по росту продуктов EGC’24
МоскваОнлайн
7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн