Как стать автором
Обновить

Создание data lineage в Apache Atlas из логических планов Spark (не без «костылей»)

Уровень сложностиСредний
Время на прочтение16 мин
Количество просмотров221

Ремарка

Текущая реализация представляет собой сырой прототип, направленный исключительно на демонстрацию возможности отображения логического плана из Apache Spark в Apache Atlas. Lfyysq прототип, по сути, является «прототипом прототипа» и служит лишь начальной отправной точкой для более глубокого анализа и разработки.

В данной работе Автор не стремимся представить окончательное или оптимальное решение. основной фокус заключается в демонстрации принципа и наметке необходимых методов для интеграции логических планов с метаданными в Apache Atlas.

Автор не призываем использовать данный подход в производственной среде в его текущем виде. Для полноценного решения задачи требуется дальнейшая проработка, включая создание специализированных библиотек, улучшение архитектуры. И все прочие прочие ...

Цель работы

Целью данной работы является создание прототипа, демонстрирующего возможность интеграции логических планов Apache Spark с метаданными в Apache Atlas , подобно тому как это происходит в данной статье с Apache NIFI .

Тестовая задача для иллюстрации и парсинг плана в AST

Определим небольшой файл cars.csv со следующим содержанием:

model,manufacturer
Model S,Tesla
Model 3,Tesla
Mustang,Ford
Civic,Honda

И напишем даг выведем его логический план:

  val spark = SparkSession.builder()
    .appName("Logical Plan Example")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  val carsCSV = spark
    .read
    .option("header", "true")
    .csv("src/main/resources/cars.csv")

  val carsSeq = List(
    ("i8", "BMW"),
    ("A4", "Audi"),
    ("911", "Porsche"),
    ("Corolla", "Toyota")
  ).toDF("model", "manufacturer")

  val unioncars = carsCSV.union(carsSeq)

  val resDF = unioncars
    .where(col("manufacturer") =!= "Audi")
    .select("model", "manufacturer")
    .withColumn("processedDDTM", lit(LocalDateTime.now()))

  val logicalPlan = resDF.queryExecution.logical

  println(logicalPlan)
/* вывод
    Project [model#17, manufacturer#18, 2024-09-12 13:00:46.880141 AS processedDDTM#36]
      +- Project [model#17, manufacturer#18]
         +- Filter NOT (manufacturer#18 = Audi)
            +- Union false, false
               :- Relation [model#17,manufacturer#18] csv
               +- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]
                  +- LocalRelation [_1#23, _2#24]
   */
}

Логический план представляет собой дерево, и для дальнейшей работы его необходимо преобразовать в удобную форму (AST).

Для этого мы определим класс AST, который будет отражать структуру плана в формате, удобном для последующей обработки.

// Определение корневого класса или типа для всех узлов дерева
sealed trait Node {
  // Метод для получения имени узла на основе его типа
  def getName: String = this.getClass.toString
}

// Узел типа "Проект", содержащий последовательность столбцов
case class ProjectNode(columns: Seq[String]) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "Project"
}

// Узел типа "Фильтр", содержащий условие фильтрации
case class FilterNode(condition: String) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "Filter"
}

// Узел типа "Объединение", указывающий, следует ли объединять все записи и по какому признаку
case class UnionNode(isAll: Boolean, byName: Boolean) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "Union"
}

// Узел типа "Логическое отношение", содержащий последовательность столбцов
case class LogicalRelationNode(columns: Seq[String]) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "LogicalRelation"
}

case class LocalRelationNode(columns: Seq[String]) extends Node {
  override def getName: String = "LocalRelation"
}

// Узел типа "Локальное отношение", содержащий последовательность столбцов
case class LocalRelationNode(columns: Seq[String]) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "LocalRelation"
}

// Класс для представления абстрактного синтаксического дерева (AST), где каждый узел имеет тип Node,
// список дочерних узлов, номер уровня и выражение уровня (необходим для индонтефикации нод на одном уровне)
case class AST(node: Node,
               children: Seq[AST],
               level_num: Int,
               levelExpr: String)

И напишем парсер из логического плана в AST

// Объект для парсинга логических планов в AST
object ParserAST {

