Pull to refresh

Join таблиц в реальном времени на Apache Flink

Level of difficultyHard
Reading time5 min
Views887

Допустим есть 2 таблицы в любой реляционной базе данных.
Таблица users весом 4TB

id

firstname

lastname

1

Egor

Myasnik

2

Pavel

Hvastun

3

Mitya

Volk

Таблица domains 2TB

id

user_id

domain_name

1

1

Approval

2

1

Rejection

3

1

Stoppage

4

3

Cancellation

Задача

В один летний день к вам приходит бизнес и требует результат выполнения запроса в реальном времени.

SELECT d.user_id, u.firstname, u.lastname, d.domain_name
FROM users u
INNER JOIN domains d
ON u.user_id = d.user_id

Первые решения, приходящие в голову

  1. Первое, что придет на ум - это просто создать обычный view с содержимым запроса и попробовать отдать на проверку, но это не будет работать быстро и будет нагружать систему источника ( далее СИ ) данных.

  2. Что обычно делают и мне довелось увидеть в разных компаниях - это приземление CDC данных в кафку и последующей дедупликацией на основе служебных метаданных CDC сообщения в различной системе приемника ( далее СП ) на уровне DWH или витрином слое.
    Например делаем CDC потоки в кафку для таблиц users domains и далее какой либо стриминг джобой вычитываем CDC поток из кафки и дедуплицируя кладем в СП в две разные таблицы для users и domains. Далее создаем view с запросом бизнеса и отдаем на проверку. Работать по скорости будет примерно так же, как и в 1 случае с небольшой погрешностью в зависимости от выбранного СП, однако таким образом удалось избавиться от нагрузки СИ. Из минусов - усложнили систему добавив еще 2 слоя.

Эффективное решение на Apache Flink

