Допустим есть 2 таблицы в любой реляционной базе данных.
Таблица users
весом 4TB
id | firstname | lastname |
1 | Egor | Myasnik |
2 | Pavel | Hvastun |
3 | Mitya | Volk |
Таблица domains
2TB
id | user_id | domain_name |
1 | 1 | Approval |
2 | 1 | Rejection |
3 | 1 | Stoppage |
4 | 3 | Cancellation |
Задача
В один летний день к вам приходит бизнес и требует результат выполнения запроса в реальном времени.
SELECT d.user_id, u.firstname, u.lastname, d.domain_name
FROM users u
INNER JOIN domains d
ON u.user_id = d.user_id
Первые решения, приходящие в голову
Первое, что придет на ум - это просто создать обычный
view
с содержимым запроса и попробовать отдать на проверку, но это не будет работать быстро и будет нагружать систему источника ( далее СИ ) данных.Что обычно делают и мне довелось увидеть в разных компаниях - это приземление CDC данных в кафку и последующей дедупликацией на основе служебных метаданных CDC сообщения в различной системе приемника ( далее СП ) на уровне DWH или витрином слое.
Например делаем CDC потоки в кафку для таблицusers
domains
и далее какой либо стриминг джобой вычитываем CDC поток из кафки и дедуплицируя кладем в СП в две разные таблицы дляusers
иdomains
. Далее создаемview
с запросом бизнеса и отдаем на проверку. Работать по скорости будет примерно так же, как и в 1 случае с небольшой погрешностью в зависимости от выбранного СП, однако таким образом удалось избавиться от нагрузки СИ. Из минусов - усложнили систему добавив еще 2 слоя.
Эффективное решение на Apache Flink
Для достижения реального времени выполнения запроса с уменьшенной нагрузкой нам также нужен будет слой кафки с CDC сообщениями и витриный слой, но в данной статье я опущу выбор и стратегию подхода на слое СП. У нас также возникнет слой обработки данных с внедрением Apache Flink, в качестве sink
я применю для простоты вывод в консоль.
Создайте
maven
проект с Java 11 и добавьте туда следующие Flink зависимости:<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>3.2.0-1.19</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-client</artifactId> <version>1.19</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-client</artifactId> <version>1.19</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> </dependency>
Вам также понадобится настроенный паттерн CDC, а именно CDC сообщения для наших двух таблиц в кафке.
Далее с помощью Table API или DataStream API подпишитесь на кафка топики.
Пример метода.public class User implements Serializable { public Integer id; public String firstname; public String lastname; // getters and setters omitted public static User fromRow(Row row) { // создайте маппер } } public class Domain implements Serializable { public Integer id; public Integer user_id; public String domain_name; // getters and setters omitted public static Domain fromRow(Row row) { // создайте маппер } }
tableEnv.executeSql("CREATE TABLE users (" + "`before` ROW<id: INT, firstname: STRING, lastname: STRING>," + "`op` STRING," + "`after` ROW<id: INT, firstname: STRING, lastname: STRING>," + ") WITH (" + "'connector' = 'kafka'," + "'topic' = 'users_topic'," + "'properties.bootstrap.servers' = 'kafka-brokers'," + // адреса брокеров кафки "'properties.group.id' = 'users_consumer_group'," + "'scan.startup.mode' = 'earliest'"); DataStream<User> users = tableEnv.toDataStream(tableEnv.from("users")).map(User::fromRow); tableEnv.executeSql("CREATE TABLE domains (" + "`before` ROW<id: INTEGER, user_id: INTEGER, domain_name: STRING>," + "`op` STRING," + "`after` ROW<id: INTEGER, user_id: INTEGER, domain_name: STRING>," + ") WITH (" + "'connector' = 'kafka'," + "'topic' = 'domains_topic'," + "'properties.bootstrap.servers' = 'kafka-brokers'," + // адреса брокеров кафки "'properties.group.id' = 'domains_consumer_group'," + "'scan.startup.mode' = 'earliest'"); DataStream<Domain> domains = tableEnv.toDataStream(tableEnv.from("domains")).map(Domain::fromRow);
После того, как убедились, что
DataStream
users
иdomains
получают данные, для реализации `INNER JOIN` операции на уровне SQL нам понадобится методconnect
.users .connect(domains) .keyBy( user -> user.id, domain -> domain.user_id ) .process(new InnerJoinFunction()) .print();
import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.*; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector; import java.io.Serializable; public class Join1 extends KeyedCoProcessFunction<Integer, User, Domain, InnerJoinFunction.Output> { private MapState<Integer, User> usersState; private ValueState<Domain> domainsState; @Override public void processElement1(final User user, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception { usersState.put(user.id, user); final Domain domain = domainsState.value(); if (domain != null) { out.collect(new InnerJoinFunction.Output( user.id, user.firstname, user.lastname, domain.domain_name )); } } @Override public void processElement2(final Domain domain, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception { domainsState.update(domain); final boolean innerJoinCondition = !user.metadata.__is_deleted; for (User user : usersState.values()) { out.collect(new InnerJoinFunction.Output( user.id, user.firstname, user.lastname, domain.domain_name )); } } @Override public void open(OpenContext openContext) throws Exception { var usersStateDescriptor = new MapStateDescriptor<>( "users", Integer.class, User.class ); var domainsStateDescriptor = new ValueStateDescriptor<>( "domains", Domain.class ); usersState = getRuntimeContext().getMapState(usersStateDescriptor); domainsState = getRuntimeContext().getState(domainsStateDescriptor); super.open(openContext); } public static class Output implements Serializable { public Integer user_id; public String firstname; public String lastname; public String domain_name; // getters and setters omitted } }
MapState используется так как имеем реляцию один ко многим и достижения скорости обновления и доступа к данным в
state
На выходе мы будем видеть обновляемые в реальном времени в консоли актуальные данные формирующие данный запрос.
Разумеется, данный пример простой и имеет нюансы в деталях реализации, но возможность реализации наглядно видна и все выполняемые операции выполняются заO(1)
в процессе обновленияstate
и отправки данных дальше по потоку. Если предположить, что СП справляется со всемиCRUD
операциями не более относительно быстро, то результат запроса будет актуален всегда и выполняться на аналитическом хранилище будет быстро.
Дальнейшие шаги
Для доработки данного примера разверните OLAP СП и грузите данные каким либо
sink
коннектором Flink. Вы будете видеть актуальную витрину даже когда данных очень много при правильной настройке и выдаче достаточных ресурсов на уровне СП даже при терабайтных нагрузках на СИ - данная теория была проверена мною на практике.Учитывайте при обработке данных в
InnerJoinFunction
удаления и апдейты и обрабатывайте их правильно.Перекосы тоже бывают и с ними бороться будет непросто.
В прод конфигурации подумайте о внедрении RocksDB.