Как стать автором
Обновить

Spark Structured Streaming graceful shutdown — Что в этом сложного и как это правильно делать?

Время на прочтение 7 мин
Количество просмотров 2.5K

Тем кто знает зачем graceful shutdown нужен и что такое Spark Structure Stream не понаслышке рекомендую сразу перейти к решению.

Введение

Что будет в статье:
1. Код с решемением проблемы
2. Описание проблемы
3. Мои пояснения о работе к Spark Structured streaming которые, как я считаю, важны для понимание проблемы.
4. В статье будут ссылки на источники на английском, и мой перевод к ним курсивом.

Чего не будет в статье:
1. Примеров кода и простых сценариев работы Spark или Spark Structured Streaming
2. Подробного пояснения работы Spark Structured Streams
3. Теории на тему потоковой обработки данных: delivery-semantic и обеспечение отказоустойчивости.

Так что если вы не знакомы с Spark на базовом уровне, то рекомендую ознакомиться заранее, иначе многие отсылки или примеры могут показаться не понятными или не очевидными.
Благо информации на эти темы хватает, это очень популярная область в Big data мире сейчас.

Определение проблемы и участников

Начну пожалуй с того, что такое Spark Structured Stream и graceful shutdown.

Graceful shutdown он же корректное/штатное/без ошибок завершение работы, с точным определением сложнее. Нет ну то есть все, я думаю, понимают что это значит, вот только как показывает практика все понимают что то свое. И это утверждение легко проверить просто вбив эти два английских слова в google. По этому для описании концепции в целом, я приведу тот вариант который мне по нраву, а для Structured Stream отдельно опишу чего именно хочется достичь. Graceful shutdown — это когда вы даете программе команду на выключение, долго ждете, а потом она выключается.

Spark Structured Stream, не путать с Spark Direct Stream. И так согласно официальной документации это масштабируемый и отказоустойчивый механизм потоковой обработки данных, построенный на поверх Spark SQL framework. Позволяет описывать обработку потока данных точно так же, как если бы это был статический набор данных. Я лично готов поспорить, насчет точно так же, но это уже для другой статьи.

Тут для не опытных Spark пользователей хочется заметить, что все spark параметры c префиксом spark.streaming.xxx, не имеют ни какого отношения к Structured Stream и являются частью DStream. В том числе и spark.streaming.stopGracefullyOnShutdown

В случае со Spark Structured Stream, хотелось бы обьяснить чего именно надо долго ждать - завершения уже начатого микро batch(группа/пакет) данных.

Для новичков в Spark, стоит обьяснить -
Что за группа и как это связанно с Stream -> Из документации

По умолчанию, Structured Streaming запросы выполняются маленькими(условно) пакетами данных которые превращают поток в серию маленьких задач, таким образом достигая задержками в 100 милисекунд и гарантирует отказоустойчивость на уровне exactly-once для каждой записи.
Для опытных пользователей замечу, что такое описание не применимо к Continuous streaming . И я его в статье рассматривать не буду, лично я, смысла не вижу делать там graceful shutdown.

Зачем вообще это нужно

  • Существует какая-то логика, привязанная к тому что stream жив или к его завершению. Простой пример, необходимо определять штатные перезапуски Structured Stream и его падения что бы настроить Alert manager, систему оповещения об ошибках.

  • Существует логика, возможно очень дорогостоящая в плане ресурсов, которая обрабатывает падения. Например возможно при падании происходят дубликаты записи или еще какие то негативные последствия, и их поиск требует full scan по базе или файловой системе. Естественно делать это на каждый перезапуск Structured Stream слишком расточительно.

  • Строгие Latency SLA, гарантии задержки данных. Которые нельзя нарушать в случае штатных операций - изменения схемы или смены конфигурации, банального restart на другом кластере. Например у меня SLA 2 минуты, это значит что данные, с момента их записи в источник до точки назначения должны добраться за 2 минуты.

