
Привет!
В этом посте разберем, как обрабатывать объекты JVM, сгенерированные из схем Avro, в датасетах Spark. Вместе с этим рассмотрим, как организовать код при помощи шаблона функционального программирования "класс типов" (type class) на языке Scala.
Датафрейм в Spark - абстракция таблицы со столбцами и строками, покрывает большинство кейсов обработки данных. Тем не менее, бывает так, что данные имеют настолько специфичную структуру, что привести их к общему знаменателю в датафрейме достаточно сложно. В этом случае можно прибегнуть к программному интерфейсу Spark Dataset. Он позволяет манипулировать не абстрактными строками и столбцами, а конкретными объектами виртуальной машины Java.
Типичный кейс использования датасетов - чтение сообщений Kafka в формате Avro.
Процесс состоит из трёх этапов:
Генерация классов-образцов (case class) исходя из схемы Avro-сообщения.
Чтение Avro-сообщений в Dataset Spark, типированный классом-образцом.
Реализация операции обработки сообщения. В частности, напишем функцию, которая приведет сообщение к плоской структуре.
С кодом можно ознакомиться здесь.
Генерация классов-образцов Scala
Представим, что нам необходимо обрабатывать данные клиента со следующей структурой:

Клиент (Customer) может иметь несколько аккаунтов. Аккаунт, в свою очередь, имеет несколько групп взаимодействия (InteractionGroups). Группа взаимодействия содержит в себе взаимодействия, совершенные клиентом в определенную дату. Взаимодействие имеет один из типов, обозначающий канал взаимодействия: сайт, твиттер, инстаграм или звонок.
Отступление
Наверняка, у вас возникли претензии к этой модели :) Это справедливо. Данные специально структурированы "криво", чтобы смоделировать реальный кейс. Зачастую необходимо обрабатывать данные из устаревших систем, структуру которых мы изменить не можем. Приходится работать с тем, что есть.
Схема сообщения Avro, соответствующая данной модели, будет выглядеть так:
{ "type": "record", "name": "Customer", "namespace": "com.github.aleksandrachasch.avro", "fields": [ { "name": "Id", "type": "string" }, { "name": "Name", "type": "string" }, { "name": "Surname", "type": "string" }, { "name": "Accounts", "type": { "type": "array", "items": { "type": "record", "name": "Account", "namespace": "com.github.aleksandrachasch.avro", "fields": [ { "name": "AccountId", "type": "int" }, { "name": "AccountType", "type": "string" }, { "name": "InteractionGroups", "type": { "type": "array", "items": { "type": "record", "name": "InteractionGroup", "namespace": "com.github.aleksandrachasch.avro", "fields": [ { "name": "Id", "type": "string" }, { "name": "date", "type": "string" }, { "name": "interactions", "type": { "type": "array", "items": { "type": "record", "name": "Interaction", "namespace": "com.github.aleksandrachasch.avro", "fields": [ { "name": "Id", "type": "string" }, { "name": "type", "type": { "type": "enum", "name": "InteractionType", "namespace": "com.github.aleksandrachasch.avro", "symbols": [ "WEBSITE", "TWITTER", "CALL", "INSTAGRAM" ] } } ] } } } ] } } } ] } } } ] }
Теперь из этой схемы можно сгенерировать классы-образцы Scala. Затем мы будем оперировать экземплярами этих классов.
В реальной жизни процесс генерации классов-образцов выглядит примерно так:
Чтение схемы Avro. Например, из
Confluent Schema Registry- регистра cхем топиков кластера Kafka, к которому можно обращаться через API.Генерация классов-образцов в фазе компиляции
MavenилиSBT. Сгенерированные классы при этом копируются прямо вsrc/проекта. В случае обновления схемы в регистре мы увидим изменения в нашем проекте. Будет понятно уже в фазе компиляции, насколько совместимы эти изменения с существующим кодом.
В этом проекте будем использовать Maven и avrohugger-maven-plugin. Плагин использует библиотеку AvroHugger для генерации классов Scala.
В pom.xml добавляем плагин:
<plugin> <groupId>at.makubi.maven.plugin</groupId> <artifactId>avrohugger-maven-plugin</artifactId> <version>1.6</version> <configuration> <sourceDirectory>${basedir}/src/main/resources</sourceDirectory> <outputDirectory>${basedir}/src/main/scala</outputDirectory> <sourceGenerationFormat>STANDARD</sourceGenerationFormat> </configuration> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>generate-scala-sources</goal> </goals> </execution> </executions> </plugin>
В конфигурации необходимо указать путь к файлу с схемой Avro (по умолчанию src/main/avro), а таже путь к сгенерированным классам (по умолчанию target/generated-sources/avro):
<sourceDirectory>${basedir}/src/main/resources</sourceDirectory> <outputDirectory>${basedir}/src/main/scala</outputDirectory>
Внимание! сгенерированные файлы перепишут уже существующие классы-образцы в папке src/main/scala.
Также указываем тип записи Avro (SpecificRecord или GenericRecord).
По умолчанию плагин генерирует специфический тип записи (SpecificRecord). В этом случае все классы-образцы будут наследовать абстрактный класс org.apache.avro.specific.SpecificRecordBase. Помимо самого класса-образца и аргументов создадутся методы чтения и записи новых значений атрибутов. Рекомендуется использовать этот тип записи, если планируется обрабатывать данные специфичными Avro-функциями, использующими Avro Specific API.
В нашем случае мы сами будем писать функцию обработки, поэтому подойдет GenericRecord:
<sourceGenerationFormat>STANDARD</sourceGenerationFormat>
Этот тип записи генерирует обычные классы-образцы, без дополнительных методов и абстракций.
Выполняем команду Maven:
mvn avrohugger:generate-scala-sources
В папке src/main/scala появятся пять сгенерированных классов-образцов, соответствующих пяти объектам данных.

