Uber держит огромные объёмы данных сразу в своих дата-центрах и в облаке, поэтому их нужно постоянно и быстро копировать между регионами для аналитики и аварийного восстановления.

Когда объём таких копирований вырос до петабайта в день, оказалось, что система тормозит не на самой передаче данных, а на подготовке задач и служебных накладных расходах. Команда переработала процесс так, чтобы тяжелая подготовительная рутина выполнялась ближе к месту запуска задач, а ключевые этапы шли параллельно.

Для маленьких копирований они убрали лишние запуски отдельных процессов/контейнеров, чтобы не тратить время на старт пустой работы. В итоге пропускная способность репликации выросла примерно в 5 раз и стала стабильнее, а перенос данных в облако пошёл заметно быстрее

Введение

Для Uber приоритетом является надежное озеро данных, распределенное между локальной инфраструктурой и облачными средами. Такая многорегиональная архитектура создает сложности в обеспечении надежного и своевременного доступа к данным из-за ограниченной пропускной способности сети и необходимости бесперебойной доступности данных, особенно для аварийного восстановления. Uber использует сервис Hive Sync, который применяет Apache Hadoop® Distcp (Distributed Copy) для репликации данных. Однако по мере того как озеро данных Uber превысило 350 ПБ, ограничения Distcp стали очевидны. В этом материале рассматриваются оптимизации, внесенные в Distcp, чтобы повысить его производительность и удовлетворить растущие потребности Uber в репликации данных и аварийном восстановлении в рамках распределенной инфраструктуры.

Понимание Distcp

Distcp — это фреймворк с открытым исходным кодом для копирования больших наборов данных между различными локациями в распределенном режиме. Он использует Hadoop-овский фреймворк в для распараллеливания и распределения задач копирования по множеству узлов, что обеспечивает более быстрые и масштабируемые передачи данных, особенно в крупномасштабных средах.

Рисунок 1: Высокоуровневая архитектура Distcp.
Рисунок 1: Высокоуровневая архитектура Distcp.

Архитектура Distcp


Архитектура Distcp включает несколько ключевых компонентов:

  • Инструмент Distcp: определяет файлы, группирует их в блоки (Copy Listing), задает распределение по мапперам и отправляет настроенное задание Hadoop в YARN.

  • Hadoop Client: настраивает среду выполнения задания, определяет, какие мапперы обрабатывают конкретные блоки (Input Splitting), и отправляет джобу в YARN.

  • RM (Resource Manager): компонент YARN, который планирует задачи, принимает задание Distcp, выделяет ресурсы и делегирует выполнение Application Master.

  • AM (Application Master): отслеживает жизненный цикл задания MapReduce, запрашивает у RM ресурсы (контейнеры) для задач Copy Mapper и консолидирует файловые сплиты в месте назначения.

  • Copy Mapper: выполняет непосредственное копирование данных назначенных файловых блоков, работая в контейнере под управлением YARN Node Manager.

  • Copy Committer: объединяет скопированные блоки, чтобы собрать итоговые файлы в файловой системе назначения.

Рисунок 2: Диаграмма, показывающая работу Distcp при копировании из каталога /src/ в каталог /dest/.
Рисунок 2: Диаграмма, показывающая работу Distcp при копировании из каталога /src/ в каталог /dest/.

На рисунке 2 показано, как Distcp реплицирует три файла из исходного каталога /src/ в целевой каталог /dest/ с использованием компонентов, описанных ранее. Исходный каталог содержит три файла — File 1, File 2 и File 3 — одинакового размера. Задача Copy Listing, выполняемая на стороне клиента, обнаруживает эти файлы и разбивает каждый из них на два блока (chunks). Затем задача Input Splitting распределяет эти файловые блоки между тремя мапперами.

  • Map 1 получает один блок из File 1 и еще один — из File 2

  • Map 2 обрабатывает один блок из File 3 и один — из File 1

  • Map 3 обрабатывает один блок из File 2 и еще один — из File 3

Затем задачи Copy Mapper копируют эти блоки из источника в целевой каталог. После репликации в AM запускается задача Copy Committer, которая объединяет соответствующие блоки каждого файла, воссоздавая итоговые три файла в целевом каталоге.

Как HiveSync использует Distcp

HiveSync изначально был построен на основе пр��екта с открытым исходным кодом Airbnb® ReAir. Он поддерживает как массовую репликацию (позволяющую копировать большие объемы данных за один проход), так и инкрементальную репликацию (синхронизацию инкрементальных обновлений по мере поступления новых данных), поддерживая согласованность озер данных Uber между HDFS™ (Hadoop Distributed File System) и облачными объектными хранилищами. Для крупномасштабной репликации данных HiveSync использует Distcp.