В случае Spark Structured Stream, можно использовать одноминутные интервалы. Тогда 1 минуту spark собирает данные и до 1й минуты тратит на обработку. А теперь представим что в конце 2й минуты, минуты обработки, был вызван перезапуск по причине изменений. Если потушить приложение прямо так, не дождавшись окончания обработки - то всю работу прийдется выполнить повторно. Что естественно приведет к задержке нарушающей SLA.

Особенно это не желательно при изменениях схемы, ведь таких изменений может быть много.

Очень важно обратить внимание что отказоустойчивость Structured Stream не имеет никакого отношения к graceful shutdown. Отказоустойчивость достигается в результате того что все операции и сам Structured Stream являются идемпотентными. А значит можно безопасно стартовать после падений.

Особенно подчеркну что delivery semantic на уровне exactly-once гарантируется только для Spark Structured Stream операций и состояний. И если используемый Sink, точка назначения ваших данных, не поддерживает идемпотентность записи, то перезапуск упавшего Structured Stream может привести к дубликатам при записи. Например родной Kafka Sink, . Но в любом случае это проблема куда глубже чем только падание Structured Stream, потому что в таком случае дубликаты могут происходить и при повторении Spark Task, что вообще штатная для spark операция. Которая регулярно происходит.

При этом если вас интересует корректное завершение в случае sigterm сигнала, то обязательно нужно добиться завершения Structured Stream, до завершения spark context. Этот пример и будет разобран, как самый неочевидный частный случай shutdown.

И почему shutdown требует отдельной статьи, а не вызова метода stop

Ну начну с того, что за метод stop такой.

Из документации, существует 2 метода которые собственно отвечают за остановку Structured stream.

query.stop() // Остановить запрос(stream)
query.awaitTermination() // заблокировать пока запрос(stream) не прекратиться по причине вызова stop() или ошибки

Уже только по наличие 2ух методов можно предположить что stop() асинхронный. Что лично меня навело на мысль, что ни какой проблемы нет и graceful shutdown иметься из коробки.

Но я притворюсь что я умный и читал документацию, а не выгребал ошибки в production. Из документация

Останавливает выполнение этого запроса если он активен. Вызов блокируется до того как прекратиться выполнение query execution threads или по timeout.

И вот тут опытный Java/Scala разработчик по идее должен был напрячься.

Какой то Thread, уж больно напоминающий java thread по названию, прекратиться по сигналу. Да и в добавок к этому еще один, дополнительный, метод который ждет завершения. Это же thread.interupt() скажите вы, и будете правы - так что ни про какой graceful shutdown речь идти не может.

Но по правде сказать, я уже на столько свыкся с всякими Async/Rx/CSP концепциями, что мне в голову не пришло, Stream может быть реализованы как Thread(() => while(true) processNextBatch). В одном из самых популярных Big Data frameworks. И очень зря ведь это Spark, с кучей Hadoop внутри и ThreadLocal.

Так что я просто вызвал эти два метода, парой как предлагает официальная документация. И все внешне было именно так, как надо. Ни каких ошибок или warning логов. Приложение выключалось тихо и спокойно. Только очень уж быстро. За секунды, а мои batches - ни микро ни разу. А минут по 5.

Что-то явно пошло не так. Ковыряясь в исходниках spark легко узнать что:
Meтод stop() вызывает sparkSession.sparkContext.cancelJobGroup(runId.toString). cancelJobGroup отменяются все задачи которые spark запланировал или выполняет в ходе этого stream.

Идем на spark UI, и видим что на самом деле задачи были cancel job fail.

Решение

Вот github gist с полностью готовым решением, которое бежит у меня в production не первый год. Дальше в статье будет только описание работы этого кода.

Если вы не новичок в spark, возомжно, вам будет проще посмотреть сам код, чем читать описание.

Высокоуровневое описание решения:

  1. Запустить сигнал об остановке

  2. C этого момента, если наш stream активный - то ждем когда завершится текущий batch.

    1. Как узнать что batch завершился?
      В spark есть интерфейс StreamingQueryListener, который сообщает о том что batch выполнен. Важно он Асинхронный.

  3. Получив сигнал от StreamingQueryListener о том что batch закончился, выключаем Stream - stream.stop.