  // Функция для преобразования логического плана в AST
  // Возвращает Option[AST], где None означает, что план не может быть преобразован
  private def parseAST(plan: LogicalPlan): Option[AST] = {

    // Рекурсивная функция для обхода логического плана и создания узлов AST
    // Параметры:
    // - logicalPlan: текущий логический план для обработки
    // - levelnum: уровень в дереве AST
    // - levelExpr: строковое представление уровня и индекса
    // Возвращает Option[AST], где None означает, что логический план не может быть преобразован
    def loop(logicalPlan: LogicalPlan, levelnum: Int, levelExpr: String): Option[AST] = {

      // Определение узла на основе типа логического плана
      val node: Option[Node] = logicalPlan match {
        case p: Project =>
          // Обработка узла типа Project и создание узла AST с именем "Project"
          val columns = p.projectList.map(_.sql)
          Some(ProjectNode(columns))
          
        case f: Filter =>
          // Обработка узла типа Filter и создание узла AST с именем "Filter"
          val condition = f.condition.sql
          Some(FilterNode(condition))
          
        case u: Union =>
          // Обработка узла типа Union и создание узла AST с именем "Union"
          val isAll = u.allowMissingCol
          val byName = u.byName
          Some(UnionNode(isAll, byName))
          
        case lr: LocalRelation =>
          // Обработка узла типа LocalRelation и создание узла AST с именем "LocalRelation"
          val columns = lr.output.map(_.sql)
          Some(LocalRelationNode(columns))
          
        case lr: LogicalRelation =>
          // Обработка узла типа LogicalRelation и создание узла AST с именем "LogicalRelation"
          val columns = lr.output.map(_.sql)
          Some(LogicalRelationNode(columns))
          
        case _ =>
          // Если логический план не совпадает ни с одним из известных типов, возвращаем None
          None
      }

      // Если узел успешно создан, создаем AST и рекурсивно обрабатываем детей
      node.map { n =>
        // Создание списка дочерних узлов AST, рекурсивно обрабатывая каждый дочерний план
        val children = logicalPlan.children.zipWithIndex.flatMap {
          case (ch, i) => loop(ch, levelnum + 1, f"${levelnum + 1}_${i}")
        }.toList
        // Создание узла AST с текущим узлом и его дочерними узлами
        AST(n, children, levelnum, levelExpr)
      }
    }

    // Запуск рекурсивного обхода с начальным уровнем и строковым представлением
    loop(plan, 1, "1_0")
  }

  // Неявное преобразование для класса LogicalPlan, добавляющее метод для получения AST
  implicit class parser(lp: LogicalPlan) {
    def AST(): Option[AST] = {
      parseAST(lp)
    }
  }
}

теперь можно получать AST следующим образом logicalPlan.AST().get

Определим сущности в Атласе для построения Lianage

таблица наследовательности в Apche Atlas
таблица наследовательности в Apche Atlas

Подобно тому, как в языках программирования на базе Java все классы наследуются от Object, в Apache Atlas все сущности наследуются от Referenceable. Однако построение lineage (линейности данных) происходит только для типов Process и DataSet. Если тип не наследуется от одного из этих классов (например, если наследование происходит от Asset), то кнопка "Lineage" попросту не появится.

Кроме того, сам lineage строится на основе полей inputs и outputs для Process, аналогично и для DataSet. Здесь ничего не поделаешь — придется наследоваться от этих типов, хотя большинство полей будет оставаться пустыми.

Изначально моей целью было отразить преобразования, происходящие в Apache Spark, но структура Apache Atlas вынуждает окружать мои Process сущностями DataSet в полях inputs и outputs. Хотя меня изначально интересовали только Process, эти DataSet-ы могут быть использованы для отображения схем данных, с которыми процесс начинается и которые возвращает. Однако на данном этапе я не планирую парсить схемы и оставлю каждый DataSet пустым.

В Apache Atlas кастомные сущности можно описывать с помощью формата JSON. При этом важно соблюдать правильную последовательность определения типов, иначе возникнет ошибка 404 при попытке сослаться на тип, который еще не существует в системе.

