ZooKeeper или пишем сервис распределенных блокировок

    disclaimer Так получилось, что последний месяц я разбираюсь с ZooKeeper, и у меня возникло желание систематизировать то, что я узнал, собственно пост об этом, а не о сервисе блокировок, как можно было подумать исходя из названия. Поехали!

    При переходе от многопоточного программирования к программированию распределенных систем многие стандартные техники перестают работать. Одной из таких техник являются блокировки (synchronized), так как область их действия ограничена одним процессом, следовательно, они не только не работают на разных узлах распределенной системы, но так же не между разными экземплярами приложения на одной машине; получается, что нужен отдельный механизм для блокировок.

    От распределенного сервиса блокировок разумно требовать:
    1. работоспособность в условиях моргания сети (первое правило распределенных систем — никому не говорить о распределенных системах сеть ненадежна)
    2. отсутствие единой точки отказа

    Создать подобный сервис нам поможет ZooKeeper

    image В википедии написано, что ZooKeeper — распределенный сервис конфигурирования и синхронизации, не знаю как вам, но мне данное определение мало что раскрывает. Оглядываясь на свой опыт, могу дать альтернативное определение ZooKeeper, это распределенное key/value хранилище со следующими свойствами:
    • пространство ключей образует дерево (иерархию подобную файловой системе)
    • значения могут содержаться в любом узле иерархии, а не только в листьях (как если бы файлы одновременно были бы и каталогами), узел иерархии называется znode
    • между клиентом и сервером двунаправленная связь, следовательно, клиент может подписываться как изменение конкретного значения или части иерархии
    • возможно создать временную пару ключ/значение, которая существует, пока клиент её создавший подключен к кластеру
    • все данные должны помещаться в память
    • устойчивость к смерти некритического кол-ва узлов кластера

    Знакомство с незнакомой системой нужно начинать прежде всего с API, которое она предлагает, итак

    Поддерживаемые операции


    exists проверяет существование znode и возвращает его метаданные
    create создает znode
    delete удаляет znode
    getData получает данные ассоциированные с znode
    setData ассоциирует новые данные с znode
    getChildren получает детей указанного znode
    sync дожидается синхронизации узла кластера, к которому мы подсоединены, и мастера.

    Эти операции можно разделить по следующим группам
    callback CAS
    exists delete
    getData setData
    getChildren create
    sync

    Callback — read-only операции, к которым можно указать коллбеки, коллбек сработает, когда запрашиваемая сущность измениться. Коллбек сработает не более одного раза, в случае, когда нужно постоянно мониторить значение, в обработчике события нужно постоянно переподписываться.

    CAS — write запросы. Проблема конкурентного доступа в ZooKeeper'е решена через compare-and-swap: с каждым znode храниться его версия, при изменении её нужно указывать, если znode уже был изменен, то версия не совпадает и клиент получит соответственное исключение. Операции из этой группы требуют указания версии изменяемого объекта.

    create — создает новый znode (пару ключ/значение) и возвращает ключ. Кажется странным, что возвращается ключ, если он указывается как аргумент, но дело в том, что ZooKeeper'у в качестве ключа можно указать префикс и сказать, что znode последовательный, тогда к префиксу добавиться выровненное число и результат будет использоваться в качестве ключа. Гарантируется, что создавая последовательные znode с одним и тем же префиксом, ключи будут образовывать возрастающую (в лексико-графическом смысле) последовательность.

    Помимо последовательных znode, можно создать эфемерные znode, которые будут удалены, как только клиент их создавший отсоединиться (напоминаю, что соединение между кластером и клиентом в ZooKeeper держится открытым долго). Эфемерные znode не могут иметь детей.

    Znode может одновременно быть и эфемерным, и последовательным.

    sync — синхронизует узел кластера, к которому подсоединен клиент, с мастером. По хорошему не должен вызываться, так как синхронизация происходит быстро и автоматически. О том, когда её вызывать, будет написано ниже.

    На основе последовательных эфемерных znode и подписках на их удаление можно без проблем создать систему распределенных блокировок.

    Система распределенных блокировок


    На самом деле все придумано до нас — идем на сайт ZooKeeper в раздел рецептов и ищем там алгоритм блокировки:

    1. Создаем эфемерный последовательный znode используя в качестве префикса "_locknode_/guid-lock-", где _locknode_ — имя ресурса, который блокируем, а guid — свежесгенерированный гуид
    2. Получаем список детей _locknode_ без подписки на событие
    3. Если созданный на первом шаге znode в ключе имеет минимальный числовой суффикс: выходим из алгоритма — мы захватили ресурс
    4. Иначе сортируем список детей по суффиксу и вызываем exists с коллбеком на znode, который в полученном списке находиться перед тем, что создан нами на шаге 1
    5. Если получили false переходим на шаг 2, иначе ждем события и переходим на шаг 2

    Для проверки усвоения материала попробуйте понять самостоятельно, по убыванию или по возрастанию нужно сортировать список в шаге 4.

    Так как, в случае падения любой операции при работе ZooKeeper мы не можем узнать прошла операция или нет, нам нужно выносить эту проверку на уровень приложения. Guid нужен как раз для этого: зная его и запросив детей, мы можем легко определить создали мы новый узел или нет и операцию стоит повторить.

    Кстати я не говорил, но думаю вы уже догадались, что для вычисления суффикса для последовательного znode используется не уникальная последовательность на префикс, а уникальная последовательность на родителя, в котором будет создан znode.

    WTF


    В теории можно было бы и закончить, но как показала практика, начинается самое интересное — wtf'ки. Под wtf'ами я имею ввиду расхождение моих интуитивных представлений о системе с её реальном поведением, внимание, wtf не несет оценочного суждения, кроме того я прекрасно понимаю, почему создатели ZooKeeper'а пошли на такие архитектурные решения.

    WTF #1 — выворачиваем код на изнанку


    Любой метод API может кинуть checked exception и обязать вас его обработать. Это не привычно, но правильно, так как первое правило распределенных систем — сеть не надежна. Одно из исключений, которое может полететь — пропажа соединения (моргание сети). Не стоит путать пропажу соединения с узлом кластера (CONNECTIONLOSS), при который клиент сам его восстановит с сохраненной сессией и коллбеками (подключится к другому или будет ждать), и принудительное закрытие соединения со стороны кластера и потерей всех коллбеков (SESSIONEXPIRED), в данном случае задача по восстановлению контекста ложится на плечи программиста. Но мы отошли от темы…

    Как обрабатывать подмигивания? На самом деле при открытии соединения с кластером мы указываем коллбек, который вызывается многократно, а не только раз, как остальные, и который доставляет события о потери соединения и его восстановлении. Получается при потери соединения нужно приостановить вычисления и продолжить их, когда придет нужное событие.

    Вам это ничего не напоминает? C одной стороны — события, с другой — необходимость «играть» с потоком выполнения программы, по-моему где-то рядом continuation и монады.

    В общем, я оформил шаги программы в виде:

    public interface Task {
        Task continueWith(Task continuation); // объединяем шаги в цепочку
    
        void run(Executor context, Object arg); // нормальное выполнение 
        void error(Executor context, Exception error); // вместо того, чтобы кидать исключение - передаем его
    }
    

    где Executor

    public interface Executor {
        void execute(Task task, Object arg, Timer timeout); // timeout ограничивает время выполнения таски передавая/кидая TimeoutException в error таски, создавая мягкий real-time
    }
    

    Добавив нужные комбинаторы можно строить следующие программы, где-то используя идемпотентность шагов, где-то явно подчищая мусор:

    image

    • квадрат — полезная операция, а стрелка — поток выполнения и/или поток ошибок
    • ромб — комбинатор, которые игнорирует указанные ошибки и повторяет последнюю полезную операцию
    • сота — комбинатор, которые выполняет операцию A в случае нормального потока выполнения, и операцию B в случае указанной ошибки
    • скругленный параллелепипед — комбинатор который успешный поток выполнения пускает в себя, а ошибочный сразу прокидывает дальше

    Реализовывая Executor, я добавил в него функции обертки над классом ZooKeeper так, что он сам является обработчиком всех событий и сам решает, какой Watcher (обработчик события вызвать). Внутри реализации я поместил три BlockingQueue и три потока, которые их читают, в итоге получилось так, что при приходе события оно добавляется в eventQueue, тем самым поток практически моментально возвращается во внутренности ZooKeeper, кстати, внутри ZooKeeper все Watcher'ы работают в одном потоке, поэтому возможна ситуация, когда обработка одного события блокирует все остальные и сам ZooKeeper. Во вторую очередь taskQueue добавляются Task'и вместе с аргументами. На обработку этих очередей (eventQueue и taskQueue) отведено по потоку, eventThread и taskThread соответственно, эти потоки читают свои очереди и заворачивают каждый поступивший объект в Job'у и кладет в jobQueue, с который связан свой поток, собственно и запускающий код таски или обработчик сообщения. В случае падения соединения поток taskThread приостанавливается, а в случае поднятия сети возобновляется. Выполнение кода тасок и обработчиков в одном потоке позволяет не беспокоиться о блокировках и облегчает бизнес-логику.

    WTF #2 — сервер главный


    Можно сказать, что в ZooKeeper сервер (кластер) является главным, а у клиентов практически нет прав. Иногда это доходит до абсолюта, например… В конфигурации ZooKeeper есть такой параметр как session timeout, он определяет на сколько максимум может пропадать связь между кластером и клиентом, если максимум превышен, то сессия этого клиента будет закрыта и все эфемерные znode этого клиента удаляться; если связь все-таки восстановиться — клиент получит событие SESSIONEXPIRED. Так вот, клиент при пропажи соединения (CONNECTIONLOSS) и превышении session timeout тупо ждет и нечего не делает, хотя, по идеи он мог догадаться о том, что сессия сдохла и сам своим обработчикам кинуть SESSIONEXPIRED.

    Из-за такого поведения разработчик в какие-то моменты рвать на себе волосы, допустим вы подняли сервер ZooKeeper и пытаетесь к нему подключиться, но ошиблись в конфиге и стучитесь не по тому адресу, или не по тому порту, тогда, согласно описанному выше поведению, вы просто будете ждать, когда клиент перейдет в состояние CONNECTED и не получите никакого сообщения об ошибке, как это было бы в случае с MySQL или чем-нибудь подобным.

    Интересно, что такой сценарий позволяет безболезненно обновлять ZooKeeper на продакшене:
    • выключаем ZooKeeper — все клиенты переходят в состояние CONNECTIONLOSS
    • обновляем ZooKeeper
    • включаем ZooKeeper, связь с сервером восстановилась, но сервер не посылает SESSIONEXPIRED, так как относительно сервера время на время отключения было остановлено

    Кстати, именно из-за такого поведения я передаю в Executor Timer, который отменяет выполнение Task'и, если мы слишком долго не можем подключиться.

    WTF #3 — переполняется int


    Допустим вы реализовали блокировки согласно описанному выше алгоритме и запустили это дело в highload продакшен, где, допустим вы берете 10MM блокировок в день. Где-то через год вы обнаружите, что попали в ад — блокировки перестанут работать. Дело в том, что через год у znode _locknode_ счетчик cversion переполниться и нарушиться принцип монотонно возрастающей последовательности имен последовательных znode, а на этом принципе основана наша реализация блокировок.

    Что делать? Нужно периодически удалять/создавать заново _locknode_ — при этом счетчик ассоциированный с ним сброситься и принцип монотонной последовательности снова нарушится, но дело в том, что znode можно удалить только когда у него нет детей, а теперь сами догадайтесь, почему сброс cversion у _locknode_, когда в нем нет детей не влияет на алгоритм блокировки.

    WTF #4 — quorum write, но не read


    Когда ZooKeeper вернул ОК на запрос записи это означает что данные записались на кворум (большинство машин в кластере, в случае 3х машин, кворум состоит из 2х), но при чтении пользователь получает данные с той машины, к которой он подключился. То есть возможна ситуация, когда пользователь получает старые данные.

    В случае, когда клиенты не общаются иначе как только через ZooKeeper это не составляет проблемы, так как все операции в ZooKeeper строго упорядочены и тогда нет возможности узнать о том, что событие произошло кроме, как дождаться его. Догадайтесь сами, почему из того следует, что все хорошо. На самом деле, клиент может знать, что данные обновились, даже если никто ему не сказал — в случае, когда он сам произвел изменения, но ZooKeeper поддерживает read your writes consistency, так что и это не проблема.

    Но все-же, если один клиент узнал об изменении части данных через канал связи все ZooKeeper, ему может помочь принудительная синхронизация — именно для этого нужна команда sync.

    Производительность


    Большинство распределенных key/value хранилищ используют распределенность для хранения большого объема данных. Как я уже писал, данные которые хранит в себе ZooKeeper не должны превышать размер оперативной памяти, спрашивается, зачем ему распределенность — она используется для обеспечения надежности. Вспоминая про необходимость набрать кворум на запись, не удивительно падение производительности на 15% при использовании кластера из трех машин, по сравнению с одной машиной.

    Другая особенность ZooKeeper состоит в том, что он обеспечивает персистентность — за неё тоже нужно так как во время обработки запроса включено время записи на диск.

    И последний удар по производительности происходит из-за строгой упорядоченности запросов — чтобы её обеспечить все запросы на запись идут через одну машину кластера.

    Тестировал я на локальном ноуте, да-да это сильно напоминает:
    image
    DevOps Borat
    Big Data Analytic is show 90% of devops which are use word 'benchmark' in sentence are also use word 'laptop' in same sentence.
    но очевидно, что ZooKeeper лучше всего себя показывает в в конфигурации из одной ноды, с быстрым диском и на небольшом кол-ве данных, поэтому мой X220 c SSD и i7 идеально для этого подходил. Тестировал я преимущественно запросы на запись.

    Потолок по производительности был где-то около 10K операций в секунду при интенсивной записи, на запись уходит от 1ms, следовательно, с точки зрения одного клиента, сервер может работать не быстрее 1K операций в секунду.

    Что это значит? В условиях, когда мы не упираемся в диск (утилизация ssd на уровне 10%, для верности попробовал так же разместить данные в памяти через ramfs — получил небольшой прирост в производительности), мы упираемся в cpu. Итого, у меня получилось, что ZooKeeper всего в 2 раза медленнее, чем те числа, которые указали на сайте его создатели, что не плохо, если учесть, что они знают, как из него выжать все.

    Резюме


    Не смотря на все, что я здесь написал, ZooKeeper не так плох, как может показаться. Мне нравится его лаконичность (всего 7 команд), мне нравиться то, как он подталкивает и направляет своим API программиста к правильному подходу при разработке распределенных систем, а именно, в любой момент все может упасть, потому каждая операция должна оставлять систему в консистентном состоянии. Но это мои впечатления, они не так важны, как то, что ZooKeeeper хорошо решает задачи, для которых он был создан, среди которых: хранение конфигов кластера, мониторинг состояние кластера (кол-во подключенных нод, статус нод), синхронизация нод (блокировки, барьеры) и коммуникация узлов распределенной системы (a-la jabber).

    Перечислю еще раз то, что стоит иметь ввиду при разработке с помощью ZooKeeper:
    • сервер главный
    • клиент получает уведомление о записи, только когда данные попали на диск
    • quorum write + read your writes consistency
    • строгая упорядоченность
    • в любой момент времени все может упасть, поэтому после каждого изменения система должна находиться в консистентном состоянии
    • в случае пропажи связи мы находимся в состоянии, в котором состояние последней операции записи неизвестно
    • явная обработка ошибок (по мне лучшая стратегия — использовать CPS)


    О распределенных блокировках


    Возвращаясь к алгоритму блокировки, описанному выше, могу сказать, что он не работает, точнее работает ровно до тех пор, пока действия внутри критической секции происходят над тем же и только тем же кластером ZooKeeper, что используется для блокировки. Почему так? — Попробуйте догадаться сами. А в следующей статье я напишу как сделать распределенные блокировки более честными и расширить класс операций внутри критической секции на любое key/value хранилище с поддержкой CAS.

    Несколько ссылок на информацию по ZooKeeper


    zookeeper.apache.org
    outerthought.org/blog/435-ot.html
    highscalability.com/zookeeper-reliable-scalable-distributed-coordination-system
    research.yahoo.com/node/3280
    Поделиться публикацией
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 20

      0
      Команда sync всё портит, как мне кажется. Насчёт WTF #4 непонятно — так можно вызывать getData для получения правильных данных?
        0
        Можно, но осторожно)

        Допустим, что у нас есть два клиента — A и B, и где-то в один момент времени A решил изменить данные, а B прочитать. Если B не знает, что данные изменились, то нет никакой разницы получит он старые данные или новые, но если он ожидает получить новые могут быть проблемы. Как B может узнать?

        1. Через обработчик события, который он поставил (хорошо)
        2. Через запись в другом znode о том, что в нужном данные изменились (хорошо)
        3. Если A и B связались друг с другом напрямую (плохо)

        В первом случае B получит правильные данные, из-за гарантий zk о том, что если клиенту пришло событие об изменении данных, то запрашивая данные он должен получить новую версию. Во-втором случае тоже будут новые данные из-за глобального порядка — если мы видим новые данные у одного znode, то мы гарантировано видим все изменения, которые были сделаны до него. В ситуации 3 у нас нет гарантий, поэтому, как поведет.

        На самом деле, опыт показал, что такое поведение не большая проблема. Кстати в конфиге ZooKeeper можно указать максимальное время за которое должна происходить синхронизация узлами кластера и лидером (syncLimit).
        +1
        Добро пожаловать в ад :)

        Полезный совет: если у вас много нод (например 5) и вы верите, что метеорит не фиганёт по дискам трёх машин сразу, то в конфиге можно указать forceSync=no, тогда zookeeper перестанет на каждый чих делать fdatasync и io упадёт в несколько тысяч раз.
          0
          Да, ад это подходящие слово, да и маскот намекает, что нам предстоит чистить авгиевы конюшни.

          Спасибо, но я уже нашел это в документации, а до того как нашел — игрался с ramfs.
            0
            Хороший проект в apache не отдадут.
          0
          В сторону hazelcast/jgroups не смотрели, как сервисов распределенной блокировок?
            0
            Сильно утрирую: раньше в документацию по телевизорам входила и электросхема, сейчас там только инструкция по замене батареек в пульте, так вот, документация по ZooKeeper это электросхема, а по hazelcast — современная версия.

            При разработке распределенных систем очень важно знать как система работает в нештатном режиме, при смерти нод, разрывах в сети и так далее, документация по ZooKeeper описывает это поведение, а бегло просматривая документацию по hazelcast я не смог этого найти.

            Другой аспект это доверие, я верю в работоспособност систем в составе которых используется ZooKeeper, таких как Hadoop и HBase, а про использование hazelcast я ничего не знаю, может его используют в основном в качестве кеша и тогда его надежность не в приоритете у разработчиков.
              0
              hazelcast работает по мастер-слейв схеме, самая старая нода является мастером, когда она умирает, ей на смену приходит самая старая живая нода, старая восстанавливается и становится слейвом
            0
            Идемпотентность, а не то что вы написали ;-)
              +1
              Идея проверять автоматически орфографию ночью оказалась не очень хорошей идей) а вообще о таких ошибках принято писать в личку
              0
              Нужно больше минера красивых графиков производительности при разных условиях. Я же помню, что ты как минимум на трёх машинах тестил, и данные собирал. Или они утеряны? :)
                0
                Не проявилась ли проблема с тем, что элементы размером больше 128к не сохраняются или обрезаются?
                  0
                  Это проблема клиента, думается мне. Сишный клиент выдавал не больее 512 байт в несвежей версии, которая по номеру почему-то совпадала с последней.
                    0
                    Тесты проводились с разных клиентов, результат везде одинаковый.
                      0
                      Есть два клиента (на самом деле): java и c, остальные — обёртки поверх, насколько мне известно.
                        0
                        Я допускаю, что у меня что-то не так, поэтому был и вопрос, не проявлялась ли проблема.
                        У вас проявлялась?
                  0
                  В распределенных системах вместо synchronized используется понятие транзакции. В J2EE этому соотсветствуют стандарты JTA, XA, JTS, JCA. Насколько я понял, ZooKeeper не интегрируется с JTA. Поэтому не совсем понимаю почему выбор пал на ZooKeeper, а не на стандартные решения типа EHCache или Infinispan (Oracle Coherence трогать не будем)? Работать с блокировками вручную — довольно сложное и ненадежное занятие, приводящее к трудноотлавливаемым ошибкам.
                    +1
                    Если работать с внешними key/value хранилищами, которые не поддерживат транзакции, может понадобиться реализовывать их вручную. Да, для этого можно взять готовые средства, но и они в редких случаях будут давать сбои (в критической секции будет находиться два потока), только если не замкнут весь поток данных на себя. Если интересно — могу расписать подробнее.
                      0
                      Очень интересно — распишите :)
                        0
                        Если необходимо синхронизировать только данные, то вместо блокировок все чаще используется MVCC. При помощи него легко отслеживаются коллизии.

                    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                    Самое читаемое