company_banner

К порядку: правила создания конвейеров обработки данных

    К 2020 году вы не могли не заметить, что миром правят данные. И, как только речь заходит о работе с ощутимыми объёмами, появляется необходимость в сложном многоэтапном конвейере обработки данных

    Сам по себе конвейер обработки данных — это комплект преобразований, которые требуется провести над входными данными. Сложен он, например, потому, что информация всегда поступает на вход конвейера в непроверенном и неструктурированном виде. А потребители хотят видеть её в лёгкой для понимания форме.

    В наших приложениях Badoo и Bumble конвейеры принимают информацию из самых разных источников: генерируемых пользователями событий, баз данных и внешних систем. Естественно, без тщательного обслуживания конвейеры становятся хрупкими: выходят из строя, требуют ручного исправления данных или непрерывного наблюдения.

    Я поделюсь несколькими простыми правилами, которые помогают нам в работе с преобразованием данных и, надеюсь, помогут и вам. 

    Правило наименьшего шага

    Первое правило сформулировать легко: каждое отдельное взятое преобразование должно быть как можно проще и меньше.  

    Допустим, данные поступают на машину с POSIX-совместимой операционной системой. Каждая единица данных — это JSON-объект, и эти объекты собираются в большие файлы-пакеты, содержащие по одному JSON-объекту на строку. Пускай каждый такой пакет весит около 10 Гб. 

    Над пакетом надо произвести три преобразования: 

    1. Проверить ключи и значения каждого объекта. 

    2. Применить к каждому объекту первую трансформацию (скажем, изменить схему объекта). 

    3. Применить вторую трансформацию (внести новые данные).

    Совершенно естественно всё это делать с помощью единственного скрипта на Python:

    python transform.py < /input/batch.json > /output/batch.json

    Блок-схема такого конвейера не выглядит сложной:

    Проверка объектов в transform.py занимает около 10% времени, первое преобразование — 70%, на остальное уходит 20% времени. 

    Теперь представим, что ваш стартап вырос и вам уже приходится обрабатывать сотни, а то и тысячи пакетов. И тут вы обнаружили, что в финальный этап логики обработки данных (занимающий 20% времени) закралась ошибка, — и вам нужно всё выполнить заново.

    В такой ситуации рекомендуется собирать конвейеры из как можно более мелких этапов:

    python validate.py < /input/batch.json > /tmp/validated.json
    python transform1.py < /tmp/validated.json > /tmp/transformed1.json
    python transform2.py < /tmp/transformed1.json > /output/batch.json

    Блок-схема превращается в симпатичный паровозик:

    Выгоды очевидны:

    • конкретные преобразования проще понять;

    • каждый этап можно протестировать отдельно;

    • промежуточные результаты отлично кешируются;

    • систему легко дополнить механизмами обработки ошибок;

    • преобразования можно использовать и в других конвейерах.

    Правило атомарности

    К правилу наименьшего шага прилагается второе — правило атомарности. Оно звучит так: каждый шаг-преобразование либо должен случиться, либо нет. Никаких промежуточных состояний данных быть не должно.

    Давайте вернёмся к первому примеру. Есть входные данные, над которыми мы проводим преобразование:

    python transform.py < /input/batch.json > /output/batch.json

    Что будет, если в процессе работы скрипт упадёт? Выходной файл будет повреждён. Или, что ещё хуже, данные окажутся преобразованы лишь частично, а следующие этапы конвейера об этом не узнают. Тогда на выходе вы получите лишь частичные данные. Это плохо.

    В идеале данные должны быть в одном из двух состояний: готовые к преобразованию или уже преобразованные. Это называется атомарностью: данные либо переходят в следующее правильное состояние, либо нет:

    Если какие-то этапы конвейера расположены в транзакционной базе данных, то атомарность легко достигается использованием транзакций. Если вы можете использовать такую базу данных, то не пренебрегайте этой возможностью.

    В POSIX-совместимых файловых системах всегда есть атомарные операции (скажем, mv или ln), с помощью которых можно имитировать транзакции: 

    python transform.py < /input/batch.json > /output/batch.json.tmp
    mv /output/batch.json.tmp /output/batch.json

    В этом примере испорченные промежуточные данные окажутся в файле *.tmp, который можно изучить позднее при проведении отладки или просто удалить. 

    Обратите внимание, как хорошо это правило сочетается с правилом наименьшего шага, ведь маленькие этапы гораздо легче сделать атомарными.

    Правило идемпотентности 

    В императивном программировании подпрограмма с побочными эффектами является идемпотентной, если состояние системы не меняется после одного или нескольких вызовов.

    Википедия

    Наше третье правило более тонкое: применение преобразования к одним и тем же данным один или несколько раз должно давать одинаковый результат. 

    Повторюсь: если вы дважды прогоните пакет через какой-то этап, результаты должны быть одинаковы. Если прогоните десять раз, результаты тоже не должны различаться. Давайте скорректируем наш пример, чтобы проиллюстрировать эту идею:

    python transform.py < /input/batch.json > /output/batch1.json
    python transform.py < /input/batch.json > /output/batch2.json
    diff /input/batch1.json /output/batch2.json
    # файлы те же
    python transform.py < /input/batch.json > /output/batch3.json
    diff /input/batch2.json /output/batch3.json
    # никаких изменений

    На входе у нас /input/batch.json, а на выходе — /output/batch.json. И вне зависимости от того, сколько раз мы применим преобразование, мы должны получить одни и те же данные:

    Так что если только transform.py не зависит от каких-то неявных входных данных, этап transform.py является идемпотентным (своего рода перезапускаемым). 

    Обратите внимание, что неявные входные данные могут проявиться самым неожиданным образом. Если вы слышали про детерминированную компиляцию, то главные подозреваемые вам известны: временные метки, пути в файловой системе и другие разновидности скрытого глобального состояния.

    Чем важна идемпотентность? В первую очередь это свойство упрощает обслуживание конвейера. Оно позволяет легко перезагружать подмножества данных после изменений в transform.py или входных данных в /input/batch.json. Информация будет идти по тем же маршрутам, попадёт в те же таблицы базы данных, окажется в тех же файлах и т. д.

    Но помните, что некоторые этапы в конвейерах по определению не могут быть идемпотентными. Например, очистка внешнего буфера. Однако, конечно же, подобные процедуры всё равно должны оставаться маленькими™ и атомарными™. 

    Правило избыточности

    Четвёртое правило: насколько возможно откладывайте удаление промежуточных данных. Зачастую это подразумевает использование дешёвого, медленного, но ёмкого хранилища для входных данных:

     

    Пример: 

    python transform1.py < /input/batch.json > /tmp/batch-1.json
    python transform2.py < /tmp/batch-1.json > /tmp/batch-2.json
    python transform3.py < /tmp/batch-2.json > /tmp/batch-3.json
    cp /tmp/batch-3.json /output/batch.json.tmp # не атомарно!
    mv /output/batch.json.tmp /output/batch.json # атомарно

    Сохраняйте сырые (input/batch.json) и промежуточные (/tmp/batch-1.json, /tmp/batch-2.json, /tmp/batch-3.json) данные как можно дольше — по меньшей мере до завершения цикла работы конвейера. 

    Вы скажете мне спасибо, когда аналитики решат поменять алгоритм вычисления какой-то метрики в transform3.py и вам придётся исправлять данные за несколько месяцев. 

    Другими словами: избыточность избыточных данных — ваш лучший избыточный друг.

    Заключение

    Давайте подведём итоги:

    • разбивайте конвейер на изолированные маленькие этапы;

    • стремитесь делать этапы атомарными и идемпотентными;

    • сохраняйте избыточность данных (в разумных пределах). 

    Так обрабатываем данные и мы в Badoo и Bumble: они приходят через сотни тщательно подготовленных этапов преобразований, 99% из которых атомарные, небольшие и идемпотентные. Мы можем позволить себе изрядную избыточность, поэтому держим данные в больших холодном и горячем хранилищах, а между отдельными ключевыми преобразованиями имеем и сверхгорячий промежуточный кеш.

    Оглядываясь назад, могу сказать, что эти правила выглядят очевидными. Возможно, вы даже интуитивно уже следуете им. Но понимание лежащих в их основе причин помогает видеть границы применимости этих правил и выходить за них при необходимости.

    А у вас есть свои правила обработки данных?

    Badoo
    Big Dating

    Комментарии 6

      +1
      Спасибо, очень кстати!
        0

        А потом начинается хайлоад и выясняется что терабайты данных в день дорого по 10 раз записывать в промежуточные хранилища. И сеть и хранилища в никуда уходят и стоят много денег.


        И начинается слияние всех маленьких этапов обработки в один большой.

          0

          Я точно не знаю, что означает термин "хайлоад", но событий в Badoo/Bumble для хранения поступает несколько миллионов в секунду. Это именно что терабайты в день.


          Обычно такой объем данных означает, что фирма уже может себе позволить некоторое резервирование данных.

            0
            Допустим 100 терабайт в день. Если у нас в конвеере 10 стадий это хранение и передача по сети лишнего петабайта. Петабайт это дорого. Позволить можно, но оптимизация в этом месте просто напрашивается. Эффект в деньгах будет очень заметный.

            Можно посчитать для любых цифр. Почем оно выходит в месяц.
            Гигабайты в день точно нет.
            100 терабайт в день точно да.
            1 терабайт — вопрос.
              0

              Никто никуда петабайты не двигает :-)


              Суточные данные приходят и обрабатываются большими пакетами. Каждый пакет оказывается в 3-5 хранилищах. В разных хранилищах этот пакет занимает разный объем места. Сырые и неэффективно сжатые данные перепаковываются под эффективные колоночные форматы, и дальше по конвейеру занимают уже меньше места.


              Опять же, у разных данных разная политика удаления. Какие-то данные сохраняются до конца обработки пакета — минуты, другие защищаются в конце дня и т.д.


              И, кстати, именно место на медленных дисках в наши дни необычайно дёшево, дороже стали лишние вычисления.

                0
                Я про поток данных. Сотня терабайт это в сумме за сутки. Без резервирования, это чистый входящий поток в каком-нибудь джейсоне или что там у вас.
                Понятно что он идет пакетами, но это поток и пакеты сравнительно небольшие.

                Далеко не все эффективно жмется. Раза в 2 конечно. Но это как раз покрывает необходимость записи и хранения двух копий данных. То на то и выходит.

                Хранить на обычных дисках дешево. А вот писать и читать уже проблема. Иопсов мало. В них все и упирается как правило в сценарии поэтапной обработки и быстрых оперативных хранилищ. ssd еще и дешевле могут выйти, за счет уменьшения количества необходимых дисков.

                дороже стали лишние вычисления.

                При красивом слиянии всех этапов обработки вычислений особо больше не становится. Делаем то тоже самое. С чего бы там сильно больше ЦПУ тратить?
                Некий оверхед есть, но он вполне покрывается за счет уменьшения ожидания io, все уже в памяти.
                А оперативку так и вообще сэкономить можно. Когда все в одном месте в сумме выходит меньше копий одних и тех данных в памяти на весь конвеер. Тут уже думать надо, в среднем получается.

                Опять же, у разных данных разная политика удаления. Какие-то данные сохраняются до конца обработки пакета — минуты, другие защищаются в конце дня и т.д.

                Запас минимум на день в любом случае нужен во всех местах.
                Факапы бывают. Надо иметь возмжность сохранить данные и не останавливать еще работающую обработку.
                Можно смело и делать хранение минимум 24 часа везде, будет гарантия что места и производительности хранилищ хватит на любой типовой факап.

        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

        Самое читаемое