Рисунок 3: Архитектура HiveSync: рабочий процесс репликации данных с использованием Distcp.
Рисунок 3: Архитектура HiveSync: рабочий процесс репликации данных с использованием Distcp.

На рисунке 3 показано, как сервер HiveSync прослушивает запросы на копирование от исходного кластера Hive. Для объемов более 256 МБ он передает задания Distcp в исполнительный компонент. Затем несколько воркеров (асинхронных потоков) параллельно подготавливают и отправляют эти задания в YARN через клиент Hadoop. Поток мониторинга отслеживает прогресс каждого задания, и после успешного завершения задания данные становятся доступными в целевом кластере.

Проблемы масштабирования HiveSync: точка перелома

К третьему кварталу 2022 года HiveSync столкнулся с серьезными трудностями масштабирования: всего за один квартал суточный объем репликации данных вырос с 250 ТБ до 1 ПБ.

Одним из факторов, вызвавших столь быстрый рост, стала концентрация записей данных в одном дата-центре. В 2022 году Uber ради экономии перешла к активнопассивной архитектуре озера данных, отказавшись от равномерно распределенной генерации данных в пользу модели, при которой основной локальный дата-центр обеспечивал 90% генерации данных и выполнял большинство батчевых вычислительных задач. Это существенно увеличило нагрузку на сервер HiveSync, реплицирующий данные из основной области во вторичную. Влияние проекта SRC (Single Region Compute) будет рассмотрено отдельно.

Еще одним фактором стало подключение к HiveSync всех локальных наборов данных Hive. В новой активнопассивной модели HiveSync стал критически важным для аварийного восстановления, обеспечивая репликацию данных, создаваемых в одном регионе, в другой географический регион. Это потребовало масштабирования HiveSync на все озеро данных Uber. Всего за один квартал число наборов данных под управлением HiveSync выросло с 30 000 до 144 000 за счет подключения новых датасетов. Это более чем вдвое увеличило количество запросов на репликацию.

В результате число ежедневных заданий репликации взлетело с 10 000 до среднего значения 374 000, что значительно превышало возможности системы по их обработке. Это привело к существенному накоплению очереди, из-за чего становилось все сложнее выполнять заявленные SLA по задержкам репликации. В частности, при таком масштабе стало трудно поддерживать SLA по лагу репликации P100 в 4 часа и SLO P99.9 в 20 минут.

Кроме того, ожидалось, что масштаб запросов на репликацию значительно вырастет, поскольку HiveSync получил ключевую роль в миграции озера данных Uber из локальной инфраструктуры в облачные регионы. По прогнозам, этот переход должен был почти удвоить масштаб и объем запросов на копирование, усилив давление на HiveSync и потребовав оптимизации процессов репликации данных для работы в условиях облачной инфраструктуры.

Ключевые улучшения

Мы внедрили следующие доработки Distcp, чтобы адаптировать его под наши требования к масштабированию. Эти оптимизации заметно повысили масштаб и эффективность репликации данных в Uber.

В их число входят:

  • Перенос ресурсоемких задач Copy Listing и Input Splitting в Application Master, что снизило задержку отправки задания на 90% за счет уменьшения конкуренции на стороне HDFS-клиента.

  • Параллелизация задач Copy Listing и Copy Committer, которая существенно сократила время планирования и завершения заданий.

  • Реализация заданий Uber для небольших переносов, что позволило устранить 268 000 запусков контейнеров в день и оптимизировать использование ресурсов YARN.

Рассмотрим каждое из этих изменений подробнее.

Перенос подготовительных задач Distcp в AM


Во время одного из инцидентов мы заметили, что при высокой нагрузке на систему рост задержек файловой системы приводил к соответствующему увеличению задержек Distcp на этапе формирования списка копирования (copy-listing).

Рисунок 4: Рост задержек HDFS FSUtils напрямую влияет на задачу Distcp Copy Listing.
Рисунок 4: Рост задержек HDFS FSUtils напрямую влияет на задачу Distcp Copy Listing.

Когда мы проанализировали дампы потоков во время всплеска, выяснилось, что большинство потоков зависали в ожидании блокировки, удерживаемой HDFS-клиентом для удаленных вызовов процедур (RPC). Такой подход плохо масштабируется в среде с высокой степенью многопоточности.

Рисунок 5: Потоки, заблокированные на RPC-вызовах.
Рисунок 5: Потоки, заблокированные на RPC-вызовах.

В типичном потоке отправки задания Distcp несколько компонентов зависят от HDFS-клиента: воркер Distcp для сравнения данных, инструмент Distcp для формирования Copy Listing и Hadoop Client для выполнения Input Splitting. По мере роста числа потоков исполнителя Distcp увеличивается и количество параллельных пользователей HDFS-клиента.

Рисунок 6: Множество параллельных вызовов из разных запросов на копирование создают конкуренцию на HDFS-клиенте.
Рисунок 6: Множество параллельных вызовов из разных запросов на копирование создают конкуренцию на HDFS-клиенте.

Мы выяснили, что, хотя Distcp хорошо масштабирует само копирование данных, задачи планирования и построения списка файлов он обрабатывает на стороне клиента. Эта подготовительная фаза, в рамках которой определяются файлы для копирования (Input Splitting), становилась узким местом, поскольку опиралась на общий HDFS-клиент, который также использовали другие компоненты HiveSync. По мере роста объемов данных и числа воркеров Distcp блокировки уровня JVM в HDFS-клиенте превратились в серьезную проблему, приводя к конкуренции потоков при увеличении параллелизма. Это вызывало задержки: на один только этап Copy Listing приходилось 90% латентности при отправке задания.

Проблему усугубляло большое число вызовов к NameNode, пропорциональное количеству копируемых файлов, — особенно болезненное при работе с крупными каталогами.

Чтобы снизить нагрузку на один HDFS-клиент, мы перенесли ресурсоемкие задачи Copy Listing и Input Splitting с сервера HiveSync в AM.

Рисунок 7: Процессы Copy Listing и Input Splitting перенесены с сервера HiveSync (клиента) в AM
Рисунок 7: Процессы Copy Listing и Input Splitting перенесены с сервера HiveSync (клиента) в AM

Теперь каждое задание Distcp выполняет Copy Listing в собственном контейнере AM, что существенно снижает конкуренцию за блокировки на Hadoop-клиенте HiveSync. Это позволило добиться 90-процентного сокращения задержки при отправке заданий Distcp.

Рисунок 8: Зафиксировано 90-процентное сокращение времени отправки заданий Distcp
Рисунок 8: Зафиксировано 90-процентное сокращение времени отправки заданий Distcp

Параллелизация задачи Copy Listing

Инструмент Distcp выполняет задачу Copy Listing, чтобы сформировать блоки файловой системы для файлов, подлежащих копированию. Эти блоки записываются в sequence file, образуя список фрагментов файлов, которые задача Copy Mapper затем копирует из исходного кластера в целевой. В ходе этого процесса основной поток последовательно обращается к NameNode через API getFileBlockLocations для файлов, размер которых превышает заданный размер фрагмента, чтобы сформировать файловые сплиты (chunks). Он также выполняет повторные попытки, когда проверка статуса файла завершается неудачей, что делает этот этап самой ресурсоемкой частью Distcp.

Рисунок 9: Даже после переноса этой задачи в Application Master p99 задержки на самом загруженном сервере репликации в среднем составляла около 10 минут
Рисунок 9: Даже после переноса этой задачи в Application Master p99 задержки на самом загруженном сервере репликации в среднем составляла около 10 минут

Мы заметили, что несколько файлов можно листить параллельно и записывать в sequence file в любом порядке. Однако фрагменты каждого файла должны оставаться вместе и в правильной последовательности, поскольку алгоритм Copy Committer использует их для объединения скопированных файловых сплитов в месте назначения. Опираясь на эту идею, мы распараллелили вызовы к NameNode файловой системы, чтобы снизить задержку Copy Listing: для каждого файла выделили отдельный поток, создающий сплиты и добавляющий их в блокирующую очередь, а отдельный поток-писатель последовательно записывал блоки в sequence file. Такой подход помог улучшить время завершения заданий Distcp.

Рисунок 10: Рабочий процесс задачи Copy Listing V2.
Рисунок 10: Рабочий процесс задачи Copy Listing V2.

На рисунке 10 показано, как функция листинга использует многопоточность для получения файлов из исходного кластера через вызовы к NameNode. Каждый поток отвечает за создание блоков для одного файла, что позволяет параллельно обрабатывать несколько файлов. Например, /src/file1 (1684 МБ) разбивается на два фрагмента: первый фрагмент (/src/file1/part0) содержит 4 HDFS-блока по 256 МБ каждый, а второй фрагмент (/src/file1/part1) включает 3 блока (2 по 256 МБ и 1 на 128 МБ). Поток листинга синхронно добавляет эти фрагменты в блокирующую очередь, тогда как отдельный поток-писатель регулярно опрашивает очередь и последовательно записывает оба фрагмента в sequence file. Для быстрого реагирования на сбои: если какой-либо поток завершается ошибкой, главный поток останавливает процесс и повторно запускает задание Distcp. После завершения функции листинга и записи всех элементов очереди в sequence file она обновляет статус задания с помощью Status Updater.

