Pull to refresh
46
0
Алексей @0x0FFF

Архитектор распределенных систем обработки данных

Send message
Местами статью больно читать:
дифференциальные вычисления, линейную алгебру, теорию вероятности, машинное обучение, графы и обучение на них, логистическую регрессию, линейный дискриминантный анализ и так далее
Смесь из разделов математики и конкретных алгоритмов, непонятно как взаимосвязанных. К тому же дифференциальные вычисления никаким боком к большим объемам данных не относятся. Так же как и обозначенные алгоритмы — это то, что можно делать с данными (не обязательно большими), но чтобы работать с большими данными знать это сосвем не обязательно

Но вдруг, перед релизом, или когда данных окажется больше, чем вы ожидали, вы обнаружите, что этот алгоритм не работает в «параллельном режиме», не работает через MapReduce — им можно загрузить только одно ядро процессора. Поэтому вам нужно будет экстренно и заново изобрести еще один алгоритм, который умеет работать параллельно, и придумать, как он должен работать в парадигме MapReduce.
Вы это серьезно, переделывать алгоритм для работы на кластере нужно обязательно непосредственно перед релизом? Может для начала при выборе алгоритма стоит проверить, как он работает на кластере из виртуалок? Или хотя бы в локальном режиме MapReduce?

Spark — берет, выполняет все задания, а затем выгружает результат
При этом на каждом shuffle бережно складывая данные на жесткий диск и затем вычитывая их

Можно кинуть в ответ Apache Tez или отыскать что-нибудь мелкое в зоопарке Apache — но, поверьте, для снижения рисков лучше использовать mainstream-технологии, которые развиваются в ногу с рынком.
Apache Tez — это mainstream для Apache Hive. Никто в своем уме сейчас не использует Apache Hive поверх MapReduce: либо Hive+Tez, либо Impala или аналог

Полученные результаты мы выгружаем в Apache Mahout и на выходе получаем конкретные рекомендации для клиента
Зачем вам Apache Mahout и чем не понравился Spark MLlib? К слову сказать, Apache Mahout мертв чуть более чем 3 года

Deep learning — это, простыми словами, «качественное» машинное обучение, подразумевающее очень детальное изучение проблемы машиной и, часто, использование многослойной рекуррентной нейронной сети
Deep не имеет отношения к качеству и детальности проработки, а означает «глубину» (количество слоев) обучаемой сети

Также, все более активно используются HBase, Casandra, Mahout, Spark MLLib
Как я уже написал выше, Mahout мертв и имеет скорее отрицательную динамику использования. Также странно видеть в одном ряду два Key-Value хранилища и подпроект Spark для машинного обучения

DAG (directed acyclic graph) vs Hadoop MapReduce vs Hadoop Streaming.
DAG — абстракция уровня исполнения задачи в Spark, Hadoop MapReduce — фреймворк для обработки данных на кластере, Hadoop Streaming — дефолтный job MapReduce, который передает данные в виде текста стороннему приложению и получает от него результат. Как они могут быть в одном списке?

Streaming реализован в Spark гораздо лучше, чем в Hadoop, им гораздо удобнее пользоваться и работает часто эффективнее, за счет кэширования данных в памяти.
Как уже писали выше, MapReduce Streaming и Spark Streaming — вещи абсолютно разные

Удобные коллекции: filter, map, flatMap
Постойте, filter — это коллекция, вы уверены?

Master-машины, которые контролируют вообще весь кластер. На них установлен Spark Master
Такой вещи как Spark Master нет. Spark Master — это просто jar'ник Apache Spark, стартовавший с определенными параметрами. Это штука динамическая и зависит от того, в каком режиме вы запускаете Spark

Core-машины, на которых развернута файловая система — HDFS. Их может быть несколько штук. Правда, рекомендуется только увеличивать количество core-машин, а не уменьшать, иначе теряются данные.
Вы это серьезно? Слышали ли вы о таких вещах, как HDFS Node Decommission, HDFS Balancer, replication factor?

