Когда строишь потоки обработки в Apache Nifi в основном требуется результат записать в базу данных. Бывают случаи, когда запись в целевую таблицу не проходит по причине несоответствия данных в записи и полей в таблице. Это возникает в случае, если на источнике размер поля увеличился, либо изменился порядок знаков после занятой, либо вы ожидаете "uniqueidentifier" а приходит пустая строка.
Ситуаций много, а решение обычно одно - локализовать ошибку, найти поле, которое не соответствует S2T, внести корректировки в таблицы, либо дополнить трасформации. В случае, если полей в записи две три, понять причину не сложно. Когда же их много приходится тратить время на разбор записи по отдельным полям и проверку каждого потенциального источника ошибки.
Для этого предлагаю скрипт для ScriptedTransformRecord, позволяющий разобрать запись на массив записей, каждая из которых содержит только одно значение, остальные проставляются "null".
results = []
fields = record.getRawFieldNames()
schema = record.getSchema()
fields.each{key -> {
log.debug("Got key: ${key}")
nMap = new java.util.HashMap<String, Object>()
log.debug("create map")
nMap.put(key,record.getValue(key))
log.debug("Put to map value")
newrecord = new org.apache.nifi.serialization.record.MapRecord(schema, nMap)
log.debug("Сreate new record with field ${key}")
results.add(newrecord)
}}
results
Полезные ссылки: