Ремарка:
В данной статье представлено, как автор решил свою проблему и не утверждает, что нельзя было сделать это лучше или красивее. Также автор не утверждает, что данную проблему или задачу следует решать именно таким методом и не настаивает на повторении данного подхода. Это лишь один из возможных способов решения, предложенный автором, и он представлен здесь исключительно для ознакомления.
Автор отмечает, что не смог найти информацию по данной проблеме ни на русскоязычных, ни на англоязычных ресурсах, включая Хабр и другие источники. Статья представляет собой дискуссионный материал, и автор будет рад увидеть альтернативные решения в комментариях.
Что я пытаюсь сделать:
Рассмотрим создание UDF blue, который возвращает строку "0000FF" на Scala API."
object Example1 extends App with Configure { spark.udf.register("blue", udf(() => "0000FF")) // создаем и регестрируем udf spark.sql("select blue()").show() /* вывод: +------+ |blue()| +------+ |0000FF| +------+ */ }
Рассмотрим ситуацию, когда сама функция UDF поступает в программу в виде строки. Например, у нас есть переменная code, которая содержит "() => "0000FF"". В рантайме необходимо получить объект типа Function0[String] из этой строки и использовать его для создания UDF. Таким образом, моя цель заключается в написании программы, способной обрабатывать сложные лямбды с произвольным количеством аргументов и произвольными выходными данными в виде строк.
например:
"(num: Int) => num * num"
"(str: String) => str.reverse"
"(num1: Double, str: String) => Math.pow(num1, str,size)"
и тд
как не получилось делать и какая возникала ошибка:
расмотрим следующий код
import org.apache.spark.sql.functions.udf import scala.reflect.runtime.universe import scala.tools.reflect.ToolBox object Example2 extends App with Configure { def udfRegister(labdaCode: String): Unit = { import universe._ val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox() val tree = toolbox.parse(labdaCode) val compiledCode = toolbox.compile(tree) val function = compiledCode().asInstanceOf[Function0[String]] //сторока успешно конвертируеться в Function0(String) println(function()) // вывод: 0000FF val udfBlue = udf(() => function()) // успешное создание udf spark.udf.register("blue", udfBlue) } udfRegister("() => \"0000FF\"") spark.sql("select blue()").show() // возникает ошибка }
При запуске задачи возникает следующая ошибка: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
Ошибка указывает на то, что в коде или при работе с Spark возникла попытка использовать SerializedLambda вместо ожидаемого типа scala.Function1 (функция с одним аргументом).
В Spark функции должны быть сериализуемыми, чтобы их можно было передать через кластер. SerializedLambda может возникать при попытке передачи некорректно сериализуемой функции.
В свою очередь SerializedLambda есмь класс, используемый в Java для сериализации лямбда-выражений и методов ссылок. Этот интерфейс генерируется компилятором Java при сериализации лямбда-выражений. (судя по сему данный класс не предоставляет необходимого спарку инструмента для сериализации)
Особенно запутывает то, что данная ошибка не возникает при конвертации полученного объекта в Function0[String], который Spark полностью поддерживает, а также не возникает при создании объекта UDF. Ошибка проявляется уже в рантайме при запуске задачи на кластере.
Однако следующий код снова работает:
import org.apache.spark.sql.functions.udf import scala.reflect.runtime.universe import scala.tools.reflect.ToolBox object Example3 extends App with Configure { val labdaCode = "() => \"0000FF\"" import universe._ val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox() val tree = toolbox.parse(labdaCode) val compiledCode = toolbox.compile(tree) val function = compiledCode().asInstanceOf[Function0[String]] //сторока успешно конвертируеться в Function0(String) println(function()) // вывод: 0000FF val udfBlue = udf(() => function()) // успешное создание udf spark.udf.register("blue", udfBlue) // успешное регестрирование udf spark.sql("select blue()").show() /*вывод: +------+ |blue()| +------+ |0000FF| +------+ */ }
В сущности в Example3 происходит все тоже самое что и в Example2, но первое работает, а второе нет что вводит еще в большее замешательство
import org.apache.spark.sql.functions.udf import scala.reflect.runtime.universe import scala.tools.reflect.ToolBox object Example4 extends App with Configure { if (true) { val labdaCode = "() => \"0000FF\"" import universe._ val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox() val tree = toolbox.parse(labdaCode) val compiledCode = toolbox.compile(tree) val function = compiledCode().asInstanceOf[Function0[String]] //сторока успешно конвертируеться в Function0(String) println(function()) // вывод: 0000FF val udfBlue = udf(() => function()) // успешное создание udf spark.udf.register("blue", udfBlue) // успешное регестрирование udf } spark.sql("select blue()").show() // возникает ошибка }
как и Example3 Example4 неделает тоже самое, но почему то блок if снова введет к ошибке
так же к ошибкам приводит и использование циклов и монад, Try и тд (но тут я это демонстрировать не буду)
таким образом все сводиться к тому чтобы правильным образом создать function которая бы не являлась бы SerializedLambda и есть следующие ограничения:
можно создавть лямбды/функции только в мкетоде main
нельзя и�� создовать в блоках if, try, for, монадах и тд.
хоть примеры и демонстрируют работу с совсем простой функцией blue, моя задача сделать универсальное приложение, которое будет принимать любую лямбду (на самом деле от 0 до 10 поскольку максимальный UDF10) в формате строки
Определимся с тем как программа получает информацию о функциях:
Я хочу чтобы информацию получало в формате JSON и путсь у него будет следующая структура:
[ //массив поскольку функций может прийти много или неприйти вообще { "name": "blue", //имя бля udf "targetType": "String", //тип данных для возращаемый "fun": "() => \"00ff00\"", //собственно сама лямбда "imports": ["...", "..."] //опциональное, на случай если нужны дополнительные импорты }, // остальные имеют такую же структуру { "name": "reverse", "targetType": "String", "fun": "(str:String) => str.reverse" }, { "name": "plus", "targetType": "Int", "fun": "(i1:Int, i2:Int) => i1 + i2" }, { "name": "sum3", "types": "Int,Int,Int", "targetType": "Int", "fun": "(i1:Int, i2:Int, i3:Int) => i1 + i2 + i3" } ]
Определимся с классом который будет отражать данную структуру Json:
// думаю тут не стоит расписывать какое поле за что отвечает case class UDFConfig(name: String, targetType: String, fun: String, imports: List[String] = List.empty)
Для парсинга json файла я буду использовать библиотеку json4s и следующий класс
package Util import org.json4s.Formats import org.json4s.native.JsonMethods.parse //небольшой обьект для ковертации строки в формате json в список обьектов типа JsonType object JsonHelper { private def jsonToListConvert[JsonType](json: String)(implicit mf: Manifest[JsonType], formats: Formats): List[Either[Throwable, JsonType]] = { parse(json) .extract[List[JsonType]] .map(Right(_)) } implicit class JsonConverter(json: String) { def jsonToList[JsonType]()(implicit mf: Manifest[JsonType], formats: Formats): List[Either[Throwable, JsonType]]= { jsonToListConvert[JsonType](json) } } }
И создадим трейт Configure в котором и определи переменые которые понадобяться для дальнейшей работы
Иimport config.UDFConfig import org.apache.spark.sql.SparkSession import org.json4s.DefaultFormats import scala.io.Source trait Configure { lazy val spark = SparkSession .builder() .appName("Example UDF runtime compile") .master("local[*]") .getOrCreate() lazy val sc = spark.sparkContext // создали спрак сессию и контекст lazy val pathUdfsJson = "./UDFs.json" //путь до json c функциями implicit lazy val formats = DefaultFormats // список форматов для json4s, в моем случае хватеает дефолтных форматов import Util.JsonHelper._ lazy val udfJson = getJson(pathUdfsJson) // получаем сам json в виде строки lazy val udfsConfigs: List[UDFConfig] = udfJson .jsonToList[UDFConfig] .filter(_.isRight) .map { case Right(udfconfig) => udfconfig } // получаем все корректные udfconfig // этот обект мне и понадобиться //метод для чтения файлов private def getJson(path: String): String = { val file = Source.fromFile(path) try { file.getLines().mkString("\n") } finally { file.close() } } }
Определим последний обьект который будет udfconfig переводить в строку которую будет динамически в ратайме превращаться в FunctionN в последствии
package Util import config.UDFConfig object UDFHelper { private def configToStringConvert(udfConfig: UDFConfig): String = { //создадим все импорты для итоговой строки val imports = udfConfig .imports // достаем импорты из конфига .map(_.trim.stripPrefix("import ").trim) // удаляем с лева ключевое слово если оно есть .distinct // оставляем уникальные .map(imp => f"import ${imp}") // конкатенируем слева ключевое слово .mkString("\n") // все соеденяем через отступ val Array(params, functionBody) = udfConfig.fun.split("=>", 2).map(_.trim) // Отделяем функциональную часть от переменных val paramsTypes: Seq[(String, String)] = params .stripPrefix("(") // Убираем с лева скобку .stripSuffix(")") // Убираем с права скобку .split(",") // Разделяем параметры .map(_.trim) // Убираем лишние пробелы .map { case "" => null case param => val Array(valueName, valueType) = param.split(":").map(_.trim) (valueName, valueType) } // разделяем имя переменой от типа данных этой переменной .filter(_ != null) // отвильтровываем null-ы val funcTypes: String = paramsTypes.size match { case 0 => udfConfig.targetType case _ => f"${List .fill(paramsTypes.size)("Any") .mkString("", ", ", ", ")}${udfConfig.targetType}" } //здесь получаем перечисление через запятую типов данных Function val anyParams = paramsTypes.map(_._1.trim + "_any").mkString(", ") //парметры лямда вырожения val instances = paramsTypes.map { case (valueName, valueType) => f" val ${valueName}: ${valueType} = ${valueType}_any.asInstanceOf[${valueType}]" }.mkString("\n") // тут определяем конвертации парметров люмбды в необхадимые типы // собираем все вместе в итоговый стринг f""" |${imports} | |val func: Function${paramsTypes.size}[${funcTypes}] = (${anyParams}) => { | |${instances} | | ( |${functionBody} | ).asInstanceOf[${udfConfig.targetType}] | |} | |func |""".stripMargin } implicit class Converter(udfConfig: UDFConfig) { def configToString(): String = { configToStringConvert(udfConfig) } } }
По поводу использования типа Any: как я уже упоминала, моя задача заключается в создании универсальной логики, где передаваемая через JSON лямбда или функция может иметь любое количество параметров (на практике от 0 до 10). В условиях, где невозможно динамически преобразовать строку в FunctionN, использование типа Any является единственным выходом (в дальнейшем будет ясно, почему). Иначе пришлось бы создавать множество объектов для каждого возможного числа параметров.
что весьма много.
9 поскольку я решила ограничеться 9-тью типами данных: String, Int, Boolean, Byte, Short, Long, Float, Double, Date, Timestamp
Создаем обьекты регистраторы:
Поскольку мне не разрешено использовать конструкции типа if или match, я не могу динамически определять количество параметров в одном объекте UDFRegN и соответственно настраивать его. В результате мне пришлось создавать несколько отдельных объектов UDFRegN, каждый из которых поддерживает определённое количество параметров. Например, для случая с нулевым количеством параметров я создал соответствующий объект UDFRegN.
Это подход необходим из-за ограничений, которые не позволяют динамически адаптировать логику UDF в зависимости от передаваемого числа параметров. Таким образом, в данном случае было бы необходимо создать 11 отдельных объектов UDFRegN для обработки всех возможных комбинаций параметров.
import org.apache.spark.sql.functions.udf import java.sql.{Date, Timestamp} import scala.reflect.runtime.universe import scala.tools.reflect.ToolBox object UDFReg0 extends App with Configure { // как говорилось можно только функцию main использовать val indexcode = Integer.parseInt(args(0)) // это единственная возможность получить парметр извне и предполагается что это тндекс в списке UDFsConfig import Util.UDFHelper._ val udfConfig = udfsConfigs(indexcode) //получаем udfConfig (из Configure) val functionCode = udfConfig.configToString() // преобразовываем его в код val toolbox = universe.runtimeMirror(getClass.getClassLoader).mkToolBox() val tree = toolbox.parse(functionCode) val compiledCode = toolbox.compile(tree) val function = compiledCode().asInstanceOf[Function0[Any]] // получаем функцию из кода val myUDF = udfConfig.targetType match { // выбираем instanceOF в зависимости от типа данных который должна возращать функция case "String" => udf(() => function().asInstanceOf[String]) case "Int" => udf(() => function().asInstanceOf[Int]) case "Boolean" => udf(() => function().asInstanceOf[Boolean]) case "Byte" => udf(() => function().asInstanceOf[Byte]) case "Short" => udf(() => function().asInstanceOf[Short]) case "Long" => udf(() => function().asInstanceOf[Long]) case "Float" => udf(() => function().asInstanceOf[Float]) case "Double" => udf(() => function().asInstanceOf[Double]) case "Date" => udf(() => function().asInstanceOf[Date]) case "Timestamp" => udf(() => function().asInstanceOf[Timestamp]) case _ => throw new IllegalArgumentException(f"Неизвестный тип") } spark.udf.register(udfConfig.name, myUDF) // регистраця UDF }
И осталось написать еще 10 однотипных почти точно таких же "регестраторов"
и для этого напишем последний скрипт, чтоб не делать жто в ручную)
import java.io.{File, PrintWriter} object GenerateRegestrators extends App { (0 to 10).toList.map { num => val types = (0 to num).toList.map(_ => "Any").mkString(", ") val lambdaVaues = (0 until num).toList.map(n => f"value${n}: Any").mkString(", ") val functionValues = (0 until num).toList.map(n => f"value${n}").mkString(", ") f""" |import org.apache.spark.sql.functions.udf | |import java.sql.{Date, Timestamp} |import scala.reflect.runtime.universe |import scala.tools.reflect.ToolBox | |object UDFReg${num} extends App with Configure { | | val indexcode = Integer.parseInt(args(0)) | | import Util.UDFHelper._ | val udfConfig = udfsConfigs(indexcode) | val functionCode = udfConfig.configToString() | | val toolbox = universe.runtimeMirror(getClass.getClassLoader).mkToolBox() | | val tree = toolbox.parse(functionCode) | val compiledCode = toolbox.compile(tree) | val function = compiledCode().asInstanceOf[Function${num}[${types}]] | | val myUDF = udfConfig.targetType match { | case "String" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[String]) | case "Int" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Int]) | case "Boolean" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Boolean]) | case "Byte" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Byte]) | case "Short" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Short]) | case "Long" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Long]) | case "Float" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Float]) | case "Double" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Double]) | case "Date" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Date]) | case "Timestamp" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Timestamp]) | case _ => throw new IllegalArgumentException(f"Неизвестный тип") | } | | spark.udf.register(udfConfig.name, myUDF) | |} |""".stripMargin } .zipWithIndex .foreach { case (str, index) => val file = new File(f"./src/main/scala/UDFReg${index}.scala") val writer = new PrintWriter(file) try { writer.write(str) } finally { writer.close() } } }
тут пояснять нечего, оно просто генерит остальные обьекты регистраторы по аналогии
после запуска скрипта в проекте появляются недомтоющие классы UDFRegN.
собираем все вместе:
Итого остается лишишь написать последний пример где наконец динамическая компиляция UDF из строки заработает:
object Example5 extends App with Configure { import Util.UDFHelper._ udfsConfigs.zipWithIndex.foreach { //индекс нужен чтоб передать в args case (udfConfig, index) => udfConfig.fun.split("=>", 2)(0).count(_ == ':') match { //тут и происходит выбор какому регестратору передать case 0 => UDFReg0.main(Array( index.toString )) case 1 => UDFReg1.main(Array( index.toString )) case 2 => UDFReg2.main(Array( index.toString )) case 3 => UDFReg3.main(Array( index.toString )) case 4 => UDFReg4.main(Array( index.toString )) case 5 => UDFReg5.main(Array( index.toString )) case 6 => UDFReg6.main(Array( index.toString )) case 7 => UDFReg7.main(Array( index.toString )) case 8 => UDFReg8.main(Array( index.toString )) case 9 => UDFReg9.main(Array( index.toString )) case 10 => UDFReg10.main(Array( index.toString )) case _ => Unit } } spark.sql("select blue(), reverse('namoW tobor ociP ociP') AS rev, plus(1, 2), sum3(1, 1, 1)").show /* вывод +------+--------------------+----------+-------------+ |blue()| rev|plus(1, 2)|sum3(1, 1, 1)| +------+--------------------+----------+-------------+ |00ff00|Pico Pico robot W...| 3| 3| +------+--------------------+----------+-------------+ */ }
и таки получилось.
P.S. код можно глянуть тут