Для всего остального используются task-машины. Это обычные Spark-серверы, на которых работают воркеры
Вот это — sparc-серверы, а то, о чем вы пишете, это просто ноды вашего кластера, предназначенные для запуска процессов Spark

В Yarn-кластерах, как и в Oracle, используется множество настроек, и, по хорошему, нужен админ, который в этом очень хорошо разбирается
Да, для работы с кластером Hadoop нужны определенные знания. Но если вы пишете статью, то как бы подразумевается, что вы этими знаниями обладаете

Что такое Reduce? Когда в один worker собираются сгруппированные по одному ключу данные
В один worker… Коллеги, вы видели когда-нибудь настройку mapreduce.job.reduces? Один — это значение по умолчанию, их может быть сколько угодно. В Apache Spark же это задается дефолтным уровнем паралелизма и количеством партиций в целевом RDD (практически все трансформации принимают как параметр количество партиций в целевом RDD). При этом значение по умолчанию — не 1, а количество партиций в исходном RDD

Допустим, вам нужно выгрузить из Spark данные в модель. Если объем велик, то это будет выполняться очень долго
А сохранить в ту же HDFS и вычитать оттуда? А записать через тот же JdbcRDD в любимый MySQL?
Было бы интересно прочитать побольше о архитектуре ClickHouse, а также за счет чего и на каких тестах она работает в 2,8-3,4 раза быстрее HP Vertica
Плюс примера с Python в том, что он интерактивный, то есть вы просто поднимаете процесс PySpark и контекст уже создан для вас. Если вас интересует вариант с поднятием кластера, можно сделать, допустим, так:
pyspark --master yarn-client --num-executors 6 --executor-memory 4g --executor-cores 12

Запуск через spark-submit хорошо описан в официальной документации, нужно просто вместо jar-файла передать py-скрипт
Всё дело в новом архитектурном подходе, который значительно выигрывает в производительности у классических MR приложений
Основные преимущества Spark:
  • Облегчение процесса разработки – меньше кода, код проще, интерактивный интерпретатор для Scala, Python, R
  • Удобное кэширование данных – ускоряет итеративные алгоритмы
  • Интеграция в одном проекте как пакетное обработки, так и потоковой (micro-batch)
  • Большое community – более 700 контрибьюторов

все промежуточные данные между Map и Reduce фазами, сбрасывались в HDFS
Промежуточные данные MapReduce кладутся на локальные диски серверов, выполняющих mapper'ы, в единственной копии. HDFS не используется. Данные перед reduce-фазой также собираются на локальных дисках без использования HDFS. Подробности можете посмотреть в моей статье тут

Теперь промежуточные данные сериализуются и хранятся в оперативной памяти, а обмен данными между узлами происходит напрямую, через сеть, без лишних абстракций. Стоит сказать что дисковый ввод/вывод всё таки используется (на этапе shuffle). Но его интенсивность значительно меньше.
Все промежуточные данные в Spark во время shuffle сбрасываются на диск точно так же, как в MapReduce. Если данные обрабатываются по одному алгоритму, то интенсивность ввода-вывода будет одинаковой

Ну и наконец Spark оперирует RDD абстракциями (Resilient Distributed Dataset), которые более универсальны чем MapReduce. Хотя для справедливости надо сказать, что есть Cascading. Это обёртка над MR, призванная добавить гибкости.
Также вы забыли упомянуть, что есть и другие обертки над MR, вроде Pig и Hive, которые на текущий момент популярнее Spark

Да, стоит сказать, что Spark API доступно для Scala, Java и Python
а также для R

Приведенный вами пример кода является скорее контрпримером. В презентации по архитектуре Spark я привожу для контраста пример классического «word count» на MapReduce и PySpark, чтобы показать, насколько громоздско и неудобно писать сразу в MR. Вот реализация примера из вашей статьи на PySpark:
data = ["user_id:0000, habrahabr.ru",
        "user_id:0001, habrahabr.ru",
        "user_id:0002, habrahabr.ru",
        "user_id:0000, abc.ru",
        "user_id:0000, yxz.ru",
        "user_id:0002, qwe.ru",
        "user_id:0002, zxc.ru",
        "user_id:0001, qwe.ru"]