Использование 6 потоков позволило нам добиться снижения p99 среднего времени листинга Distcp на 60% и уменьшить максимальную задержку на 75% по всем серверам HiveSync.

Рисунок 11: Улучшение задержки Copy Listing на сервере HiveSync при использовании 6 потоков.
Рисунок 11: Улучшение задержки Copy Listing на сервере HiveSync при использовании 6 потоков.

Параллелизация задачи Copy Committer


После того как задачи Distcp Copy Mapper завершают копирование файловых сплитов из источника в целевой каталог, задача Copy Committer в AM объединяет эти сплиты в полноценные файлы. Для каталогов, содержащих более 500 000 файлов, этот процесс может занимать до 30 минут. В версии с открытым исходным кодом фрагменты файлов объединяются последовательно, что приводит к более низкой производительности.

Чтобы решить эту проблему, мы распараллелили процесс конкатенации файлов: каждый поток отвечает за объединение одного файла за раз. Sequence file, созданный на этапе Copy Listing, используется для определения порядка блоков отдельных файлов, которые необходимо объединить в месте назначения.

Рисунок 12: Рабочий процесс задачи Copy Committer V2.
Рисунок 12: Рабочий процесс задачи Copy Committer V2.

На рисунке 12 мапперы извлекают файловые сплиты, созданные на этапе Copy Listing, из Sequence File и копируют их в целевой каталог /dest/. Каждый поток конкатенации (Concatenator) собирает сплиты для конкретного файла и объединяет их, формируя итоговый файл. Три сплита файла 1 (/dest/file_part0, /dest/file_part1 и /dest/file_part2) объединяются, чтобы получить /dest/file1 в месте назначения. Аналогично — для файла 2 и файла 3. Для быстрого реагирования на сбои: если какой-либо поток сталкивается с проблемой, главный поток останавливает процесс и повторно запускает задание Distcp.

Рисунок 13: Средняя задержка конкатенации снизилась на 97,29% при использовании 10 потоков.
Рисунок 13: Средняя задержка конкатенации снизилась на 97,29% при использовании 10 потоков.

«Уберизация» однопроцессных заданий: улучшение использования YARN

Около 52% заданий Distcp, отправляемых серверами HiveSync, требуют всего одного маппера, чтобы скопировать менее 512 МБ и менее 200 файлов. Хотя такие небольшие задания завершаются быстро, значительная часть времени уходит на подготовку окружения (выделение нового контейнера в YARN и время запуска JVM), а не на само копирование.

Рисунок 14: Более 50% заданий Distcp получают по одному мапперу каждое.
Рисунок 14: Более 50% заданий Distcp получают по одному мапперу каждое.

Чтобы устранить эти накладные расходы, мы задействовали возможность Hadoop под названием «Uber jobs», которая устраняет необходимость выделять и запускать задачи в отдельных контейнерах. Вместо этого задачи Copy Mapper выполняются непосредственно в JVM Application Master, что снижает лишние затраты на выделение контейнеров.

Рисунок 15: Рабочий процесс Uber job
Рисунок 15: Рабочий процесс Uber job

На рисунке 15 показано, как AM определяет, соответствует ли задание критериям Uber job. Если соответствует, задача Copy Mapper выполняется локально внутри JVM Application Master. В противном случае AM запрашивает контейнер через Node Manager и запускает задачу Copy Mapper там. После завершения задачи AM инициирует задачу Copy Committer для объединения файловых сплитов в месте назначения.

Мы включили Uber jobs со следующими настройками:

mapreduce.job.ubertask.enable: true
mapreduce.job.ubertask.maxmaps: 1 (гарантирует использование только 1 маппера)
mapreduce.job.ubertask.maxbytes: 512 MB (ограничивает объем копирования данными до 512 МБ)

Реализовав этот подход, мы устранили необходимость примерно в 268 000 ежедневных запусков однопроцессорных контейнеров, что существенно улучшило использование ресурсов YARN и эффективность выполнения заданий.

Существенный пятикратный рост пропускной способности инкрементальной репликации данных

