Pull to refresh
1303.4
OTUS
Цифровые навыки от ведущих экспертов

Spark Essentials: Руководство по настройке и запуску проектов Spark с помощью Scala и sbt

Reading time18 min
Views896
Original author: Suffyan Asad

Введение 

В этой статье представлено подробное руководство по инициализации проекта Spark с помощью Scala Build Tool (SBT). Это руководство охватывает все этапы процесса, включая создание проектов, управление зависимостями, локальное тестирование, компиляцию и развертывание проекта Spark на кластере.

Это руководство было тщательно продумано, чтобы помочь новичкам, так что даже те, кто только начинает изучать Spark, смогут легко ему следовать. Более того, эта статья послужит ценным пособием для тех, кто хочет создавать, тестировать и развертывать пакетные задания Spark в среде JVM.

Цель этой статьи — предоставить вам подробное руководство по инициализации проекта Spark, в котором будут подробно рассмотрены все ключевые идеи. В руководстве будет рассмотрен пошаговый процесс создания проектов с помощью Scala Build Tool (SBT), а также продемонстрировано управление зависимостями, локальное тестирование, компиляция и развертывание проекта Spark на кластере.

К концу этого руководства читатели должны научиться:

  • Настраивать проект Spark под среду JVM с помощью SBT

  • Тестировать проект локально

  • Компилировать проект в JAR-файл

  • Развертывать и запускать JAR-файла на кластере

Если вам интересно узнать, как настраивать проекты PySpark, рекомендую ознакомиться с моей исчерпывающей статьей на эту тему. Найти статью можно здесь:

Spark Essentials: Руководство по настройке, упаковке и запуску проектов PySpark

Без лишних отлагательств, давайте начнем наше путешествие с инициализации проекта Spark!

Установка предварительных зависимостей: Java, Scala и sbt

Прежде чем начать, убедитесь, что в системе, в которой вы работаете, установлены все необходимые инструменты и языки. Мы будем использовать:

Java 1.8 JDK:

Я предпочитаю использовать дистрибутив Java Developmnt Kit (JDK) Amazon Corretto 8. Его можно скачать для Windows, Mac и Linux, перейдя по этой ссылке:

Загрузка Amazon Corretto 8

Scala 2.12.18:

Scala 2.12.18 можно получить, перейдя по ссылке:

Scala 2.12.18

Scala Built Tool (sbt):

Последняя версия sbt на момент написания этой статьи — 1.9.8. sbt можно загрузить с сайта:

Скачать sbt

Специфика установки этих зависимостей на различных платформах, Windows, Linux или MacOS, здесь рассматриваться не будет, поскольку это выходит за рамки данной статьи. Обязательно следуйте инструкциям, приведенным для платформы, на которой вы работаете. После установки, чтобы убедиться, что все настроено правильно, можно выполнить в терминале следующие команды:

java -version
scala -version
sbt -version

Команда для проверки версий java, scala и sbt

И вывод вышеуказанных команд должен быть следующим:

Выводы команд для проверки версий java, scala, sbt
Выводы команд для проверки версий java, scala, sbt

Пример задачи с Scala и sbt

Теперь, когда все установлено, мы можем начать.

Scala Built Tool (sbt) — это инструмент для сборки и управления зависимостями для проектов на Scala и Java. Среди других подобных инструментов для проектов на Java можно выделить Maven, Ant и т. д. Цитата с сайта sbt:

sbt создан для проектов на Scala и Java. Его выбирают 93,6% Scala-разработчиков (2019).

sbt, простой инструмент для сборки

В этом разделе sbt используется для настройки проекта Spark. Для начала создайте новую папку spark-example. Scala Built Tool (SBT) полагается на соглашение, придерживаясь структуры каталогов, подобной Maven. Так, по умолчанию файлы с кодом находятся в папке src/main/scala, а тесты — в папках src/test/scala. Итак, внутри папки создайте spark-example следующие вложенные каталоги:

Структура каталогов проекта

Структуру каталогов можно задать с помощью команд:

mkdir spark_example
cd spark_example
mkdir src
mkdir src/main
mkdir src/main/scala
mkdir src/main/scala/com
mkdir src/main/scala/com/example
mkdir src/main/scala/com/example/sparktutorial
mkdir src/test
mkdir src/test/scala
mkdir src/test/scala/com
mkdir src/test/scala/com/example
mkdir src/test/scala/com/example/sparktutorial
mkdir project
mkdir data

