company_banner

ZooKeeper в качестве системы гарантированной доставки для Яндекс.Почты

    Ежедневно Яндекс обрабатывает входящий поток из миллионов писем и файлов. Их количество постоянно и непрерывно увеличивается. При этом уровень входящей нагрузки неравномерный, он сильно зависит от времени суток и дня недели. Кроме того, периодически случаются ботнет-атаки.

    Задача усложняется тем, что каждое письмо и каждый файл требуют дополнительной обработки: мы классифицируем их, сохраняем, извлекаем текст, распознаем образы, создаем индексы. Благодаря этому в дальнейшем пользователю гораздо проще найти нужные письма или файлы.

    image

    Обработка может занимать разное время: от миллисекунд, требуемых для сохранения письма, до нескольких десятков секунд, затрачиваемых, например, на извлечение текста из фотографий.

    Как бы мы не рассчитывали мощность системы приема и обработки писем и файлов, мы никогда не застрахованы от ситуаций, когда входной поток значительно превышает наши ожидания.

    В этих случаях нам помогают очереди. Они позволяют сгладить всплески входной нагрузки: выиграть время и дождаться, когда кластер обработки писем станет доступен.

    Что такое очередь


    Очередь на обработку документов мало чем отличается от живой очереди в магазине или на почте. Там они тоже возникают по той же причине: обслуживать клиентов мгновенно невозможно. Однако важное преимущество очереди на обработку документов в Персональных сервисах Яндекса заключается в том, что стоят в ней только письма и файлы, пользователю ждать ничего не надо. По сути это буфер, в который можно быстро записать файлы и письма, а когда нагрузка на систему обработки спадет, быстро все это прочитать.

    Как правило с очередью работают две программы: одна записывает в нее данные, другая вычитывает их и отправляет потребителю.

    Программа записи в очередь должна быть максимально простой, чтобы не вносить никаких дополнительных задержек на приеме задачи, мы же не хотим делать еще одну очередь на помещение в основную. И программа чтения из очереди уже взаимодействует с различными функциональными элементами сервисов: Почты/Диска или других продуктов.

    Программ чтения данных может быть несколько, каждая из которых служит для специализированной обработки данных. Программа чтения может работать существенно дольше, чем программа записи, поскольку на нее возложена дополнительная функциональность. Поэтому очередь должна уметь хранить данные за промежуток времени, достаточный для обработки писем/файлов своими источниками.

    В основном очереди используются там, где есть неравномерная входная нагрузка и требуется выполнять операции над данными. В частности, на приеме писем в почту и файлов в диск.

    При использовании очередей важно учитывать такие правила:
    • Очередь нужна там, где есть неравномерная нагрузка на систему обработки.
    • Очередь помогает сгладить неравномерность и служит некоторым буфером между входом и системой обработки.
    • Сиcтема обработки должна справляться со всей входной нагрузкой на каком-то определенном ограниченном временном интервале, иначе никакие очереди не помогут.


    Архитектура системы приема писем в Почту:


    Письма влетают в почту и попадают в кластер Mx-Front, программы которого обрабатывают письма и сохраняют их в почтовые хранилища.

    image

    Если покладка письма прошла неудачно, письмо попадает на кластер Mx-Back, который состоит из очереди на PostFix и ряда программ покладки и анализа писем. Программы кластера Mx-Back делают много попыток положить письмо в хранилища в течение длительного времени, но если и здесь неудача, то письмо считается недоставленным. Если покладка письма удалась, то письмо отправляется на кластер Services, из которого оно уже доставляется в поиск.

    Архитектура системы приема данных в Почтовый офис:


    Поскольку сервис Почтовый офис является частью почты, то и система покладки встроена в почтовую. Отличие состоит лишь в том, что сам сервис не должен хранить сами письма. Сервису требуется лишь мета-информация из писем. А еще Постофис позволяет посмотреть на почту со стороны отправителя, а не со стороны получателя. Поэтому в качестве очереди мы решили попробовать другие решения, отличные от PostFix.

    Архитектура системы приема файлов в Диск:


    Файлы влетают в кладун, который складывает файлы в хранилище (mulca) и отправляет в кластер mpfs. MPFS сохраняет метаинформацию о файлах в хранилище метаданных (mongo DB) и кладет в очередь для доставки файлов в поиск. Далее файлы из очереди попадают в поиск.

    image

    Какие бывают очереди


    PostFix – очередь на входе в Почту в трех местах: mxfront, mxback, services
    Достоинство у этой очереди всего одно, она очень проста в использовании.
    Недостатков гораздо больше:
    • рассчитан только для почтовых задач и работает по протоколу smtp;
    • работает только в пределах одного сервера, значит в случае поломки сервера все данные пропадают и нет синхронизации последовательности между разными серверам;
    • нельзя читать из произвольного места в очереди, возможно только последовательное чтение;
    • возврат задачи в очередь возможен только через начало.

    RabbitMQ – используется для информирования Поиска в почте об удалении писем из почтового ящика. Основное достоинство этой очереди – возможность произвольного чтения данных. К недостаткам можно отнести все недостатки несетевой очереди, перечисленные выше.

    Очередь в Mongo DB – используется в Диске для всех операций с метаданными: копирование, перемещение файлов на Диске и индексация файлов для поиска.
    Достоинства – очередь сетевая. Если выпадает один сервер, данные сохраняются.
    К основным недостаткам относится то, что очередь разгребается разными обработчиками, которые не синхронизированы друг с другом, поэтому не выполняется очередность постановки задач.


    ZooKeeper


    Существующие решения ограничены по решаемым задачам или не дают гарантии доставки получателю. Поэтому мы решили потестировать Apache ZooKeeper – очередь с гарантией доставки. Помимо гарантированной доставки, у него есть еще два важных преимущества:
    • сохранность данных и работоспособность, когда один из серверов очереди недоступен;
    • возможность распределения голов очереди по разным дата-центрам.

    Конечно, есть и недостатки, но они не столь существенны:
    • требуется избыточное количество серверов – минимум три, а лучше пять;
    • требуется время на синхронизацию данных во всех головах очереди.

    Три сервера, необходимых в минимальной конфигурации мы разместили в разных дата-центрах, чтобы быть застрахованными от отключения одного из них целиком. Более того, недоступность одного из дата-центров для компании является типовым режимом работы. Еженедельно проходят так называемые учения, чтобы быть уверенными, что сервисы Яндекса будут продолжать работать, если один из дата-центров вышел из строя.

    Учитывая богатый опыт использования очередей в персональных сервисах Яндекса, заявленные технические характеристики выше перечисленных очередей, а также наши внутренние требования по надежности доставки данных, мы решили рискнуть и попробовать Zookeeper в новом персональном сервисе Яндекса Почтовый офис.

    Zookeeper заводится не с первого раза


    Мы провели множество экспериментов, вот некоторые из них:

    1. Попробовали через ZooKeeper передавать данные в количестве равном числу хранилищ. По умолчанию это два бэкэнда. Получили 8 тысяч RPS (requests per second) на передачу данных в одном экземпляре. И 2,5 тысячи RPS на передачу данных в двух экземплярах.
      Поняли, что нам нужно иметь минимум три бэкэнда – два поиска + логстор, в итоге передавали данные три раза через ZooKeeper и получили 500 RPS – это очень низкая производительность, поскольку сервисов, желающих получить одни и те же данные может быть гораздо больше трех, а значит мы с каждым разом будем терять производительность на копирование данных и протоколы согласования между серверами очереди.
    2. Тщательно рассмотрев все проблемы из первого этапа экспериментов, мы решили использовать ZooKeeper как кольцевой буфер. В этом случае в ZooKeeper данные всегда складываются только один раз, а каждый бэкэнд содержит свою позицию в очереди, до которой он вычитал данные из очереди. В этом случае, если бэкэнд падает, он поднимается и вычитывает из очереди данные начиная с позиции, которая у него сохранена. Такое решение имеет два важных преимущества: во-первых, высокая производительность (8 тысяч RPS) благодаря тому, что данные хранятся всего в одном экземпляре. Во-вторых, неограниченное число бэкэндов, которые пользуются одними и теми же данными. Таким образом были решены проблемы с производительностью очереди.
    3. Но тут мы узнали, что это еще не все. Поскольку ZooKeeper может работать только с данными в памяти, а недоступность одного из серверов хранилища (бэкэнда) может достигать нескольких суток, то для самого ZooKeeper-а требуется хранилище. В качестве этого хранилища стали использовать Apache Lucene.


    То, что у нас получилось, обязательно нужно было протестировать функционально и нагрузочно. Вот как мы это делали…

    Архитектура тестового стенда


    Для того, чтобы качественно провести функциональное и нагрузочное тестирование системы доставки, мы сначала описали кейсы, в которых очередь должна “доставлять”. Все возможные ситуации вытекают из продуктовых требований к надежности системы доставки и архитектурных особенностей построения систем в нашей компании.

    Во-первых, протокол гарантированной доставки требует наличия нечетного количества работающих инстансов системы гарантированной доставки – это минимум три одновременно работающих программы, а лучше пять.

    Во-вторых, в Яндексе является стандартом одновременное размещение любых систем в разных дата-центрах – это позволяет сохранить работоспособность сервиса в случае отключения питания в ДЦ или других аномальных явлениях в данном ДЦ.

    В третьих, мы умышленно усложнили себе жизнь и решили совместить требование системы доставки и требования Яндекса, в результате мы разместили программу системы доставки на трех разных серверах, расположенных в трех разных дата-центрах.

    С одной стороны, выбранное нами решение усложняет общую архитектуру и приводит к различным нежелательным эффектам в виде долгих междатацентровых пингов, с другой стороны, мы получаем систему, защищенную от атомной войны.

    Получилась такая архитектура: прокси, получающий данные, очередь, хранилище.
    Прокси размещены в пяти дата-центрах на десяти серверах. Очередь размещена в трех дата-центрах на трех серверах. В качестве хранилища может быть выбран любой SQL и noSQL storage, а самих хранилищ может быть любое количество. В нашем случае в качестве хранилища мы выбрали noSQL storage на базе Lucene, и разместили его на двух серверах в 2-х разных дата-центрах.

    Функциональное и нагрузочное тестирование


    Перед вводом в эксплуатацию мы провели функциональное и нагрузочное тестирование системы гарантированной доставки.
    Тестирование проводилось последовательно в четырех режимах:

    1. Нормальный режим работы: все сервера доступны. Отправляем данные через очередь с нагрузкой 500 RPS в течение суток, проверяем, что все данные доставляются до бэкэнда.
    2. Недоступен один из серверов очереди. Начинаем отправку данных через очередь. Отключаем одну из голов очереди на час. Продолжаем отправку писем и проверяем, что данные доставляются до бэкэндов.
    3. Недоступен один из серверов бэкэнда. Начинаем отправку данных через очередь. Отключаем один из бэкэндов на час, затем включаем и спустя определенный промежуток времени проверяем, что данные добежали до бэкэнда.
    4. Недоступен один из серверов очереди и один из серверов бэкэнда. Начинаем отправку данных через очередь. Отключаем одну из голов очереди и одну из голов бэкэнда на один час. Отправку данных не останавливаем. Проверяем, что данные доставляются в рабочий бэкэнд. Через час включаем отключенные головы очереди и бэкэнда. Проверяем что после включения данные попадают в оба бэкэнда и что данные в каждом из них одинаковые.


    Внедрение


    По результатам тестирования мы зажмурили глаза и, взяв ответственность на свою голову, решили запускать всю эту систему в продакшен. Как и ожидали, тестирование тестированием, а реальная жизнь создает такие ситуации, которые никогда не предскажешь заранее. Иногда у нас заканчивается пул соединений, иногда мы долго доставляем данные на один из бэкэндов, причем один из бэкэндов почему-то актуальный, а второй отстает.

    Одним словом, в ходе эксплуатации возникают все новые и новые ситуации, которые мы успешно фиксим, и наша очередь, запущенная еще в ноябре, из экспериментальной технологии постепенно превращается в хороший автономный продукт.
    Яндекс
    Как мы делаем Яндекс

    Comments 35

      0
      А вы пробовали потестировать для своих целей hazelcast?
        0
        На hazelcast мы смотрели. На тот момент когда выбирали технологию, hazelcast была просто библиотекой синхронизации запросов (быть может сейчас что-то изменилось). А нам нужна целая система гарантированной доставки, которая требует кучу еще всего: сохранение данных на диск, синхронизация данных с диска, восстановление данных после перезапуска и т.д. и т.п. У нас в соседнем проекте используется hazelcast поверх activemq.
        Но если прочитать все, что мы делали, то боюсь что это тоже не решение из коробки. Его нужно будет тюнить и тюнить.
          0
          Сейчас в библиотеке hazelcast точно есть очередь.
          А какими тулами Вы пользовались при работе с zookeeper?
          К примеру, Вы пробовали использовать curator.apache.org/?
            0
            curator.apache.org мы не использовали, тем более нам существенно пришлось переделать внутренности zookeeper-а, так что вероятно не будет совместимости
              0
              Интересно вы используете Zookeeper. В Сurator есть repices для всяких очередей, но большими буквами написано, что это нужно делать осторожно (не для большого количества сообщений и почему). См ZooKeeper makes a very bad Queue source. Вообще для задач схожих с вашей много раз слышал о Kafka.
        +2
        > затрачиваемых, например, на извлечение текста из фотографий

        Вы распознаете текст на фотографиях пользователей?
          +3
          пытаемся
            0
            а в аудио-файлах текст распознаете?
              0
              В аудио-файлах мы можем извлекать только теги. Одним словом все то, что записано в метаинформации. А сами песни мы не прослушиваем.
              0
              У меня поиск в почте не работает по надписям в граф. файлах прикрепленных к письму.
              Файл — это скриншот экрана, где распознать текст сложностей не представляется.
                +2
                Ну я же не написал что мы распознаем текст,
                написал что пока пытаемся :)
                  0
                  Ясно — спасибо! :)
                    0
                    Когда будет релевантный поиск в почте через веб-клиент?
                    Выдает всё, но не то что ищешь.
                    Постоянно приходится пользоваться аутлуком, чтобы найти что-нибудь в закромах.
              0
              Еженедельно проходят так называемые учения, чтобы быть уверенными, что сервисы Яндекса будут продолжать работать, если один из дата-центров вышел из строя.

              При этом вы действуете максимально мягко (например, заранее приказать GSLB начать игнорировать существование данной площадки, заранее препендами или отключением анонса BGP увести трафик), или просто рубите сплеча, измеряя реальный даунтайм? Что-то подсказывает мне, что используется первый сценарий, но вдруг?

              И есть кое-что хуже полной недоступности ЦОДа: частичная недоступность ЦОДа и/или деградация связи до него или в нем… Вероятность такого события всяко выше, чем полного обесточивания или перерубания всех (или всех кроме одной, которая сразу захлебнется от трафика) магистралей.
                0
                И есть кое-что хуже полной недоступности ЦОДа
                Это точно… ))

                Про наши учения надо как-нибудь поподробнее пост написать. Возьмем на заметку.
                  0
                  С удовольствием почитаю. Ибо еженедельные учения — это очень круто. Если они еще и проводятся по-серьезному…
                +1
                Не пытались использовать Kafka от Linkedin, учитывая, что вы уже используете zookeeper как кольцевой буфер, из которого разные бэкэнды читают одни и те же данные?
                  0
                  Для данной задачи Kafka мы не пытались использовать, но Kafka у нас используется в другом месте. Кстати, интересный вопрос, поизучаю и вероятно напишу насколько Kafka именно сюда может/не может подойти.
                    0
                    Ок, спасибо. Еще один вопрос. Со стороны консьюмеров гарантировать exactly-once обработку сообщений можно, например, хранением текущей позиции в кольцевом буфере вместе с обработанными сообщениями. А как вы гарантируете exactly-once запись сообщений в zookeeper? Гарантируете ли вообще?
                      0
                      exactly-once запись сообщений в zookeeper и гарантируем так:
                      1. перед записью в очередь для сообщения вычисляется хэш и в очередь сообщение записывается вместе с хэшем
                      2. в момент записи сообщения в очередь сначала выполняется поиск по хэшу и если сообщение с таким хэшем уже есть в очереди, то запись сообщения не производится
                      3. если в момент записи сообщения в очередь соединение обрывается, то это сообщение записывается еще раз в случае, если оно не было записано до этого
                        0
                        Понятно, спасибо.
                      0
                      Тут коллеги подсказали, что в последней kafke уже появилась репликация, но с рядом ограничений. Так что если поработать над настройкой Кафки, вероятно спустя время тоже что-то могло получиться
                        0
                        Ага, как раз хотел вам об этом написать. Но в текущей версии не возможно реализовать идемпотентное добавление сообщений в очередь, возможно дублирование. Кажется, они хотят реализовать схему, подобную вашей, в будущих версиях.
                      –1
                      Про Кафку — она не реплицирует данные, поэтому не гарантирует доставку сообщений и поэтому для данной задачи не подходит
                      +2
                      У меня в яндекс-почте накопилось в папочке 969736 писем. Как я не крутился, а удалить такое кол-во писем просто нельзя. Запрос в тех.поддержку пару лет назад тоже не дал результатов. Хотелось бы чтобы такие вещи были организованы несколько лучше…
                        0
                        Дождались бы уж юбилея.
                          0
                          Вам не помогла кнопка «Очистить» в mail.yandex.ru/neo2/#setup/folders?
                            0
                            Нет, не помогла
                              0
                              К сожалению, с операциями над очень большим количеством писем на данный момент бывают проблемы. Мы занимаемся исправлением ситуации, а пока что можем запустить процесс очистки определённой папки со своей стороны. Можно для этого обратиться в службу поддержки, или скажите мне в личку свой логин и название папки.
                          –1
                          > Apache ZooKeeper – очередь с гарантией доставки

                          Вы уверены? В apache так не считают: zookeeper.apache.org

                          > ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

                          Всё же kafka тут лучше подошел бы, мне кажется.
                            0
                            Ну мы вроде бы описали, что Zookeeper сильно видоизменен и используется в особом режиме.
                            а вот походимость kafka с учетом наших ограничений еще нужно доказывать.

                            Расскажите как бы вы настроили kafka, чтобы она гарантированно доставляла в условиях междатацентровой очереди, в условиях еженедельных отключений ДЦ, в условиях огромных объемов данных, при необходимости гарантированной доставки всего и сохранения последовательности сообщений и действий над ними?

                            Дело в том, что zookeeper содержит не только входной набор писем, но и еще для задач Постмастера он получает действия пользователей над письмами: положили в спам, удалили, прочитали и т.д. и т.п.
                            и тут очень важно, чтобы сообщение о приходе письма попало в бекэнд раньше, сообщения о действии пользователя над ним
                              0
                              > Ну мы вроде бы описали, что Zookeeper сильно видоизменен и используется в особом режиме.

                              Это вы потом написали, аж в другом параграфе, отделённом заголовком. Сначала вы написали, что зукипер для очередей, что не совсем правда.

                              У kafka вроде бы всё хорошо с объёмами данных, отключенными нодами, очередностью сообщений и вот этим всем. Если у вас уже есть kafka в другом месте, то накладные расходы на поддержу должны быть меньше, нежели с самописными надстройками над zk.

                              kafka.apache.org/08/ops.html
                              engineering.linkedin.com/kafka/intra-cluster-replication-apache-kafka
                            –1
                            Постарайтесь, пожалуйста, подгонять иллюстрации по размеру. Мегабайтная картинка шириной 640 пикселей это некоторое неуважение к 3G-читателям.
                              +1
                              retina-читатели с вами в корне несогласны
                                0
                                Возможно. А что, у Хабра есть CSS для Ретины?
                                Кроме того, <img width="800"

                            Only users with full accounts can post comments. Log in, please.