Сначала определим тип для DataSet.

  {
    "enumDefs": [],
    "structDefs": [],
    "classificationDefs": [],
    "entityDefs": [
      {
        "name": "pico_spark_data_type", 
        "description": "A type inheriting from assets for Pico DataSet", 
        "superTypes": ["DataSet"],
        "attributeDefs": [],
        "relationshipDefs": [] 
      }
    ],
    "relationshipDefs": [],
    "businessMetadataDefs": []
  }

Комментарии:

  1. enumDefs, structDefs, classificationDefs:

    • Пустые массивы, так как перечисления, структуры и классификации не используются.

  2. entityDefs:

    • Определяет сущности в системе.

    • name: Имя сущности, которая представляет тип данных.

    • description: Описание сущности.

    • superTypes: Суперклассы, от которых наследуется данная сущность.

    • attributeDefs: Пустой массив, так как атрибуты не указаны.

    • relationshipDefs: Пустой массив, так как связи не определены.

  3. relationshipDefs, businessMetadataDefs:

    • Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.

{
    "enumDefs": [],
    "structDefs": [],
    "classificationDefs": [],
    "entityDefs": [
      {
        "name": "pico_spark_process_type",
        "description": "A type inheriting from assets for Pico Spark abstraction",
        "superTypes": ["Process"],
        "attributeDefs": [
          {
            "name": "inputs",
            "description": "List of inputs for the process",
            "typeName": "array<pico_spark_data_type>",
            "isOptional": true
          },
          {
            "name": "outputs",
            "description": "List of outputs for the process",
            "typeName": "array<pico_spark_data_type>",
            "isOptional": true
          }
        ],
        "relationshipDefs": []
      }
    ],
    "relationshipDefs": [],
    "businessMetadataDefs": []
  }

Комментарии:

  1. enumDefs, structDefs, classificationDefs:

    • Пустые массивы, так как перечисления, структуры и классификации не используются в данном определении.

  2. entityDefs:

    • Содержит определения сущностей.

    • name: Имя сущности, определяющей тип данных в контексте Pico Spark.

    • description: Описание сущности.

    • superTypes: Суперклассы, от которых сущность наследуется.

    • attributeDefs: Пустой массив, так как атрибуты не добавлены.

    • relationshipDefs: Пустой массив, так как связи не указаны.

  3. relationshipDefs, businessMetadataDefs:

    • Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.

Для типа pico_spark_process_type я также создаю наследников для всех типов узлов (Filter, Project, Union и т.д.) в AST. Однако здесь я опущу это, поскольку это займет слишком много места и будет слишком однообразно.

В этих JSON-ах много пустых сущностей, но без них не обойтись, так как без них типы в Apache Atlas не создаются.

Взаимодействие с Apache Atlas по REST

Простого описания сущностей недостаточно — их нужно передать в Apache Atlas. У Atlas есть обширное REST API для взаимодействия с системой. Конкретно процесс создания нового типа выглядит следующим образом:

curl -X POST "http://<atlas-server-url>/api/atlas/v2/types/typedefs" \
     -H "Content-Type: application/json" \
     -H "Accept: application/json" \
     -d '{
           "enumDefs": [],
           "structDefs": [],
           "classificationDefs": [],
           "entityDefs": [
             {
               "name": "pico_spark_data_type",
               "description": "A type inheriting from assets for Pico DataSet",
               "superTypes": ["DataSet"],
               "attributeDefs": [],
               "relationshipDefs": []
             }
           ],
           "relationshipDefs": [],
           "businessMetadataDefs": []
         }'

создаю JSON файл где будут перечислены тела запросов для всх необходимых кастомных типов под названием EntityTypes.json
и создам метод который читает этот файл и делает запрос на каждый EntityType

  val atlasServerUrl = "http://localhost:21000/api/atlas/v2"
  val authHeader: String = "Basic " + java.util.Base64.getEncoder.encodeToString("admin:admin".getBytes)