Для достижения реального времени выполнения запроса с уменьшенной нагрузкой нам также нужен будет слой кафки с CDC сообщениями и витриный слой, но в данной статье я опущу выбор и стратегию подхода на слое СП. У нас также возникнет слой обработки данных с внедрением Apache Flink, в качестве sink я применю для простоты вывод в консоль.

  • Создайте maven проект с Java 11 и добавьте туда следующие Flink зависимости:

    <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-connector-kafka</artifactId>
       <version>3.2.0-1.19</version>
    </dependency>
    <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-sql-client</artifactId>
       <version>1.19</version>
    </dependency>
    <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-sql-client</artifactId>
       <version>1.19</version>
    </dependency>
    <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-java</artifactId>
       <version>${flink.version}</version>
    </dependency>
    <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-table-planner-loader</artifactId>
       <version>${flink.version}</version>
    </dependency>
  • Вам также понадобится настроенный паттерн CDC, а именно CDC сообщения для наших двух таблиц в кафке.

  • Далее с помощью Table API или DataStream API подпишитесь на кафка топики.
    Пример метода.

    public class User implements Serializable {
      public Integer id;
      public String firstname;
      public String lastname;
    
      // getters and setters omitted
    
      public static User fromRow(Row row) {
       // создайте маппер
      }
    }
    
    public class Domain implements Serializable {
      public Integer id;
      public Integer user_id;
      public String domain_name;
    
      // getters and setters omitted
    
      public static Domain fromRow(Row row) {
       // создайте маппер
      }
    }
     tableEnv.executeSql("CREATE TABLE users (" +
                    "`before` ROW<id: INT, firstname: STRING, lastname: STRING>," +
                    "`op` STRING," +
                    "`after` ROW<id: INT, firstname: STRING, lastname: STRING>," +
                    ") WITH (" +
                    "'connector' = 'kafka'," +
                    "'topic' = 'users_topic'," +
                    "'properties.bootstrap.servers' = 'kafka-brokers'," + // адреса брокеров кафки
                    "'properties.group.id' = 'users_consumer_group'," +
                    "'scan.startup.mode' = 'earliest'");
    
            DataStream<User> users = tableEnv.toDataStream(tableEnv.from("users")).map(User::fromRow);
    
    tableEnv.executeSql("CREATE TABLE domains (" +
                    "`before` ROW<id: INTEGER, user_id: INTEGER, domain_name: STRING>," +
                    "`op` STRING," +
                    "`after` ROW<id: INTEGER, user_id: INTEGER, domain_name: STRING>," +
                    ") WITH (" +
                    "'connector' = 'kafka'," +
                    "'topic' = 'domains_topic'," +
                    "'properties.bootstrap.servers' = 'kafka-brokers'," + // адреса брокеров кафки
                    "'properties.group.id' = 'domains_consumer_group'," +
                    "'scan.startup.mode' = 'earliest'");
    
            DataStream<Domain> domains = tableEnv.toDataStream(tableEnv.from("domains")).map(Domain::fromRow);
      
  • После того, как убедились, что DataStream users и domains получают данные, для реализации `INNER JOIN` операции на уровне SQL нам понадобится метод connect .

    users
      .connect(domains)
      .keyBy(
        user -> user.id,
        domain -> domain.user_id
      )
      .process(new InnerJoinFunction())
      .print();
    import org.apache.flink.api.common.functions.OpenContext;
    import org.apache.flink.api.common.state.*;
    import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
    import org.apache.flink.util.Collector;
    
    import java.io.Serializable;
    
    public class Join1 extends KeyedCoProcessFunction<Integer, User, Domain, InnerJoinFunction.Output> {
        private MapState<Integer, User> usersState;
        private ValueState<Domain> domainsState;
    
        @Override
        public void processElement1(final User user, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception {
            usersState.put(user.id, user);
            final Domain domain = domainsState.value();
    
            if (domain != null) {
                out.collect(new InnerJoinFunction.Output(
                        user.id,
                        user.firstname,
                        user.lastname,
                        domain.domain_name
                ));
            }
        }
    
        @Override
        public void processElement2(final Domain domain, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception {
            domainsState.update(domain);
    
            final boolean innerJoinCondition = !user.metadata.__is_deleted;
    
            for (User user : usersState.values()) {
                out.collect(new InnerJoinFunction.Output(
                        user.id,
                        user.firstname,
                        user.lastname,
                        domain.domain_name
                ));
            }
        }
    
        @Override
        public void open(OpenContext openContext) throws Exception {
            var usersStateDescriptor = new MapStateDescriptor<>(
                    "users",
                    Integer.class,
                    User.class
            );
            var domainsStateDescriptor = new ValueStateDescriptor<>(
                    "domains",
                    Domain.class
            );
            usersState = getRuntimeContext().getMapState(usersStateDescriptor);
            domainsState = getRuntimeContext().getState(domainsStateDescriptor);
    
            super.open(openContext);
        }
    
        public static class Output implements Serializable {
            public Integer user_id;
            public String firstname;
            public String lastname;
            public String domain_name;
            
            // getters and setters omitted
        }
    }
  • MapState используется так как имеем реляцию один ко многим и достижения скорости обновления и доступа к данным в state

  • На выходе мы будем видеть обновляемые в реальном времени в консоли актуальные данные формирующие данный запрос.
    Разумеется, данный пример простой и имеет нюансы в деталях реализации, но возможность реализации наглядно видна и все выполняемые операции выполняются за O(1) в процессе обновления state и отправки данных дальше по потоку. Если предположить, что СП справляется со всеми CRUD операциями не более относительно быстро, то результат запроса будет актуален всегда и выполняться на аналитическом хранилище будет быстро.

Дальнейшие шаги

  • Для доработки данного примера разверните OLAP СП и грузите данные каким либо sink коннектором Flink. Вы будете видеть актуальную витрину даже когда данных очень много при правильной настройке и выдаче достаточных ресурсов на уровне СП даже при терабайтных нагрузках на СИ - данная теория была проверена мною на практике.

  • Учитывайте при обработке данных в InnerJoinFunction удаления и апдейты и обрабатывайте их правильно.

  • Перекосы тоже бывают и с ними бороться будет непросто.

  • В прод конфигурации подумайте о внедрении RocksDB.

Tags:
Hubs:
0
Comments1

Articles