Введение
Что будет в статье:
1. Код с решемением проблемы
2. Описание проблемы
3. Мои пояснения о работе к Spark Structured streaming которые, как я считаю, важны для понимание проблемы.
4. В статье будут ссылки на источники на английском, и мой перевод к ним курсивом
.
Чего не будет в статье:
1. Примеров кода и простых сценариев работы Spark или Spark Structured Streaming
2. Подробного пояснения работы Spark Structured Streams
3. Теории на тему потоковой обработки данных: delivery-semantic и обеспечение отказоустойчивости.
Так что если вы не знакомы с Spark на базовом уровне, то рекомендую ознакомиться заранее, иначе многие отсылки или примеры могут показаться не понятными или не очевидными.
Благо информации на эти темы хватает, это очень популярная область в Big data мире сейчас.
Определение проблемы и участников
Начну пожалуй с того, что такое Spark Structured Stream и graceful shutdown.
Graceful shutdown он же корректное/штатное/без ошибок завершение работы, с точным определением сложнее. Нет ну то есть все, я думаю, понимают что это значит, вот только как показывает практика все понимают что то свое. И это утверждение легко проверить просто вбив эти два английских слова в google. По этому для описании концепции в целом, я приведу тот вариант который мне по нраву, а для Structured Stream отдельно опишу чего именно хочется достичь. Graceful shutdown — это когда вы даете программе команду на выключение, долго ждете, а потом она выключается.
Spark Structured Stream, не путать с Spark Direct Stream. И так согласно официальной документации это масштабируемый и отказоустойчивый механизм потоковой обработки данных, построенный на поверх Spark SQL framework. Позволяет описывать обработку потока данных точно так же, как если бы это был статический набор данных.
Я лично готов поспорить, насчет точно так же
, но это уже для другой статьи.
Тут для не опытных Spark пользователей хочется заметить, что все spark параметры c префиксом spark.streaming.xxx, не имеют ни какого отношения к Structured Stream и являются частью DStream. В том числе и spark.streaming.stopGracefullyOnShutdown
В случае со Spark Structured Stream, хотелось бы обьяснить чего именно надо долго ждать - завершения уже начатого микро batch
(группа/пакет) данных.
Для новичков в Spark, стоит обьяснить -
Что за группа
и как это связанно с Stream -> Из документацииПо умолчанию, Structured Streaming запросы выполняются маленькими(условно) пакетами данных которые превращают поток в серию маленьких задач, таким образом достигая задержками в 100 милисекунд и гарантирует отказоустойчивость на уровне exactly-once для каждой записи.
Для опытных пользователей замечу, что такое описание не применимо к Continuous streaming
. И я его в статье рассматривать не буду, лично я, смысла не вижу делать там graceful shutdown.
Зачем вообще это нужно
Существует какая-то логика, привязанная к тому что stream жив или к его завершению. Простой пример, необходимо определять штатные перезапуски Structured Stream и его падения что бы настроить Alert manager, систему оповещения об ошибках.
Существует логика, возможно очень дорогостоящая в плане ресурсов, которая обрабатывает падения. Например возможно при падании происходят дубликаты записи или еще какие то негативные последствия, и их поиск требует full scan по базе или файловой системе. Естественно делать это на каждый перезапуск Structured Stream слишком расточительно.
Строгие Latency SLA, гарантии задержки данных. Которые нельзя нарушать в случае штатных операций - изменения схемы или смены конфигурации, банального restart на другом кластере. Например у меня SLA 2 минуты, это значит что данные, с момента их записи в источник до точки назначения должны добраться за 2 минуты.
В случае Spark Structured Stream, можно использовать одноминутные интервалы. Тогда 1 минуту spark собирает данные и до 1й минуты тратит на обработку. А теперь представим что в конце 2й минуты, минуты обработки, был вызван перезапуск по причине изменений. Если потушить приложение прямо так, не дождавшись окончания обработки - то всю работу прийдется выполнить повторно. Что естественно приведет к задержке нарушающей SLA.
Особенно это не желательно при изменениях схемы, ведь таких изменений может быть много.
Очень важно обратить внимание что отказоустойчивость Structured Stream не имеет никакого отношения к graceful shutdown
. Отказоустойчивость достигается в результате того что все операции и сам Structured Stream являются идемпотентными. А значит можно безопасно стартовать после падений.
Особенно подчеркну что delivery semantic на уровне exactly-once гарантируется только для Spark Structured Stream операций и состояний. И если используемый Sink, точка назначения ваших данных, не поддерживает идемпотентность записи, то перезапуск упавшего Structured Stream может привести к дубликатам при записи. Например родной Kafka Sink, . Но в любом случае это проблема куда глубже чем только падание Structured Stream, потому что в таком случае дубликаты могут происходить и при повторении Spark Task, что вообще штатная для spark операция. Которая регулярно происходит.
При этом если вас интересует корректное завершение в случае sigterm сигнала, то обязательно нужно добиться завершения Structured Stream, до завершения spark context. Этот пример и будет разобран, как самый неочевидный частный случай shutdown.
И почему shutdown требует отдельной статьи, а не вызова метода stop
Ну начну с того, что за метод stop
такой.
Из документации, существует 2 метода которые собственно отвечают за остановку Structured stream.
Уже только по наличие 2ух методов можно предположить что
query.stop() // Остановить запрос(stream)
query.awaitTermination() // заблокировать пока запрос(stream) не прекратиться по причине вызова stop() или ошибкиstop()
асинхронный. Что лично меня навело на мысль, что ни какой проблемы нет и graceful shutdown иметься из коробки.
Но я притворюсь что я умный и читал документацию, а не выгребал ошибки в production. Из документация Останавливает выполнение этого запроса если он активен. Вызов блокируется до того как прекратиться выполнение query execution threads или по timeout.
И вот тут опытный Java/Scala разработчик по идее должен был напрячься.
Какой то Thread, уж больно напоминающий java thread по названию, прекратиться по сигналу. Да и в добавок к этому еще один, дополнительный, метод который ждет завершения. Это же thread.interupt()
скажите вы, и будете правы - так что ни про какой graceful shutdown речь идти не может.
Но по правде сказать, я уже на столько свыкся с всякими Async/Rx/CSP концепциями, что мне в голову не пришло, Stream может быть реализованы как Thread(() => while(true) processNextBatch)
. В одном из самых популярных Big Data frameworks. И очень зря ведь это Spark, с кучей Hadoop внутри и ThreadLocal.
Так что я просто вызвал эти два метода, парой как предлагает официальная документация. И все внешне было именно так, как надо. Ни каких ошибок или warning логов. Приложение выключалось тихо и спокойно. Только очень уж быстро. За секунды, а мои batches - ни микро ни разу. А минут по 5.
Что-то явно пошло не так. Ковыряясь в исходниках spark легко узнать что:
Meтод stop()
вызывает sparkSession.sparkContext.cancelJobGroup(runId.toString)
. cancelJobGroup
отменяются все задачи которые spark запланировал или выполняет в ходе этого stream.
Идем на spark UI, и видим что на самом деле задачи были cancel job fail.
Решение
Вот github gist с полностью готовым решением, которое бежит у меня в production не первый год. Дальше в статье будет только описание работы этого кода.
Если вы не новичок в spark, возомжно, вам будет проще посмотреть сам код, чем читать описание.
Высокоуровневое описание решения:
Запустить сигнал об остановке
C этого момента, если наш stream активный - то ждем когда завершится текущий batch.
Как узнать что batch завершился?
В spark есть интерфейс StreamingQueryListener, который сообщает о том что batch выполнен. Важно он Асинхронный.
Получив сигнал от StreamingQueryListener о том что batch закончился, выключаем Stream -
stream.stop
.
Для этого подойдет lock/sempahore/queue/rxStream или любая другая технология для синхронизации в многопоточной среде.
Реализация для инициализации ShutdownHook. Который будет ждать пока stream не закончиться.
class GracefulStopOnShutdownListener(streams: StreamingQueryManager) extends StreamingQueryListener {
private val runningQuery = new ConcurrentHashMap[UUID, (Runnable, SynchronousQueue[Boolean])]()
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
val stream = streams.get(event.id)
val stopSignalChannel = new SynchronousQueue[Boolean]()
val shutdownHook: Runnable = () => {
if (stream.isActive) {
val stopSignal = true
stopSignalChannel.put(stopSignal)
stream.stop()
stream.awaitTermination()
}
}
ShutdownHookManager.get().addShutdownHook(shutdownHook, 100, 20, TimeUnit.MINUTES)
runningQuery.put(stream.id, (shutdownHook, stopSignalChannel))
log.info(s"Register shutdown hook for query ${event.id}")
}
* runningQuery: Map[stream -> Shutdown Hook + Стоп Сигнал].
Структура которая сохраняет каждый запущенный Stream, что бы процесс работал независимо для каждого Structured Stream на Spark driver.
* SynchrnousQueue.put - чтобы заблокировать ShutdownHook до тех пор пока не закончиться batch. Этот метод положит элемент в очередь и будет ждать до тех пор, пока другой Thread его не заберет.
* Важно использовать здесь не java/scala shutdown hook. А Hadoop ShutdownHookManager. Лучше бы Spark ShutdownHookmanager, но он не имеет public API. Но Spark ShutdownHookmanager регистриурет себя в hadoop с приоритетом 40. Так что нам необходимо использовать hadoop ShutdownHook, с приоритетом выше 40. Это важно, если указать приоритет ниже чем у spark shutdown hooks, то spark context выключиться раньше чем наш stream, что приведет к падению Stream и его текущих задач.
Дальше необходимо, что бы каждый batch отпускал наш shutdown hook. Но не блокировался, если не было sigterm signal.
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
val (_, stopSignalChannel) = runningQuery.get(event.progress.id)
stopSignalChannel.poll()
}
* SynchronousQueue.poll - не блокирующий, или заберет елемент из очереди или получит null. Это разблокирует Put из Shutdown Hook, но не заблокирует Listener если не было sigterm.
В целом все готово уже и будет работать, но есть не учтенный сценарий.
* Stream стартанул - зарегистрировал hook, но упал с ошибкой или был принудительно остановлен до того как пришел sigterm.
Значит, нужно удалить ShutdownHook в этом случае. Тут нам поможет последний метод onQueryTerminated.
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
val (shutdownHook, stopSignalChannel) = runningQuery.remove(event.id)
if (!ShutdownHookManager.get().isShutdownInProgress) ShutdownHookManager.get().removeShutdownHook(shutdownHook)
stopSignalChannel.poll()
}
* удаляем наш stream из Listener, чтобы не ресурсы почистить.
* Удаляем ShutodownHook, чтобы он нам позже не заблокировал приложение при sigterm
* На всякий случай, отпускаем синхронизацию. Вдруг кто то успел заблокироваться на нее, до того как мы удалили хук - но после того как последний batch был закончен.
Заключение
Тот же github gist что содержит решение, содержит Main class который позволяет воспроизвести ошибку и визуализировать решение используя spark в local mode.