Если вам интересно узнать больше о структуре каталогов sbt, то вы можете почитать этот раздел документации:

Справочное руководство sbt — Структура каталогов

Следующий шаг — создание файла build.sbt. Файл build.sbt в определяет настройки и зависимости проекта Scala. Кроме того, он содержит информацию о сборке, включая имя проекта, версию, версию Scala, а также дополнительные настройки по сборке проекта, запуску тестов и т. д.

Создайте файл build.sbt в корне папки spark-example и добавьте в него следующее содержимое:

scalaVersion := "2.12.18"

name := "sparktutorial"
organization := "com.example"
version := "1.0"

В этом файле build.sbt нашему проекту присвоено имя sparkexample, а организации — com.tutorial. Он также настраивает Scala версии 2.12.18 для компиляции проекта.

Чтобы создать файл build.sbt, выполните следующие команды:

echo '
scalaVersion := "2.12.18"

name := "sparktutorial"
organization := "com.example"
version := "1.0"
' > build.sbt

Затем в папке project создайте файл build.properties и добавьте в него следующее:

sbt.version=1.9.8

Этот файл задает версию sbt 1.9.8. Это необязательный шаг, но задать версию sbt не помешало бы, так как в противном случае будет использоваться установленная версия sbt, а это может привести к сбою сборки из-за несовместимостей.

Выполните следующие команды, чтобы создать в папке properties файл build.properties:

echo 'sbt.version=1.9.8' > project/build.properties

После этого создайте простой файл Scala-кода, содержащий main-функцию, чтобы убедиться, что все настроено правильно, успешно компилируется и запускается. Создайте файл с именем SparkExampleMain.scala в папке src/main/scala и добавьте в него следующее содержимое:

package com.example.sparktutorial

object SparkExampleMain extends App {
    println("Hello world")
}

Код просто выводит на консоль сообщение Hello world. Объект Main расширяет трейт App, который

Можно использовать для быстрого превращения объектов в исполняемые программы

и

Не требует явного метода main. Вместо этого все тело класса становится "main-методом".

Позже этот файл будет изменен для инициализации задачи Spark, и он вместе с другими файлами с кодом будет представлять из себя простой пример задачи.

Файл с кодом можно создать с помощью следующей команды:

echo '
package com.example.sparktutorial

object SparkExampleMain extends App {
println("Hello world")
}' > src/main/scala/com/example/sparktutorial/SparkExampleMain.scala

После выполнения этих шагов в каталоге spark_example у нас будут следующие файлы и папки:

Файлы и папки в каталоге spark_example
Файлы и папки в каталоге spark_example

Эти шаги также можно выполнить, используя функции IDE, и, как мы увидим позже, в IDE может быть реализована поддержка Scala и sbt, которая может сделать этот процесс проще и удобнее. Но этот первый раздел призван показать вам, как это делать самостоятельно, и поэтому демонстрирует эти шаги из командной строки. Скелет проекта с простым кодом на Scala готов, и следующим шагом будет компиляция с помощью sbt и запуск.

Чтобы запустить сервер и консоль sbt, запустите sbt в командной строке:

запуск консоли sbt
запуск консоли sbt

Проект можно скомпилировать с помощью команды compile. После этого мы запустим объект Main, выполнив команду run:

Скомпилируйте и запустите проект
Скомпилируйте и запустите проект

Чтобы выйти из консоли sbt, выполните команду exit.

После успешной компиляции и запуска кода, мы наконец можем немного пописать Spark-код!!! Во-первых, нам нужны данные, и для этого демо мы используем данные TLC Trip Record за октябрь 2023 года.

Пример данных — TLC Trip Record Data

TLC Trip Record Data — это полный набор данных, содержащий подробную информацию о поездках на такси в Нью-Йорке. Данные можно загрузить с сайта.

В этом руководстве мы будем использовать записи о поездках на желтом такси (Yellow Taxi Trip) за октябрь 2023 года. Загрузите файл parquet и поместите его в папку data внутри папки проекта spark_example.

Написание простой задачи Spark

Затем обновите файл build.sbt, чтобы включить Spark 3.5.0 в качестве зависимости. Для этого будут добавлены два пакета, spark-core и spark-sql. На сайте репозитория Maven на вкладке SBT в каждой библиотеке содержится код для добавления библиотеки в качестве зависимости в файл build.sbt:

Добавление зависимостей - код sbt в репозиторий Maven для spark-core
Добавление зависимостей - код sbt в репозиторий Maven для spark-core

Spark core: https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0

Spark SQL: https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.12/3.5.0

В файл build.sbt необходимо добавить следующий код:

libraryDependencies ++= Seq(
	"org.apache.spark" %% "spark-core" % "3.5.0",
	"org.apache.spark" %% "spark-sql" % "3.5.0"
)

fork := true

В build.sbt добавлено еще одно выражение. Fork := true требуется для запуска проекта из командной строки, иначе после завершения работы программа завершится с исключением. При этом код запускается в новом инстансе JVM. Более подробное обсуждение можно найти в этих ответах на StackOverflow:

Затем добавьте файл Scala под названием package.scala, содержащий package object. Внутри package object нужно написать функцию для создания сессии Spark, а также функцию для парсинга аргументов командной строки. Код выглядит следующим образом:

package com.example


import org.apache.spark.sql.SparkSession


package object sparktutorial {
  def createSparkSession(appName: String, isLocal: Boolean): SparkSession = {
    if (isLocal) {
      SparkSession
        .builder()
        .config("spark.sql.caseSensitive", value = true)
        .config("spark.sql.session.timeZone", value = "UTC")
        .config("spark.driver.memory", value = "8G")
        .appName(appName)
        .master("local[*]")
        .getOrCreate()
    } else {
      SparkSession
        .builder()
        .config("spark.sql.caseSensitive", value = true)
        .config("spark.sql.session.timeZone", value = "UTC")
        .appName(appName)
        .getOrCreate()
    }
  }


	def parseArgs(args: Array[String]): (String, String) = {
    val parsedArgs = args.map(arg => arg.split(" ")).map(splitArg => splitArg(0) -> splitArg(1)).toMap
    val inputPath = parsedArgs("--input-path")
    val outputPath = parsedArgs("--output-path")


    println(s"input path: ${inputPath}")
    println(s"output path: ${outputPath}")


    (inputPath, outputPath)
  }
}

Package object в Scala — это специальный объект, который объявляется в файле package.scala внутри каждого пакета. Элементы, объявленные в package object, доступны в пакете так же, как если бы они были членами самого пакета, не требуя оператора import.

Это полезная функция для инкапсуляции утилит, связанных с пакетом, обеспечивающая удобное место для размещения общих констант, псевдонимов типов, неявных преобразований и т. д. Подробнее об package object в Scala можно узнать из документации:

Определения верхнего уровня в пакетах

Наш package object содержит функцию createSparkSession, задача которой заключается в создании сессии Spark. Эта функция доступна в пакете com.example.sparktutorial без необходимости импорта.

Кроме того, функция parseArgs парсит аргументы командной строки. Задача принимает два аргумента: первый — входной путь для чтения данных, второй — выходной путь для записи данных.

Затем добавьте еще один файл Analysis.scala в тот же пакет по адресу src/main/scala/com/example/sparktutorial со следующим кодом:

package com.example.sparktutorial


import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{avg, col, count, lit}


object Analysis {
  def calculateAverageTipByPickupLocation(data: DataFrame): DataFrame = {
    val result = data
      .groupBy("PULocationID")
      .agg(
        avg("tip_amount"),
        count(lit(1)).as("num_rows")
      )


    result.sort(col("num_rows").desc).show(truncate = false, numRows = 10)
    result
  }
}

Код содержит функцию для поиска средней суммы чаевых для каждого места подбора пассажиров (PULocationID). Помимо возврата данных, код также отображает места подбора, в которых средняя сумма чаевых была наибольшей. Остается только прочитать данные, вызвать функцию для анализа и записать данные в выходную локацию. Путь к данным и местоположение вывода задаются аргументами командной строки.

Наконец, обновите главный объект SparkExampleMain:

package com.example.sparktutorial


import com.example.sparktutorial.Analysis.calculateAverageTipByPickupLocation


object SparkExampleMain extends App {
  val spark = createSparkSession("Spark test", isLocal = true)


  val (inputPath, outputPath) = parseArgs(args = args)
  val data = spark.read.parquet(s"${inputPath}/*.parquet")


  val analysisResult = calculateAverageTipByPickupLocation(data = data)


  analysisResult.write.option("header", "true").csv(outputPath)
}

Компиляция и выполнение задачи в локальном режиме

