В разрезе: новостной агрегатор на Android с бэкендом. Распределённые системы обработки сообщений (Spark, Storm)

    Вводная часть (со ссылками на все статьи)

    image

    Основным компонентом системы, который обрабатывает сырые данные с «пауков», выполняет обогащение данных, их индексацию и последующий поиск является система обработки сообщений, т.к. только подобные системы могут адекватно реагировать на пиковые нагрузки входных данных, недостачу некоторых видов ресурсов и могут быть легко горизонтально масштабируемы.

    Когда анализировалось будущее использование системы, обрабатывающей запросы или входящие данные, были выделены следующие требования:

    • Низкая задержка (latency) обработки сообщения;
    • Возможность получения данных из разных источников (БД, message middleware);
    • Возможность обработки данных на нескольких узлах;
    • Отказоустойчивость к ситуациям выхода из строя узлов;
    • Поддержка уровня гарантированной обработки сообщения «at-least-once»;
    • Наличие интерфейса для мониторинга состояния кластера и для управления им (хотя бы частично).

    В качестве итогового решения был выбран фреймворк Apache Storm. Для поклонников Apache Spark: с учётом широкой распространённости этого фреймворка (с использованием Spark Streaming или сейчас Spark Structured Streaming), всё дальнейшее повествование будет строиться в сравнении с функционалом Apache Spark.

    С учётом того, что обе системы обладают сильно пересекающимися множествами функций, выбор был не прост, однако т.к. больший контроль над процессом обработки каждого сообщения остаётся за Apache Storm – выбор был сделан в его пользу. Дальше я попытаюсь пояснить, в чём различие, в чём схожесть фреймворков и что означает «больший контроль над процессом обработки каждого сообщения».

    Основные концепции


    Далее попытаюсь вкратце рассказать про основные шаги размещения и выполнения вашего кода в кластере.

    Готовый код для каждой системы загружается в кластер посредством специальных утилит (специфичных для каждого фреймворка), при этом готовый код в обоих случаях это uberjar/shadowJar (т.е. jar-файл, содержащий все необходимые зависимости, кроме самого фреймворка естественно). При этом в обоих случаях указываются классы точек входа и параметры работы кода (в виде одного из возможных ключей утилиты).
    Дальше ваш код преобразуется в топологию (topology) для Apache Storm и приложение (application) для Apache Spark.

    Топология Apache Storm


    Вы объявляете топологию в вашей точке входа подобным образом:

    public class HeatmapTopologyBuilder {
      public StormTopology build() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("checkins", new Checkins());
        builder.setBolt("geocode-lookup", new GeocodeLookup()).shuffleGrouping("checkins");
        builder.setBolt("heatmap-builder", new HeatMapBuilder()).globalGrouping("geocode-lookup");
        builder.setBolt("persistor", new Persistor()).shuffleGrouping("heatmap-builder");
        return builder.createTopology();
      }
    }
    

    всё преобразуется в нечто подобное (топология Apache Spark):

    image

    В итоге мы имеем граф (возможно содержащий циклы), по ветвям которого бегают Tuple’ы (пакеты с данными или сообщения, указаны на графике), где каждый узел это либо источник Tuple – Spout, либо их обработчик Bolt.

    При создании топологии вы определяете: какие Spout/Bolt будут участвовать в его работе, как они соединены между собой, как группируются сообщения внутри кластера на основании ключей (или не группируются вовсе). В итоге вы можете ваши сообщения объединять, разделять, преобразовывать (выполняя внешнее взаимодействия или нет), «проглатывать», пускать по циклу, запускать в именованные потоки (связи между элементами кластера имеют поток «по-умолчанию», но вы можете создавать свои именованные, например, для пакетов требующих продолжительной обработки).

    При этом всё это движение отражается и учитывается в статистике и метриках «Storm UI» — веб-приложения для отслеживания состояния кластера. Далее несколько скриншотов:







    Приложение Apache Spark


    Вы объявляете приложение в вашей точке входа подобным образом (тут будем говорить только про Spark Streaming):

    // Create a StreamingContext with a 1-second batch size from a SparkConf
    val ssc = new StreamingContext(conf, Seconds(1))
    // Create a DStream using data received after connecting to port 7777 on the
    // local machine
    val lines = ssc.socketTextStream("localhost", 7777)
    // Filter our DStream for lines with "error"
    val errorLines = lines.filter(_.contains("error"))
    // Print out the lines with errors
    errorLines.print()
    

    Это всё преобразуется в нечто подобное «Получатель (Receiver) + Apache Spark код + Операторы вывода (Output operations)»:



    В итоге мы имеем:

    • Ваш управляющий код (driver), занимающийся координацией выполнения Apache Spark приложения;
    • Receiver (в случае Spark Streaming), читающий данные из источника и формирующий серии RDD;
    • ацикличные графы (порождённые кодом на driver), выполняющиеся на узлах кластера, представляющие собой схему обработки и порождения новых RDD.

    При этом всё это отражается и учитывается в статистике и метриках «Spark UI» — веб-приложения для отслеживания состояния кластера. Далее несколько скриншотов:







    Как видно, несмотря на общую цель, способы формирования путей обработки и итоговые структуры совершенно разные по идеологии, что в итоге ведёт к ещё большим отличиям при выполнении.

    Выполнение топологии Apache Storm и приложения Apache Spark в кластере


    Кластер Apache Storm


    После загрузки вашего кода на управляющие машины Apache Storm кластера (nimbus, аналог Cluster Manager) из Java кода точки входа с использованиеи APache Storm API формируется топология и информация о её выполнении:

    • какое количество задач;
    • какой уровень параллелизма;
    • какие связи между обработчиками сообщений должны быть сделаны;
    • какие параметры работы кластера должны быть специфичными именно для данной топологии;
    • какое количество Worker’ов должно создаться и т.д.

    Вся эта информация и ваш код (jar-файлы) отправляется на другие узлы, где создаются необходимые элементы вашей топологии (экземпляры классов, реализующих генераторы сообщений (Spout) и обработчики (Bolt)) и осуществляется дополнительное конфигурирование узлов. После этого топология считается развёрнутой и начинается работа в соответствии с заданной конфигурацией кластера и топологии.

    Скриншот рабочей топологии из Storm UI:



    Из рычагов настройки кластера/топологии у вас имеются параметры, влияющие на:

    • какое количество экземпляров Spout/Bolt в рамках терминов Apache Storm топологии (worker/executor/task) будет запущено в кластере;
    • как они могут быть распределены между узлами (не непосредственно конечно, а через свои абстракции);
    • каковы размеры буферов в элементах;
    • каково количество необработанных сообщений и как долго может «гулять» в кластере;
    • параметры включения торможения кластера при резком заполнении его сообщениями «backpressure»;
    • регулирование уровня отладки;
    • частота сбора статистики (при входящем потоке в 100 000 сообщений в секунду учитывать каждое – лишняя нагрузка) и т.д.

    Кластер Apache Spark


    После загрузки вашего кода в кластер создаётся приложение, выполняемое драйвером (driver), которое формирует большое количество графов обработки RDD (directed acyclic graph, DAG)-> DAG разбивается на job (действия (action) в виде «collect», «saveAsText», etc) -> из job формируются этапы (stage) (преобразования (transformation) в большинстве своём) –> stage разбивается на task (минимальные единицы работы над разделом (partition)).



    Далее в дело вступает планировщик, который распределяет задачи и данные по узлам кластера. Ваш прикладной код присутствует в driver’е и фрагментах кода, передаваемых в узлы для выполнения трансформаций/действий.

    Из рычагов настройки кластера/топологии у вас имеются:

    • лимиты на использование памяти;
    • количества ядер;
    • таймауты по доступности данных (data locality);
    • алгоритмы работы планировщиков (FIFO/FAIR) и т.д.

    Конечно, возможности по работе с ресурсами могут отличаться от менеджера ресурсов в зависимости от типа кластера.

    В итоге получается, что Apache Spark ориентирован на обработку потоков данных с учётом рассмотрения их как совокупности, с анализом и обработкой их в контексте других сообщений за определённый период времени или вообще всех полученных за время обработки. В то время как Apache Storm рассматривает каждое сообщение как отдельную сущность и обрабатывает её так же. В случае Trident-топологий, где осуществляется формирование микро пакетов (micro batching), это утверждение не сильно меняется т.к. batching – это средство минимизации служебного трафика и лишних соединений на каждое сообщение. Отсюда и получаются разные архитектуры кластеров, разные сущности по обработке сообщений и принципы их работы в Apache Storm и Apache Spark.

    В итоге получается, что подходы к управлению ресурсами в кластере отражают уровень абстракции над процессом обработки («выше абстракция — меньше влияния»).

    Текущие топологии проекта




    Как видно, текущие топологии так же линейны (алгоритм достаточно прост) и разбиты по принципу «отдельный источник – отдельная топология» с целью простоты управления и обновления топологий на кластере.

    Дополнительны особенности Apache Storm


    DPRC (Distributed RPC)


    image
    Интересной особенностью Apache Storm является DPRC (Distributed RPC): возможность делать вызовы методов, которые фактически обрабатываются кластером. Указанный функционал используется при реализации REST клиента, ответы которого в последующем кэшируются Nginx.

    Схема работы проста: демон получающий запросы и буферизирующий их -> специальные Spout, отправляющие запросы в топологию -> невидимые коллекторы, отправляющие данные и убирающие запросы из входного буфера. В итоге получаем простой, но мощный инструмент, благодаря которому обработку запроса можно выполнять на кластере, в то время как для вызывающей стороны это всего лишь вызов RPC.

    Несмотря на отсутствие такого решения в Apache Spark (мне известного) думаю, что реализация не слишком сложна.

    Trident


    Trident – это абстракция высокого уровня для выполнения вычислений в реальном времени с использованием примитивов Apache Storm. Это позволяет легко совмещать высокую пропускную способность (миллионы сообщений в секунду), обработку потоков с отслеживанием состояния с низкой задержкой распределенных запросов. Если вы знакомы с инструментами пакетной обработки высокого уровня, такими как Pig или Cascading, понятия Trident будут очень знакомы – Trident имеет соединения, агрегаты, группировку, функции и фильтры (по сути примерно те же абстракции, что и Apache Spark RDD). В дополнение к этому, Trident добавляет примитивы для выполнения инкрементной обработки с отслеживанием состояния поверх любой базы данных или хранилища. Trident имеет последовательную, семантику «exactly-once», поэтому понять работу топологии, реализованной на нём достаточно легко.

    Минусами Trident можно назвать лишь более сложную взаимосвязь между кодом вашей топологии и тем какие Spout/Bolt будут созданы.

    Trident позволяет на писать примерно такой код:

    TridentState urlToTweeters =
           topology.newStaticState(getUrlToTweetersState());
    TridentState tweetersToFollowers =
           topology.newStaticState(getTweeterToFollowersState());
    
    topology.newDRPCStream("reach")
           .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
           .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
           .shuffle()
           .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
           .parallelismHint(200)
           .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
           .groupBy(new Fields("follower"))
           .aggregate(new One(), new Fields("one"))
           .parallelismHint(20)
           .aggregate(new Count(), new Fields("reach"));
    

    и получать примерно такую топологию:

    image

    Преимущества Apache Storm:


    • Интеграции с большим количеством источников данных (базы данных, брокеры сообщений – Kafka, HBase, HDFS, Hive, Solr, Cassandra, JDBC, JMS, Dredis, Elasticsearch, Kinesis, Kestrel, MongoDB….);
    • Наличие специального языка с высокоуровневыми функциями по работе с сообщениями (Trident);
    • Наличие средств для контроля нагрузки (Resource Aware Scheduler);
    • Детальный контроль над уровнем параллелизма (что с другой стороны влечёт за собой наличие понимания работы элементов топологии и реакции системы на резкое повышение объёма входящих данных);
    • Поддержка SQL запросов для обрабатываемых данных (экспериментальная функция, аналогичная Apache Spark SQL);
    • Поддержка других не-JVM языков;
    • Поддержка кластерного развёртывания (YARN, Mesos, Docker, Kubernetes).

    Недостатки Apache Storm:


    • Реализация на Clojure (это я думаю и плюс и миинус этого аспекта). Однако в планах по дальнейшему развитию Apache Storm говорится о планах по переходу в версии 2.0 на реализацию на Java. В первую очередь для увеличения базы commiter’ов (чем больше, тем качественнее каждая версия и быстрее происходит развитие продукта);
    • Недостаток информации о фреймворке — информации (статей, книг, видео) откровенно значительно меньше, чем по Apache Spark;
    • Более сложная с моей точки зрения архитектура – как следствие более крутая кривая обучения разработчиков: более высокая вероятность накодить ошибку, у кого-то может вовсе не хватить знаний/настойчивости одолеть фреймворк.

    Итоги сравнения Apache Spark и Apache Storm


    Как уже стало понятно Apache Spark (Spark Streaming) и Apache Storm вещи разные и сравнивать их «в лоб» неверно, по причине отличия в массе аспектов, в первую очередь в способох формирования входных данных для обработки: Apache Spark (micro-batch) и Apache Storm Core (per-message) (тут скорее уместно сравнение Apache Spark и Apache Storm’s Trident).

    • Скорость реакции (latency, не throughput): нет официальных сравнений производительности, признанных обеими сторонами, но большинство говорит о секундах для Apache Spark и долях секунд для Apache Storm;
    • Принципы обработки: Apache Storm – фреймворк для потоковой обработки, который так же осуществляет micro-batch’инг (Apache Storm’s Trident), Apache Spark — фреймворк для пакетной обработки, который так же осуществляет micro-batch’инг (Spark Streaming);
    • Языки: Apache Storm – более разнообразен в количестве языков для реализации обработчиков, не только JVM-based, Python и R – как в случае с Apache Spark;
    • Гарантии обработки сообщений: Из трёх семантик «at-most-once», «at-least-once» и «exactly-once» Apache Spark поддерживает «из коробки» только «exactly-once»;
    • Отказоустойчивость: ни один из фреймворков не даёт 100% гарантии от пропуска сообщений для всех источников (для этого к источники должны удовлетворять серии условий, должны быть reliable и durable, как, например, Kafka. Так же необходимо учесть, что checkpoint’ы на HDFS вносят свою собственную задержку — уменьшают latency, что в может привести к решению их отключения для некоторых сценариев).

    В сухом остатке имеем 2 фреймворка со своими особенностями, специфику которых необходимо учитывать для принятия правильного решения.

    Спасибо за внимание!
    Share post

    Similar posts

    Comments 0

    Only users with full accounts can post comments. Log in, please.