Интро
Привет Хабр!
Эта статья продолжение первых двух статей по построению real-time системы по реализации Join двух таблиц источников.
Другие части:
1 часть
2 часть
Agenda:
1. Вспомнить архитектуру Apache Flink.
2. Проблемные сценарии, возникающие при update.
3. Решение всех сценариев при update.
Архитектура Apache Flink (бегло)
Скорее всего читатели знают, но контекста ради.
Flink имеет децентрализованный дизайн с распределенной архитектурой, где набор контейнеров ( Task Manager ) несут ответственность за свою локальную зону или не несут???
Эта зона в случае разбиения через keyBy размазывается исходя из хэша заданного ключа, посредством чего, Flink гарантирует попадание всех событий с этим ключом на тот же контейнер и также гарантирует сохранение того порядка событий, в котором они доехали до оператора, и, как следствие этот контейнер является мастером-владельцем ключа - отсюда понятие локальной зоны становится оправданным, ведь все хэш пространство размазывается на контейнеры равномерно. Однако, если не использовать keyBy и просто попробовать вызвать что угодно, то будет round-robin распределение и тогда снимается понятие локальной зоны ответственности.
Проблема нюанса с update
Если в таблице domains поле user_id изменяемое, то давайте представим такой пример:
1. Было domains.id = 1, domains.user_id = 1
2. Происходит обновление и меняется domains.id = 1, domains.user_id = 2
3. Мы получаем в потоке одно событие domains.id = 1, domains.user_id = 2, delete = false
4. Это событие ( с какой то вероятностью ) попадает на другой контейнер, мэтчится с пользователем user.id = 2, заполняет state этим событием и отправляется дальше по потоку обновляя СП.
Но! Событие или уже объект, связанный с domains.id = 1, domains.user_id = 1 остается в state на другом контейнере и хотелось бы, чтобы события могли за собой убирать.
Второй пример:
1. Было domains.id = 1, domains.user_id = 1
2. Происходит обновление и меняется domains.id = 2, domains.user_id = 2 - да вот так вот сразу обновился первичный ключ и пользак сразу!
3. Мы получаем в потоке одно событие domains.id = 2, domains.user_id = 2, delete = false
4. Это событие ( с какой то вероятностью ) попадает на другой контейнер, мэтчится с пользователем user.id = 2, заполняет state этим событием и отправляется дальше по потоку обновляя СП неправильно! СП дедуплицируется по domains.id и новое событие domains.id = 2 до него дойдет, но старое domains.id = 1 не удалится. В добавок мы все также забыли почистить за собой.
Решение нюанса с update
Оказывается, что есть серебрянная пуля решающее все.
Достаточно во время прихода событий update делить их на delete + insert.
В CDC update событии согласно Debezium протоколу сообщение имеет заполненные before и after.
Итоговое решение для Domain
java public class Domain implements Serializable { public Integer id; public Integer user_id; public String domain_name; public boolean delete; // getters and setters omitted public static Domain[] fromRow(Row row) { Character op = (Character) row.getField(1); if (op == null) { throw new IllegalStateException("Never should happen, if Debezium feels fine"); } if (op == 'd') { Row before = (Row) row.getField(0); return new Domain[]{ new Domain(before.getField(0), before.getField(1), before.getField(2), true) }; } else if (op == 'u') { Row before = (Row) row.getField(0); Row after = (Row) row.getField(2); return new Domain[]{ new Domain(before.getField(0), before.getField(1), before.getField(2), true), new Domain(after.getField(0), after.getField(1), after.getField(2), false) }; } else { Row after = (Row) row.getField(2); return new Domain[]{ new Domain(after.getField(0), after.getField(1), after.getField(2), false) }; } } }
Обратите внимание на порядок - сначала delete и потом insert. Далее оно в таком же порядке доходит до Join оператора из-за гарантий keyBy и помечается удаленным в state ( при желании можно удалять ) и отправляется для старого domain.id = 1 в СП с пометкой удаления. В случае insert + delete свежим событием было бы удаление на всех уровнях.
Итого:
Имеем realtime поток, умеющий работать со всеми CRUD операциями.
State не имеющий избыточности.
Умеем работать с изменяемым ключом джоина.
Умеем работать с изменяемым первичным ключом на СИ.
Следующие шаги:
Выстроить хронологию событий - что если сначала придет в кафку второе событие и потом первое?
СП - как правильно выбрать?
Можем ли мы уже потянуть терабайты и держать их в
умеstate.