def generatePicoSparkTypes(): Unit = {

  // Функция для чтения содержимого файла из ресурсов
  def readFileFromResources(fileName: String): String = {
    val source = Source.fromResource(fileName)
    try source.mkString
    finally source.close()
  }

  // Чтение JSON из файла ресурсов
  val jsonString = readFileFromResources("EntityTypes.json")

  // Попытка разобрать строку JSON в структуру данных
  val parsedJson: Either[ParsingFailure, Json] = parse(jsonString)

  // Преобразование разобранного JSON в список объектов JSON
  val jsonObjects: Option[List[Json]] = parsedJson match {
    case Right(json) =>
      json.as[List[Json]] match {
        case Right(jsonArray) =>
          Some(jsonArray)
        case Left(error) =>
          // Обработка ошибки разбора массива JSON
          println(s"Error parsing JSON array: $error")
          None
      }
    case Left(error) =>
      // Обработка ошибки разбора JSON
      println(s"Error parsing JSON: $error")
      None
  }

  // Отправка каждого объекта JSON на сервер Atlas
  jsonObjects match {
    case Some(jsonArray) =>
      jsonArray.foreach { jsonBody =>
        // Создание POST-запроса для создания типа в Apache Atlas
        val createTypeRequest = basicRequest
          .method(Method.POST, uri"$atlasServerUrl/types/typedefs") // Метод POST и URL для запроса
          .header("Authorization", authHeader) // Заголовок авторизации
          .header("Content-Type", "application/json") // Заголовок типа содержимого
          .header("Accept", "application/json") // Заголовок для принятия ответа в формате JSON
          .body(jsonBody.noSpaces) // Тело запроса с JSON-данными
          .response(asString) // Ожидание ответа в формате строки

        // Отправка запроса и вывод результата
        val response = createTypeRequest.send(backend)
        println(response.body) // Печать тела ответа
        println(response.code) // Печать кода ответа
      }
    case None =>
      // Сообщение, если JSON-объекты не были найдены
      println("No JSON objects found.")
  }

}

комментарии:

  1. readFileFromResources: Функция для чтения содержимого файла JSON из ресурсов.

  2. jsonString: Получение строки JSON из файла.

  3. parsedJson: Попытка разобрать строку JSON в структуру данных Json.

  4. jsonObjects: Преобразование разобранного JSON в список объектов JSON.

  5. jsonArray.foreach: Для каждого объекта JSON создается и отправляется POST-запрос на сервер Atlas.

  6. createTypeRequest: Создание POST-запроса с JSON-данными для создания типов в Apache Atlas.

  7. response: Отправка запроса и вывод результата, включая тело ответа и код ответа.

теперь для создания всех энтити в Apache Atlas достаточно вызвать метод
generatePicoSparkTypes()


Поскольку DataSet сущности уже созданы, можно сразу приступить к созданию Process сущностей с заполненными полями inputs и outputs. Это важно, так как при попытках обновления сущностей через API ничего не сработало. Начнем с определения набора методов:

EntityTypes в Apache Atlas
EntityTypes в Apache Atlas

как видим все EntityType созданы

Создаем DataSet Entity

Перед тем как создавать сущности процессов нужно создать сущности DataSet-тов, поскольку первые ссылаются на вторые

На данном уже определен pico_spark_data_type который отвечает за входные / выходные схемы данных.

Для начала определимся с двумя вспомогательными методами

/**
 * Создает функцию для отправки JSON данных на указанный эндпоинт в Apache Atlas.
 *
 * @param postfix Строка, добавляемая к базовому URL для формирования полного URL эндпоинта.
 * @return Функция, принимающая JSON строку и отправляющая ее на сервер через HTTP POST запрос.
 */
def senderJsonToAtlasEndpoint(postfix: String): String => Unit = {

  jsonBody => {
    // Создание HTTP POST запроса для отправки JSON данных на сервер
    val createTypeRequest = basicRequest
      .method(Method.POST, uri"$atlasServerUrl/${postfix}")
      .header("Authorization", authHeader)
      .header("Content-Type", "application/json")
      .header("Accept", "application/json")
      .body(jsonBody)
      .response(asString)

    // Отправка запроса и получение ответа
    val response = createTypeRequest.send(backend)
    
    // Вывод тела ответа и кода статуса
    println(response.body)
    println(response.code)
  }
}

/**
 * Генерирует и отправляет сущности данных Spark в Apache Atlas для указанного домена.
 *
 * @param domain Домен, который будет использоваться в атрибутах сущностей.
 * @param execJsonAtlas Функция для отправки JSON данных в Apache Atlas.
 * @return Функция, принимающая AST и создающая JSON для каждой дочерней сущности.
 */
def generateSparkDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {

  // Локальная функция для генерации и отправки сущностей данных Spark
  def generateEntities(ast: AST): Unit = {
    ast.children.foreach { inast =>
      // Формирование JSON тела для сущности данных Spark
      val jsonBody =
        f"""
           |{
           |  "entity": {
           |    "typeName": "pico_spark_data_type",
           |    "attributes": {
           |      "domain": "${domain}",
           |      "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
           |      "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
           |      "description": "A description for the spark_data"
           |    }
           |  }
           |}
           |""".stripMargin

      // Отправка сформированного JSON тела на сервер
      execJsonAtlas(jsonBody)
      
      // Рекурсивный вызов для обработки дочерних узлов
      generateEntities(inast)
    }
  }

  // Возвращаем функцию для генерации сущностей
  generateEntities
}

Пояснения

  • senderJsonToAtlasEndpoint: Эта функция создает и возвращает другую функцию, которая отправляет JSON данные на указанный эндпоинт в Apache Atlas. Комментарии объясняют параметры, создание запроса, отправку и обработку ответа.

  • generateSparkDataEntities: Эта функция генерирует сущности данных Spark, формирует соответствующий JSON и отправляет его в Apache Atlas, используя переданную функцию для отправки. Комментарии описывают параметры и внутреннюю логику функции, включая рекурсивный вызов для обработки всех дочерних узлов.

Напишем еще 2 метода для запуска формирования Linage В Atlas

/**
 * Преобразует AST (абстрактное синтаксическое дерево) в сущности Apache Atlas и отправляет их на сервер.
 *
 * @param ast Абстрактное синтаксическое дерево, представляющее структуру данных.
 * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
 * @param topLevelExpr Выражение уровня, используемое для определения уровня в AST. В данном случае не используется.
 */
def ASTToAtlasEntity(ast: AST, domain: String, topLevelExpr: String): Unit = {

  // Создание функции для отправки JSON данных на эндпоинт "entity" в Apache Atlas
  val entitySender = senderJsonToAtlasEndpoint("entity")
  
  // Создание функции для генерации сущностей данных Spark и отправки их в Apache Atlas
  val sparkDataEntityGenerator = generateSparkDataEntities(domain, entitySender)

  // Создание базовых сущностей вывода и отправка их на сервер
  //ее реализацию опущу
  createBaseOutput(domain, entitySender)
  
  // Создание базовых сущностей ввода и отправка их на сервер
  //ее реализацию опущу
  createBaseInput(domain, entitySender)
  
  // Генерация и отправка сущностей данных Spark на основе AST
  sparkDataEntityGenerator(ast)
}

/**
 * Имплементация расширения для преобразования AST в сущности Apache Atlas.
 *
 * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
 */
implicit class converter(ast: AST) {

  /**
   * Преобразует текущее AST в сущности Apache Atlas и отправляет их на сервер.
   *
   * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
   */
  def EntityToAtlas(domain: String): Unit = {
    ASTToAtlasEntity(ast, domain, "")
  }
}

Пояснения:

  • ASTToAtlasEntity: Этот метод преобразует переданное AST в сущности Apache Atlas и отправляет их на сервер. Он использует вспомогательные функции для создания базовых сущностей и генерации сущностей данных Spark, а также отправляет их на сервер через созданную функцию entitySender.

  • EntityToAtlas: Это метод расширения (implicit class) для типа AST, который упрощает вызов метода ASTToAtlasEntity с дефолтным значением для topLevelExpr. Этот метод предоставляет удобный способ преобразования AST в сущности Apache Atlas, используя указанный домен.

Теперь при запуске ast.EntityToAtlas("picoDomain")В атласе появляется data entity

скриншот с web UI
скриншот с web UI

так как DataSet Entity уже созданы, то можно создавать Process Entity сразу с заролнеными inputs и outputs, это важно поскольку сколько я не тыкалась в Api для обновления Entuty ничего не работало.