Для этого подойдет lock/sempahore/queue/rxStream или любая другая технология для синхронизации в многопоточной среде.

Реализация для инициализации ShutdownHook. Который будет ждать пока stream не закончиться.

class GracefulStopOnShutdownListener(streams: StreamingQueryManager) extends StreamingQueryListener {

  private val runningQuery = new ConcurrentHashMap[UUID, (Runnable, SynchronousQueue[Boolean])]()

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    val stream = streams.get(event.id)
    val stopSignalChannel = new SynchronousQueue[Boolean]()
    val shutdownHook: Runnable = () => {
      if (stream.isActive) {
        val stopSignal = true
        stopSignalChannel.put(stopSignal)
        stream.stop()
        stream.awaitTermination()
      }
    }
    ShutdownHookManager.get().addShutdownHook(shutdownHook, 100, 20, TimeUnit.MINUTES)
    runningQuery.put(stream.id, (shutdownHook, stopSignalChannel))
    log.info(s"Register shutdown hook for query ${event.id}")
  }

* runningQuery: Map[stream -> Shutdown Hook + Стоп Сигнал].
Структура которая сохраняет каждый запущенный Stream, что бы процесс работал независимо для каждого Structured Stream на Spark driver.
* SynchrnousQueue.put - чтобы заблокировать ShutdownHook до тех пор пока не закончиться batch. Этот метод положит элемент в очередь и будет ждать до тех пор, пока другой Thread его не заберет.
* Важно использовать здесь не java/scala shutdown hook. А Hadoop ShutdownHookManager. Лучше бы Spark ShutdownHookmanager, но он не имеет public API. Но Spark ShutdownHookmanager регистриурет себя в hadoop с приоритетом 40. Так что нам необходимо использовать hadoop ShutdownHook, с приоритетом выше 40. Это важно, если указать приоритет ниже чем у spark shutdown hooks, то spark context выключиться раньше чем наш stream, что приведет к падению Stream и его текущих задач.

Дальше необходимо, что бы каждый batch отпускал наш shutdown hook. Но не блокировался, если не было sigterm signal.

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    val (_, stopSignalChannel) = runningQuery.get(event.progress.id)
    stopSignalChannel.poll()
  }

* SynchronousQueue.poll - не блокирующий, или заберет елемент из очереди или получит null. Это разблокирует Put из Shutdown Hook, но не заблокирует Listener если не было sigterm.

В целом все готово уже и будет работать, но есть не учтенный сценарий.
* Stream стартанул - зарегистрировал hook, но упал с ошибкой или был принудительно остановлен до того как пришел sigterm.

Значит, нужно удалить ShutdownHook в этом случае. Тут нам поможет последний метод onQueryTerminated.

override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    val (shutdownHook, stopSignalChannel) = runningQuery.remove(event.id)
    if (!ShutdownHookManager.get().isShutdownInProgress) ShutdownHookManager.get().removeShutdownHook(shutdownHook)
    stopSignalChannel.poll()
  }

* удаляем наш stream из Listener, чтобы не ресурсы почистить.
* Удаляем ShutodownHook, чтобы он нам позже не заблокировал приложение при sigterm
* На всякий случай, отпускаем синхронизацию. Вдруг кто то успел заблокироваться на нее, до того как мы удалили хук - но после того как последний batch был закончен.

Заключение

Тот же github gist что содержит решение, содержит Main class который позволяет воспроизвести ошибку и визуализировать решение используя spark в local mode.

Теги:
Хабы:
0
Комментарии 0
Комментарии Комментировать

Публикации

Истории

Работа

Data Scientist
66 вакансий
Scala разработчик
22 вакансии

Ближайшие события

Московский туристический хакатон
Дата 23 марта – 7 апреля
Место
Москва Онлайн
Геймтон «DatsEdenSpace» от DatsTeam
Дата 5 – 6 апреля
Время 17:00 – 20:00
Место
Онлайн
PG Bootcamp 2024
Дата 16 апреля
Время 09:30 – 21:00
Место
Минск Онлайн
EvaConf 2024
Дата 16 апреля
Время 11:00 – 16:00
Место
Москва Онлайн