rdd = sc.parallelize(data)
counts = rdd.map(lambda x: x.split(',')).map(lambda x: (x[1],1))
tops = counts.reduceByKey(lambda x,y: x+y).takeOrdered(10, key = lambda x: -x[1])
print tops

Согласитесь, в разы более читабельно
Как раз самое время начать сейчас, когда исходный код был открыт: link
И не просто открыт, они мигрировали в github историю всех 23к коммитов начиная с форка Postgres
На текущий момент перевод Greenplum в open source планируется выполнить в последних числах октября

В целом по статье одно замечание — Primary и Mirror синхронизируются не через WAL, а с помощью кастомной синхронной репликации на уровне файлов. Именно поэтому возможно переключение на Mirror в режиме online — благодаря синхронной репликации зеркало будет гарантированно содержать ту же информацию. И именно поэтому производительность падает при выпадании одного сегмента из пары — пресловутый WAL начинает писаться в полном объеме

А касательно фич новых PostgreSQL — работы по их бэкпорту ведутся полным ходом. Будет и jsonb, и hstore, и anonymous code blocks, и еще много всего. Пока релиз GPDB 5.0 планируется на первую половину следующего года
Не буду критиковать ваши замечания. Просто приведите пример алгоритма обработки данных, скорость работы которого зависит от объема входных данных сублинейно (допустим, по тому же логарифму)
У вас есть процессор. Максимально он может выполнить X инструкций в секунду, чем больше ядер тем больше X. У вас есть входные данные в количестве Y записей. Если на обработку каждой записи тратится C операций процессора, то на этом процессоре максимум вы сможете обработать Y*C/X операций в секунду. Не важно, 20 или 20000 потоков у вас, процессор быстрее не станет и выполнять больше X операций он не сможет физически. Итого Runtime = Y*C/X. Увеличив Y в 2 раза получим Runtime = 2*Y*C/X, что в два раза больше. Если у нас не один процессор, а 1000, это не изменит картины. Будет время выполнения Y*C/(1000*X) и 2*Y*C/(1000*X), то есть при росте количества данных в 2 раза все равно производительность упадет в 2 раза.

Это называется асимптотическая сложность алгоритма O(N). К таким алгоритмам и относится чаще всего алгоритм парсинга логов — вы не сможете получить меньше O(N), т.к. вам нужно как минимум один раз прочитать каждую строчку входных данных. Я не отрицаю, есть алгоритмы сложностью меньше O(N), допустим тот же поиск в сбалансированном дереве с характеристикой O(logN), или даже на дереве ван Эмде Боаса со сложностью O(log(logN)), но все алгоритмы с sublinear complexity не являются не алгоритмами обработки данных, а алгоритмами поиска.

Да, и не «вы» — система-то в общем не моя, я скорее сам критикую их архитектуру.
График никакого отношения к MapReduce не имеет. Он лишь показывает, что используемый алгоритм обработки данных линейно зависит от количества входных данных. Тут не важно, работает ли он на кластере или на одной машине.
Почему 3 прохода обработки? При записи данных в HDFS вы действительно записываете данные 3 раза на 3 разных машинах, но делается это не последовательно, а параллельно. При чтении вычитывается только одна копия и чаще всего локальная для обработчика, другие копии хранятся для отказоустойчивости и speculative execution
Понятно. Но Flume умел то, что я говорю, еще в 2012 году. Cassandra изначально тут использовать не надо было, т.к. это система совсем для другой задачи. Hive с partitioning вполне мог справиться, опять же уже в 2012-м. Тот же Hive+Tez доступен с февраля 2013-го и пользоваться им проще, чем Pig
С появлением в картинке Exadata и разворачиванием Cassandra+Hadoop на виртуальных машинах ситуация начинается проясняться. Я не в курсе истории, но скорее всего она выглядела приблизительно так:
  1. Заказчик играет тендер на ПАК и закупает Exadata. Счастливые закупщики на полученные откаты идут строить дачи и покупать новые машины (7 кластеров Exadata — это не шутка, десятки миллионов долларов)
  2. Затем играется тендер на разработку софта, на остатки проектного бюджета. Исполнителям понятно, что хранить и обрабатывать логи в Exadata — это дикость, да и места может не хватить, поэтому для снижения расходов на имеющейся у заказчика инфраструктуре разворачивается ряд open source продуктов
  3. Естественно, имеющаяся инфраструктура — это отрезать кусочек от какого-нибудь VDI и недорогого СХД и запустить там виртуалки с Cassandra/Hadoop поверх shared storage
  4. У исполнителя опыта немного да и бюджет не резиновый, поэтому берется софт, пусть и не подходящий, зато умеющий много «из коробки»
  5. С горем пополам и из рук вон плохой производительностью система сдается и идет в production

