Search
Write a publication
Pull to refresh
21
0
Иван Клименко @KlimenkoIv

Архитектор интеграционных решений в BigData

Send message

Когда строишь потоки обработки в Apache Nifi в основном требуется результат записать в базу данных. Бывают случаи, когда запись в целевую таблицу не проходит по причине несоответствия данных в записи и полей в таблице. Это возникает в случае, если на источнике размер поля увеличился, либо изменился порядок знаков после занятой, либо вы ожидаете "uniqueidentifier" а приходит пустая строка.

Ситуаций много, а решение обычно одно - локализовать ошибку, найти поле, которое не соответствует S2T, внести корректировки в таблицы, либо дополнить трасформации. В случае, если полей в записи две три, понять причину не сложно. Когда же их много приходится тратить время на разбор записи по отдельным полям и проверку каждого потенциального источника ошибки.

Для этого предлагаю скрипт для ScriptedTransformRecord, позволяющий разобрать запись на массив записей, каждая из которых содержит только одно значение, остальные проставляются "null".

results = []

fields = record.getRawFieldNames()
schema = record.getSchema()
fields.each{key -> {
    log.debug("Got key: ${key}")
    nMap = new java.util.HashMap<String, Object>()
    log.debug("create map")
    nMap.put(key,record.getValue(key))
    log.debug("Put to map value")
    newrecord = new org.apache.nifi.serialization.record.MapRecord(schema, nMap)
    log.debug("Сreate new record with field ${key}")
    results.add(newrecord)
}}

results

Полезные ссылки:

Tags:
Total votes 1: ↑1 and ↓0+1
Comments0

Information

Rating
4,090-th
Location
Санкт-Петербург, Санкт-Петербург и область, Россия
Date of birth
Registered
Activity

Specialization

Data Engineer, Data Engineer
Lead
From 550,000 ₽
Git
Java
Docker
Database
ETL
Apache Airflow
Apache Kafka
Python
High-loaded systems
PostgreSQL