Сделать свою собственную SQL-базу данных или запускать SQL-запросы в NoSQL-базе данных — кажется, это очень непростая задача. А если мы говорим о распределенной БД, то сложность возрастает многократно. Но, к счастью, Apache Calcite — фреймворк с открытым кодом — поможет сделать это довольно легко.
Роман Кондаков, Software Engineer в Querify Labs, на конференции HighLoad++ Весна 2021 рассказал об опыте интеграции Apache Calcite в распределенную in-memory-платформу Apache Ignite. Если ваша система распределена, и вы хотите завести в ней SQL, читайте про то, как устроен Apache Calcite и какие есть нюансы его использования для распределенных систем. Видео его выступления можно посмотреть здесь.
Давным-давно Apache Ignite был распределённым кэшом. В эту большую распределенную «хэш-мапину» можно было что-то положить по ключу, и что-то оттуда забрать. Всё это работало быстро, но многие пользователи хотели бы делать что-то еще со своими данными в Ignite. Например, проводить какую-то аналитику, да и вообще всё то, что мы привыкли делать при помощи SQL.
И в один прекрасный момент у разработчиков Apache Ignite родилась идея прикрутить SQL к Apache Ignite. Первая попытка была сделана при помощи маленькой легковесной базы H2, написанной на Java, которую научили работать с Ignite.
Выглядело так: каждая нода кластера Ignite поднимала внутри себя базу H2. Когда SQL-запрос приходил на Ignite, он сначала попадал в базу H2, она его парсила, оптимизировала и начинала исполнять. Но не по тем данным, что лежали в базе H2, а по данным в базе Ignite. То есть Ignite научился свои данные скармливать H2.
Так это работало в рамках одной ноды. Но так как Ignite — это распределенная система, и там может быть много нод, то следующим шагом стало исполнение SQL распределенно.
Рассмотрим этот challenge на примере запроса: найти всех сотрудников, получающих ровно среднюю зарплату. Например, есть небольшой кластер, который состоит из трех нод: две ноды с данными и одна — клиентская. В каждой из них поднят инстанс H2. И каждый запрос — например, посчитать среднюю зарплату по предприятию — Apache Ignite всегда будет разбивать на две части:
Первым будет map-запрос, который исполняется на нодах, где лежат данные. Из среднего на каждой ноде невозможно вытащить глобальное среднее, поэтому в этом map-запросе сначала нужно посчитать сумму зарплат и количество сотрудников. Результаты запроса отправляются на ту ноду (в данном случае на клиентскую), откуда запрос пришел. Там результаты map-запроса обрабатывает reduce-запрос.
Reduce-запрос считает результат всего распределенного запроса. То есть суммирует все зарплаты и общее количество сотрудников, которые пришли со всех нод, делит одно на другое, чтобы получить общую по кластеру среднюю зарплата — то, что и было нужно по условиям задачи.
Но у этой схемы были свои недостатки. Например, запрос нужно разделить на две части, а не каждый запрос можно разделить так, чтобы его можно было выполнить распределенно за два прохода.
При этом в Apache Ignite каждый из этих запросов бился еще на две части (Map и Reduce). То есть получалось всего 4 запроса, а Apache Ignite такого выполнить, к сожалению, не может, потому что возможен только один Map и только один Reduce.
Можно было бы усовершенствовать старый движок, но минус этого подхода в том, что H2, на которой базируется SQL в Apache Ignite, в принципе не предназначен для распределенного исполнения. Поэтому попытка его усовершенствовать выглядела бы как попытка улучшить хак, что не всегда хорошо.
Написать свой SQL движок с нуля — тоже было бы интересной задачей. Инженерам нравится решать сложные и интересные задачи. Но эта — очень трудоемкая и долгая, с негарантированным исходом — могла бы растянуться на человеко-десятилетия работы. А пользователям нужен результат сейчас.
Третьим вариантом было исправить SQL в Apache Ignite. Образно говоря, взять PostgreSQL, вытащить из него оптимизатор и как-то прикрутить его к текущему движку.
Apache Calcite – фреймворк для создания SQL БД
Но, к счастью, PostgreSQL потрошить не пришлось, потому что нашлось готовое решение — Apache Calcite, который сам себя позиционирует как фреймворк для создания SQL БД. Что означает: при помощи Apache Calcite можно взять любую NoSQL базу данных, добавить в нее Calcite и волшебным образом получить SQL БД
Calcite — это довольно зрелый продукт. Он стартовал в начале 2000-х, тогда как Apache нам известен c 2013 года. Несмотря на то, что Calcite не на слуху у широкой аудитории, им пользуется огромное количество вендоров: и те, что делают БД, и коммерческие вендоры. То есть это взрослый продукт, который подходит для production-ready решений.
Паттерн использования Calcite довольно прост. Когда у вас есть запрос, вы его отдаете Calcite, он его парсит и оптимизирует. На выходе вы получаете план запроса в виде реляционного выражения, который вам надо исполнить. Фактически этот план можно исполнить где угодно, не только в Apache Ignite:
Остается только научить Calcite делать план, который вам нужен, а вашу систему — этот план исполнять. Как это сделать, сейчас и посмотрим.
Как работает Calcite
Внутри Calcite состоит из огромного количества компонентов, но чаще всего используются следующие три:
Кто-то ставит свой парсер или оптимизатор, но обычно берут все три, потому что в Calcite они уже подогнаны друг под друга, не надо ничего допиливать.
Парсер
Когда в парсер приходит SQL-запрос, тот парсит его в абстрактное синтаксическое дерево — или в SqlNode в терминах Calcite.
Это похоже на то, как мы в школе разбирали предложение по составу: подлежащее, сказуемое и пр. Здесь по тому же принципу мы выбираем колонки и операцию— SELECT или UPDATE. А так же — из каких таблиц взять данные, что джойнить и с какими условиями конвертировать запрос в реляционное выражение.
Конвертер
На этом этапе дерево валидируется на предмет правильности составления и конвертируется в реляционное выражение — или в RelNode в терминах Calcite.
Здесь происходит довольно важный переход от декларативного SQL, когда мы говорим: «Дай нам данные, которые так-то отсортированы, из таких-то таблиц и так-то сджойнены» — к императивному реляционному выражению, которое говорит: «Чтобы получить ответ на свой запрос, сначала отсканируй таблицы с сотрудниками департамента, затем сджойни их, отфильтруй ненужные строчки и оставь только те колонки, которые мы запрашивали».
В принципе, это уже рабочий план, который можно исполнять. Но есть нюансы. Этот план не оптимален, потому что фильтр находится вверху. Получается, что мы джойним все таблицы целиком, и только потом отфильтровываем ненужные данные. Оптимально сначала сделать фильтр, откинув лишние колонки, и потом уже оставшиеся строчки между собой сджойнить.
То же самое и с Project. Вместо того чтобы тащить все колонки с самого низа дерева до его верха, съедая память ненужными колонками, можно сначала прочитать только те колонки, которые нам нужны. И здесь приходит очередь оптимизатора для улучшения реляционного выражения.
Оптимизатор и его правила
Оптимизаторы в Calcite конфигурируются при помощи правил. Правило определяет модификацию реляционного выражения, чтобы переписать дерево на эквивалентное. Правило в Calcite — это обычный класс в Java и состоит оно из двух частей:
Паттерн — на что это правило срабатывает;
Трансформация — что это правило с деревом выполняет.
Образно говоря, оптимизатор бегает по дереву и ищет знакомые паттерны тех правил, которые в нем сконфигурированы. Когда он находит совпадение (например, Filter над Join), то вызывает трансформацию, и Filter уходит под Join.
В Calcite есть два оптимизатора. HepPlanner — эвристический (rule-based) оптимайзер. Его достоинство в том, что он быстрый. Он применяет правила, пока видит паттерны (совпадения). Но нужно строго отбирать правила, чтобы они друг друга не отменяли Если два правила будут противоречить друг другу (например, одно двигает фильтр вниз, а другое — вверх), то они зациклятся. HepPlanner используется для всегда (или почти всегда) оптимальных преобразований, например, декорреляции запроса или push-down Filter и Project.
Но есть и более интересный оптимизатор — cost based optimizer VolcanoPlanner. Cost based означает, что каждому дереву присваивается некий кост. Это скалярная величина — чем ниже кост, тем дерево для нас оптимальнее и тем проще его будет исполнить. Задача оптимизатора — найти дерево с минимальным костом.
VolcanoPlanner не переписывает дерево на месте, как эвристический оптимизатор. Вместо этого он сохраняет все модификации дерева запроса в специальную структуру под названием МЕМО.
МЕМО представляет собой следующее. Например, у нас есть какое-то дерево (здесь Filter и Project), и появляется еще один оператор (например, Join), который эквивалентен Project.
Так как Join отдает абсолютно те же строчки и тот же результат, что и Project, VolcanoPlanner помещает их обоих в один класс эквивалентности Set. И на запрос отдает всё, что находится в одном сете. Filter, соответственно, тоже имеет свой сет. И тогда не важно, что будет его дочерним узлом — мы говорим, что это будет сам сет. Теоретически любой узел может подойти на должность поставщика данных для Filter.
Так как оптимизатор знает кост каждого узла (например, у Project он условно 10, у Join — 30) и помнит самый лучших кост для каждого сета, то в нашем примере побеждает Project.
Когда мы применим все правила, прооптимизировав дерево, VolcanoPlanner выберет из каждого сета победителя и из него построит новое дерево, которое и будет оптимальным планом:
Обратите внимание, что новый Project попал в тот же самый сет, что и Filter. Это происходит потому, что у правил VolcanoPlanner такой инвариант — они переписывают дерево на идентичную конструкцию, то есть на то, что отдает тот же самый результат.
Нижний Filter попал в другой сет, потому что он отдает не то же самое, что и сет слева с Project. Сет слева отдает данные "отпроджекченные", но не отфильтрованные, а сет справа — наоборот. Поэтому они находятся в разных категориях.
Кроме того, VolcanoPlanner используется для физического планирования.
VolcanoPlanner – физическое планирование
Физическое планирование в Calcite является cost based оптимизацией. Join двух таблиц — это логическое выражение. Но есть много алгоритмов его получения — например, HashJoin, MergeJoin, NestedLoopJoin. Чтобы понять, каким алгоритмом выполнять конкретный Join, требуется физическое планирование.
Если система умеет выполнять HashJoin, то в Calcite добавляется правило HashJoinRule — оно изначальное дерево переписывает на то дерево, где верхним узлом является HashJoin. Если система умеет исполнять MergeJoin, то правило будет переписывать логический Join на конкретную физическую имплементацию с MergeJoin. Так как у MergeJoin есть требование, чтобы входы джойна должны быть отсортированы по ключам, по которым идет Join, то в этом месте надо будет добавить еще сортировку входов.
Когда физическое планирование завершено, оптимизатор оценивает каждый план, проставляя каждому узлу какое-то значение — один узел дороже, другой — дешевле. По итогу дерево получает итоговую стоимость:
В нашем примере выиграл HashJoin, несмотря на то, что сам по себе MergeJoin стоит дешевле — это простая операция, там не надо строить никаких хэш-таблиц. Это случилось из-за дорогой сортировки на входе.
Можно подумать, что HashJoin в оптимизаторе будет всегда побеждать MergeJoin, но на самом деле это не так. В случае, когда данные уже отсортированы — например, отсортированные индексы — победит MergeJoin.
Конечно, оптимизатор должен как-то понимать, вставлять сортировку или нет. Для этого в Calcite существуют так называемые трейты, где можно сказать — левый вход джойна должен быть отсортирован по x, а правый — по y.
Соответственно, в сеты слева и справа попадут и просто отсканированные данные с последующей сортировкой, и индексные сканы, которые в сортировке не нуждаются. Calcite снова вычислит косты и в конце выберет победителя. В нашем примере могут победить индексные сканы, а мы тем самым избежим дорогой сортировки.
Так можно получить готовый план. Чтобы отдать результат пользователю, настает черед рантайма, который понимает и исполняет план.
Как сделать рантайм
Подготовленный запрос в виде плана похож на некую программу на каком-то своем языке программирования. Чтобы ее исполнить, ее можно интерпретировать. Одним из способов интрепретации плана будет превращение каждого узла дерева в итератор.
Например, итератор Scan — это сущность, которая умеет читать данные из таблицы и отдавать их дальше. Итератор Filter умеет брать данные снизу, от Scan, и применять к ним предикат. Если предикат сказал true, мы отдаем строку дальше. Если она не удовлетворяет предикату, то идет поиск следующей. Итератор Project просто оставляет только те колонки, которые были в запросе.
Есть другие варианты. Можно использовать кодогенерацию, то есть превратить дерево в код на каком-то языке и начать его исполнять. Но кодогенерация может длиться очень долго (секунды), если у вас сложный развесистый запрос. Комбинацией подходов можно его ускорить.
Например, некоторые системы сначала строят дерево из итераторов, и в фоне пытаются скомпилировать запрос при помощи кодогенерации. Запрос начинает исполняться итераторами, интерпретироваться, а потом, когда появится готовый кодогенеренный код, исполнение плавно переключается на кодосгенерированный движок, потому что чаще всего он работает быстрее.
Иногда для исполнения используется дополнительное железо, например, GPU или FPGA. Очень часто используется векторная инструкция процессоров, потому что это позволяет очень быстро обрабатывать данные, особенно если они хранятся в колоночном виде.
Но как быть в распределенной системе, как Apache Ignite? Наш готовый план из примера пока может исполниться только на одной ноде. То есть в случае простого запроса как Filter и Scan, мы должны отдать его результат в клиентское приложение, например, в JDBC-драйвер. Который исполняется на одном клиентском узле, а нижняя часть дерева Scan в это время исполняется где-то в кластере, где лежат данные по нодам. То есть части дерева должны исполниться в разных местах системы.
Например, в нашем примере Scan может всегда исполняться на Data nodes, клиентское приложение будет ждать данные на клиентском узле, а Filter можно исполнить тоже на клиенте. Чтобы отправить данные, в Calcite есть оператор Exchange. Он не меняет и не фильтрует данные, он меняет распределение этих данных в кластере:
Но этот план — не самый лучший вариант. Мы отправляем абсолютно все данные, что у нас лежат в кластере, и только потом их фильтруем на клиенте. Разумнее будет делать фильтрацию в кластере, а не на клиенте. Поэтому cost based оптимизатор должен учитывать распределение данных.
Как сказать кальциту о распределенных данных
Это делается тоже при помощи трейтов. Сортировка — это трейт, физическое свойство данных, что они отсортированы определенным образом. Распределение данных — это тоже трейт. Он говорит, что данные распределены каким-то образом по кластеру. Оптимизатор в Calcite с этими трейтами умеет работать.
Здесь верхний узел имеет трейт Single, то есть он исполняется всего на одной ноде. У нижнего узла — трейт распределения Hash, то есть данные как-то партицированы по хэшу первичного ключа. Задача оптимизатора — определить, какой трейт должен иметь фильтр, и куда воткнуть Exchange.
Если упрощенно, то оптимизатор пытается вставить Exchange везде, где только может. Например, он может поставить, как здесь, после фильтра, но может поставить перед фильтром:
Теперь можно вернуться к первому запросу, который Apache Ignite изначально не мог исполнить — найти всех сотрудников, получающих ровно среднюю зарплату. Теперь такие запросы он сможет исполнять без переписывания запроса, и Calcite выдаст примерно такой план:
Apache Ignite смог!
Всё, что отмечено красным, исполняется на одной ноде, всё черное исполняется распределенно в кластере. Получается, исполнили распределенно, собрали данные, посчитали что-то, раскидали, опять посчитали и снова собрали — исполнение получается довольно сложным, но с Calcite это сделать гораздо проще, чем с той же H2.
Кастомизация Calcite
Основная прелесть Calcite заключается в том, что он создавался как раз для кастомизации, чтобы использовать его в разных проектах. Он очень хорошо настраивается.
В него можно добавлять свои реляционные выражения, меняя стандартный фильтр или джойн. Добавлять правила при переписывании реляционного дерева или переопределять косты у своих или встроенных операторов. Например, если HashJoin в вашей системе будет исполняться долго, потому что он сложнее и тяжелее, чем MergeJoin, то можно уменьшить кост у MergeJoin — и он будет чаще выбираться как победитель в плане.
Можно добавлять свои метаданные и использовать их при планировании. Например, если в таблице есть уникальная колонка и по ней — фильтр по равенству, то результатом этого фильтра будет всего одна строка. А значит, всегда можно очень точно оценить кардинальности планов и благодаря этому получать очень качественные планы.
Полезные ссылки
На сайте проекта есть немного документации. Но вообще в Calcite проблема с документацией, и всем, кто в него входит, приходится читать код. Есть презентации и блоги, но к сожалению, нет единственного централизованного хранилища с технической документацией по Calcite.
Если вы решите сделать свой прототип, лучше всего начать с презентации, где очень хорошо описываются все базовые API Calcite и есть примеры кода.
В статье «Apache Calcite: A Foundational Framework for Optimized Query Processing Over Heterogeneous Data Sources» создатели фреймворка и основные пользователи рассказывают про его историю и приводят кейсы использования.
Про то, как работает оптимизатор cost based, можно узнать из статей про так называемый Volcano/Cascades фреймворк: «The Cascades Framework for Query Optimization» и «Exploiting Upper and Lower Bounds in Top-Down Query Optimization».
В этом году нас ждeт две HighLoad-конференции: 20-21 сентября в Санкт-Петербурге и 25-26 ноября — в Москве.
Открытая трансляция главного зала а Питере будет возможна благодаря поддержке ЦФТ. Для доступа к трансляции вам нужно только зарегистрироваться. За 2 дня это будут 14 докладов. Смотрите расписание.