company_banner

Распределённый xargs, или Исполнение гетерогенных приложений на Hadoop-кластере

    enter image description here


    Привет, Хабр! Меня зовут Александр Крашенинников, я руковожу DataTeam в Badoo. Сегодня я поделюсь с вами простой и элегантной утилитой для распределённого выполнения команд в стиле xargs, а заодно расскажу историю её возникновения.


    Наш отдел BI работает с объёмами данных, для обработки которых требуются ресурсы более чем одной машины. В наших процессах ETL (Extract Transform Load) в ход идут привычные миру Big Data распределённые системы Hadoop и Spark в связке с OLAP-базой Exasol. Использование этих инструментов позволяет нам горизонтально масштабироваться как по дисковому пространству, так и по CPU/ RAM.


    Безусловно, в наших процессах ETL существуют не только тяжеловесные задачи на кластере, но и машинерия попроще. Широкий пласт задач решается одиночными PHP/ Python-скриптами без привлечения гигабайтов оперативной памяти и дюжины жёстких дисков. Но в один прекрасный день нам потребовалось адаптировать одну CPU-bound задачу для выполнения в 250 параллельных инстансов. Настала пора маленькому Python-скрипту покинуть пределы родного хоста и устремиться в большой кластер!


    Варианты манёвра


    Итак, мы имеем следующие входные условия задачи:


    1. Долгоиграющая (около одного часа) CPU-bound задача на языке Python.
    2. Требуется выполнить задачу 250 раз с различными входными параметрами.
    3. Результат выполнения получить синхронно, то есть запустить что-то, подождать, выйти с exit code согласно результатам.
    4. Минимальное время исполнения – считаем, что у нас имеется достаточное количество вычислительных ресурсов для параллелизации.

    Варианты реализации


    Один физический хост


    Тот факт, что запускаемые приложения являются однопоточными и не используют более 100% одного ядра CPU, даёт нам возможность бесхитростно осуществлять последовательность fork-/ exec-действий при реализации каждой задачи.


    C использованием xargs:


    commands.list:
    /usr/bin/uptime
    /bin/pwd
    
    krash@krash:~$ cat commands.list | xargs -n 1 -P `nproc` bash -c
    /home/krash
     18:40:10 up 14 days,  9:20,  7 users,  load average: 0,45, 0,53, 0,59

    Подход прост как валенок и хорошо себя зарекомендовал. Но в нашем случае мы его отметаем, поскольку при исполнении нашей задачи на машине с 32 ядрами результат мы получим через ~восемь часов, а это не соответствует формулировке «минимальное время исполнения».


    Несколько физических хостов


    Следующий инструмент, который можно применить для такого решения, – GNU Parallel. Помимо локального режима, схожего по функционалу с xargs, он имеет возможность выполнения программ через SSH на нескольких серверах. Выбираем несколько хостов, на которых будем исполнять задачи («облако»), делим список команд между ними и посредством parallel исполняем задачи.


    Создаём файл nodelist со списком машин и числом ядер, которые мы там можем утилизировать:


    1/  cloudhost1.domain
    1/  cloudhost2.domain

    Запускаем:


    commands.list:
    /usr/bin/uptime
    /usr/bin/uptime
    
    krash@krash:~$ parallel --sshloginfile nodelist echo "Run on host \`hostname\`: "\; {} ::: `cat commands.list`
    Run on host cloudhost1.domain:
     15:54  up 358 days 19:50,  3 users,  load average: 25,18, 21,35, 20,48
    Run on host cloudhost2.domain:
     15:54  up 358 days 15:37,  2 users,  load average: 24,11, 21,35, 21,46

    Однако этот вариант мы тоже отметаем в силу эксплуатационных особенностей: у нас нет сведений о текущей загрузке и доступности хостов кластера, и возможно попадание в ситуацию, когда параллелизация принесёт только вред, так как один из целевых хостов будет перегружен.


    Hadoop-based решения


    У нас есть проверенный инструмент BI, который мы знаем и умеем использовать, связка Hadoop+Spark. Чтобы втиснуть наш код в рамки кластера, есть два решения:


    Spark Python API (PySpark)

    Поскольку исходная задача написана на Python, а у Spark есть соответствующий API для этого языка, можно попробовать портировать код на парадигму map/reduce. Но и этот вариант нам пришлось отвергнуть, так как стоимость адаптации была неприемлемой в рамках этой задачи.


    Hadoop Streaming

    Map/reduce-фреймворк Hadoop позволяет выполнять задания, написанные не только на JVM-совместимых языках программирования. В нашем конкретном случае задача называется map-only – нет reduce-стадии, так как результаты выполнения не подвергаются какой-либо последующей агрегации. Запуск задачи выглядит так:


    hadoop jar $path_to_hadoop_install_dir/lib/hadoop-streaming-2.7.1.jar \
    -D mapreduce.job.reduces=0 \
    -D mapred.map.tasks=$number_of_jobs_to_run \
    -input hdfs:///path_for_list_of_jobs/ \
    -output hdfs:///path_for_saving_results \
    -mapper "my_python_job.py" \
    -file "my_python_job.py"

    Этот механизм работает следующим образом:


    1. Мы запрашиваем у Hadoop-кластера (YARN) ресурсы на выполнение задачи.
    2. YARN выделяет какое-то количество физических JVM (YARN containers) на разных хостах кластера.
    3. Между контейнерами делится содержимое файлов(а), лежащих в папке hdfs://path_for_list_of_jobs.
    4. Каждый из контейнеров, получив свой список строк из файла, запускает скрипт my_python_job.py и передаёт ему последовательно в STDIN эти строки, интерпретируя содержимое STDOUT как возвратное значение.

    Пример с запуском дочернего процесса:


    #!/usr/bin/python
    
    import sys
    import subprocess
    
    def main(argv):
       command = sys.stdin.readline()
       subprocess.call(command.split())
    
    if __name__ == "__main__":
       main(sys.argv)

    И вариант с «контроллером», запускающим бизнес-логику:


    #!/usr/bin/python
    
    import sys
    
    def main(argv):
       line = sys.stdin.readline()
       args = line.split()
       MyJob(args).run()
    
    if __name__ == "__main__":
       main(sys.argv)

    Этот подход наиболее полно соответствует нашей задаче, но имеет ряд недостатков:


    1. Мы лишаемся потока STDOUT выполняемой задачи (он используется в качестве канала коммуникации), а хотелось бы после завершения задачи иметь возможность посмотреть логи.
    2. Если в будущем мы захотим запускать ещё какие-то задачи на кластере, нам придётся делать для них wrapper.

    В результате анализа вышеописанных вариантов реализации мы приняли решение создать свой велосипед продукт.


    Hadoop xargs


    Требования, предъявляемые к разрабатываемой системе:


    1. Выполнение списка задач с оптимальным использованием ресурсов Hadoop-кластера.
    2. Условие успешного завершения – «все подзадачи отработали успешно, иначе – fail».
    3. Возможность сохранения подзадач для дальнейшего анализа.
    4. Опциональный перезапуск задачи при коде выхода, отличном от нуля.

    В качестве платформы для реализации мы выбрали Apache Spark – мы с ней хорошо знакомы и умеем её «готовить».


    Алгоритм работы:


    1. Получить из STDIN список задач.
    2. Сделать из него Spark RDD (распределённый массив).
    3. Запросить у кластера контейнеры для выполнения.
    4. Распределить массив задач по контейнерам.
    5. Для каждого контейнера запустить map-функцию, принимающую на вход текст внешней программы и производящую fork-exec.

    Код всего приложения до неприличия простой, и непосредственно интерес представляет, собственно, код функции:


    package com.badoo.bi.hadoop.xargs;
    
    import lombok.extern.log4j.Log4j;
    import org.apache.commons.exec.CommandLine;
    import org.apache.commons.lang.NullArgumentException;
    import org.apache.log4j.PropertyConfigurator;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import java.io.IOException;
    import java.util.Arrays;
    
    /**
    * Executor of one command
    * Created by krash on 01.02.17.
    */
    @Log4j
    public class JobExecutor implements VoidFunction<String> {
    
       @Override
       public void call(String command) throws Exception {
    
           if (null == command || command.isEmpty()) {
               throw new NullArgumentException("Command can not be empty");
           }
    
           log.info("Going to launch '" + command + "'");
           Process process = null;
           try {
    
               CommandLine line = CommandLine.parse(command);
    
               ProcessBuilder builder = getProcessBuilder();
               // quotes removal in bash-style in order to pass correctly to execve()
               String[] mapped = Arrays.stream(line.toStrings()).map(s -> s.replace("\'", "")).toArray(String[]::new);
               builder.command(mapped);
               process = builder.start();
    
               int exitCode = process.waitFor();
               log.info("Process " + command + " finished with code " + exitCode);
               if (0 != exitCode) {
                   throw new InstantiationException("Process " + command + " exited with non-zero exit code (" + exitCode + ")");
               }
           } catch (InterruptedException err) {
               if (process.isAlive()) {
                   process.destroyForcibly();
               }
           } catch (IOException err) {
               throw new InstantiationException(err.getMessage());
           }
       }
    
       ProcessBuilder getProcessBuilder() {
           return new ProcessBuilder().inheritIO();
       }
    }

    Сборка


    Сборка приложения производится стандартным для Java-мира инструментом – Maven. Единственное различие – в среде, в которой будет запускаться приложение. Если вы не используете Spark для вашего кластера, то сборка выглядит так:


    mvn clean install

    В этом случае получившийся JAR-файл будет содержать в себе исходный код Spark’а. В случае, если на машине, с которой производится запуск приложения, установлен клиентский код Spark, он должен быть исключён из сборки:


    mvn clean install -Dwork.scope=provided

    В результате такой сборки файл приложения будет существенно меньше (15 Кб против 80 Мб).


    Запуск


    Пусть у нас есть файл commands.list со списком заданий следующего вида:


    /bin/sleep 10
    /bin/sleep 20
    /bin/sleep 30

    Запускаем приложение:


    akrasheninnikov@cloududs1.mlan:~> cat log.log | /local/spark/bin/spark-submit --conf "spark.master=yarn-client" hadoop-xargs-1.0.jar
    17/02/10 15:04:26 INFO Application: Starting application
    17/02/10 15:04:26 INFO Application: Got 3 jobs:
    17/02/10 15:04:26 INFO Application: /bin/sleep 10
    17/02/10 15:04:26 INFO Application: /bin/sleep 20
    17/02/10 15:04:26 INFO Application: /bin/sleep 30
    17/02/10 15:04:26 INFO Application: Application name: com.badoo.bi.hadoop.xargs.Main
    17/02/10 15:04:26 INFO Application: Execution environment: yarn-client
    17/02/10 15:04:26 INFO Application: Explicit executor count was not specified, making same as job count
    17/02/10 15:04:26 INFO Application: Initializing Spark
    17/02/10 15:04:40 INFO Application: Initialization completed, starting jobs
    17/02/10 15:04:52 INFO Application: Command '/bin/sleep 10' finished on host bihadoop40.mlan
    17/02/10 15:05:02 INFO Application: Command '/bin/sleep 20' finished on host bihadoop31.mlan
    17/02/10 15:05:12 INFO Application: Command '/bin/sleep 30' finished on host bihadoop18.mlan
    17/02/10 15:05:13 INFO Application: All the jobs completed in 0:00:32.258

    После завершения работы через GUI YARN мы можем получить логи приложений, которые запускали (пример для команды uptime):


    enter image description here


    В случае невозможности выполнения команды весь процесс выглядит следующим образом:


    akrasheninnikov@cloududs1.mlan:~> echo "/bin/unexistent_command" | /local/spark/bin/spark-submit --conf "spark.master=yarn-client" --conf "spark.yarn.queue=uds.misc" --conf "spark.driver.host=10.10.224.14" hadoop-xargs-1.0.jar
    17/02/10 15:12:14 INFO Application: Starting application
    17/02/10 15:12:14 INFO Main: Expect commands to be passed to STDIN, one per line
    17/02/10 15:12:14 INFO Application: Got 1 jobs:
    17/02/10 15:12:14 INFO Application: /bin/unexistent_command
    17/02/10 15:12:14 INFO Application: Application name: com.badoo.bi.hadoop.xargs.Main
    17/02/10 15:12:14 INFO Application: Execution environment: yarn-client
    17/02/10 15:12:14 INFO Application: Explicit executor count was not specified, making same as job count
    17/02/10 15:12:14 INFO Application: Initializing Spark
    17/02/10 15:12:27 INFO Application: Initialization completed, starting jobs
    17/02/10 15:12:29 ERROR Application: Command '/bin/unexistent_command' failed on host bihadoop36.mlan, 1 times
    17/02/10 15:12:29 ERROR Application: Command '/bin/unexistent_command' failed on host bihadoop36.mlan, 2 times
    17/02/10 15:12:30 ERROR Application: Command '/bin/unexistent_command' failed on host bihadoop36.mlan, 3 times
    17/02/10 15:12:30 ERROR Application: Command '/bin/unexistent_command' failed on host bihadoop36.mlan, 4 times
    17/02/10 15:12:30 ERROR Main: FATAL ERROR: Failed to execute all the jobs
    java.lang.InstantiationException: Cannot run program "/bin/unexistent_command": error=2, No such file or directory
        at com.badoo.bi.hadoop.xargs.JobExecutor.call(JobExecutor.java:56)
        at com.badoo.bi.hadoop.xargs.JobExecutor.call(JobExecutor.java:16)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachAsync$1.apply(JavaRDDLike.scala:690)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachAsync$1.apply(JavaRDDLike.scala:690)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$15.apply(AsyncRDDActions.scala:118)
        at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$15.apply(AsyncRDDActions.scala:118)
        at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
        at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

    Заключение


    Разработанное решение позволило нам соблюсти все условия исходной задачи:


    1. Мы получаем у Hadoop ядра для запуска нашего приложения, согласно требованиям (числу ядер) – максимальный уровень параллелизации.
    2. При выдаче ресурсов учитываются загрузка и доступность хостов (за счёт API YARN).
    3. Мы сохраняем содержимое STDOUT/ STDERR всех задач, которые запускаем.
    4. Не пришлось переписывать исходное приложение.
    5. "Write once, run anywhere" © Sun Microsystems – разработанное решение теперь можно использовать для запуска любых других задач.

    Радость от полученного результата была столь велика, что мы не могли не поделиться ею с вами. Исходные коды Hadoop xargs мы опубликовали на GitHub.

    Badoo
    283,00
    Big Dating
    Поделиться публикацией

    Комментарии 10

      +1
      > Помимо локального режима, схожего по функционалу с xargs, он имеет возможность выполнения программ через SSH на нескольких серверах.
      Ого. Однозначно плюсов вам. Я даже не догадался такое поискать, хотя и parallel активно использую.

      > так как один из целевых хостов будет перегружен.
      А у него вроде есть параметр maxla? Или via ssh он не работает?
        0
        Честно говоря, мы не сильно углублялись в нутра parallel, т.к. широкого применения он у нас не имеет, а предварительные изыскания показали его неприменимость к нашей задаче.
        Помимо загруженности хоста есть ещё понятие «доступности» (выключен, например, или gracefully выводится из эксплуатации :). Также, нам не хочется держать где-то в конфиге список хостов и их технические характеристики — пусть это будет головной болью кластер-менеджера.
        0
        Ну это не распределённый xargs, а вызов hadoop из командной строки. Вот распределённый xargs: https://github.com/cheusov/paexec

        Правда документация там не очень, да и АМ.

          0
          Спасибо за комментарий и интересный инструмент!
          Из входных данных проекта https://github.com/cheusov/paexec/:
          Small program that processes a list of tasks in parallel on different CPUs, computers in a network or whatever.
          Очень похоже на то, что выполняет наша утилита.
          А здесь: https://github.com/cheusov/paexec/blob/master/paexec/paexec.pod
          Tasks are read by paexec from stdin and are represented as one line of text, i.e. one input line — one task.
          И здесь мы схожи.

          Выходит, что реализации схожи, и, с моей точки зрения, обе могут называться «распределённым xargs» :)

          Со своей колокольни обратил внимание на пару вещей, из-за которых мы бы не стали этот инструмент брать в рассмотрение:

          https://github.com/cheusov/paexec/blob/master/paexec/paexec.pod
          Remember that empty line MUST NOT appears in general result lines

          Мы ограничены форматом output'а того, что мы запускаем на удалённой стороне.

          Последовательность fork-exec
          krash@krash:~$ paexec -t '/usr/bin/ssh -x' -n 'cloud1' -c '/usr/bin/uptime; echo ""' -d
          nodes_count = 1
          nodes [0]=cloududs1
          cmd = uptime; echo ""
          start: init__read_graph_tasks
          start: init__child_processes
          running cmd: /usr/bin/ssh -x cloududs1 'env PAEXEC_EOT='\'''\''  /bin/sh -c '\''/usr/bin/uptime; echo ""'\'''
          


          Команды транспорта опускаем, рассмотим то, что запускается на удалённой стороне:
          /bin/sh -c "/usr/bin/uptime"
          

          При выполнении этой команды, мы получим на удалённой стороне последовательность fork-exec, которая сначала запустит /bin/sh, а затем — fork-exec для /usr/bin/uptime.
          Я запустил paexec, указав в качестве команды пользователя /usr/bin/sleep 1000, затем прервал выполнение paexec через SIGINT.
          Что мы получаем в результате? Правильно — на удалённом хосте у нас висит /usr/bin/sleep (аналог нашего долгоиграющего приложения).
          Т.е. при прерывании работы управляющего приложения, дочерние не прибиваются. Именно по этой причине, мы в своей реализации не используем spawn shell'а, а сразу зовём execve приложения.
            0
            Насчет ограничений на outout. Их нет. Пустая строка — значение ПО УМОЛЧАНИЮ, потому ее использовать нельзя. Но никто не мешает ее сменить. См. paexec -y
          0

          ЕМНИП python-fabric умеет параллельно запускаться на разных хостах, делать всё что скажешь и не терять stdout.

            0
            Спасибо за ещё один инструмент в копилку!
            Как я написал в комментарии ниже, за счёт spawn'а remote shell, мы рискуем оставить после себя долгоиграющий неприбитый процесс, что для нас неприемлемо. Ну и да, опять — где брать список хостов, и т.д.
            0
            Я так понимаю что это применимо только к тем задачам которые только производят трансформации над чем-то. Но не сохраняют ничего в локальную фс, но я так понял все же есть возможность использования hdfs? Просто допустим у нас есть задача сохранять файл с именем и содержимым в виде переданного аргумента, то куда в итоге все будет сохраняться если запускать такую задачу через ваш hadoop-xargs? Потому что судя по всему скормить ему можно абсолютно любую программу и он будет ее запускать с нужными аргументами.
              0
              В общем случае, любой кластер, где производятся подобного рода вычисления, является statless. Это означает, что после выполнения программы, все артефакты (временные файлы), которые она наплодила, должны быть уничтожены. Для сохранения каких-либо результатов следует использовать shared-ресурсы (база данных, HDFS).
              Конкретно в случае нашей задачи, мы на Python производим вычисления и записываем результат в файл в текущей рабочей директории. Когда бизнес-логика отработала, файл заливается в HDFS (из этого же процесса).
              В случае краха процесса/уничтожения YARN контейнера, рабочая директория контейнера уничтожается, и мы не мусорим локальную FS кластера.
                +1
                Спасибо за ответ, побольше бы статей с особенностями Hadoop/Spark etc и хорошими практиками по работе с ними.

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

            Самое читаемое