Доработка условия Inner Join
Ранее в примере в InnerJoinFunction мы отправляли данные дальше по потоку независимо от CRUD операции, которая была применена на нашу запись - мы просто при приходе новой записи из какой либо таблицы всегда отправляли новую запись в СП, но ведь если придет сообщение, уведомляющее об удалении записи, то необходимо будет запись удалить на СП и если взаимосвязь таблиц один ко многим, то придется удалить много записей.
Структура Debezium сообщения
Мы работаем с кафкой и принимаем сообщения, генерируемые Debezium в данном примере. Его структура приблизительно такая при простейшей конфигурации ( бОльшей нам и не надо ).
{ "op": "(c|r|u|d)", "source": { ... }, "ts_ms" : "...", "ts_us" : "...", "ts_ns" : "...", "before" : [Data, null], "after" : [Data, null] }
Где Data это модель данных.
В зависимости от операции "op" поля before и after принимают значение, описанное в модели данных либо null . Если вкратце, то при:
op = 'c' -> before = null, after = Data -> insert op = 'r' -> before = null, after = Data -> initial snapshot if set op = 'u' -> before = Data, after = Data -> update op = 'd' -> before = Data, after = null -> delete
Доработка маппера класса модели данных
public class Domain implements Serializable { public Integer id; public Integer user_id; public String domain_name; public boolean delete; // getters and setters omitted public static Domain fromRow(Row row) { Character op = (Character) row.getField(1); if (op == null) { throw new IllegalStateException("Never should happen, if Debezium feels fine"); } Row domain = (Row) row.getField(op == 'd' ? 0 : 2); Integer id = (Integer) domain.getField(0); Integer user_id = (Integer) domain.getField(1); Integer domain_name = (Integer) domain.getField(2); return new Domain(id, user_id, domain_name); } }
При:
1. op = c или r берем то, что в after.
2. op = d берем всегда before.
3. op = u берем всегда тоже after, однако есть нюанс, о котором расскажу в следующей статье.
С моделью User по аналогии с Domain.
Доработка InnerJoinFunction
import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.*; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector; import java.io.Serializable; public class InnerJoinFunction extends KeyedCoProcessFunction<Integer, User, Domain, InnerJoinFunction.Output> { private MapState<Integer, Domain> domainsState; private ValueState<User> usersState; @Override public void processElement1(final User user, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception { usersState.update(user); for (final Domain domain : domainsState.values()) { out.collect(new InnerJoinFunction.Output( user.id, user.firstname, user.lastname, domain.domain_name, user.delete || domain.delete )); } } @Override public void processElement2(final Domain domain, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception { domainsState.put(domain.id, domain); final User user = usersState.value(); if (user != null) { out.collect(new InnerJoinFunction.Output( user.id, user.firstname, user.lastname, domain.domain_name, user.delete || domain.delete )); } } @Override public void open(OpenContext openContext) throws Exception { var usersStateDescriptor = new ValueStateDescriptor<>( "users", User.class ); var domainsStateDescriptor = new MapStateDescriptor<>( "domains", Integer.class, Domain.class ); usersState = getRuntimeContext().getState(usersStateDescriptor); domainsState = getRuntimeContext().getMapState(domainsStateDescriptor); super.open(openContext); } public static class Output implements Serializable { public Integer user_id; public String firstname; public String lastname; public String domain_name; public boolean delete; // getters and setters omitted } }
Такой подход будет успешно работать со всеми CRUD операциями и выполнять inner join условие.
Нюанс с update
Если ключ join изменяется ( любая из колонок одной из любой таблиц ), то данный подход работать не будет из за локальности хранения state движком Flink, в нашем примере ключом join является поле user_id в обеих таблицах и если в одной из них оно изменяется, то этот подход в при операциях update над user_id будет ломать логику.
В 3 части будет подробный разбор решения это проблемы.
