Всем привет!
В этой статье мы посмотрим на два варианта решения "типичной" задачи потоковой обработки с учетом состояния при помощи фреймворка Apache Flink.
Если вы только начинаете знакомство с Flink, здесь можно найти обильное количество материалов.
Постановка задачи
Итак, давайте представим базу данных (пусть это будет PostgreSQL) состоящую из трех таблиц:
Clients - общая информация по клиенту:
Имя поля | Тип поля | Описание |
---|---|---|
clientId | int | уникальный идентификатор клиента |
name | string | имя клиента |
surname | string | фамилия клиента |
patronymic | optional[string] | отчество клиента (необязательное поле) |
sex | string | пол |
ClientCompany - информация о компании клиента:
Имя поля | Тип поля | Описание |
---|---|---|
clientId | int | уникальный идентификатор клиента |
companyId | int | уникальный идентификатор компании |
companyName | string | название компании |
Payment - информация о платежах клиента:
Имя поля | Тип поля | Описание |
---|---|---|
clientId | int | уникальный идентификатор клиента |
amount | int | сумма платежа |
tmMs | long | время свершения платежа |
Будем получать обновления по всем таблицам через CDC-канал (пусть это будет Debezium) в топик Kafka в формате Json. Задачей нашей Job-ы будет слушать топики и выдавать обновления по клиенту (например: клиент сменил компанию) во внешнюю систему (в рамках статьи в качестве внешней системы будем использовать консоль вывода).
Начинаем программировать
Наш технологический стек:
Мы разберем 2 варианта решения:
При помощи Table API
При помощи классического State Processor API
Прежде чем писать основную логику нам потребуется реализовать несколько общих шагов, которыми мы будем пользоваться в обеих реализациях.
Первое что нам потребуется - это научиться читать топики Kafka, в этом нам поможет уже готовый Flink Kafka Connector:
sealed trait KafkaConsumerSource {
def configure[A: Decoder: ClassTag](bootstrapServers: String, topicName: String): KafkaSource[A] = {
KafkaSource
.builder()
.setBootstrapServers(bootstrapServers)
.setGroupId(s"$topicName-group")
.setTopics(topicName)
.setDeserializer(new KafkaJsonRecordDeserializer[A])
.setStartingOffsets(OffsetsInitializer.earliest())
.build()
}
}
object KafkaConsumerSource extends KafkaConsumerSource {
def configureKafkaDataSource[A: Decoder: ClassTag](
topicName: String
)(implicit env: StreamExecutionEnvironment,
typeInformation: TypeInformation[A]): DataStream[A] = {
val kafkaSource = KafkaConsumerSource.configure[A](Kafka.BootstrapServers, topicName)
env.fromSource[A](kafkaSource, WatermarkStrategy.noWatermarks[A](), s"$topicName-flow")
}
}
class KafkaJsonRecordDeserializer[A: Decoder: ClassTag] extends KafkaRecordDeserializationSchema[A] {
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[A]): Unit = {
val value = new String(record.value())
val obj = Serde.toObj(value).toOption // в случае неудачи при десериализации, ошибка просто игнорируется и событие отбрасывается (в реальных проектах так делать не стоит)
obj.foreach(out.collect)
}
override def getProducedType: TypeInformation[A] = {
TypeExtractor
.getForClass(classTag[A].runtimeClass.asInstanceOf[Class[A]])
.asInstanceOf[TypeInformation[A]]
}
}
В сериализации/десериализации сообщений нам поможет Circe:
object Serde {
def toObj[A: Decoder](json: String): Either[circe.Error, A] = decode[A](json)
def toJson[A: Encoder](obj: A): String = obj.asJson.toString()
}
Мы немного упростим себе задачу, сымитируем поведение CDC-канала, отправляя сообщения требуемого формата напрямую в топик Kafka:
case class Client (before: Option[Client.Value], after: Option[Client.Value], op: String)
object Client {
case class Value(clientId: Int, name: String, surname: String, patronymic: Option[String], sex: String)
}
case class ClientCompany (before: Option[ClientCompany.Value], after: Option[ClientCompany.Value], op: String)
object ClientCompany {
case class Value(clientId: Int, companyId: Int, companyName: String)
}
case class Payment (before: Option[Payment.Value], after: Option[Payment.Value], op: String)
object Payment {
case class Value(clientId: Int, amount: Int, tmMs: Long)
}
Теперь мы готовы приступить к основной части.
Использование Flink Table API
Итак, прежде всего нам нужно получить необходимое окружение, поскольку мы будем работать в потоковом режиме, заручимся таковым:
implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
Далее получим окружение для доступа к Table API :
implicit val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
Формируем Kafka Source при помощи кода который мы подготовили выше:
val clients: DataStream[Client] = KafkaConsumerSource.configureKafkaDataSource[Client]("clients")
Теперь нам остался всего один шаг до получения представления с которым нам предстоит работать - Dynamic Tables. Чтобы сделать этот шаг максимально простым, воспользуемся возможностями Scala и расширим DataStream методом toStreamTable:
implicit class DataStreamOps[A: RowTransformer](val stream: DataStream[A]) {
def toStreamTable(primaryKey: String*)(implicit env: StreamTableEnvironment): Table = {
val transformer = RowTransformer[A]
stream
.map(transformer.toRow(_))
.flatMap(_.toSeq)(transformer.typeInformation)
.toChangelogTable(
env,
Schema
.newBuilder()
.primaryKey(primaryKey: _*)
.build(),
ChangelogMode.upsert()
)
}
}
На вход функции подается список имен полей, которые составят первичный ключ нашей таблицы. Как и в большинстве фреймворков наименьшей единицей для работы с таблицами является строка (Row). Нужен способ преобразования всех наших сущностей в тип Row, продолжим пользоваться возможностями Scala, на этот раз тем что Scala позволяет писать код в функциональном стиле, а конкретно мы воспользуемся Type Class-ми. Наш Type Class будет иметь следующий вид:
sealed trait RowTransformer[A] {
def toRow(entity: A): Option[Row]
protected val fieldsName: Array[String]
implicit def typeInformation: TypeInformation[Row]
}
Представим реализацию для одной из таблиц (для остальных алгоритм будет аналогичен):
implicit case object ClientRow extends RowTransformer[Client] {
override def toRow(entity: Client): Option[Row] = {
val kind = kindOf(entity.operation)
val record = entity.before.orElse(entity.after)
record.map(r => Row.ofKind(kind, Int.box(r.clientId), r.name, r.surname, r.patronymic.orNull, r.sex))
}
override implicit def typeInformation: TypeInformation[Row] = {
Types.ROW_NAMED(fieldsName, Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING)
}
override protected val fieldsName: Array[String] = Array("clientId", "name", "surname", "patronymic", "sex")
}
Теперь мы можем преобразовывать DataStream
в Table
одним действием:
val clients: Table = KafkaConsumerSource.configureKafkaDataSource[Client]("clients")
.toStreamTable("clientId")
Это далеко не единственный способ создания таблиц, если мы изначально хотим работать с данными как с таблицей, необязательно сперва представлять данные в виде DataStream:
tableEnv.createTable(
"companies",
TableDescriptor
.forConnector("kafka")
.schema(
Schema
.newBuilder()
.column("clientId", DataTypes.INT().notNull())
.column("companyId", DataTypes.INT().notNull())
.column("companyName", DataTypes.STRING().notNull())
.primaryKey("clientId")
.build()
)
.option(KafkaConnectorOptions.TOPIC, Seq("companies").asJava)
.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, Kafka.BootstrapServers)
.option(KafkaConnectorOptions.PROPS_GROUP_ID, "companies-group")
.option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET)
.option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
.build()
)
val companies: Table = tableEnv.sqlQuery("SELECT * FROM companies")
companies.print("Companies")
И конечно же куда без SQL:
tableEnv.executeSql(
"""
|CREATE TABLE payments (clientId INT NOT NULL, amount INT NOT NULL, tmMs BIGINT NOT NULL)
|WITH (
| 'connector' = 'kafka',
| 'topic' = 'payments',
| 'properties.bootstrap.servers' = 'localhost:9092',
| 'properties.group.id' = 'payments-group',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'debezium-json'
|)
|""".stripMargin)
val payments: Table = tableEnv.sqlQuery("SELECT * FROM payments")
payments.print("Payments")
Мы окончательно подготовились к написанию основной логики. Для нашей задачи будем использовать inner join, т.е. при получении нового события мы будем выдавать результат во внешнюю систему только в случае наличия данных по ключу во всех таблицах.
clients
.join(companies, $"client.clientId" === $"company.clientId")
.join(payments, $"client.clientId" === $"payment.clientId")
.select("client.clientId", "name", "surname", "companyId", "companyName", "amount", "timestamp")
.toChangelogStream(Schema.derived(), ChangelogMode.upsert())
.print("Portfolio")
Использование State Processor API
Снова начнем с окружения, в этом варианте нам будет достаточно только окружения для работы в потоковом режиме:
implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
В прошлом варианте мы приводили все наши сущности к типу Table, сейчас нам нужно сделать нечто похожее:
case class PortfolioState(
id: Int,
client: Option[Client.Value],
company: Option[ClientCompany.Value],
payment: Option[Payment.Value]) {
def complete: Option[Portfolio] = {
for {
client <- client
company <- company
payment <- payment
} yield {
Portfolio(
client.clientId,
client.name,
client.surname,
company.companyId,
company.companyName,
payment.amount,
payment.timestamp
)
}
}
}
Мы описали свой собственный аналог таблицы, добавив в него метод complete формирующий результат только в случае наличия данных по всем трем сущностям (аналог inner join). Опишем способ трансформации исходной сущности в наш тип (для остальных сущностей будет аналогично):
object PortfolioState {
def apply(client: Client): PortfolioState = {
val id = client.before.map(_.clientId).getOrElse(client.after.map(_.clientId).get)
PortfolioState(id, client.before.orElse(client.after), None, None)
}
}
Итоговый результат:
val clients: DataStream[PortfolioState] = KafkaConsumerSource.configureKafkaDataSource[Client]("clients").map(PortfolioState(_))
Последнее чего нам не хватает - это обработчика событий, который будет поддерживать актуальное состояние и выдавать результаты:
class StateProcessor extends KeyedProcessFunction[Int, PortfolioState, Portfolio] {
private var state: ValueState[PortfolioState] = _
override def open(parameters: Configuration): Unit = {
state = getRuntimeContext.getState(
new ValueStateDescriptor[PortfolioState]("state", createTypeInformation[PortfolioState])
)
}
override def processElement(
value: PortfolioState,
ctx: KeyedProcessFunction[Int, PortfolioState, Portfolio]#Context,
ut: Collector[Portfolio]
): Unit = {
Option(state.value()).fold {
state.update(value) // если state == null, инициализируем его входным значением
} { currentState =>
val updatedState = currentState.copy(
client = value.client.orElse(currentState.client),
company = value.company.orElse(currentState.company),
payment = value.payment.orElse(currentState.payment)
)
updatedState.complete.foreach(out.collect) // если данных хватает, то выдаем результат
state.update(updatedState)
}
}
}
Соберем все вместе:
clients
.union(companies) // объединяем все три потока в один
.union(payments)
.keyBy(_.id) // шардиурем данные по полю id (clientId)
.process(new PortfolioStateProcessor())
.print("Portfolio")
Итоги
Мы рассмотрели 2 разных подхода. Даже несмотря на всю простоту постановки, подход с применением Table API для задач отслеживания изменений и моментальной выдачи результата выглядит привлекательнее. Мы получаем возможность описывать желаемый результат на привычном SQL-подобном синтаксисе, находясь в бесконечном потоке событий.
Полный код проекта можно найти здесь