После внесения этих изменений в код следующим шагом будет компиляция и запуск проекта. Не забудьте выйти из консоли sbt и перезапустить ее, чтобы она зафиксировала обновления в файле build.sbt. В качестве альтернативы для сборки и запуска кода можно использовать команду sbt run, которая также выйдет из консоли sbt сразу после завершения запуска.

Также нам понадобятся некоторые аргументы командной строки, поэтому полная команда выглядит так: sbt "run -input-path data --output-path output". Команда выводит переданные аргументы, а также вывод задачи и логи Spark. Она запускает задачу в локальном режиме.

Локальный режим Spark позволяет запускать программы Spark на одной машине, используя зависимости Spark (spark-core и spark-sql), включенные в проект. Локальный режим использует ресурсы машины, на которой он запущен, и выполняет все задачи в процессе Driver. Этот режим часто используется для разработки, тестирования и отладки, так как он не требует распределенного кластера, что позволяет легко тестировать Spark-приложения на отдельных машинах.

Ранее, при создании локальной сессии Spark, для мастера было установлено значение local[*], что указывает на локальный режим, использующий все доступные ядра. Поскольку локальный режим использует driver для выполнения всех задач, память driver’а также установлена на 8 ГБ с помощью конфигурации spark.driver.memory. Ничего из этого не требуется при выполнении задачи на кластере, так как мастер предоставляется командой spark-submit, либо берется из кластера Spark, на который отправляется задача, если кластер настроен таким образом. Конфигурации исполнителей также могут быть предоставлены командой spark-submit.

Чтобы узнать больше о локальном режиме Spark, почитайте следующую (подробную) статье:

Spark local — Внутренние компоненты Spark Core

Перед запуском убедитесь, что файл с данными находится в папке data внутри папки проекта.

Выходные данные задачи:

Запуск задачи — выведенные переданные параметры
Запуск задачи — выведенные переданные параметры

Задача также выводит 10 лучших точек подбора по среднему размеру чаевых:

10 лучших точек подбора по среднему размеру чаевых
10 лучших точек подбора по среднему размеру чаевых

Несмотря на то, что большинство из них находятся на уровне INFO, логи Spark отображаются как ошибки. Это происходит потому, что по умолчанию Spark выводит свои логи в SYSTEM_ERR.

Кроме того, логов Spark ну уж очень много. Их можно ограничить, настроив Spark на отображение только сообщений выше уровня WARN. Spark 3.5.0 использует log4j2 для логирования. Поэтому это поведение по умолчанию можно изменить, предоставив файл log4j2.properties с соответствующим содержимым:

rootLogger.level = warn
rootLogger.appenderRef.stdout.ref = console


appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

Этот файл выполняет две функции. Во-первых, он настраивает уровень логгера на WARN или выше, предотвращая вывод логов Spark уровня INFO. Во-вторых, он изменяет цель консоли на SYSTEM_OUT, заменяя стандартное SYSTEM_ERR, что предотвращает появление логов Spark в виде [error].

Добавьте этот файл с именем log4j2.properties в каталог src/main/resources и повторно запустите проект. Вывод будет следующим:

Обновленный вывод после добавления файла log4j2.properties

Перед повторным запуском не забудьте либо удалить папку output, либо указать другое значение параметра output-path, иначе задача завершится неудачей.

Теперь в логах тихо! При необходимости файл log4j2.properties можно настроить так, чтобы выводить на консоль только логи уровня INFO, а остальные направлять в файл. Представленный здесь файл log4j2.properties был создан с помощью следующей ссылки.

Отображение пользовательского интерфейса Spark

Чтобы получить доступ к пользовательскому интерфейсу Spark добавьте Thread.*sleep*(100000000) в конец функции main, а затем откройте localhost:4040 в браузере:

Пользовательский интерфейс Spark 
Пользовательский интерфейс Spark 

Информацию о среде можно просмотреть на вкладке Environment. Мы видим версии Java и Scala и то, что и мастер является  local[*], и spark.app.id также является локальным.

Среда Spark
Среда Spark

Доступ к планам запросов можно получить на вкладке SQL/DataFrame:

План запросов Spark
План запросов Spark

Компиляция и отправка в Spark

Следующий шаг — компиляция JAR и запуск на кластере Spark. Прежде чем подготовить JAR-файл, давайте настроим локальный кластер Spark с помощью docker compose.