Если серьезно:
  • Cassandra для накопления и batch processing логов вообще никаким боком не нужна
  • Такая система собирается за неделю на том же Flume. Вы удивитесь, но данные в случае обрыва канала он тоже хранить умеет
  • Вместо одной стойки Exadata за те же деньги закупается кластер Hadoop на 100 нод с поддержкой Hortonworks/Cloudera
  • Для запросов с 9 group by существуют отдельный класс систем поверх Hadoop, которые умеют делать pipelining лучше чистого MapReduce — это Hive+Tez, Cloudera Impala, SparkSQL, Apache HAWQ
  • По производительности — на скромном кластере в 20 нод агрегация не 300 миллионов, а 300 миллиардов записей лога делается за <30 минут
Статья оставляет впечатление недосказанности:
  • Во-первых, зачем для сервиса накопления и анализа логов Cassandra? Почему нельзя было взять тот же Flume и спокойно грузить данные в HDFS, а там их с тем же успехом обрабатывать в MR?
  • Картинки исходной и целевой архитектуры не имеют общих компонент, сложно понять куда именно (и как) вы подключили Cassandra
  • 300млн записей за 100 минут — какого рода обработка проводится? Мой ноутбук может спокойно распарсить 300млн строк лога за 7-8 минут на одном ядре. Обработка очень сложная с подтягиванием данных из внешних систем?
  • Показатели производительности без указания характеристик кластера смотрятся немного странно
  • На графиках не подписаны оси и что где меряется непонятно
  • Основной показатель производительности для Cassandra — это обычно количество операций в секунду и latency операций (average и перцентили p99, p995, p999, p9999), эти графики были бы самыми интересными

Логистическая модель склонна к overfitting. Всегда нужно смотреть на результат cross-validation, я уверен что для логистической модели он будет ниже, поэтому вы так сильно опустились в итоговой таблице. По опыту, для текста хорошо работает LSA + XGBoost/RandomForest. Также не вижу у вас tf-idf взвешивания, крайне полезная штука.

Если запускать RandomForest напрямую на DT-матрице, стоит потюнинговать его параметры: 1000+ деревьев и количество фич, отобранных для каждого дерева, взять приблизительно равным квадратному корню из количества столбцов (кол-ва уникальных термов)
Хорошая архитектура. Я бы сделал почти так же
Приятно, что и российские компании постепенно приходят к идее использования современных open-source инструментов и архитектур, отказываясь от тех же исторических WebLogic + Oracle DB в подобных системах

Небольшой вопрос касательно архитектуры: у вас используются Aerospike, MongoDB и HBase, все они в той или иной мере key-value store. Можете пояснить, в чем идея такого разбиения и какие принципиальные кейсы вынесены на Aerospike и MongoDB, с которыми не справился бы HBase?
Сначала нам предлагают запускать Hadoop на блейд-серверах с размещением данных в СХД. Теперь нам предлагают запускать Hadoop и Spark на мейнфреймах. Самое печальное здесь то, что люди, принимающие решение о внедрении конкретных продуктов, зачастую не являются экспертами и доверяют крупным брендам вроде IBM и аналитическим агентствам их поддерживающим, вроде Gartner. В итоге внедряются нежизнеспособные системы, которые просто не взлетают
Тут скорее всего они имели в виду не Apache Hadoop, а CDH, который включает в себя Apache HBase (к которому можно сделать отсылку «NoSQL»), Apache Solr (aka «быстрый поиск по данным»), Apache Hive (aka «подобие SQL-языка доступа к данным», то есть HiveQL)
Похоже, что на волне популярности «Big Data» этот пост призван показать, что «смотрите, мы в AT Consulting тоже умеем Hadoop, у нас есть реальный проект и 14 специалистов, прошедших курсы Cloudera».