начнем с того что определим пачку методов:

  // Создает функцию для отправки сущностей в Apache Atlas
  // Использует функцию преобразования AST в JSON и функцию отправки JSON
  def senderEntity(nodeToAtlasCreateEntityJson: (AST, String) => String, execJsonAtlas: String => Unit): (AST, String) => Unit = {
    // Возвращает функцию, которая преобразует AST в JSON и отправляет его в Atlas
    (ast: AST, topLevelExpr: String) => {
      val jsonBody = nodeToAtlasCreateEntityJson(ast, topLevelExpr)
      execJsonAtlas(jsonBody)
    }
  }

  // Генерирует JSON для сущностей в Atlas на основе AST и уровня
  // Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.
  def generatotrProcessEntity(domain: String, qualifiedName: (Node, String) => String): (AST, String) => String = {
    (ast: AST, topLevelExpr: String) => {
      val node = ast.node

      // Создает список входных сущностей, если есть дочерние элементы
      val inputs = if (ast.children.nonEmpty) {
        ast.children.map(_.levelExpr).map { expr =>
          f"""
             |
             |{
             |  "typeName": "pico_spark_data_type",
             |  "uniqueAttributes": {
             |    "qualifiedName": "pico_spark_data_${ast.levelExpr}-${expr}@${domain}"
             |  }
             |}
             |
             |""".stripMargin
        }.mkString(", ")
      } else {
        f"""
           | {
           |  "typeName": "pico_spark_data_type",
           |   "uniqueAttributes": {
           |    "qualifiedName": "pico_spark_data_input@${domain}"
           |   }
           | }
           |""".stripMargin
      }

      // Создает JSON для выходных сущностей, если задан topLevelExpr
      val output = if (topLevelExpr.nonEmpty) {
        f"""
           | {
           |  "typeName": "pico_spark_data_type",
           |   "uniqueAttributes": {
           |      "qualifiedName": "pico_spark_data_${topLevelExpr}-${ast.levelExpr}@${domain}"
           |   }
           | }
           |""".stripMargin
      } else {
        f"""
           | {
           |  "typeName": "pico_spark_data_type",
           |   "uniqueAttributes": {
           |    "qualifiedName": "pico_spark_data_output@${domain}"
           |   }
           | }
           |""".stripMargin
      }

      // Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.
      node match {
        case p: ProjectNode =>
          f"""
             |{
             |"entity": {
             |      "typeName": "pico_spark_project_type",
             |      "attributes": {
             |        "qualifiedName": "${qualifiedName(node, ast.levelExpr)}",
             |        "name": "pico_project_${ast.levelExpr}",
             |        "description": "This is an project for the pico_spark_project_type",
             |        "columns": [${p.columns.map(col => "\"" + col + "\"").mkString(", ")}],
             |        "inputs":[ ${inputs} ],
             |        "outputs":[ ${output} ]
             |      }
             |    }
             |}
             |""".stripMargin
        case ...

      }
    }
  }

  // Создает функцию для генерации и отправки сущностей в Apache Atlas
  // Использует предоставленные функции для создания JSON и отправки его в Atlas
  def generatorDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {

    def sparkDataEntitys(ast: AST): Unit = {
      ast.children.foreach { inast =>
        val jsonBody =
          f"""
             |{
             |  "entity": {
             |    "typeName": "pico_spark_data_type",
             |    "attributes": {
             |      "domain": "${domain}",
             |      "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
             |      "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
             |      "description": "A description for the spark_data"
             |    }
             |  }
             |}
             |""".stripMargin

        execJsonAtlas(jsonBody)
        sparkDataEntitys(inast)
      }
    }

    // Возвращает функцию, которая генерирует и отправляет сущности данных для Spark
    sparkDataEntitys
  }

Пояснения:

  • senderEntity: Функция, которая создает и отправляет JSON для сущностей в Apache Atlas, используя предоставленные функции преобразования и отправки.

  • generatotrProcessEntity: Функция, которая генерирует JSON для различных типов узлов в AST и преобразует их в формат, пригодный для Apache Atlas.

  • generatorDataEntities: Функция, которая создает и отправляет данные сущностей для Spark, рекурсивно обрабатывая детей узлов в AST.