Я адаптировал отличный пример создания трехузлового кластера Spark (1 мастер, 2 рабочих узла) для использования Spark 3.5.0. Его можно взять из следующего репозитория.

Ветка — adapt-for-spark-3.5.0. Перед запуском убедитесь, что в вашей системе установлено следующее:

После этого можно создать трехузловой кластер Spark, выполнив следующие команды:

git clone https://github.com/SA01/docker-spark-cluster.git
cd docker-spark-cluste
git checkout adapt-for-spark-3.5.0
docker build -t spark-docker-image-3.5.0:latest .
docker-compose up

Кластер запущен, и мастера и рабочих можно увидеть на рабочем столе Docker или выполнив команду docker ps:

Мастер и рабочие узлы Spark в виде контейнеров Docker
Мастер и рабочие узлы Spark в виде контейнеров Docker

Пользовательский интерфейс кластера Spark доступен по адресу http://localhost:9000:

Пользовательский интерфейс кластера Spark
Пользовательский интерфейс кластера Spark

В папке репозитория есть две папки, которые монтируются на узле-мастере и могут быть использованы для размещения данных и JAR-файлов в кластере:

  • apps в /opt/spark-apps

  • data в /opt/spark-data

URL мастера Spark очень важен, поскольку он передается команде spark-submit, чтобы предоставить его сессии Spark. Этот URL меняется каждый раз, когда кластер Docker compose останавливается и перезапускается, поэтому не забывайте записывать его при каждом запуске кластера.

Упаковка и выполнение на тестовом кластере

На следующем этапе задача упаковывается в Uber JAR или Fat JAR, который включает все классы и зависимости. Для этого используется плагин sbt под названием sbt-assembly. Однако перед этим задачу необходимо подготовить. Для этого необходимо внести следующие изменения:

  • Обновите вызов функции createSparkSession в объекте SparkExampleMain, установив значение isLocal в false.

  • В том же объекте SparkExampleMain удалите Thread.sleep

Следующим шагом будет подключение плагина sbt-assembly. Его можно получить, следуя процедуре, указанной на его странице Github.

Во-первых, добавьте в папку project файл plugins.sbt со следующим содержимым:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5")

Затем обновите файл build.sbt следующим образом:

scalaVersion := "2.12.18"


name := "sparktutorial"
organization := "com.example"
version := "1.0"


libraryDependencies ++= Seq(
	"org.apache.spark" %% "spark-core" % "3.5.0" % "provided",
	"org.apache.spark" %% "spark-sql" % "3.5.0" % "provided"
)


fork := true


assembly / assemblyMergeStrategy := {
    case PathList("META-INF", "services", _*) => MergeStrategy.concat
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
}

Как видите, в стратегии слияния произошли некоторые изменения. Это связано с тем, что стратегия слияния fat jar обновлена для обработки проблем слияния из-за файлов в каталоге META-INF. Дальнейшие подробности выходят за рамки этой статьи, но если вам интересно, пройдитесь по следующим ссылкам, которые я использовал для решения проблем со сборкой при подготовке кода для этой статьи:

Во-вторых, spark-core и spark-sql помечены как provided, чтобы исключить их из сборки. Это сделано потому, что JAR будет отправлен на кластер и будет использовать Spark, присутствующий и работающий на этом кластере, поэтому их включение излишне. Однако если есть дополнительные зависимости, их нужно включить в fat JAR, чтобы код мог их использовать. Обратите внимание, что после того, как мы пометим spark-core и spark-sql как provided, JAR не сможет быть запущен напрямую в локальном режиме из-за отсутствия этих зависимостей. Для этого удалите provided, соберите сборку заново и запустите.

Кроме того, в установите  main-классе SparkExampleMain isLocal = false в вызове функции createSparkSession, которая создает сессию Spark.

После этого выполните команду sbt clean, а затем команду sbt assembly, чтобы собрать FAT JAR.

Команды для создания fat JAR
Команды для создания fat JAR

При сборке JAR с помощью sbt assembly в папке target/scala-2.12 создается файл под названием sparktutorial-assembly-1.0.jar. Скопируйте этот файл в каталог apps в папке проекта docker-compose. Также скопируйте данные из каталога data и файл log4j2.properties, чтобы генерировать меньше логов Spark.

Затем подключитесь к мастер-узлу с помощью команды docker exec, имя образа - docker-spark-cluster-spark-master-1:, а затем запустите задачу spark с помощью команды spark-submit:

cp target/scala-2.12/sparktutorial-assembly-1.0.jar <path to docker spark cluster repository>/apps/
cp <path to data>/yellow_tripdata_2023-10.parquet <path to docker spark cluster repository>/data/
cp src/main/resources/log4j2.properties <path to docker spark cluster repository>/apps/
docker exec -it docker-spark-cluster-spark-master-1 /bin/bash
/opt/spark/bin/spark-submit --class com.example.sparktutorial.SparkExampleMain --files /opt/spark-apps/log4j2.properties --conf "spark.driver.extraJavaOptions=-Dlog4j2.configurationFile=file:/opt/spark-apps/log4j2.properties" --master <spark master URL> /opt/spark-apps/sparktutorial-assembly-1.0.jar --input-path /opt/spark-data --output-path /opt/spark-apps/output

Убедитесь, что вы получили последний URL мастера Spark с localhost:9000, чтобы избежать исключения "failed to connect to master". После этого задача должна запуститься и успешно завершиться:

Выходные данные задачи:

Чтобы настроить Spark на использование предоставленного файла log4j2.properties, воспользуйтесь этим вопросом на Stack Overflow

Как использовать определенный файл log4j2.properties в моем приложении / драйвере Spark?

Задача также отображается в пользовательском интерфейсе мастера Spark, сначала как запущенная, а затем как завершенная. Во время выполнения задачи пользовательский интерфейс можно просматривать по адресу localhost:4040.

Пользовательский интерфейс мастера Spark - Выполнение задачи
Пользовательский интерфейс мастера Spark - Выполнение задачи
Пользовательский интерфейс мастера Spark - Выполненная задача
Пользовательский интерфейс мастера Spark - Выполненная задача

Папку с результатами можно найти по адресу /opt/spark-apps/output на мастер-узле. Поскольку это смонтированная папка, доступ к ней можно получить и с машины-хоста.

На этом основной процесс создания, упаковки и запуска задачи Spark с помощью sbt завершен. Детали процесса могут отличаться в зависимости от дополнительных зависимостей, которые должны быть упакованы вместе с JAR, и особенностей среды выполнения. Например, отправка JAR-файла Spark на кластере Amazon Elastic MapReduce в качестве одного из шагов требует настройки в соответствии с документацией EMR. В этой статье представлен общий обзор шагов. Конкретные шаги могут отличаться в зависимости от множества факторов.

Более того, в реальных сценариях процесс сборки будет интегрирован в конвейер CI/CD. Отправка задач будет управляться инструментом оркестровки, например Apache Airflow. Также очень важную роль играют юнит-тесты. Для написания и выполнения тестов можно использовать такой фреймворк, как scalatest.

Настройка проекта Spark в IDE IntelliJ IDEA

Использование такой IDE, как IntelliJ IDEA с плагинами Scala и sbt, может похвастаться более плавной настройки и запуском проекта благодаря интеграции с sbt и Scala. Если вы хотите использовать ее, скачайте и установите следующее:

Затем откройте IntelliJ IDEA и выберите New Project, чтобы открыть мастер создания нового проекта.

Создание нового проекта
Создание нового проекта

Далее в окне нового проекта выберите местоположение проекта, версию Scala (2.12.18), версию JDK (1.8) и тип проекта sbt.

Новый проект — установите версии JDK, sbt, Scala и другие детали
Новый проект — установите версии JDK, sbt, Scala и другие детали

Это создаст новый проект со структурой папок, файлами build.sbt, project/build.properties и project/plugins.sbt.

Содержание только что созданного проекта
Содержание только что созданного проекта

Кроме того, в IDEA встроены терминал и консоль sbt. Далее скопируйте код и конфигурации из примера задачи, приведенного выше, включая содержимое файла build.sbt. В файле plugins.sbt сохраните плагин sbt-ide-settings, добавленный IDE. После внесения изменений в файл build.sbt и другие конфигурационные файлы sbt проект можно легко обновить, нажав кнопку reload all sbt projects.

После копирования всего этого кода проект можно выполнить локально. Для этого удалите конфигурацию provided для spark-core и spark-sql в файле build.sbt. Обратите внимание, что это лишь демо, а в реальных проектах правильным способом локального тестирования кода является использование юнит-тестов. Узнать подробнее можно на сайте.