Улучшения, которые мы внедрили в инструмент Distcp в Uber, значительно усилили наши возможности по инкрементальной репликации данных как в локальных, так и в облачных дата-центрах. Благодаря этим изменениям всего за один год мы увеличили пропускную способность обработки данных в локальной инфраструктуре в 5 раз, не столкнувшись ни с одним инцидентом, связанным с масштабированием.

Рисунок 16: Масштаб HiveSync в локальных и облачных дата-центрах.
Рисунок 16: Масштаб HiveSync в локальных и облачных дата-центрах.

Бесшовная массовая миграция данных из локальной инфраструктуры в облако
В последние месяцы мы расширили возможности HiveSync, чтобы поддержать репликацию локального озера данных в облачное озеро данных, как отмечено здесь. Улучшения Distcp сыграли ключевую роль в работе с масштабом этой миграции. На сегодняшний день мы успешно перенесли в облако более 306 ПБ данных.

Рисунок 17: Данные, мигрированные из локальной инфраструктуры в облако с помощью сервиса HiveSync
Рисунок 17: Данные, мигрированные из локальной инфраструктуры в облако с помощью сервиса HiveSync

Улучшенная наблюдаемость

Мы внедрили несколько ключевых метрик, которые существенно повысили наблюдаемость. Эти метрики дали представление о времени отправки заданий Distcp как на стороне клиента, так и на стороне Yarn AM, о темпах отправки заданий, а также о производительности ключевых компонентов Distcp, таких как задачи Copy Listing и Copy Committer. Мы также отслеживали такие показатели, как максимальное использование heap-памяти контейнерами Hadoop, p99 скорости копирования Distcp на одно задание и общую скорость копирования. Повышенная прозрачность позволила нам лучше контролировать и понимать скорость репликации сервиса и сыграла ключевую роль в предотвращении и диагностике нескольких инцидентов.

Проблемы

В ходе вывода изменений на промышленные серверы мы столкнулись с рядом трудностей. Одна из них — исключения OOM (out of memory) в AM. Тщательное стресс-тестирование помогло определить оптимальные параметры использования памяти и ядер. Мы добавили метрики для выявления OOM-проблем, что впоследствии помогло подобрать оптимальные конфигурации ресурсов YARN для запросов на копирование, требовательных к памяти.

Еще одной проблемой стала высокая частота отправки заданий из HiveSync. Снижение задержки отправки увеличило скорость подачи заданий, что нередко приводило к ошибкам «Yarn Queue Full». Чтобы не перегружать YARN, мы внедрили в HiveSync circuit breaker, который временно приостанавливает новые отправки до тех пор, пока повторные попытки не начнут проходить успешно. Мы добавили метрики для фиксации таких событий, что позволило в режиме реального времени мониторить ситуацию и при необходимости корректировать емкость очереди YARN. Управление высокими скоростями копирования, хотя и было эффективным, приводило к высокой загрузке сетевой полосы и требовало тонкой настройки, чтобы сбалансировать производительность с ресурсными ограничениями.

Мы также сталкивались со сбоями AM из-за длительно выполняющейся задачи Copy Listing. Изначально части Copy Listing и Input Splitting были перенесены в фазу setup в AM. Это создавало проблемы, потому что RM ожидает, что AM будет регулярно отправлять heartbeat-сигналы. Поскольку отправка heartbeat запускается только после завершения setup, а задача Copy Listing иногда занимает более 10 минут, это приводило к тайм-аутам. Чтобы исправить ситуацию, задачу Copy Listing перенесли в setup выходного коммиттера (output committer), который выполняется уже после запуска отправителя heartbeat, что предотвратило тайм-ауты.

Заключение

В перспективе команда сосредоточится на ряде улучшений, связанных с параллелизацией, более эффективным использованием ресурсов и управлением сетью. В их числе:

  • Параллелизация выставления прав доступа к файлам

  • Параллелизация input splitting

  • Перенос вычислительно емких задач коммита в фазу Reduce для повышения масштабируемости

  • Реализация динамического ограничителя пропускной способности

Кроме того, мы планируем внести для этих оптимизаций патч в open source. Команда Uber HiveSync продолжает фокусироваться на решении задач репликации данных, где даже небольшие улучшения на нашем масштабе способны дать значительный эффект.


Уже сейчас OpenIDE позволяет разрабатывать проекты на Java, Spring, Python, Go, JavaScript и TypeScript! А поддержка Docker и 300+ плагинов доступны абсолютно бесплатно в маркетплейсе. Пробуйте российскую IDE в деле и подписывайтесь на нас в Telegram, чтобы не пропустить свежие обновления и полезные материалы.