Pull to refresh

Интеграция Spark Streaming и Kafka

Reading time6 min
Views16K
Original author: Data Flair
Здравствуйте, коллеги! Напоминаем, что не так давно у нас вышла книга о Spark, а прямо сейчас проходит последнюю корректуру книга о Kafka.


Надеемся, эти книги окажутся достаточно успешными для продолжения темы — например, для перевода и издания литературы по Spark Streaming. Перевод об интеграции этой технологии с Kafka мы и хотели вам сегодня предложить

1. Обоснование

Apache Kafka + Spark Streaming – одна из наилучших комбинаций для создания приложений, работающих в реальном времени. В этой статье мы подробно обсудим детали такой интеграции. Кроме того, мы рассмотрим пример со Spark Streaming-Kafka. Затем обсудим «подход с получателем» и вариант непосредственной интеграции Kafka и Spark Streaming. Итак, приступим к интеграции Kafka и Spark Streaming.



2. Интеграция Kafka и Spark Streaming

При интеграции Apache Kafka и Spark Streaming возможны два подхода к конфигурации Spark Streaming для получения данных из Kafka – т.е. два подхода к интеграции Kafka и Spark Streaming. Во-первых, можно использовать Получатели и высокоуровневый API Kafka. Второй (более новый) подход – это работа без Получателей. Для обоих подходов существуют разные модели программирования, отличающиеся, например, по части производительности и семантических гарантий.



Рассмотрим эти подходы подробнее

a. Подход на основе получателей

В данном случае прием данных обеспечивает Получатель. Итак, воспользовавшись высокоуровневым API потребления, предусмотренным в Kafka, мы реализуем Получатель. Далее полученные данные хранятся в Исполнителях Spark. Затем в Kafka – Spark Streaming запускаются задания, в рамках которых обрабатываются данные.

Однако, при использовании такого подхода сохраняется риск потери данных в случае отказа (при конфигурации, задаваемой по умолчанию). Следовательно, потребуется дополнительно включить в Kafka – Spark Streaming журнал опережающей записи, чтобы исключить потери данных. Таким образом, все данные, полученные от Kafka, синхронно сохраняются в журнале опережающей записи в распределенной файловой системе. Именно поэтому даже после отказа системы все данные можно восстановить.

Далее рассмотрим, как использовать в приложении с Kafka – Spark Streaming такой подход с применением получателей.

i. Связывание

Теперь свяжем наше потоковое приложение со следующим артефактом для приложений Scala/Java, воспользуемся при этом определениями проекта для SBT/Maven.

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

Однако, при развертывании нашего приложения нам придется добавить вышеупомянутую библиотеку и ее зависимости, это понадобится для Python-приложений.

ii. Программирование

Далее создадим входной поток DStream, импортировав KafkaUtils в код потокового приложения:

import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

Кроме того, пользуясь вариантами createStream, можно задать классы-ключи и классы-значения, а также соответствующие классы для их декодирования.

iii. Развертывание

Как и в случае с любым приложением Spark, для запуска используется команда spark-submit. Однако, детали немного отличаются в приложениях на Scala/Java и в приложениях на Python.

Более того, при помощи –packages можно добавить spark-streaming-Kafka-0-8_2.11 и его зависимости непосредственно к spark-submit, это пригодится для приложений на Python, где невозможно управлять проектами при помощи SBT/Maven.

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...

Также можно загрузить JAR-архив артефакта Maven spark-streaming-Kafka-0-8-assembly из репозитория Maven. Затем добавить его к spark-submit с –jars.

b. Прямой подход (без получателей)

После подхода с использованием получателей был выработан более новый подход – «прямой». Он дает надежные сквозные гарантии. В таком случае мы периодически запрашиваем Kafka о смещениях вычитанных данных (offsets) по каждому топику/секции, а не организуем доставку данных через получателей. Кроме того, определяется размер считываемого фрагмента, это нужно для правильной обработки каждого пакета. Наконец, для считывания диапазонов с данными из Kafka с заданными смещениями используется простой потребляющий API, особенно когда запускаются задания по обработке данных. Весь процесс напоминает считывание файлов из файловой системы.

Примечание: Данная возможность появилась в Spark 1.3 для Scala и Java API, а также в Spark 1.4 для Python API.

Теперь давайте обсудим, как применить этот подход в нашем потоковом приложении.
Об API потребления (Consumer API) подробнее рассказано по следующей ссылке:

Apache Kafka Consumer | Examples of Kafka Consumer

i. Связывание

Правда, такой подход поддерживается лишь в приложениях на Scala/Java. Имея следующий артефакт, скомпонуйте проект SBT/Maven.

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

ii. Программирование

Далее импортируйте KafkaUtils и создайте входной DStream в коде потокового приложения:

import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])

В параметрах Kafka потребуется указать либо metadata.broker.list, либо bootstrap.servers. Следовательно, по умолчанию мы будем потреблять данные, начиная с последнего смещения в каждой секции Kafka. Однако, если вы хотите, чтобы считывание началось с самого маленького фрагмента, то в параметрах Kafka нужно задать конфигурационную опцию auto.offset.reset.

Более того, работая с вариантами KafkaUtils.createDirectStream, можно начать считывание с произвольного смещения. Затем сделаем следующее, что позволит нам получить доступ к фрагментам Kafka, потребленным в каждом пакете.

// Храним ссылку на актуальные диапазоны фрагментов, чтобы ее могли использовать и последующие потоки
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}

Если мы хотим организовать мониторинг Kafka на основе Zookeeper при помощи специальных инструментов, то можем сами обновлять Zookeeper с их помощью.

iii. Развертывание

Процесс развертывания в данном случае напоминает процесс развертывания в варианте с получателем.

3. Преимущества прямого подхода

Второй подход к интеграции Spark Streaming с Kafka выигрывает у первого по следующим причинам:

a. Упрощенный параллелизм

В данном случае не требуется создавать множество входных потоков Kafka и объединять их. Однако, Kafka – Spark Streaming создаст столько RDD-сегментов, сколько будет сегментов Kafka на потребление. Все эти данные Kafka будут считываться параллельно. Следовательно, можно сказать, что у нас будет соответствие «один к одному» между сегментами Kafka и RDD, а такая модель получается понятнее и проще в настройке.

b. Эффективность

Чтобы полностью исключить потери данных при первом подходе, информацию требовалось хранить в журнале опережающей записи, а затем их реплицировать. На самом деле, это неэффективно, поскольку данные реплицируются дважды: в первый раз самим Kafka, а во второй – журналом опережающей записи. При втором подходе данная проблема устраняется, поскольку получателя нет, а, значит, не нужен и журнал опережающей записи. Если у нас предусмотрено достаточно долгое хранение данных в Kafka, то восстанавливать сообщения можно прямо из Kafka.

с. Семантика Exactly-Once

В принципе, мы использовали высокоуровневый API Kafka при первом подходе, чтобы хранить в Zookeeper потребленные считываемые фрагменты. Однако, именно так принято потреблять данные из Kafka. Пусть при этом и надежно исключаются потери данных, существует небольшая вероятность, что при некоторых отказах отдельные записи могут быть потреблены дважды. Все дело в несогласованности между механизмом надежной передачи данных в Kafka – Spark Streaming и считыванием фрагментов, происходящим в Zookeeper. Следовательно, при втором подходе мы применяем простой Kafka API, не требующий прибегать к Zookeeper. Здесь считываемые фрагменты отслеживаются в Kafka – Spark Streaming, для этого используются контрольные точки. В таком случае устраняется несогласованность между Spark Streaming и Zookeeper/Kafka.

Следовательно, даже в случае отказов, Spark Streaming получает каждую запись строго однократно. Здесь нужно гарантировать, чтобы наша операция вывода, при которой данные сохраняются во внешнем хранилище, была либо идемпотентной, либо атомарной транзакцией, в которой сохранялись бы и результаты, и смещения. Именно так достигается семантика exactly-once при выводе наших результатов.

Хотя, здесь есть один недостаток: смещения в Zookeeper не обновляются. Поэтому мониторинговые инструменты Kafka на основе Zookeeper не позволяют отслеживать прогресс.
Однако, мы все равно можем обращаться к смещениям, если обработка устроена таким методом – обращаемся к каждому пакету и обновляем Zookeeper сами.

Вот и все, что мы хотели рассказать об интеграции Apache Kafka и Spark Streaming. Надеемся, вам понравилось.
Tags:
Hubs:
Total votes 14: ↑14 and ↓0+14
Comments6

Articles

Information

Website
piter.com
Registered
Founded
Employees
201–500 employees
Location
Россия