Чтобы запустить задачу локально, убедитесь, что все конфигурации sbt были подхвачены и все зависимости загружены. Также установите в функции createSparkSession значение 'isLocal' в 'true'. Затем выберите в меню сборки 'Rebuild Project'.

Rebuild project - подготовка к локальному запуску задачи
Rebuild project - подготовка к локальному запуску задачи

Затем нажмите на зеленую кнопку Run рядом с объявлением объекта SparkExampleMain в файле SparkExampleMain.scala и выберите ‘Run SparkExampleMain’.

Запуск проекта
Запуск проекта

Это создает конфигурацию выполнения, а также запускает задачу. Первое выполнение не удается, потому что не были переданы аргументы. Чтобы передать аргументы, выберите конфигурацию запуска в меню запуска, которое по умолчанию находится в правом верхнем углу пользовательского интерфейса. В открывшемся меню выберите "Edit Configuration".

Обратите внимание, что вместо запуска main-функции из этого меню можно создать конфигурацию запуска, а также конфигурации запуска для выполнения юнит-тестов.

Доступ к конфигурациям запуска
Доступ к конфигурациям запуска
Запуск конфигурации для локального выполнения задачи
Запуск конфигурации для локального выполнения задачи

Здесь в поле Program Arguments введите аргументы -input-path data --output-path output, нажмите Ok, а затем запустите задачу:

Результат локального выполнения тестовой задачи
Результат локального выполнения тестовой задачи

Далее, чтобы подготовить сборку, сначала верните изменения, сделанные для подготовки к локальному выполнению:

  • Установите значение isLocal равным false в вызове функции createSparkSession, чтобы создать сессию Spark для кластера.

  • Отметьте sprk-core и spark-sql, как provided в файле build.sbt.

Затем перезагрузите конфигурации проекта из sbt. После внесения изменений в файл build.sbt появится кнопка refresh, нажмите ее или выберите кнопку 'Reload all projects' из меню sbt в правой части интерфейса.

Далее откройте консоль sbt в нижней части пользовательского интерфейса. Это приведет к запуску оболочки sbt. Когда она загрузится, введите assembly, чтобы собрать сборку fat JAR, который можно запустить на кластере. Если консоль sbt уже запущена, остановите и перезапустите ее, чтобы получить последние конфигурации перед выполнением команды assembly.

Сборка fat JAR-файла
Сборка fat JAR-файла

После копирования JAR-файла и подключения к узлу-мастеру его можно отправить в кластер с помощью того же процесса и той же команды, которые были продемонстрированы ранее:

/opt/spark/bin/spark-submit --class com.example.sparktutorial.SparkExampleMain --files /opt/spark-apps/log4j2.properties --conf "spark.driver.extraJavaOptions=-Dlog4j2.configurationFile=file:/opt/spark-apps/log4j2.properties" --master <spark master URL> /opt/spark-apps/sparktutorial-assembly-1.0.jar --input-path /opt/spark-data --output-path /opt/spark-apps/output
Вывод задачи на кластере
Вывод задачи на кластере

На этом мы завершаем раздел о создании проекта с помощью IntelliJ IDEA.

Код примера

Код примера, о котором идет речь в статье, можно загрузить из этого Github-репозитория.

Заключение

В заключение хочу сказать, что в этой статье изложены основные принципы настройки, упаковки и запуска задач Spark с помощью sbt. Она проведет вас через каждый шаг: создание нового проекта, компиляция, упаковка, локальное тестирование и отправка задания Spark на кластер с помощью команды spark-submit. Ее целью является охватить весь процесс.

Имейте в виду, что некоторые этапы этого процесса могут отличаться в зависимости от среды выполнения (например, самоуправляемый кластер или Amazon EMR), а процесс настройки и отправки может потребовать некоторых корректировок. Однако принципы, изложенные здесь, в целом остаются неизменными. Главная цель этой статьи — дать вам знания, которые позволят модифицировать процесс в соответствии с вашими потребностями.

Пожалуйста, не стесняйтесь оставлять отзывы или задавать вопросы. Надеюсь, эта статья будет вам полезна.

Лучшие практики по Scala, подходы функционального программирования и самые мощные Scala-библиотеки можно изучить на практическом онлайн-курсе «Scala-разработчик».

Tags:
Hubs:
Total votes 8: ↑7 and ↓1+10
Comments0

Articles

Information

Website
otus.ru
Registered
Founded
Employees
101–200 employees
Location
Россия
Representative
OTUS