Как стать автором
Обновить

Join таблиц в реальном времени на Apache Flink ( Часть 2 )

Уровень сложностиСложный
Время на прочтение3 мин
Количество просмотров700

Доработка условия Inner Join

Ранее в примере в InnerJoinFunction мы отправляли данные дальше по потоку независимо от CRUD операции, которая была применена на нашу запись - мы просто при приходе новой записи из какой либо таблицы всегда отправляли новую запись в СП, но ведь если придет сообщение, уведомляющее об удалении записи, то необходимо будет запись удалить на СП и если взаимосвязь таблиц один ко многим, то придется удалить много записей.

Структура Debezium сообщения

Мы работаем с кафкой и принимаем сообщения, генерируемые Debezium в данном примере. Его структура приблизительно такая при простейшей конфигурации ( бОльшей нам и не надо ).

{
	"op": "(c|r|u|d)",
	"source": {
		...
	},
	"ts_ms" : "...",
	"ts_us" : "...",
	"ts_ns" : "...",
	"before" : [Data, null],
	"after" : [Data, null]
}

Где Data это модель данных.
В зависимости от операции "op" поля before и after принимают значение, описанное в модели данных либо null . Если вкратце, то при:

op = 'c' -> before = null, after = Data -> insert
op = 'r' -> before = null, after = Data -> initial snapshot if set
op = 'u' -> before = Data, after = Data -> update
op = 'd' -> before = Data, after = null -> delete

Доработка маппера класса модели данных

public class Domain implements Serializable {
  public Integer id;
  public Integer user_id;
  public String domain_name;
  public boolean delete;

  // getters and setters omitted

  public static Domain fromRow(Row row) {
        Character op = (Character) row.getField(1);

        if (op == null) {
            throw new IllegalStateException("Never should happen, if Debezium feels fine");
        }

        Row domain = (Row) row.getField(op == 'd' ? 0 : 2);

        Integer id = (Integer) domain.getField(0);
        Integer user_id = (Integer) domain.getField(1);
        Integer domain_name = (Integer) domain.getField(2);

        return new Domain(id, user_id, domain_name);
    }
}

При:
1. op = c или r берем то, что в after.
2. op = d берем всегда before.
3. op = u берем всегда тоже after, однако есть нюанс, о котором расскажу в следующей статье.

С моделью User по аналогии с Domain.

Доработка InnerJoinFunction

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.*;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;

import java.io.Serializable;

public class InnerJoinFunction extends KeyedCoProcessFunction<Integer, User, Domain, InnerJoinFunction.Output> {
    private MapState<Integer, Domain> domainsState;
    private ValueState<User> usersState;

    @Override
    public void processElement1(final User user, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception {
        usersState.update(user);
      
        for (final Domain domain : domainsState.values()) {
            out.collect(new InnerJoinFunction.Output(
                    user.id,
                    user.firstname,
                    user.lastname,
                    domain.domain_name,
                    user.delete || domain.delete
            ));
        }
    }

    @Override
    public void processElement2(final Domain domain, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception {
        domainsState.put(domain.id, domain);

        final User user = usersState.value();

        if (user != null) {
            out.collect(new InnerJoinFunction.Output(
                    user.id,
                    user.firstname,
                    user.lastname,
                    domain.domain_name,
                    user.delete || domain.delete
            ));
        }
    }

    @Override
    public void open(OpenContext openContext) throws Exception {
        var usersStateDescriptor = new ValueStateDescriptor<>(
                "users",
                User.class
        );
        var domainsStateDescriptor = new MapStateDescriptor<>(
                "domains",
                Integer.class,
                Domain.class
        );
        usersState = getRuntimeContext().getState(usersStateDescriptor);
        domainsState = getRuntimeContext().getMapState(domainsStateDescriptor);

        super.open(openContext);
    }

    public static class Output implements Serializable {
        public Integer user_id;
        public String firstname;
        public String lastname;
        public String domain_name;
        public boolean delete;
        
        // getters and setters omitted
    }
}

Такой подход будет успешно работать со всеми CRUD операциями и выполнять inner join условие.

Нюанс с update

Если ключ join изменяется ( любая из колонок одной из любой таблиц ), то данный подход работать не будет из за локальности хранения state движком Flink, в нашем примере ключом join является поле user_id в обеих таблицах и если в одной из них оно изменяется, то этот подход в при операциях update над user_id будет ломать логику.

В 3 части будет подробный разбор решения это проблемы.


Теги:
Хабы:
0
Комментарии0

Публикации

Работа

Java разработчик
193 вакансии
Data Scientist
43 вакансии

Ближайшие события