
В прошлой статье я рассказал про общую структуру проекта, про работу Kafka с CDC для получения данных из базы. Теперь пришло время поговорить про саму реализацию триггеров на Python. Как говорилось в предыдущей статье, мы будем реализовывать только триггеры Before (Instead Of останутся в базе без изменений). Итак, что же нам необходимо предусмотреть при разработке?
Каждый триггер будет запускаться как отдельный Deployment в K8s, то есть нужно предусмотреть удобный запуск триггеров.
Один триггер может обрабатывать только один топик из Kafka.
В каждом триггере должна быть возможность точечно настраивать фильтры по получаемым из Kafka данным.
Чтение Kafka
Для начала необходимо реализовать чтение топиков Kafka. На этом этапе мы знаем, что один топик — это данные из одной таблицы, и один триггер может обрабатывать данные только из одного топика. Таким образом, мы пришли к реализации ServiceLocator. Только мы будем реализовывать этот паттерн через декораторы.
@SubscribeKafkaTopik('Sales') class TrSalesUpdate(ABCTrigger):
Таким образом, при запуске нашего сервиса мы будем сразу получать топик, который нужно будет слушать.
class MetaTriggers(type): def __getitem__(cls, trigger_name): return cls.__triggets__[trigger_name] class SubscribeKafkaTopik(metaclass=MetaTriggers): __triggets__ = {} topok_name = None def __new__(cls, topik): if not hasattr(cls, 'instance'): cls.instance = super(SubscribeKafkaTopik, cls).__new__(cls) cls.instance.topok_name = topik return cls.instance def __call__(self, cls): if cls.__name__ not in self.__triggets__: self.__triggets__[cls.__name__] = cls(self.topok_name) return cls def __init__(self, topik): self.topok_name = topik @classmethod def print(cls): print(cls.__triggets__) @classmethod def get(cls, trigger_name): if trigger_name not in cls.__triggets__: return None return cls.__triggets__.get(trigger_name)
Теперь, как же запускать нужный нам топик? Учитывая, что мы реализовали регистрацию каждого триггера и прикрепили сопоставление триггера и топика, нам достаточно реализовать получение названия класса триггера, который мы хотим запустить через аргументы. Мы сделали это через ArgumentParser.
parser.add_argument('--trigger', help='Запускаемый триггер', default=os.getenv('TRIGGER_CLS'))
Пример запуска:
python main.py --trigger TrSalesUpdate
Далее мы получаем название триггера и, по названию триггера, начинаем слушать топик Kafka, который указан в декораторе SubscribeKafkaTopic.
SubscribeKafkaTopik[args.trigger].listen()
Что здесь происходит?
Во первых, мы получаем из SubscribeKafkaTopic класс, который был зарегистрирован как триггер с использованием декоратора @SubscribeKafkaTopic. Так как при запуске мы указали параметр --trigger TrSalesUpdate, то и на этапе SubscribeKafkaTopic[args.trigger] нам вернется класс TrSalesUpdate. Но откуда метод listen() и что он делает? Тут тоже все достаточно просто. Наш класс TrSalesUpdate, да и все другие классы, которые являются триггерами, унаследованы от базового класса ABCTrigger.
class ABCTrigger(ABC): def __init__(self, topik_name = None): if topik_name: self.consumer = KafkaConsumer( topik_name, group_id=self.__class__.__name__, api_version=(0,10), bootstrap_servers=",".join(credentials['kafka']['bootstrap_servers']), auto_offset_reset='latest', value_deserializer=lambda x: loads(x.decode('utf-8')) if x is not None else None, ) self.consumer.poll(timeout_ms=10000) @abstractmethod def call(self, message, key = None): ... def listen(self): print(f"Start Listen kafka {self.__class__.__name__}") for message in self.consumer: if message is None: continue Thread(target=self.call, args=(message.value, message.key)).start()
В этом классе есть метод listen(), который начинает слушать топик Kafka. Таким образом, конструкция SubscribeKafkaTopic[args.trigger].listen() запускает получение сообщений из указанного топика.
Во-вторых, после получения сообщений из указанного топика Kafka, сообщение передаётся в метод call.
Thread(target=self.call, args=(message.value, message.key)).start()
Таким образом, в каждом триггере должна быть реализация метода call. Для того, чтобы реализовать логику обработки сообщений для каждого отдельного триггера
def call(self, message, key = None): ...
Теперь нужно рассмотреть, как обрабатывать различные типы событий, чтобы наш триггер срабатывал только при необходимых нам действиях.
Фильтры событий
Как говорилось ранее, нам доступны следующее события:
r - read - операция чтения данных из таблицы. Возникает в момент подключения Kafka-Connect к таблице.
c - create - операция создания записи, аналог insert
u - update - операция обновления записи
d - delete - операция удаления записи
Зная эти типы событий, нам необходимо сделать так, чтобы в дальнейшем наш триггер выполнялся при одном или нескольких событиях.
Например, у нас есть триггер обновления цены счета после того, как товар в счете был добавлен, изменен или удален. Соответственно, такой триггер должен срабатывать при следующих событиях: c, u, d.
Или другой пример: нам необходимо в момент добавления товара в счет добавлять себестоимость этого товара в таблицу с товарами в счете. Такой триггер уже должен работать только при событии создания (insert), то есть с типом c.
Реализовать такой фильтр по событиям мы решили через декораторы.
@FilterActionType('u', 'c') def call(self, message, key = None): ...
Таким образом, если тип события не совпадает с типом, который был передан в декоратор, то метод call не сработает.
Скрытый текст
class FilterActionType: def __init__(self, *actions): self.actions = actions def __call__(self, fn): def call_func(*args, **kwargs): if args[1]['payload']['op'] in self.actions: return fn(*args, **kwargs) return False return call_func
Однако это не всё. Что если триггер должен срабатывать только в том случае, если изменилось какое-то определённое поле либо список определённых полей? И только в том случае, если необходимые нам поля затронуты при изменении, выполнять триггер. Возьмём тот же пример с обновлением цены счета при изменении товара в счёте. Цену счёта нам нужно менять только если изменилось количество товара или изменилась цена товара. Но нам не нужно, чтобы триггер срабатывал, когда, например, изменено описание у этого товара в счёте.
Это мы также сделали через декоратор.
@FilterActionType('u') @FilterUpdatedRow('Price', 'Quantity') def call(self, message, key = None): ...
Таким образом, если условие не подойдет хотя бы по одному из декораторов, то метод выполнен не будет.
Скрытый текст
class FilterUpdatedRow: def __init__(self, *columns): self.columns = columns def __call__(self, fn): def call_func(*args, **kwargs): for column in self.columns: if args[1]['payload']['before'][column] != args[1]['payload']['after'][column]: return fn(*args, **kwargs) return False return call_func
А теперь представим такую ситуацию. У нас есть отгрузочный документ, который закрывается, переводится на последнюю стадию, например, с типом "Закрыли". В таком случае нам необходимо проставить дату, когда был закрыт этот документ. При этом важно учитывать, что дату нужно проставлять только у тех документов, которые были закрыты только на складе A. Таким образом, нам нужно реализовать триггер, который сработает при следующих условиях:
Триггер т��лько на событие обновления
Триггер только на изменение поле Status
Триггер только на измене поля Status на значение равное Закрыто и Склад должен быть равен значению A
Что мы имеем сейчас? У нас есть два фильтра, которые помогут нам реализовать два первых пункта. Но что делать с третьим? Правильно, реализовать!
Что мы видим из ТЗ? Нам нужно реализовать фильтр по значению строк. Это достаточно просто, так как у нас из Kafka приходят значения after и before, и нам достаточно проверить, равно ли значение указанного поля (в нашем случае Status = 'Закрыто' и Store = 'A') значению в блоке after. И тут мы можем пойти двумя путями.
Реализовать декоратор отдельно для каждого поля. То есть в каждый отдельный декоратор мы будем передавать проверку на значение каждого поля. И это будет работать, так как если хоть один из декораторов не сработает, метод call не выполнится. Это подходит под логическое "И". Однако стоит предусмотреть тот вариант, что в ТЗ может стоять "ИЛИ", и тогда нам этот вариант не подойдет.
Реализовать декоратор, который принимает тип фильтрации (or или and) и в зависимости от этого фильтрует данные. И мы решили выбрать именно этот путь. Не понятно? Сейчас посмотрим на реализацию, и всё станет понятнее.
Для начала посмотрим, как эта фильтрация будет выглядеть в триггере:
@FilterActionType('u') @FilterUpdatedRow('Status') @FilterRowData( and_( [ lambda record: record['after']['Status'] == 'Закрыто', lambda record: record['after']['Store'] == 'A', ] ) ) def call(self, message, key = None): ...
В @FilterActionType('u') проверяем, что тип события это обновление
В @FilterUpdatedRow('Status') проверяем, что во время обновления было изменено именно значение поля Status
В @FilterRowData реализуем проверку наших значений. Так как в задаче нужно проверить, что Status = 'Закрыто' и Store = 'A', мы реализовали проверку этих условий через and_, в который передаем лямбда-методы по проверке данных. Таким образом, мы можем реализовать любую логику проверки данных, которые мы получаем из Kafka.
Скрытый текст
class BaseFunc: def __init__(self, filters = []): self.filters = filters
Скрытый текст
class and_(BaseFunc): def __call__(self, data): if not self.filters: return True result_filters = [] for idx, filter_ in enumerate(self.filters): if (idx >= 2) & (sum(result_filters) != idx): return False result_filters.append(int(filter_(data))) return sum(result_filters) == len(self.filters) class or_(BaseFunc): def __call__(self, data): if not self.filters: return True for filter_ in self.filters: if filter_(data): return True return False
Запуск проверки происходит в декораторе FilterRowData. И если фильтр, который мы передали в конструктор (в данном случае and_), вернёт False, то метод call не отработает.
class FilterRowData: def __init__(self, filter_func: BaseFunc = None): self.filter_func = filter_func def __call__(self, fn): def call_func(*args, **kwargs): if self.filter_func is None: return fn(*args, **kwargs) if self.filter_func(args[1]['payload']): return fn(*args, **kwargs) return False return call_func
Итоги
Таким образом, мы реализовали основные фильтры для данных, которые получаем из топиков Kafka, и в каждом из запущенных триггеров можно точечно настраивать фильтры под те данные, которые мы ожидаем получить и с которыми будем работать в данном классе. При этом мы реализовали достаточно простую регистрацию триггеров, что позволяет легко запускать нужный обработчик.
Однако остаются и задачи в бэклоге. Например, нужно сделать так, чтобы одна запись обрабатывалась триггером лишь однажды. То есть, если запись с одним и тем же идентификатором обновлялась несколько раз, то триггер для этой записи должен сработать только один раз. Или же реализовать чтение из базы слейва, а запись в базу мастера, чтобы разгрузить рабочую базу, с которой мы будем работать больше всего.