В целом же были бы интересны подробности: диаграма архитектуры решения, достигнутые показатели производительности с указанием характеристик железа, проблемы интеграции, с которыми вы столкнулись и как вы их решали. Также вы говорите про машинное обучение — тоже интересно, что за модель обучаете и на каких данных
1 — Читать логи СУБД CDC умеет уже 10 лет, и никаких модных Kafka и Spark Streaming для этого не нужно
2 — Тут вопрос в том, что real-time обработка данных зачастую дороже в плане ресурсов CPU и IO, чем batch-обработка (если имеется в виду именно одинаковая обработка). То есть растянуть нагрузку на весь день — плохой вариант. А хороший — это оставить процесс ETL для OLTP системы в покое, пусть он собирает данные из Kafka и раз в день запускает ту же логику на 4 часа. Другой же процесс, читающий те же самые данные, производит агрегацию в реальном времени. Допустим, пример из телекома: с предбиллинга сыпятся данные о вызовах, в real-time системе производится их агрегация по базовым станциям для показа статистики обрывов соединений в реальном времени на карте, а в batch-систему данные складываются «as is» и обрабатываются ночными ETL-процессами, строящими разные витрины и считающими KPI
3 — Конкретно на последней картинке двухсторонняя связь означает, что данные загружаются в HDFS, после чего кем-то пушатся в Spark Streaming, который их затем складывает в OLTP систему. Вопрос в том, что между HDFS и OLTP системами Spark Streaming не нужен: какой смысл в real-time системе, находящейся между двумя batch-системами?
4 — Я о том, что «горячие» данные должны лежать в OLTP, а «холодные» — в HDFS. Tachyon здесь — попытка работать с «горячими» данными в HDFS, где их по определению быть не должно. Уверяю вас, Tachyon + Hive будет в разы медленнее среднего MPP, а так как данные лежат только в памяти, эта связка будет еще и дороже и сложнее в управлении
Их целевая архитектура соответствует современным тенденциям в проектировании систем обработки данных, этакая смесь Data Lake + Lambda Architecture. Но вот сделана она не совсем так как надо, да и возможно человек, читавший этот доклад, не является автором предлагаемой архитектуры

По проблемам (последний слайд с архитектурой):
1. Продюссер «P» должен находиться перед OLTP системой, а не после нее. После OLTP уже можно делать стандартную выгрузку в хранилище или же CDC
2. Архитектура с Spark Streaming не имеет особых преимуществ при отсутствии потребителя real-time данных, вроде каких-либо real-time отчетов или других аналогичных систем
3. Стрелка между HDFS и Spark Streaming двухсторонняя, это вообще непонятно зачем. Данные тут должны идти только в одну сторону — от источника в виде Kafka в HDFS
4. Непонятно, зачем сюда запихнули Tachyon — для быстрой отчетности есть OLAP в виде какой-нибудь MPP RDBMS, зачем поднимать данные в память из HDFS и получать к ним доступ через Hive — непонятно, все равно ведь быстрее MPP не получится, да и дорого. Тут скорее нужна еще стрелка из ETL, работающего на Hadoop, в их OLAP-хранилище

В целом же можно реализовать подобную архитектуру и более красиво,
как-то так

RTI — real-time intelligence, STG — staging, FE — front-end, BE — back-end, SP — stored procedure, ES — Event Store, Srv — Web Service

Проблема только в том, что подобный дизайн нужно закладывать изначально, иначе миграция будет по сути равносильна переписыванию всего с нуля, чему бизнес явно не обрадуется. Скоро должен выйти подкаст от Pivotal, где я буду рассказывать об этой архитектуре

Information

Rating
Does not participate
Location
Dublin, Dublin, Ирландия
Registered
Activity