Всем привет! Я Алексей Понаморевский, разработчик решений для платформ сбора и обработки больших данных.
Два года назад мы в ITSumma создали решение для потоковой обработки данных с помощью Apache Spark и базы данных Greenplum — spark-greenplum-connector. Это многофункциональный плагин для Spark, на его основе инженеры могут строить ETL-решения и анализировать данные in-memory.
Изначально мы разработали его, как часть клиентской платформы потоковой обработки данных. Но со временем он прирос одной интересной функциональностью, которая недоступна сейчас в других подобных решениях. В этой статья я хочу сделать краткое сравнение между двумя opensource-продуктами Apache Spark и Flink, а также рассказать об одной интересной особенности Spark, которую мы реализовали в коннекторе.
Анализируем Spark и Flink в разрезе потоковой обработки данных
Для анализа я выделил 4 важных, на мой взгляд, критерия. Если у вас получится дополнить этот список своими и рассказать, что, по вашему мнению, ещё важно для этой задачи, буду рад комментариям под этой статьей.
Время задержки при прохождении данных через потоковую систему обработки
Наш коннектор не может работать быстрее, чем сам Spark, так как он компонент платформы и ограничен характеристиками исходного решения. Spark в этом плане на данный момент обладает удовлетворительными характеристиками, но уступает решениям на основе Apache Flink. Причина в том, что Spark использует механизм микропакетов (micro-batch) для потоковой обработки, а Flink — так называемый "настоящий" потоковый режим.
Что такое микропакетная обработка
Микропакетная обработка — это вариант традиционной пакетной обработки, при котором обработка данных происходит чаще, потому что обрабатываются меньшие группы новых данных. Большие наборы данных разделяются на микропакеты с небольшим объемом данных и обрабатываются параллельно.
Микропакетная обработка полезна, когда нам нужны очень свежие данные, но не обязательно в реальном времени — мы не можем ждать час или день, пока выполнится пакетная обработка, но нам также не нужно знать, что произошло за последние несколько секунд. Как раз это и требовалось на проекте у заказчика.
При этом разница между микропакетной и потоковой обработкой весьма условна. Правда в том, что Spark вносит заметную паузу, порядка 1 секунд и больше, между каждыми двумя микропакетами данных, что в итоге и приводит к задержке в потоке обработки. Однако, команда разработчиков Spark активно работает над этим недостатком, и со временем можно надеяться, что разница между Spark и Flink в этом плане будет сведена на нет или станет гораздо менее существенной.
Примеры, как разработчики нивелируют эту проблему, смотрите здесь и здесь.
Наличие параллельной распределённой обработки с возможностью масштабирования общей пропускной способности
Грубо говоря, если нам нужно обрабатывать больше данных в единицу времени, добавляем узлы в кластер, и производительность увеличивается пропорционально.
Благодаря нашему коннектору, мы можем в полной мере реализовать масштабируемость и параллельную обработку, заложенные в Spark и Greenplum — обе системы распределённые и отвечают всем требованиям в этом отношении. Коннектор позволяет гармонично организовать их совместную работу.
Впрочем, с использованием альтернативных вариантов на основе Flink, масштабируемость также может быть обеспечена, при условии, что у вас есть аналогичный специализированный коннектор Flink - Greenplum.
Гарантия обязательной строго однократной передачи каждой записи (Exactly Once Semantics)
Для многих задач важно, чтобы каждая запись (элемент данных), приходящая из источника, гарантировано была обработана и передана получателю. Причем строго один раз, без возникновения дублей, даже в случае преднамеренного либо аварийного перезапуска приложения потоковой обработки.
Spark и, соответственно, наш коннектор, обеспечивает такие гарантии за счёт механизма смещений (offset) и контрольных точек (checkpoint), о котором подробнее расскажем дальше. При этом коннектор может использоваться как в качестве источника, так и получателя данных. Здесь у нас опять же паритет с Apache Flink, поскольку там также имеется свой подобный механизм.
Интересная функциональность
А вот здесь-то и кроются те плюсы, ради которых всё затевалось.
Поясним на примере. Предположим, перед нами стоит задача организовать потоковую передачу свежих данных из БД Greenplum в Kafka, с ещё с какой-то переработкой этих данных на ходу. Источником данных может являться таблица, или группа таблиц в БД, которые содержат до нескольких миллионов записей. В поток должны поступать только свежие, т.е. вновь добавленные или измененные данные. Соответственно, нам требуется механизм, способный отбирать нужные записи и помещать их в поток.
На первый взгляд задача не выглядит очень уж сложной, однако на практике для её решения применяются целые специализированные фреймворки, например, Debezium. Вот здесь можно увидеть обзор решения на его основе, как раз для потоковой передачи данных из БД Postgres в Apache Kafka.
Используя наш коннектор и Spark, мы можем решить поставленную задачу, передавая данные из Greenplum, а не из Postgres, что уже само по себе лучше. И при этом не прибегая к услугам Debezium, просто написав несколько строк SQL-кода и передав его коннектору в качестве параметров. Как платформа обработки данных, Spark обладает большими возможностями, поэтому мы считаем, что наше решение является более привлекательным в данном случае.
Возможно ли реализовать подобное на основе Apache Flink с сохранением всех вышеперечисленных достоинств? Я не изучал этот вопрос глубоко, но на данный момент мне представляется, что нет, поправьте меня если кто-то знает ответ. Со временем, если будет потребность, мы можем распространить наш подход и на Flink, создав для него компонент, аналогичный нашему коннектору для Spark.
Метод
Суть подхода, который мы применили при реализации коннектора, заключается в кооперативной обработке данных в БД Greenplum и Spark. Так как Greenplum сам по себе предоставляет некоторые методы для обработки данных — SQL и PL/pgSQL, мы можем перенести часть обработки на сторону БД, в тех случаях, когда это целесообразно. Не исключено, что кому-то это покажется страшной ересью, но это работает, и работает хорошо!
Во-первых, это позволяет увеличить степень распараллеливания процессов и производительность. Во-вторых, обе платформы выигрывают в функциональности за счёт взаимодополнения.
Примеры кода и как работает коннектор
В качестве примера приведу элементы кода из пробного скрипта, идущего в комплекте с коннектором, и прокомментирую его работу. Версия коннектора с открытым исходным кодом доступна на github, там можно посмотреть пример целиком в файле streaming-example.scala, а также всю необходимую документацию.
Скрипт демонстрирует работу коннектора при использовании БД Greenplum одновременно в качестве источника и получателя данных в режиме потоковой обработки.
Фейковые данные генерируются в БД с помощью PL/SQL программного блока, через коннектор передаются в Spark, где производится их простейшая обработка — присоединяются дополнительные колонки, затем через другую копию коннектора передаются обратно в БД, где их обрабатывает ещё один PL/SQL блок. Далее он их подсчитывает и печатает на консоли среднее время задержки для сквозного прохождения данных и другую статистику. Скрипт предназначен для интерактивного запуска в программе spark-shell.
Основной элемент скрипта — код, определяющий структуру потока обработки данных и выглядит он так:
val stream = (spark.readStream.format("its-greenplum").option("url", dbUrl).
option("user", dbUser).
option("password", dbPassword).
option("dbtable","select 1::bigint id, 1::int seq_n, clock_timestamp() gen_ts, 1::bigint offset_id").
option("sqlTransfer", generator).
option("offset.select", s"select json_build_object('offset_ts', (extract(epoch from pg_catalog.clock_timestamp()) * ${1.0/secondsPerBatch})::bigint)::text").
load().
withColumn("spark_ts", com.itsumma.gpconnector.ItsMiscUDFs.getRowTimestamp()).
selectExpr("getBatchId() as batch_id", "id", "seq_n", "gen_ts", "spark_ts", "(cast(spark_ts as double) - cast(gen_ts as double)) as delay_s", "offset_id").
writeStream.
format("its-greenplum").option("url", dbUrl).
option("user", dbUser).
option("password", dbPassword).
option("sqlTransfer", aggregator).
option("dbmessages", "WARN").
option("checkpointLocation", cpDirName).
outputMode("append").
start())
Это типовой способ инициализации приложения потоковой обработки в Spark. Далее приложению ничего делать не нужно, и оно просто ожидает команды на завершение в каком-либо холостом цикле. Данные обрабатываются в заданном потоке автоматически по мере их поступления из источника.
Фрагмент format ("its-greenplum") указывает, что в качестве источника или получателя данных используется наш коннектор, параметры dbUrl, user и password задают параметры подключения к базе в том же формате, который обычно используется для JDBC подключения к PostgreSQL или Greenplum.
Параметр dbtable может быть использован для указания имени таблицы — источника данных, но так как в данном случае источником данных служит не таблица, а блок исполняемого PL/SQL кода, то нам каким-то образом нужно указать коннектору формат входящих данных. Для этого мы передаём здесь SQL запрос, который возвращает таблицу из одной строки с такими же колонками, которые будут поступать из PL/SQL блока.
Параметр "sqlTransfer" — здесь мы передаём коннектору текст PL/SQL кода, который будет генерировать для нас данные. PL/SQL блок вызывается один раз на каждый микропакет (micro-batch), генерирует заданное количество строк и завершается.
Другой интересный параметр во входной части потока - "offset.select". По сути, это любой SQL запрос, возвращающий JSON объект, который в Spark называется offset - он указывает на текущую позицию во входных данных. Что именно вы поместите в этот JSON полностью ваше дело — это может быть дата и время последней строки, добавленной в какую-то таблицу или любой другой ключ. Для Spark важно только изменился offset с момента предыдущего микропакета или нет.
Если offset изменился, Spark знает, что в вашем входном потоке появились новые данные и формирует новый микропакет (micro-batch). При этом вам не надо беспокоиться о том, как обеспечить целостность данных при перезапуске приложения или сбое — спарк делает это автоматически за счёт механизма check-point, где он сохраняет offset соответствующий последнему успешно завершенному микропакету. От вас требуется только из PL/SQL блока возвращать набор строк, соответствующий интервалу смещений, которые запрашивает у вас Spark.
Коннектор передаёт параметры в PL/SQL блок с помощью шаблонных полей, которые вам нужно в него включить, вот так:
v_start_offset bigint := ('<start_offset_json>'::json ->> 'offset_ts')::bigint;
v_end_ofsset bigint := ('<end_offset_json>'::json ->> 'offset_ts')::bigint;
В выходной части потока (после writeStream) всё куда проще. В option("sqlTransfer", aggregator) передаётся текст PL/SQL запроса, который получает данные из Spark.
Параметр checkpointLocation задаёт папку в файловой системе hdfs связанной с вашим кластером Spark, куда будут сохраняться контрольные точки вашего приложения. Важно выбирать отдельные папки для каждого приложения потоковой обработки, иначе продолжение потока с последней позиции после перезапуска будет невозможно.
Параметр option("dbmessages", "WARN") разрешает показывать сообщения, в которых PL/SQL блок aggregator выдаёт свой результат.
Интересный кейс
Экспериментируя с коннектором, мы выяснили один интересный факт:
когда micro-batch уже запущен, и PL/SQL блок передаёт строки в Spark по одной, используя для этого цикл типа FOR .. END LOOP. Каждая отдельная строка при некоторых условиях может доходить до приложения в Spark с минимальной задержкой — всего несколько миллисекунд.
В настоящий момент мы исследуем возможность использовать эту функциональность при реализации режима, который в Spark именуется Continuous Triggering. Он предполагает существенное уменьшение задержки обработки по сравнению с режимом micro-batch. А значит, мы сможем максимально приблизиться к “настоящей” потоковой обработке, а то и вовсе реализовать её на Spark. О том, получилось у нас или нет, читайте в следующих выпусках!