Доработка условия Inner Join
Ранее в примере в InnerJoinFunction
мы отправляли данные дальше по потоку независимо от CRUD
операции, которая была применена на нашу запись - мы просто при приходе новой записи из какой либо таблицы всегда отправляли новую запись в СП, но ведь если придет сообщение, уведомляющее об удалении записи, то необходимо будет запись удалить на СП и если взаимосвязь таблиц один ко многим, то придется удалить много записей.
Структура Debezium сообщения
Мы работаем с кафкой и принимаем сообщения, генерируемые Debezium в данном примере. Его структура приблизительно такая при простейшей конфигурации ( бОльшей нам и не надо ).
{
"op": "(c|r|u|d)",
"source": {
...
},
"ts_ms" : "...",
"ts_us" : "...",
"ts_ns" : "...",
"before" : [Data, null],
"after" : [Data, null]
}
Где Data
это модель данных.
В зависимости от операции "op"
поля before
и after
принимают значение, описанное в модели данных либо null
. Если вкратце, то при:
op = 'c' -> before = null, after = Data -> insert
op = 'r' -> before = null, after = Data -> initial snapshot if set
op = 'u' -> before = Data, after = Data -> update
op = 'd' -> before = Data, after = null -> delete
Доработка маппера класса модели данных
public class Domain implements Serializable {
public Integer id;
public Integer user_id;
public String domain_name;
public boolean delete;
// getters and setters omitted
public static Domain fromRow(Row row) {
Character op = (Character) row.getField(1);
if (op == null) {
throw new IllegalStateException("Never should happen, if Debezium feels fine");
}
Row domain = (Row) row.getField(op == 'd' ? 0 : 2);
Integer id = (Integer) domain.getField(0);
Integer user_id = (Integer) domain.getField(1);
Integer domain_name = (Integer) domain.getField(2);
return new Domain(id, user_id, domain_name);
}
}
При:
1. op
= c
или r
берем то, что в after.
2. op
= d
берем всегда before.
3. op
= u
берем всегда тоже after, однако есть нюанс, о котором расскажу в следующей статье.
С моделью User
по аналогии с Domain
.
Доработка InnerJoinFunction
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.*;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import java.io.Serializable;
public class InnerJoinFunction extends KeyedCoProcessFunction<Integer, User, Domain, InnerJoinFunction.Output> {
private MapState<Integer, Domain> domainsState;
private ValueState<User> usersState;
@Override
public void processElement1(final User user, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception {
usersState.update(user);
for (final Domain domain : domainsState.values()) {
out.collect(new InnerJoinFunction.Output(
user.id,
user.firstname,
user.lastname,
domain.domain_name,
user.delete || domain.delete
));
}
}
@Override
public void processElement2(final Domain domain, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception {
domainsState.put(domain.id, domain);
final User user = usersState.value();
if (user != null) {
out.collect(new InnerJoinFunction.Output(
user.id,
user.firstname,
user.lastname,
domain.domain_name,
user.delete || domain.delete
));
}
}
@Override
public void open(OpenContext openContext) throws Exception {
var usersStateDescriptor = new ValueStateDescriptor<>(
"users",
User.class
);
var domainsStateDescriptor = new MapStateDescriptor<>(
"domains",
Integer.class,
Domain.class
);
usersState = getRuntimeContext().getState(usersStateDescriptor);
domainsState = getRuntimeContext().getMapState(domainsStateDescriptor);
super.open(openContext);
}
public static class Output implements Serializable {
public Integer user_id;
public String firstname;
public String lastname;
public String domain_name;
public boolean delete;
// getters and setters omitted
}
}
Такой подход будет успешно работать со всеми CRUD
операциями и выполнять inner join условие.
Нюанс с update
Если ключ join изменяется ( любая из колонок одной из любой таблиц ), то данный подход работать не будет из за локальности хранения state движком Flink, в нашем примере ключом join является поле user_id
в обеих таблицах и если в одной из них оно изменяется, то этот подход в при операциях update
над user_id
будет ломать логику.
В 3 части будет подробный разбор решения это проблемы.