Классы принадлежат пакету com.github.aleksandrachasch.avro, который указан в поле namespace в схеме Avro.
Вот как выглядит, например, класс Customer:
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */ package com.github.aleksandrachasch.avro case class Customer(Id: String, Name: String, Surname: String, Accounts: Seq[Account])
Чтение Avro в Spark Dataset
Теперь создадим датасет Spark, типированный сгенерированным классом Customer.
В качестве примера создадим клиента с несколькими акканутами и взаимодействиями:
{ "Id": "1", "Name": "John", "Surname": "Doe", "Accounts": [ { "AccountId": 111, "AccountType": "private", "InteractionGroups": [ { "Id": 222, "date": "20220102", "interactions": [ { "Id": "1", "type": "WEBSITE" }, { "Id": "2", "type": "INSTAGRAM" } ] }, { "Id": 223, "date": "20220103", "interactions": [ { "Id": "4", "type": "TWITTER" } ] } ] }, { "AccountId": 112, "AccountType": "public", "InteractionGroups": [ { "Id": 333, "date": "20220105", "interactions": [ { "Id": "6", "type": "TWITTER" }, { "Id": "7", "type": "INSTAGRAM" } ] } ] } ] }
Avro - это бинарный формат данных. Именно так будет выглядеть сообщение, если его десереализовать, применив соответствующую cхему.
Создадим датасет:
import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.types.StructType import com.github.aleksandrachasch.avro.Customer import spark.implicits._ val ds = spark.read.format("json") .schema(ScalaReflection.schemaFor[Customer].dataType.asInstanceOf[StructType]) .option("mode", "FAILFAST") .option("multiline", true) .load("src/main/resources/customer-data.json") .as[Customer]
Типом переменной ds будет Dataset[Customer].
Если входные данные не соответствуют классам-образцам, то появится ошибка во время выполнения (runtime). Например, если ввести неcуществующий тип взаимодействия InteractionType :
"interactions": [ { "Id": "6", "type": "TWITTER" }, { "Id": "7", "type": "VK" } ]
Caused by: java.util.NoSuchElementException: No value found for 'VK' at scala.Enumeration.withName$2(Enumeration.scala:125) at scala.Enumeration.withName(Enumeration.scala:125) at com.github.aleksandrachasch.avro.InteractionType.withName(InteractionType.scala)
Создание функции обработки класса-образца с помощью шаблона программирования type class
Допустим, нам необходимо привести информацию о взаимодействиях клиента к плоской структуре:

Создадим класс-образец, соответствующий этой структуре:
case class FlattenedCustomer(CustomerId: String, Name: String, Surname: String, AccountId: Int, AccountType: String, InteractionGroupId: String, InteractionGroupDate: String, InteractionId: String, InteractionType : String)
Задача сводится к тому, чтобы преобразовать Dataset[Customer] в Dataset[FlattenedCustomer]. Тогда соответствующая функция будет брать в качестве аргумента объект Customer и возвращать список элементов FlattenedCustomer- по одному элементу на каждое взаимодействие:
def customerFlatten(c : Customer): Seq[FlattenedCustomer] = ???
Возникает вопрос, где эту функцию реализовать. Можно было бы добавить ее в качестве метода к классу Customer:
case class Customer(Id: String, Name: String, Surname: String, Accounts: Seq[Account]) { def customerFlatten: Seq[FlattenedCustomer] = ??? }
Но, в связи с тем, что классы-образцы генерируются автоматически, каждая новая генерация удалит созданный вручную метод. Вообще, не рекомендуется изменять классы и файлы, созданные автоматически.
С другой стороны, можно реализовать функцию в отдельном объекте и импортировать ее при обработке датасета.
Однако, есть более лаконичный и функциональный способ - класс типов (type class).
Type class — это шаблон функционального программирования, который позволяет создавать "ad hoc" полиморфизм на объектах, реализацию которых модифицировать нельзя. Например, если вы имеете дело со сторонней библиотекой, которую необходимо расширить вашим собственным функционалом.
В нашем случае "сторонняя библиотека" — это автоматически сгенерированные классы-образцы, которые мы не можем изменять.
Шаблон type class обеспечит четкое раделение кода, зависящего от внешних источников и программ (схемы Avro), и собственного ключевого функционала нашей программы.
К тому же шаблон позволяет легко добавлять методы обработки других объектов.
Шаблон type class состоит из нескольких элементов:
Трейт с методом
customerFlattenи параметрами входа (T) и выхода (C):trait DataProcessor[T, C] { def customerFlatten(t: T): Seq[C] }
Объект-компаньон со вспомогательными функциями:
object DataProcessor { def apply[T, C](implicit processor: DataProcessor[T, C]): DataProcessor[T, C] = processor implicit class DataProcessorOps[T, C](val t : T) extends AnyVal { def customerFlatten(implicit processor: DataProcessor[T, C]): Seq[C] = { processor.customerFlatten(t) } } }Функция
apply, в зависимости от типовTиС, возвращает соответствующий экземплярDataProcessor. Это происходит за счет неявного (implicit) параметраprocessor. Например, при вызове функцииapply[Customer, FlattenedCustomer]компилятор Scala автоматически найдет и вернет экземпляр классаDataProcessor[Customer, FlattenedCustomer].За счёт того же механизма неявных параметров неявный класс
DataProcessorOpsпозволяeт вызывать функциюcustomerFlattenна экземплярах классаT, как если бы он был собственным методом классаT(например,t.customerFlatten).Наконец, реализация
DataProcessorдля конкретных объектовCustomerиFlattenedCustomer:package com.github.aleksandrachasch.avro.ops object CustomerDataProcessor { implicit val customerDataProcessor: DataProcessor[Customer, FlattenedCustomer] = (t: Customer) => t.Accounts.flatMap(acc => acc.InteractionGroups.flatMap( group => group.interactions.map( interaction => FlattenedCustomer.apply(t.Id, t.Name, t.Surname, acc.AccountId, acc.AccountType, group.Id, group.date, interaction.Id, interaction.`type`.toString) ) ) ) }Реализованная функция автоматически соотносится с функцией
customerFlatten,т.к. типы параметра (Customer) и выхода (Seq[FlattenedCustomer]) совпадают.Как можно заметить, в самой функции оперируем атрибутами классов-образцов. В датафрейме это были бы названия столбцов и вложенных полей. В отличие от датафрейма, здесь ошибки будут видны уже в ходе компиляции.
Использование функции на датасете получается кратким и лаконичным:
import com.github.aleksandrachasch.avro.ops.CustomerDataProcessor._ val flattenedDs = ds.flatMap(_.customerFlatten) flattenedDs.show(false)

Как можно заметить, в коде нет декларации новых экземпляров классов (new). Механизм неявных параметров и классов (implicit) позволяет минимизировать количество экземпляров в JVM, а также различных п��ременных в коде. Нет необходимости собственноручно инициализировать классы - за нас это делает Scala.
Для обработки другого объекта достаточно реализовать DataProcessor[T, C] с соответствующими типами входа T и выхода C.
Надеюсь, этот пост был вам полезен.
Кстати, уже в конце февраля в OTUS стартует новый поток курса DataOps. В преддверии старта курса приглашаю всех желающих на бесплатный демоурок. Регистрация доступна по ссылке.
До встречи!
