По долгу работы мне приходится сталкиваться с проектированием и разработкой распределенных приложений. Такие приложения часто используют различные средства межпроцессного взаимодействия для организации взаимодействия компонентов. Особые сложности возникают в процессе реализации алгоритмов, обрабатывающих связанные данные распределенно. Для поддержки таких задач используются специализированные системы распределенной координации. Самым популярным и широко используемым продуктом является Apache Zookeeper.
Zookeeper — продукт сложный. Несмотря на солидный возраст, периодически в нем обнаруживаются те или иные ошибки. Однако, это лишь следствие его возможностей, которые помогают сделать жизнь легче многим разработчикам распределенных систем. Далее, я рассмотрю некоторые особенности Zookeeper, которые помогут понять лучше его возможности, а затем перейдем к библиотеке Apache Curator (Netflix), которая делает жизнь разработчиков распределенного ПО приятной и предлагает множество готовых рецептов для реализации распределенных объектов координации.
Apache Zookeeper
Как уже ранее было отмечено, Zookeeper — жизненно важный компонент распределенных систем. Базу данных Zookeeper проще всего представить в виде дерева, похожего на файловую систему, при этом каждый элемент дерева идентифицируется путем (/a/path/to/node) и хранит в себе произвольные данные. Таким образом, с помощью Zookeper вполне можно организовать иерархическое распределенное хранилище данных, а также другие интересные конструкции. Полезность и широкая распространенность Zookeeper-а обеспечивается рядом важнейших свойств, которые перечислены далее.
Распределенный консенсус
Консенсус обеспечивается с помощью алгоритма ZAB, данный алгоритм обеспечивает свойства C(consistency) и P(partition tolerance) CAP-теоремы, что означает целостность и устойчивость к разделению, жертвуя доступностью. На практике это приводит к следующим эффектам:
- Все клиенты видят одно и то же состояние, неважно на каком сервере они запрашивают это состояние.
- Изменение состояния происходит упорядоченно, "гонка" невозможна (для операций set, операции get-set не атомарные).
- Кластер Zookeepr может "развалиться" и стать полностью недоступным, но при этом он станет недоступным для всех.
Консенсус — способность распределенной системы каким-то образом прийти к соглашению о ее текущем состоянии. Zookeeper использует алгоритм ZAB, часто применяются и другие алгоритмы — Raft,
Raft.
Эфемерные узлы
Клиент, устанавливая соединение с кластером Zookeeper, создает сессию. В рамках сессии существует возможность создавать узлы, которые будут видны другим клиентам, но, время существования которых равно времени жизни сессии. При завершении сессии данные узлы будут удалены. Такие узлы имеют ограничения — они могут быть только терминальными и не могут иметь потомков, то есть, нельзя иметь эфемерные поддеревья. Эфемерные узлы часто применяются с целью
реализации систем обнаружения сервисов.
Представим, что у нас есть несколько экземпляров сервиса, между которыми производится балансировка нагрузки. Если какой-то из экземпляров появляется, то для него создается эфемерный узел, в котором находится адрес сервиса, а при аварии сервиса этот узел удаляется и более не может использоваться для балансировки. Эфемерные узлы применяются очень часто.
Подписка на события узла
Клиент может подписаться (watch) на события узлов и получать обновления при возникновении каких-либо событий, связанных с данными узлами. Однако, тут тоже есть ограничение — после возникновения события на узле, подписка снимается и ее необходимо восстанавливать заново, при этом, очевидно, существует возможность пропуска других событий, которые возникают на данном узле. В связи с данным фактом, возможность использования данной функции достаточно ограничена.
Например, в рамках сервисов обнаружения ее применять можно, для реакции на изменение конфигурации, но необходимо помнить, что после установки подписки необходимо выполнить операцию "вручную", чтобы убедиться, что пропуска изменения состояния не произошло.
Последовательные узлы
Zookeeper позволяет создавать узлы, имена которых формируются с добавлением последовательно возрастающих чисел, при этом данные узлы могут быть эфемерными. Эта возможность широко применяется как для решения прикладных задач (например, все однотипные сервисы, регистрируют себя как эфемерные узлы), так и для реализации "рецептов" Zookeeper, к примеру, справедливой распределенной блокировки.
Версии узлов
Версии узлов позволяют определить было ли изменение узла между чтением и записью, то есть при операции set можно указать ожидаемую версию узла, в том случае, если она не совпадет, значит, что изменение узла было произведено другим клиентом и требуется заново вычитать состояние. Данный механизм позволяет реализовать упорядоченное изменение состояния данных, например, при реализации "рецепта" распределенный счетчик.
ACL на узлы
Существует возможность задавать для узлов ограничения доступа, определяемые ACL, что предназначено для защиты данных от недоверенных приложений. Стоит отметить, что, конечно, ACL не защищают от перегрузок, которые может создать вредоносный клиент, предоставляя только механизм ограничения доступа к содержимому.
TTL на узлы
Zookeeper позволяет устанавливать узлам TTL, по истечении которого (если нет обновлений) узел будет удален. Данная функциональность появилась сравнительно недавно.
Серверы-наблюдатели
Существует возможность подключения к кластеру серверов в режиме наблюдатель (observer), которые могут использоваться для выполнения операций чтения, что очень полезно в тех случаях, когда нагрузка на кластер, генерируемая операциями записи является высокой. С использованием серверов-наблюдателей проблема может быть решена. Может возникнуть вопрос, почему бы просто в кластер не добавлять обычные узлы? Ответ кроется в алгоритме консенсуса — чем больше узлов, позволяющих писать данные, тем дольше будет тратиться времени на достижение консенсуса и тем меньше будет производительность кластера на запись. Серверы-наблюдатели не участвуют в консенсусе, а поэтому не влияют на производительность операций записи.
Синхронизация времени на узлах
Zookeeper не использует внешнее время для синхронизации узлов. Это достаточно полезное свойство, системы, которые ориентируются на точное время более подвержены ошибкам, связанным с его рассогласованием.
Конечно, в бочке меда должен быть деготь и он действительно есть — Zookeeper имеет свойства, которые могут ограничивать его применение. Есть даже выражение, которое достаточно иронично описывает сложности работы с Zookeeper — Single Cluster of Failure © Pinterest, что саркастически демонстрирует тот факт, что, стремясь избавиться от единой точки отказа с помощью распределенной системы, используя Zookeeper, можно столкнуться с ситуацией, когда он станет той самой точкой отказа.
База данных Zookeeper должна помещаться в RAM
Zookeeper загружает базу в память и держит ее там. Если база данных не помещается в RAM, то она будет помещена в Swap, что приведет к существенной деградации производительности. Если БД большая, требуется сервер с достаточно большим объемом RAM (что, впрочем, не является проблемой в настоящее время, когда 1TB RAM на сервере — далеко не предел).
Время таймаута сессии
Если при настройке клиента выбрать неверно время таймаута сессии, то это может вести к непредсказуемым последствиям, которые будут обостряться при увеличении нагрузки на кластер и выходе из строя части узлов кластера. Пользователи стремятся уменьшить время сессии (по умолчанию 30 секунд), чтобы увеличить сходимость системы, поскольку эфемерные узлы будут удаляться быстрее, но это ведет к меньшей стабильности системы под нагрузкой.
Деградация производительности от количества узлов в кластере
Обычно, в кластере используют 3 узла, которые участвуют в достижении консенсуса, желание добавить дополнительные узлы существенно снизит производительность операций записи. Количество узлов должно быть нечетным (требование алгоритма ZAB), соответственно, расширение кластера до 5, 7, 9 узлов будет негативно влиять на производительность. Если проблема именно в операциях чтения — используйте узлы-наблюдатели.
Максимальный размер данных в узле
Максимальный размер данных в узле ограничен 1MB. В случае, если требуется хранить большие объемы данных, Zookeeper не подойдет.
Максимальное количество узлов в листинге потомков
Zookepeer не накладывает на то, сколько у узла может быть потомков, однако, максимальный размер пакета данных, который сервер может отправить клиенту составляет 4МБ (jute.maxbuffer). Если у узла такое количество потомков, что их перечень не помещается в один пакет, то, к сожалению, не существует способа получить сведения о них. Данное ограничение обходится с помощью организации иерархических "псевдоплоских" списков таким же образом, каким строятся кэши в файловой системе, имена или дайджесты объектов разбиваются на части и организуются в иерархическую структуру.
Несмотря на недостатки, достоинства их перевешивают, что делает Zookeeper важнейшим компонентом многих распределенных экосистем, например, Cloudera CDH5, или DC/OS, Apache Kafka и других.
Zookeeper для разработчика
Поскольку Zookeeper реализован с использованием языка Java, то в средах JVM его использование является органичным, к примеру, достаточно легко запустить сервер или даже кластер серверов из Java и использовать его для реализации интеграционных или smoke-тестов приложения без необходимости развертывания стороннего сервера. Однако, API клиента Zookeeper достаточно низкоуровневый, что, хотя и позволяет выполнять операции, но напоминает заплыв против течения реки. Кроме того, требуется глубокое понимание основ Zookeeper, чтобы правильно реализовать обработку исключительных ситуаций. К примеру, когда я использовал для работы с Zookeeper базовый интерфейс, отладка и поиск ошибок в коде распределенной координации и обнаружения доставляли достаточно большие проблемы и требовали существенное время.
Однако, решение существует и оно было подарено сообществу разработчиком Netflix Джорданом Циммерманом. Знакомьтесь, Apache Curator.
Apache Curator
На главной странице проекта расположена цитата:
Это утверждение на 100% отражает суть Curator. Начав использовать данную библиотеку, я обнаружил, что код работы с Zookeeper стал простым и понятным, а количество ошибок и время на их устранение снизилось кратно. Если, как ранее было сказано — стандартный клиент напоминает заплыв против течения, то с куратором ситуация меняется на 180 градусов. Кроме того, в рамках Curator-а реализовано большое количество готовых рецептов, которые я обзорно рассмотрю далее.
Базовый API
API выполнен в форме исключительно удобного текучего интерфейса, что позволяет просто и лаконично определять требуемые действия. К примеру (далее, примеры приводятся на языке Scala):
client
.create()
.orSetData()
.forPath("/object/path", byteArray)
что может быть переведено как "создай узел или, если существует, просто установи данные для пути "/object/path" и запиши в него byteArray".
Или, к примеру:
client
.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath("/head/child", byteArray)
"создай узел типа последовательный и эфемерный для пути "/head/child000000XXXX" и запиши в него byteArray". Еще несколько примеров могут быть найдены на этой странице руководства.
Асинхронные операции
Curator поддерживает как синхронный, так и асинхронный режим выполнения операций. В случае асинхронного использования клиент имеет тип AsyncCuratorFramework
, в отличие от синхронного CuratorFramework
. А каждая цепочка вызовов принимает метод thenAccept
, в котором указывается Callback, который вызывается при завершении операции. Более подробно об асинхронном интерфейсе можно узнать на посвященной ему странице руководства.
val async = AsyncCuratorFramework.wrap(client);
async.checkExists().forPath(somePath).thenAccept(stat -> mySuccessOperation(stat))
При использовании Scala использование асинхронного интерфейса не кажется оправданным, поскольку функциональность может быть легко реализована с использованием Scala Future, что позволяет коду сохранить особенности scala-way разработки. Однако, в случае Java и других JVM языков, данный интерфейс может быть полезным.
Поддержка схем данных
Zookeeper не поддерживает семантику хранимых данных. Это означает, что разработчики самостоятельно несут ответственность за то, в каких форматах хранятся данные и по каким путям они расположены. Это может стать неудобным во многих случаях, например, когда в проект приходят новые разработчики. Для решения данных проблем Curator поддерживает схемы данных, которые позволяют задавать ограничения на пути и типы узлов, в рамках данных путей. Схема, создаваемая из конфигурации, может быть представлена в формате Json:
[
{
"name": "test",
"path": "/a/b/c",
"ephemeral": "must",
"sequential": "cannot",
"metadata": {
"origin": "outside",
"type": "large"
}
}
]
Поддержка миграций
Миграции Curator чем-то напоминают Liquibase, только для Zookeeper. С их помощью возможно отражать эволюцию базы данных в новых версиях продукта. Миграция состоит из набора последовательно выполняемых операций. Каждая операция представлена некоторыми преобразованиями над БД Zookeeper. Curator самостоятельно отслеживает примененность миграций с помощью Zookeeper. Данная функция может быть использована в процессе развертывания новой версии приложения. Подробно миграции описаны на соответствующей странице руководства.
Тестовый сервер и тестовый кластер
Для упрощения тестирования, Curator позволяет встроить сервер или даже кластер серверов Zookeeper в приложение. Данную задачу можно достаточно просто решить и без использования Curator, только с Zookeeper, но Curator предоставляет более лаконичный интерфейс. К примеру, в случае Zookeeper без Curator:
class ZookeeperTestServer(zookeperPort: Int, tmp: String) {
val properties = new Properties()
properties.setProperty("tickTime", "2000")
properties.setProperty("initLimit", "10")
properties.setProperty("syncLimit", "5")
properties.setProperty("dataDir", s"$tmp")
properties.setProperty("clientPort", s"$zookeperPort")
val zooKeeperServer = new ZooKeeperServerMain
val quorumConfiguration = new QuorumPeerConfig()
quorumConfiguration.parseProperties(properties)
val configuration = new ServerConfig()
configuration.readFrom(quorumConfiguration)
private val thread = new Thread() {
override def run() = {
zooKeeperServer.runFromConfig(configuration)
}
}
def start = {
thread.start()
}
def stop = {
thread.interrupt()
}
}
...
val s = new ZookeeperTestServer(port, tmp)
s.start
...
s.stop
В случае Curator:
val s = new TestingServer(port)
s.start()
...
s.stop()
Рецепты Curator
Рецепты Curator — основной мотив использования данной библиотеки для реализации распределенных механизмов взаимодействия процессов. Далее, перечислим основные рецепты, которые поддерживаются Curator и как они могут применяться. Некоторые рецепты я не применял на практике, поэтому для них дан максимально приближенный к руководству перевод.
Выбор лидера
Данные рецепты предназначены для реализации отказоустойчивой модели выполнения процессов, в рамках которой существует текущий лидер и несколько процессов находится в горячем резерве. Как только лидер перестает выполнять свои функции, другой процесс становится лидером. Существует два подходящих рецепта:
- Leader Latch, который представляет собой аналог CountDownLatch, который заблокирован до тех пор, пока процесс не стал лидером;
- Leader Election, которые реализует выбор лидера через вызов метода. В момент, когда процесс становится лидером, вызывается метод, выход из которого свидетельствует об утрате лидерства.
Блокировки
Блокировки — один из важнейших механизмов распределенной межпроцессной синхронизации. Curator предоставляет широкий набор объектов блокировок:
- Shared Reentrant Lock — распределенная блокировка, в которую может повторно входить клиент, который имеет к ней доступ;
- Shared Lock — распределенная блокировка;
- Shared Reentrant Read Write Lock — объект, который позволяет осуществлять раздельную блокировку на чтение и на запись, при этом заблокировать объект на чтение могут несколько клиентов одновременно, блокировка на запись является эксклюзивной;
- Shared Semaphore — считающий семафор, с помощью которого легко осуществить работу с ограниченным количеством ресурсов, которое задается 32-битным целым числом;
- Multi Shared Lock — высокоуровневый объект, который позволяет выполнять операции над несколькими распределенными блокировками атомарно.
Барьеры
- Barrier — объект, который позволяет некоторому клиенту заблокировать доступ к участку кода для остальных участников до выполнения определенных условий, а при их наступлении — разблокировать доступ, что приводит к тому, что все участники могут продолжить свое исполнение;
- Double Barrier — объект позволяет синхронизовать вход некоторого количества клиентов в сегмент кода и их выход из него.
Счетчики
- Shared Counter — обычный целочисленный счетчик (32 bit) с защитой от гонки;
- Distributed Atomic Long — счетчик типа Long (64 bit).
Кэши
- Path Cache — объект, который наблюдает за узлом и обновляет локальный кэш о его дочерних узлах и опционально об их данных при его изменении;
- Node Cache — объект, который наблюдает за узлом и обновляет локальный кэш о нем и его данных;
- Tree Cache — объект, который наблюдает за всем деревом потомков узла и обновляет локальный кэш при изменении в дереве;
Узлы
- Persistent Node — данный рецепт позволяет создать узел с данными, для которого Curator будет стремиться обеспечить его присутствие и неизменность, даже при внешних воздействиях;
- Persistent TTL Node — рецепт для создания узла, время жизни которого определяется TTL, который поддерживает те же свойства, что и Persistent Node;
- Group Member — позволяет организовать группу участников.
Очереди
Хочу заметить, что Zookeeper — не лучший кандидат для организации интенсивных распределенных очередей, если требуется обеспечить пропуск большого количества сообщений, то рекомендую воспользоваться специально предназначенным решением, например, Apache Kafka, RabbitMQ или другими. Тем не менее, Curator предоставляет набор рецептов для поддержки очередей:
- Distributed Queue — обычная распределенная очередь, позволяет класть и извлекать сообщения в порядке очередности;
- Distributed Id Queue — распределенная очередь, которая с каждым сообщением сохраняет идентификатор и позволяет извлечь сообщение из очереди по идентификатору с его немедленным удалением;
- Distributed Priority Queue — очередь с приоритетами;
- Distributed Delay Queue — очередь позволяет задать для каждого добавляемого элемента время, в формате Unixtime, когда он станет доступен для чтения из очереди;
- Simple Distributed Queue — аналог очереди, которая предоставляется стандартным API Zookeeper.
Заключение
Библиотека Apache Curator безусловно стоит того, чтобы рассмотреть ее к применению, она является выдающимся образцом инженерного труда и позволяет значительно упростить взаимодействие с Apache Zookeeper. К недостаткам библиотеки можно отнести малый объем документации, что повышает входной барьер для начинающих разработчиков. В своей практике мне не раз требовалось изучать исходные коды библиотеки, чтобы понять как именно работает тот или иной рецепт. Однако, это дает и положительный эффект — глубокое понимание реализации позволяет совершать меньше логических ошибок, основанных на предположениях.
Необходимо отметить, что разработчики Curator рекомендуют изучить документацию Zookeeper до того, как начать использовать библиотеку. Это очень разумный совет, поскольку Zookeeper является продуктом, для эффективного использования которого необходимо понимать как именно он функционирует, а не только знать его API. Эти затраты безусловно окупятся, а в руках опытного инженера возможности Zookeeper позволяет создавать надежные и производительные распределенные системы.