Запуск регулярных задач на кластере или как подружить Apache Spark и Oozie


    Давно уже витала в воздухе необходимость реализовать запуск регулярных Spark задач через Oozie, но всё руки не доходили и вот наконец свершилось. В этой статье хочу описать весь процесс, возможно она упростит Вам жизнь.


    Содержание


    • Задача
    • Оборудование и установленное ПО
    • Написание Spark задачи
    • Написание workflow.xml
    • Написание coordinator.xml
    • Размещение проекта на hdfs
    • Запуск регулярного выполнения
    • Заключение

    Задача


    Мы имеем следующую структуру на hdfs:


    hdfs://hadoop/project-MD2/data
    hdfs://hadoop/project-MD2/jobs
    hdfs://hadoop/project-MD2/status

    В директорию data ежедневно поступают данные и раскладываются по директориям в соответствие с датой. Например, данные за 31.12.2017 запишутся по следующему пути: hdfs://hadoop/project/data/2017/12/31/20171231.csv.gz.


    Формат входных данных

    • Разделитель строк: “\n”
    • Разделитель столбцов: “;”
    • Способ сжатия: gzip
    • Количество столбцов: 5 (device_id; lag_A0; lag_A1; flow_1; flow_2)
    • Заголовок в первой строке отсутствует
    • Данные за предыдущие сутки гарантированно записывается в соответствующую директорию в интервал времени с 00:00 до 03:00 следующих суток.

    В директории jobs располагаются задачи, которые имеют непосредственное отношение к проекту. Нашу задачу мы также будем размещать в этом каталоге.
    В директорию status должна сохраняться статистика по количеству пустых полей (со значением null) за каждый день в формате json. Например, для данных за 31.12.2017 должен будет появиться файл hdfs://hadoop/project-MD2/status/2017/12/31/part-*.json


    Примет json файла:

    {
       "device_id_count_empty" : 0, 
       "lag_A0_count_empty" : 10, 
       "lag_A1_count_empty" : 0, 
       "flow_1_count_empty" : 37, 
       "flow_2_count_empty" : 100
    }

    Оборудование и установленное ПО


    В нашем распоряжение есть кластер из 10 машин, каждая из которых имеет 8-и ядерный процессор и оперативную память в размере 64 Гб. Общий объём жёстких дисков на всех машинах 100 Тб. Для запуска задач на кластере отведена очередь PROJECTS.


    Установленное ПО:

    • Apache Hadoop 2.7.3 (Hortonworks)
    • Apache Spark 2.0.0
    • Apache Oozie 4.2.0
    • Scala 2.11.11
    • Sbt 1.0.2

    Написание Spark задачи


    Создадим структуру проекта, это можно очень просто сделать в любой среде разработки, поддерживающей scala или из консоли, как показано ниже:


    mkdir -p daily-statistic/project
    echo "sbt.version = 1.0.2" > daily-statistic/project/build.properties
    echo "" > daily-statistic/project/plugins.sbt
    echo "" > daily-statistic/build.sbt
    mkdir -p daily-statistic/src/main/scala

    Замечательно, теперь добавим плагин для сборки, для этого в файле daily-statistic/project/plugins.sbt добавляем следующую строку:


    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")

    Добавим описание проекта, зависимости и особенности сборки в файл daily-statistic/build.sbt:


    name := "daily-statistic"
    
    version := "0.1"
    
    scalaVersion := "2.11.11"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % "2.0.0" % "provided",
      "org.apache.spark" %% "spark-sql" % "2.0.0" % "provided"
    )
    
    assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

    Перейдём в директорию daily-statistic и выполним команду sbt update, для обновления проекта и подтягивания зависимостей из репозитория.
    Создаём Statistic.scala в директории src/main/scala/ru/daily


    Код задачи:

    package ru.daily
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.functions._
    
    object Statistic extends App {
    
       // инициализация   
       implicit lazy val spark: SparkSession = SparkSession.builder()
         .appName("daily-statistic")
         .getOrCreate()
    
       import spark.implicits._
    
       val workDir = args(0)
       val datePart = args(1)
       val saveDir = args(2)
    
       try {
    
          val date = read(s"$workDir/$datePart/*.csv.gz")
             .select(
                '_c0 as "device_id",
                '_c1 as "lag_A0",
                '_c2 as "lag_A1",
                '_c3 as "flow_1",
                '_c4 as "flow_2"
             )
    
             save(s"$saveDir/$datePart", agg(date))
    
       } finally spark.stop()
    
       // чтение исходных данных   
       def read(path: String)(implicit spark: SparkSession): DataFrame = {
    
          val inputFormat = Map("header" -> "false", "sep" -> ";", "compression" -> "gzip")
    
          spark.read
             .options(inputFormat)
             .csv(path)
       }
    
       // построение агрегата
       def agg(data: DataFrame):DataFrame = data
          .withColumn("device_id_empty", when('device_id.isNull, lit(1)).otherwise(0))
          .withColumn("lag_A0_empty", when('lag_A0.isNull, lit(1)).otherwise(0))
          .withColumn("lag_A1_empty", when('lag_A1.isNull, lit(1)).otherwise(0))
          .withColumn("flow_1_empty", when('flow_1.isNull, lit(1)).otherwise(0))
          .withColumn("flow_2_empty", when('flow_2.isNull, lit(1)).otherwise(0))
          .agg(
             sum('device_id_empty) as "device_id_count_empty",
             sum('lag_A0_empty) as "lag_A0_count_empty",
             sum('lag_A1_empty) as "lag_A1_count_empty",
             sum('flow_1_empty) as "flow_1_count_empty",
             sum('flow_2_empty) as "flow_2_count_empty"
          )
    
       // сохранение результата
       def save(path: String, data: DataFrame): Unit = data.write.json(path)
    
    } 

    Собираем проект командой sbt assembly из директории daily-statistic. После успешного завершения сборки в директории daily-statistic/target/scala-2.11 появится пакет с задачей daily-statistic-0.1.jar.


    Написание workflow.xml


    Для запуска задачи через Oozie нужно описать конфигурацию запуска в файле workflow.xml. Ниже приведён пример для нашей задачи:


    <workflow-app name="project-md2-daily-statistic" xmlns="uri:oozie:workflow:0.5">
    
       <global>
          <configuration>
             <property>
                <name>oozie.launcher.mapred.job.queue.name</name>
                <value>${queue}</value>
             </property>
          </configuration>
       </global>
    
       <start to="project-md2-daily-statistic" />
    
       <action name="project-md2-daily-statistic">
          <spark xmlns="uri:oozie:spark-action:0.1">
             <job-tracker>${jobTracker}</job-tracker>
             <name-node>${nameNode}</name-node>
             <master>yarn-client</master>
             <name>project-md2-daily-statistic</name>
             <class>ru.daily.Statistic</class>
             <jar>${nameNode}${jobDir}/lib/daily-statistic-0.1.jar</jar>
             <spark-opts>
                --queue ${queue}
                --master yarn-client
                --num-executors 5
                --conf spark.executor.cores=8
                --conf spark.executor.memory=10g
                --conf spark.executor.extraJavaOptions=-XX:+UseG1GC
                --conf spark.yarn.jars=*.jar
                --conf spark.yarn.queue=${queue}
             </spark-opts>
             <arg>${nameNode}${dataDir}</arg>
             <arg>${datePartition}</arg>
             <arg>${nameNode}${saveDir}</arg>
           </spark>
    
           <ok to="end" />
           <error to="fail" />
    
       </action>
    
       <kill name="fail">
          <message>Statistics job failed [${wf:errorMessage(wf:lastErrorNode())}]</message>
       </kill>
    
       <end name="end" />
    
    </workflow-app>

    В блоке global устанавливается очередь, для MapReduce задачи которая будет находить нашу задачи и запускать её.
    В блоке action описывается действие, в нашем случае запуск spark задачи, и что нужно делать при завершении со статусом ОК или ERROR.
    В блоке spark определяется окружение, конфигурируется задача и передаются аргументы. Конфигурация запуска задачи описывается в блоке spark-opts. Параметры можно посмотреть в официальной документации
    Если задача завершается со статусом ERROR, то выполнение переходит в блок kill и выводится кратное сообщение об ошибки.
    Параметры в фигурных скобках, например ${queue}, мы будем определять при запуске.


    Написание coordinator.xml


    Для организации регулярного запуска нам потребуется ещё coordinator.xml. Ниже приведён пример для нашей задачи:


    <coordinator-app name="project-md2-daily-statistic-coord" frequency="${frequency}" start="${startTime}" end="${endTime}" timezone="UTC" xmlns="uri:oozie:coordinator:0.1">
        <action>
            <workflow>
                <app-path>${workflowPath}</app-path>
                <configuration>
                    <property>
                        <name>datePartition</name>
                        <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, 'DAY'), "yyyy/MM/dd")}</value>
                    </property>
                </configuration>
            </workflow>
        </action>
    </coordinator-app>

    Здесь из интересного, параметры frequency, start, end, которые определяют частоту выполнения, дату и время начала выполнения задачи, дату и время окончания выполнения задачи соответственно.
    В блоке workflow указывается путь к директории с файлом workflow.xml, который мы зададим позднее при запуске.
    В блоке configuration определяется значение свойства datePartition, которое в данном случае равно текущей дате в формате yyyy/MM/dd минус 1 день.


    Размещение проекта на hdfs

    Как уже было сказано ранее нашу задачу мы будем размещать в директории hdfs://hadoop/project-MD2/jobs:


    hdfs://hadoop/project-MD2/jobs/daily-statistic/lib/daily-statistic-0.1.jar
    hdfs://hadoop/project-MD2/jobs/daily-statistic/workflow.xml
    hdfs://hadoop/project-MD2/jobs/daily-statistic/coordinator.xml
    hdfs://hadoop/project-MD2/jobs/daily-statistic/sharelib

    Здесь в принципе всё понятно без комментариев за исключением директории sharelib. В эту директорию мы положим все библиотеки, которые использовались в процессе создания зашей задачи. В нашем случае это все библиотеки Spark 2.0.0, который мы указывали в зависимостях проекта. Зачем это нужно? Дело в том, что в зависимостях проекта мы указали "provided". Это говорит системе сборки не нужно включать зависимости в проект, они будут предоставлены окружением запуска, но мир не стоит на месте, администраторы кластера могут обновить версию Spark. Наша задача может оказаться чувствительной к этому обновлению, поэтому для запуска будет использоваться набор библиотек из директории sharelib. Как это конфигурируется покажу ниже.


    Запуск регулярного выполнения


    И так всё готово к волнительному моменту запуска. Мы будем запускать задачу через консоль. При запуске нужно задать значения свойствам, которые мы использовали в xml файлах. Вынесем эти свойства в отдельный файл coord.properties:


    # описание окружения
    nameNode=hdfs://hadoop
    jobTracker=hadoop.host.ru:8032
    
    # путь к директории с файлом coordinator.xml
    oozie.coord.application.path=/project-MD2/jobs/daily-statistic
    
    # частота в минутах (раз в 24 часа)
    frequency=1440
    startTime=2017-09-01T07:00Z
    endTime=2099-09-01T07:00Z
    
    # путь к директории с файлом workflow.xml
    workflowPath=/project-MD2/jobs/daily-statistic
    
    # имя пользователя, от которого будет запускаться задача
    mapreduce.job.user.name=username
    user.name=username
    
    # директория с данными и для сохранения результата
    dataDir=/project-MD2/data 
    saveDir=/project-MD2/status
    jobDir=/project-MD2/jobs/daily-statistic 
    
    # очередь для запуска задачи
    queue=PROJECTS
    
    # использовать библиотеке из указанной директории на hdfs вместо системных
    oozie.libpath=/project-MD2/jobs/daily-statistic/sharelib
    oozie.use.system.libpath=false

    Замечательно, тереть всё готово. Запускаем регулярное выполнение командой:


    oozie job -oozie http://hadoop.host.ru:11000/oozie -config coord.properties -run

    После запуска в консоль выведется job_id задачи. Используя этот job_id можно посмотреть информацию о статусе выполнения задачи:


    oozie job -info {job_id}

    Остановить задачу:


    oozie job -kill {job_id}

    Если Вы не знаете job_id задачи, то можно найти его, показав все регулярные задачи для вашего пользователя:


    oozie jobs -jobtype coordinator -filter user={user_name}

    Заключение


    Получилось немного затянуто, но на мой взгляд лучше подробная инструкция чем квест-поиск по интернету. Надеюсь описанный опыт будет Вам полезен, спасибо за внимание!

    Поделиться публикацией
    Комментарии 3
      0
      Класс!
        0
        все хорошо, но что за задача то? зачем читаются эти файлы?
        как я понимаю Oozie будет запускать каждый джоб через spark-submit скрипт, т.е. стартовать на каждый джоб отдельный jvm, соответственно каждый джоб будет иметь свой sparkSession. многим задачам такое не подойдет…
          0
          В данном случае описана часть задачи для сбора статистики по приходящим данным. Основной упор я делал на описание запуска. Да у каждой задачи будет свой sparkSession.

        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

        Самое читаемое