Google Photon. Обработка данных со скоростью света*

    Photon – масштабируемая, отказоустойчивая и географически распределенная система обработки потоковых данных в режиме реального времени. Система является внутренним продуктом Google и используется в Google Advertising System. Research paper [5], описывающие базовые принципы и архитектуру Photon, был представлен на научной конференции ACM SIGMOD в 2013 году.

    В paper [5] заявлено, что пиковая нагрузка на систему может составлять миллионы событий в минуту со средней end-to-end задержкой менее 10 секунд.
    * 'Скорость света' в заголовке — наглая ложь гипербола.



    Photon решает вполне конкретную задачу: необходимо соединить (выполнить операцию join) два непрерывных потока данных в режиме реального времени. Так в упоминаемой уже Google Advertising System один из этих потоков – поток поисковых запросов, другой – поток переходов по рекламным объявлениям.

    Photon является географически распределенной системой и автоматически способен обрабатывать случаи деградации инфраструктуры, в т.ч. отказа дата-центра. В геораспределенных системах крайне непросто гарантировать время доставки сообщений (в первую очередь, из-за сетевых задержек), поэтому Photon допускает, что обрабатываемые потоковые данные могут быть не упорядочены по времени.

    Используемые сервисы: Google File System, PaxosDB, TrueTime.

    Базовые принципы


    В [5] объяснение принципов работы Photon идет в следующем контексте: пользователь ввел поисковый запрос (query) в момент времени t1 и перешел по рекламному объявлению (click) в момент времени t2. В этом же контексте, если не задано иного, в этой статье будут объяснены принципы работы Photon.

    Принцип объединения потоков (join) взят из мира РСУБД: поток query имеет уникальный идентификатор query_id (условно Primary Key), поток click имеет уникальный идентификатор click_id и включает в себя некоторый query_id (условно Foreign Key). Объединение потоков происходит по query_id.

    Следующий важный момент: ситуация, когда один click event посчитан дважды, либо, наоборот, не посчитан, будет вести, соответственно, либо к судебным искам со стороны рекламодателей, либо к упущенным выгодам со стороны Google. Отсюда, крайне важно обеспечить at-most-once семантику обработки событий.

    Другое требование – обеспечить near-exact семантику, т.е. чтобы большая часть событий была посчитана в режиме близкому real-time. События, не посчитанные в real-time, все равно должны быть посчитаны — exactly-once семантика.

    Кроме того, для экземпляров Photon, работающих в разных дата-центрах, необходимо синхронизированное состояние (точнее только critical state, так как между ДЦ весь state слишком «дорого» реплицировать). Таким синхронизируемым critical state выбрали event_id (по сути, click_id). Critical state храниться в структуре IdRegistry – in-memory key-value хранилище, построенное на основе PaxosDB.

    Последний – PaxosDB – реализует алгоритм Paxos для поддержки отказоустойчивости и согласованности данных.

    Взаимодействие с клиентами


    Worker-узлы взаимодействуют с IdRegistry по клиент-серверной модели. Архитектурно взаимодействие Worker-узлов с IdRegistry – это сетевое взаимодействие с очередью асинхронных сообщений.

    Так клиенты – Worker-узлы — отправляют к IdRegistry только 1) запрос на поиск event_id (если event_id найден, значит он уже был обработан) и 2) запрос на вставку event_id (для случая, если на шаге 1 event_id не был найден). На стороне сервера запросы принимают RPC-обработчики, целью которых поставить запрос в очередь. Из очереди запросы забирает специальный процесс Registry Thread (синглтон), который и выполнит запись в PaxosDB и инициализирует обратный вызов (callback) клиенту.


    Источник иллюстрации [5, Figure 3]

    Масштабируемость


    Т.к. реплика IdRegistry происходит между географическим регионами, сетевые задержки между которыми могут достигать 100 мс [5], то это автоматически ограничивает пропускную способность IdRegistry до десяти последовательных транзакций (event_id commits) в секунду, в то время как требование к IdRegistry было равно 10K транзакций в секунду. Но и отказаться от геораспределенности и/или от синхронно репликации critical state с поддержкой решений конфликтов в кворуме также нельзя.

    Тогда инженеры Google внедрили еще 2 практики, знакомые многим из мира СУБД:
    • пакетная отправка запросов (batching) – «полезная» информация по event_id занимает менее 100 байт; запросы отправляются пакетами на IdRegistry Client. Там они попадают in-memory в очередь, которую разбирает процесс Registry Thread, в обязанности которого входит решение конфликтов, связанные с тем, что в очереди может быть более одного элемента с одинаковым event_id.
    • timestamp-based sharding (+ динамический resharding) – все event_id делятся по диапазонам; транзакции по каждому из диапазонов отправляются на определенный IdRegistry.

    Пакетная отправка запросов имеет и обратную сторону: кроме смешения семантики (Photon обрабатывает данные real-time, а некоторые его части работают в batching-режиме), batching-сценарий не подойдет для систем c небольшим количеством событий – время сбора полного пакета может занимать существенный интервал времени.

    Компоненты


    В рамках одного ДЦ выделают следующие компоненты:
    • EventStore – обеспечивает эффективный поиск по queries (поток поисковых запросов в поисковой системе);
    • Dispatcher – чтение потока кликов по рекламным объявлениям (clicks) и передача (feed) прочитанного Joiner;
    • Joiner – stateless RPC-сервер, принимающий запросы от Dispatcher, обрабатывающий их и соединяющий (join) потоки queries и clicks.

    Алгоритм добавления записи представлен ниже:

    Источник иллюстрации [5, Figure 5]

    Взаимодействие между ДЦ:

    Источник иллюстрации [5, Figure 6]

    Алгоритм добавления записи в Joined Click Logs опустим, отметив, что в работы систем с частым сетевым взаимодействием применение retry-политик и асинхронных вызовов является крайне эффективным способом увеличения надежности и масштабируемости системы, соответственно, без усложнения общего алгоритма работы.

    Этими же приемами – retry-политик и асинхронных вызов – и воспользовались создатели Photon.

    Логика повтора запросов
    Как уже ранее упоминалось, ситуация, когда click_id поступил на обработку, а ассоциированный с ним query_id нет – не исключение. Все из-за того, что не обязательно поток поисковых запросов обработается к тому моменту, кода начнет обрабатываться поток кликов по контекстной рекламе.

    Для надежного обеспечения at-least-once семантики обработки всех click_id была введена логика, по которой для случая, описанного выше, применяется логика повторения. Для избегания троллинга (throttling) системы самой собой время между неудачными запросами увеличивается по экспоненте – exponential backoff algorithm. После некоторого количества неудачных запросов или по прошествии определенного времени click помечается как «unjoinable».


    Dispatcher


    Dispatcher – процесс, ответственный за чтение логов кликов — clicks. Эти логи хранятся в GFS и растут во времени непрерывно.

    Для того, чтобы эффективно их читать, Dispatcher периодически сканирует директорию с логами и идентифицирует новые файлы и/или измененные, сохраняет состояние каждого файла в локальной GFS-ячейке. Это состояние содержит список файлов и сдвиг от начала файла для данных, которые уже были обработаны. Таким образом при изменении файла, последний вычитывается не с начала, а с того момента, на котором обработка закончилась в прошлое чтение.

    Обработка новых данных осуществляется параллельно несколькими процессами, каждый из которых расшаривает свое состояние, что позволяет различным процессам бесконфликтно работать на разными частями одного и того же файла.

    Joiner


    Joiner – реализация stateless RPC-сервера, принимающего запросы от Dispatcher. Приняв запрос от Dispatcher, Joiner извлекает из него click_id и query_id. После чего по query_id пытается получить информацию из EventStore.

    В случае успеха, EventStore возвращает поисковый запрос соответствующий обрабатываемому click.

    Далее Joiner удаляет дубликаты (с помощью IdRegistry) и генерирует выходной лог, содержащий объединенные (joined) значения – Joined Click Logs.

    Если Dispatcher для обработки отказов использовал retry-логику, то в Joiner инженеры Google добавили еще один прием. Прием работает в случаях, когда Joiner отправил запрос к IdRegistry; последний успешно зарегистрировал click_id, но из-за сетевых проблем, либо по таймауту Joiner так и не получил ответ об успехе от IdRegistry.

    Для этого с каждым «commit click_id»-запросом, который Joiner отправляет на IdRegistry, ассоциируется специальный токен. Токен сохраняется в IdRegistry. В случае, если ответ от IdRegistry не был получен, Joiner повторяет запрос с тем же токеном, что и в прошлом запросе, и IdRegistry без труда «понимает», что пришедший запрос уже обрабатывался.

    Генерация уникальных токенов / Event_Id
    Еще одним интересным приемом, который следует отметить, является способом генерации уникальных event_id.

    Ясно, что гарантированная уникальность для event_id крайне важное требование для работы Photon. В то же время, алгоритм генерации уникального в рамках нескольких ДЦ значения может занять крайне значительное время и количество CPU-ресурсов.

    Инженеры Google нашли элегантное решение: event_id можно уникально идентифицировать используя IP узла (ServerIP), Id процесса (ProcessId) и временную метку (Timestamp) узла, на котором данное событие было сгенерировано.

    Как и в случае со Spanner, для минимизации несогласованности временных меток на различных узлах, используется TrueTime API.


    EventStore


    EventStore – это сервис, принимающий на вход query_id и возвращающий соответствующий query (информацию о поисковом запросе).

    В Photon для EventStore имеются 2 реализации:
    1. CacheEventStore – распределенное [sharding по hash(query_id)] in-memory хранилище, к котором хранится полная информация по query. Таким образом, для ответа на запрос не требуется чтение с диска.
    2. LogsEventStore — key-value хранилище, где key – query_id, а value – имя log-файла, в котором хранится информацию по соответствующему query, и смещение (byte offset) в этом файле.

    Так как Photon работает в режиме близком к реальному времени, то можно с уверенностью гарантировать, что вероятность нахождения query в CacheEventStore (при условии, что в query в него попадают с минимальной задержкой) будет очень высокой, а сам CacheEventStore может хранить события за относительно небольшой промежуток времени.

    В researching paper [5] приводится статистика, что только 10% запросов «проходят мимо» in-memory кэша и, соответственно, обрабатываются LogsEventStore.

    Результаты


    Конфигурация


    На момент публикации [5], т.е. в 2013 году, реплики IdRegistry развернуты в 5-ти датацентрах в 3-ех географических регионах (восточное, западное побережье и Mid-West Северной Америки), причем сетевые задержки между регионами превышают 100 мс. Другие компоненты Photon – Dispatchers, Joiners, etc. – развернуты в 2-ух географических регионах на западном и восточном побережье США.

    В каждом из ДЦ количество IdRegistry-шардов превышает сотню, а количество экземпляров процессов Dispatcher и Joiner превышает тысячи.

    Производительность


    Photon обрабатывает миллиарды joined-событий в день, в том числе, в периоды пиковых нагрузок миллионы событий в минуту. Объем clicks-логов, обрабатываемых за 24 часа, превышает терабайт, а объем суточных query-логов исчисляется десятками терабайт.

    90% всех событий обрабатываются (join'ятся в один stream) в первые 7 секунд, после их появления.


    Источник иллюстрации [5, Figure 7]. Больше графиков со статистикой (слайды 24-30).

    Простые принципы сложных систем


    В разделе «Базовые принципы» я уже упоминал, что Photon является системой с поддержкой exactly-once (at-least-once и at-most-once) и near-exact семантики, т.е. гарантирует, что любое событие, зафиксированное в логах, будет обработано один и только один раз, причем с большой вероятностью в режиме близком к реальному времени.

    PaxosDB реализует at-most-once семантику, в то время как retry-политики Dispatcher обеспечивают at-least-once семантику.
    Для обработки событий в near-real-time режиме (near-exact семантика) в архитектуре Photon заложены следующие принципы:
    • Масштабируемость:
      • Обязательный sharding для нереляционных хранилищ;
      • Все worker-узлы является stateless.
    • Задержка (latency):
      • RPC-коммуникации везде, где это возможно;
      • Перенос (transfer) данных в RAM везде, где это возможно.

    В заключении


    В заключении research paper [5], инженеры Google поделились хорошими практиками и своим планами на будущее.

    Принципы не новы, но для полноты и законченности статьи, я их перечислю:
    • Используйте RPC-коммуникации вместо записи на диск. Запросы, выходящие за физические границы узла, должны выполняться асинхронно, а клиент всегда должен рассчитывать, что не получит ответ по таймауту или из-за сетевых проблем.
    • Минимизируйте критическое состояние (critical state) системы, т.к. его, в общем случае, приходится синхронно реплицировать. В идеале в критическое critical state системы должен включать в себя только метаданные системы.
    • Sharding – друг масштабируемости :) Но и эту идею инженеры Google улучшили, сделав time-based sharding.

    В планах создателей Photon захватить мир уменьшить end-to-end задержки за счет того, что сервера, которые генерируют потоки clicks и queries, будут напрямую отправлять RPC-запросы к Joiner'ам (сейчас Dispatcher «ждет» этих событий). Также планируется Photon «научить» объединять несколько потоков данных (в текущей реализации Photon умеет объединять только 2 потока).

    Пожелаем создателям Photon удачи в реализации их планов! И ждем новых research paper!

    Список источников**


    [5] Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, Ashish Gupta, Haifeng Jiang, Tianhao Qiu, et al. Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams, 2013.
    ** Полный список источников, используемый для подготовки цикла.

    История изменений поста
    Chanset 01 [27.12.2013].
    Изменил иллюстрацию статьи. За новую иллюстрацию спасибо пользователю TheRaven


    Offtopic


    Это заключительная статья цикла статей о платформе Google.

    Google Platform Timeline

    Всех с наступающим Новым Годом! Удачи и упорства!

    Дмитрий Петухов,
    MCP, PhD Student, IT-зомби,
    человек с кофеином вместо эритроцитов.
    Поделиться публикацией

    Комментарии 14

      +3
      Да что такое, опять дежавю…
        +4
        Где? Cлова другие, картинки — тоже.)
          –5
          Уже хочется добавить ваши посты в игнор из-за повторяющейся картинки. И читать не хочется совсем. Напрягите фантазию, включите гугл.
            +4
            В этих постах картинки не играют основную роль
              0
              Это понятно, но первая реакция на вид статьи — вызывает отторжение.
              Встречают по одёждке.
              У меня по этой причине программируемый пульт не обновляется который год, хотя давно все поменялось. Как вспомню софт, с которым придется работать, чтобы его перешить, так вздрогну и откладываю пульт в ящик. Опенсорсных вариантов для него нет. Форум забит фразами типа — ваш софт гавно, откройте протоколы, мы сами всё напишем. Но Logitech предпочитает держать китайскую армию в сапорте, чтобы разруливать проблемы вручную. Только подумай — вручную каждому второму покупателю сранного пульта, что-нибудь настроить.
              Извиняйте уж за реакцию.
        0
        Не обнаружил существенных отличий от Шторма.
          +2
          Всех интересуют подробности…
            0
            Это неожиданно. Если Хадуп писали по следам публикаций гуглеров, то тут получилось едва ли не наоборот. Марц начал проект в 2010, кажется.
          0
          Эти технологии доступны за пределами Гугла сторонним разработчикам?
            +1
            Из всех описанных в цикле статей систем доступны (на память) сторонним разработчикам только Dremel и Colossus через PaaS-сервисы Google BigQuery и Google Cloud Storage, соответственно.
            0
            В который раз убедился, что Google не делает ничего особенного. Миллион событий в минуту это всего 16K TPS. Как-то это совсем неприлично мало для распределённой системы в 21-м веке. А уж учитывая что в приведённом примере — операция join поисковых запросов с кликами на рекламу — нет ограничения на задержку (latency), становится совсем скучно.
              0
              1) В статье сказано «миллионы».
              2) Что такое TPS?
              3) У кого прилично, много?
              4) Кому надо больше? БАК генерирует 20 млн событий в секунду. Если считать, что за день во всем интернете происходит 1 триллион пользовательских событий, или по 300 на пользователя интернета, это 11,5 млн событий в секунду.
                0
                1) Было бы сказано точнее, посчитал бы точнее. Из моего опыта, если пишут «миллионы», то их там не больше двух.
                2) Transactions per Second
                3) Много у кого. У HFT и вообще в трейдинговых системах, у MMORPG, у учёных (тот же БАК)
                4) Ключевое у вас «пользовательских». В реальности одно «пользовательское событие» порождает множество транзакций в разных системах.

                Но самое главное в моём комментарии было про отсутствие ограничения на задержку (latency). Если допустимая задержка измеряется десятками секунд, то такую систему очень легко проектировать. И скучно.
                  0
                  У HFT тоже речь идет не более чем о 10-20 млн событий в секунду в пиках по всему миру, насколько я представляю. У MMORPG вообще в разы меньше. Ученые круче БАКа вряд ли существуют. Кстати, я тут подумал, зачем БАКу онлайн-обработка? Можно и оффлайном обойтись.

                  К тому же, у HFT и ученых, думаю, очень специализированные решения, а у поисковиков и рекламных систем более обобщенные. И данных меньше: сравните время — id площадки — символ — цена и время — IP — id-кука — URL — id показанного баннера/список id контекстных объявлений — User Agent — операционная система — т. д.

                  Вы так говорите, как будто проектируете по такой системе каждый день перед завтраком :)

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

            Самое читаемое