И обновляем методы для работы с AST

   // Преобразует AST в сущности Apache Atlas и отправляет их на указанный эндпоинт
  def ASTToAtlasEntity(ast: AST, domain: String): Unit = {

    // Создает функцию отправки JSON-данных для сущностей в Apache Atlas
    val entitySender = senderJsonToAtlasEndpoint("entity")

    // Создает функцию для генерации квалифицированного имени
    val qualifiedName = generatorQualifiedName(domain)

    // Создает функцию для генерации JSON-сущностей для процессов
    val generatorProcessEntity = generatotrProcessEntity(domain, qualifiedName)

    // Создает функцию для отправки JSON-данных сущностей в Atlas
    val sendEntity = senderEntity(generatorProcessEntity, entitySender)

    // Создает функцию для генерации данных сущностей и отправки их в Atlas
    val generateDataEntity = generatorDataEntities(domain, entitySender)

    // Обрабатывает один узел AST, отправляя его как сущность в Atlas
    def processNode(ast: AST, intopLevelExpr: String): Unit = {
      sendEntity(ast, intopLevelExpr)
    }

    // Рекурсивно проходит по всему дереву AST, обрабатывая каждый узел
    def traverseAST(ast: AST, intopLevelExpr: String): Unit = {
      processNode(ast, intopLevelExpr)
      ast.children.foreach(ch => traverseAST(ch, ast.levelExpr))
    }

    // Создает базовые выходные и входные сущности для указанного домена и отправляет их в Atlas
    createBaseOutput(domain, entitySender)
    createBaseInput(domain, entitySender)

    // Генерирует данные сущностей для AST и отправляет их в Atlas
    generateDataEntity(ast)

    // Запускает рекурсивное прохождение AST
    traverseAST(ast, "")
  }

  // Обогащает класс AST функцией для преобразования его в сущности Apache Atlas
  implicit class converter(ast: AST) {

    // Преобразует текущий узел AST в сущности Apache Atlas и отправляет их на указанный эндпоинт
    def EntityToAtlas(domain: String): Unit = {
      ASTToAtlasEntity(ast, domain)
    }

  }

Пояснения:

  • ASTToAtlasEntity: Основной метод, который:

    • Создает функции для преобразования AST в JSON и отправки его в Apache Atlas.

    • Определяет вспомогательные функции для обработки узлов AST и рекурсивного обхода дерева.

    • Создает и отправляет базовые сущности (входные и выходные) в Atlas.

    • Рекурсивно проходит по дереву AST и отправляет каждую сущность в Atlas.

  • implicit class converter(ast: AST): Обогащает класс AST, добавляя метод для преобразования AST в сущности Apache Atlas.

    • EntityToAtlas: Использует метод ASTToAtlasEntity для преобразования текущего узла AST в сущности Atlas и отправки их в указанный домен.

Теперь после запуска В Apache Atlas таки появиться Linage

Что ж, на изначальный logical план вроде похоже

Project [model#17, manufacturer#18, 2024-09-12 16:57:34.046609 AS processedDDTM#36]
+- Project [model#17, manufacturer#18]
   +- Filter NOT (manufacturer#18 = Audi)
      +- Union false, false
         :- Relation [model#17,manufacturer#18] csv
         +- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]
            +- LocalRelation [_1#23, _2#24]

P.S. код можно глянуть тут
P.P.S докер фалы для запуска Apache Atlas можно взять тут

Теги:
Хабы:
+2
Комментарии1

Публикации

Истории

Работа

Scala разработчик
11 вакансий
Data Scientist
78 вакансий

Ближайшие события

27 августа – 7 октября
Премия digital-кейсов «Проксима»
МоскваОнлайн
14 сентября
Конференция Practical ML Conf
МоскваОнлайн
19 сентября
CDI Conf 2024
Москва
20 – 22 сентября
BCI Hack Moscow
Москва
24 сентября
Конференция Fin.Bot 2024
МоскваОнлайн
25 сентября
Конференция Yandex Scale 2024
МоскваОнлайн
28 – 29 сентября
Конференция E-CODE
МоскваОнлайн
28 сентября – 5 октября
О! Хакатон
Онлайн
30 сентября – 1 октября
Конференция фронтенд-разработчиков FrontendConf 2024
МоскваОнлайн
3 – 18 октября
Kokoc Hackathon 2024
Онлайн