Как стать автором
Поиск
Написать публикацию
Обновить

VRL — просто, подробно и понятно

Уровень сложностиСредний
Время на прочтение20 мин
Количество просмотров302

Vector является очень гибким агрегатором сообщений и поддерживает собственный язык для обработки событий - VRL.

Поэтому предлагаю без лишних слов перейти к изучению его возможностей. Давайте напишем простейшую конфигурацию с использованием VRL, которая будет удалять поле из JSON файла

На входе будут такие данные:

{
  "field1": "foo",
  "field2": "bar"
}

Наша первая конфигурация на VRL будет выглядеть так:

sources:
  file_input:
    type: file
    include:
      - /opt/habr/vector/intro.json
    ignore_checkpoints: true

transforms:
  parse_file:
    type: remap
    inputs:
      - file_input
    source: |

        .message = parse_json!(.message)
        .field1 = .message.field1

        del(.host)
        del(.file)
        del(.message)
        del(.timestamp)
        del(.source_type)

sinks:
  test_output:
    type: file
    inputs:
      - parse_file
    path: /opt/habr/vector/intro_out.json
    encoding:
      codec: json

И на выходе мы получим такую данные:

{
  "field1": "foo"
}

Зачем в нашем трансформе такая часть с удалением других полей?

        del(.host)
        del(.file)
        del(.message)
        del(.timestamp)
        del(.source_type)

Дело в том, что на источник данных в нашем примере - file, который содержит помимо самой строки, ещё и другие метаданные в событии.
Вот как бы выглядел наш вывод, если оставить эти поля:

{
  "field1": "foo",
  "file": "/opt/habr/vector/intro.json",
  "host": "test-mon",
  "message": {
    "field1": "foo",
    "field2": "bar"
  },
  "source_type": "file",
  "timestamp": "2025-03-17T13:09:20.653470883Z"
}

Здесь стоит обратить внимание, что наше поле field1 встречается дважды. Правда в первом случае - это отдельный объект, поскольку мы объявили его внутри нашего трансформа, а во втором случае - это часть message, то есть содержимое нашей строки.

Полезной практикой при написании VRL будет создание отдельного файла под VRL. В нашем примере VRL получился совсем короткий, но бывают случаи, когда один VRL растягивается на 100, 200 и 300 строк. Поэтому хранить всё в одном файле будет неудобно, есть риск допустить ошибки в конфигурации, а также теряется читабельность.

Чтобы вынести VRL в отдельный файл, достаточно указать параметр "file" в transform

sources:
  file_input:
    type: file
    include:
      - /opt/habr/vector/intro.json
    ignore_checkpoints: true

transforms:
  parse_file:
    type: remap
    inputs:
      - file_input
    file: /etc/vector/habr/intro.vrl

sinks:
  test_output:
    type: file
    inputs:
      - parse_file
    path: /opt/habr/vector/intro_out.json
    encoding:
      codec: json

Теперь наша конфигурация выглядит компактнее, а с самим VRL можно работать в отдельном файле.

2. Синтаксис VRL

Теперь изучив базовую работу с VRL, можно изучить его синтаксис, чтобы начать работать с Vector более гибко.

Переменные

Переменные в VRL могут содержать числовые, строковые и логические значения. Их можно присвоить напрямую:

$ a=5                                                                
5

$ b = "some text"
"some text"

$ pi = 3.14
3.14

$ k = true
true

Либо же можно присвоить значение как результат выполнения функции:

$ c = snakecase(b)
"some_text"

Также с числовыми переменными можно выполнять базовые математические операции:

$ a = 5 + 5
10

$ a = 5 - 2
3

$ a = 5 * 5
25

$ a = 5 / 5
1

$ a = 7 / 5
1.4

$ a = mod(5, 3)
2

А также мы можем объединять вместе строковые переменные:

$ str1 = "some text"
"some text"

$ str2 = "other text"
"other text"

$ str1 + " and " + str2
"some text and other text"

Также стоит отметить, что переменные, начинающиеся с точки - это JSON поля, которые пойдут дальше в sink, а без неё - это обычные переменные, которые существуют в рамках трансформа.

Массивы

VRL поддерживает работу с массивами. Массивы могут хранить в себе любые типы данных, такие как переменные, числа, строки или JSON объекты:

$ a = 5
5

$ arr = ["text", 5, 3.14, true, a, {"foo": "bar"}]
["text", 5, 3.14, true, 5, { "foo": "bar" }]

Операторы и циклы

Полноценно VRL поддерживает только оператор if, как таковых других циклов по типу for или while здесь нет, привычных нам по большинству языков программирования, но в случае с for есть функция, которая выполняет схожий механизм, о котором поговорим чуть позже. А вот при попытке использовать цикл while, мы увидим лишь, что это ключевое слово зарезервировано для будущих нужд:

$ while

error[E205]: reserved keyword
  ┌─ :1:1
  │
1 │ while
  │ ^^^^^
  │ │
  │ this identifier name is reserved for future use in the language
  │ use a different name instead
  │
  = see language documentation at https://vrl.dev
  = try your code in the VRL REPL, learn more at https://vrl.dev/examples

В целом, полезно ознакомиться со списком зарезервированных слов, чтобы избежать конфликтов - https://vector.dev/docs/reference/vrl/expressions/#keywords

Итак, начнём с простого оператора проверки условия if - else.
Здесь мы имеем вполне знакомый синтаксис:

if *условие* {
	*дейстиве*
} else if *условие* {
	*дейтсвие*
} else {
	*действие*
}

И попробуем это выполнить на реальном примере:

$ a = 3
if (a == 10) {
  "first block"
} else if (a <= 10) {
  "second block"
} else {
  "third block"
}
"second block"

Теперь перейдём к перебору массивов. Как я уже писал, на данный момент VRL не поддерживает цикл for и есть лишь функция, которая выполняет схожий механизм.

Рассмотрим функцию for_each. Для примера возьмём такой массив:

	.arr = ["my", "first", "array", "loop"]

И теперь попробуем применить функцию upcase для каждого элемента массива с помощью for_each:

$ .new_arr = []
$
$ for_each(.arr) -> |_index, value|{
    .new_arr = push(.new_arr, upcase(value))
}
$
$ .new_arr
["MY", "FIRST", "ARRAY", "LOOP"]
$ .arr = del(.new_arr)
["MY", "FIRST", "ARRAY", "LOOP"]

Как видим, здесь всё не так просто. Для начала нам надо создать новый массив, который будет хранить обработанные значения. Затем мы пробегаемся по массиву .arr, используя for_each. Выражение |_index, value|, означает, что мы будет работать только со значениями массива, и индексы нас не интересуют, поэтому будет обработан каждый элемент массива. Далее мы добавляем новое обработанное значение в массив .new_arr с помощью функции push. После чего мы проверяем содержимое временного массива, присваиваем его нашему исходному массиву и удаляем временный.

У вас мог возникнуть вопрос, почему мы не можем просто использовать исходный массив, и применить функцию upscale к каждому элементу? Но, к сожалению, VRL не позволяет нам работать с массивом таким образом:

$ for_each(.arr) -> |index, value| {
    .arr[index] = upcase(value)
}

error[E203]: syntax error
  ┌─ :2:10
  │
2 │     .arr[index] = upcase(value)
  │          ^^^^^
  │          │
  │          unexpected syntax token: "Identifier"
  │          expected one of: "integer literal"
  │
  = see language documentation at https://vrl.dev
  = try your code in the VRL REPL, learn more at https://vrl.dev/examples

Но на самом деле мы можем обработать этот массив иначе, не прибегая к созданию временного массива. Для этого мы можем воспользоваться функцией map_value, которая позволяет обрабатывать значения:

$ map_values(.arr) -> |value| { upcase!(value) }
["MY", "FIRST", "ARRAY", "LOOP"]

Теперь давайте рассмотрим аналогичное действие, но на это раз обработаем только чётные элементы массива .arr:

$ .new_arr = []
$
$ for_each(.arr) -> |index, value| {
    if mod(index, 2) == 0 {
        .new_arr = push(.new_arr, upcase(value))
    } else{
        .new_arr = push(.new_arr, value)
    }
}
$
$ .new_arr
["MY", "first", "ARRAY", "loop"]
$ .arr = del(.new_arr)
["MY", "first", "ARRAY", "loop"]

Здесь мы вновь создаём наш временный массив, который будет хранить обработанные значения. На это раз мы указываем index без нижнего подчёркивания и в теле for_each проверяем значение индекса. Если его остаток от деления на 2 равен 0, тогда этот элемент массива .arr чётный и мы применяем к нему функцию upscale. В противном случаем элемент массива .arr будет добавлен массиву .new_arr без изменений.

Обратите внимание, на блок else. Его обязательно надо добавить в наш цикл, поскольку без него нечётные элементы просто будут отброшены:

$ .new_arr
["MY", ARRAY"]

Обработка ошибок

Немало важной частью является обработка ошибок внутри вашего трансформа. При написании пайплайна могут возникать неоднозначные ситуации, которые требуют отдельного внимания.

Представим ситуацию, у нас есть файл с таким содержимым:

1,,3

И мы хотим разбить каждое число в отдельное поле и умножить его на 5, разделив их запятой. Для этого напишем такой VRL:

arr = split(.message, ",")

.var1 = to_int(arr[0]) * 5
.var2 = to_int(arr[1]) * 5
.var3 = to_int(arr[2]) * 5

А сам файл конфигурации будет выглядеть так:

sources:
  file_input:
    type: file
    include:
      - /opt/habr/vector/error.txt
    ignore_checkpoints: true

transforms:
  parse_file:
    type: remap
    inputs:
      - file_input
    file: /etc/vector/habr/errors.vrl

sinks:
  test_output:
    type: file
    inputs:
      - parse_file
    path: /opt/habr/vector/error.json
    encoding:
      codec: json

Теперь попробуем запустить vector с такой конфигурацией:

root@test-mon:/etc/vector/habr# vector --config errors.yaml 
2025-07-16T14:30:36.222710Z  INFO vector::app: Log level is enabled. level="info"
2025-07-16T14:30:36.223878Z  INFO vector::app: Loading configs. paths=["errors.yaml"]
2025-07-16T14:30:36.226896Z ERROR vector::topology::builder: Configuration error. error=Transform "parse_file": 
error[E103]: unhandled fallible assignment
  ┌─ :1:7
  │
1 │ arr = split(.message, ",")
  │ ----- ^^^^^^^^^^^^^^^^^^^^
  │ │     │
  │ │     this expression is fallible because at least one argument's type cannot be verified to be valid
  │ │     update the expression to be infallible by adding a `!`: `split!(.message, ",")`
  │ │     `.message` argument type is `any` and this function expected a parameter `value` of type `string`

Во-первых, мы видим, что vector сразу ругается на функцию split, поскольку такая запись может привести к ошибкам. В нашем случае мы полностью уверены в нашем выборе, поэтому обозначим функцию split с символом «!», чтобы вектор проигнорировал потенциально опасную запись и продолжил работу:

arr = split!(.message, ",")

И попробуем запустить vector:

root@test-mon:/etc/vector/habr# vector --config errors.yaml 
2025-07-16T14:35:09.037986Z  INFO vector::app: Log level is enabled. level="info"
2025-07-16T14:35:09.039181Z  INFO vector::app: Loading configs. paths=["errors.yaml"]
2025-07-16T14:35:09.042971Z ERROR vector::topology::builder: Configuration error. error=Transform "parse_file": 
error[E103]: unhandled fallible assignment
  ┌─ :3:9
  │
3 │ .var1 = to_int(arr[0]) * 5
  │ ------- ^^^^^^^^^^^^^^^^^^ this expression is fallible because at least one argument's type cannot be verified to be valid

И вновь непорядок, вновь vector падает из-за ошибки. На это раз ему не нравится функция to_int.
Представим, что мы очень уверенные в себе люди, и так же добавим «!» к функции

.var1 = to_int!(arr[0]) * 5
.var2 = to_int!(arr[1]) * 5
.var3 = to_int!(arr[2]) * 5

Наверное теперь всё должно пройти хорошо и vector не свалится в ошибку:

root@test-mon:/etc/vector/habr# vector --config errors.yaml 
2025-07-16T14:36:22.005848Z  INFO vector::app: Log level is enabled. level="info"
2025-07-16T14:36:22.006867Z  INFO vector::app: Loading configs. paths=["errors.yaml"]
2025-07-16T14:36:22.010056Z  INFO vector::topology::running: Running healthchecks.
2025-07-16T14:36:22.010141Z  INFO vector::topology::builder: Healthcheck passed.
2025-07-16T14:36:22.010142Z  INFO vector: Vector has started. debug="false" version="0.45.0" arch="x86_64" revision="063cabb 2025-02-24 14:52:02.810034614"
2025-07-16T14:36:22.010167Z  INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2025-07-16T14:36:22.010197Z  INFO source{component_kind="source" component_id=file_input component_type=file}: vector::sources::file: Starting file server. include=["/opt/habr/vector/error.txt"] exclude=[]
2025-07-16T14:36:22.010491Z  INFO source{component_kind="source" component_id=file_input component_type=file}:file_server: file_source::checkpointer: Loaded checkpoint data.
2025-07-16T14:36:22.010623Z  INFO source{component_kind="source" component_id=file_input component_type=file}:file_server: vector::internal_events::file::source: Found new file to watch. file=/opt/habr/vector/error.txt
2025-07-16T14:36:22.010881Z ERROR transform{component_kind="transform" component_id=parse_file component_type=remap}: vector::internal_events::remap: Mapping failed with event. error="function call error for \"to_int\" at (65:80): Invalid integer \"\": cannot parse integer from empty string" error_type="conversion_failed" stage="processing" internal_log_rate_limit=true

Вроде бы всё хорошо, в этот раз vector не упал. Но в логе можно увидеть одно сообщение:

2025-07-16T14:36:22.010881Z ERROR transform{component_kind="transform" component_id=parse_file component_type=remap}: vector::internal_events::remap: Mapping failed with event. error="function call error for \"to_int\" at (65:80): Invalid integer \"\": cannot parse integer from empty string" error_type="conversion_failed" stage="processing" internal_log_rate_limit=true

В этой записи, мы видим, что на вход функции to_int поступило пустое число. Что же тогда vector записал в файл вывода?

{"file":"/opt/habr/vector/error.txt","host":"test-mon","message":"1,,3","source_type":"file","timestamp":"2025-07-16T14:36:22.010766422Z"}

А в выходном файле у нас записано событие целиком, поскольку оно обработалось с ошибкой.

Этот пример отчётливо показывает, что не стоит злоупотреблять знаками «!», а нужно их использовать только в том случае, если вы уверены, что ваша форма записи отработает корректно.

Но как же нам тогда поступить в этой ситуации? Для начала стоит отметить, зачем мы используем функцию to_int. Дело в том, что после обработки функцией split, мы получаем на выходе все элементы массива arr в string. Поэтому мы не можем просто умножить на 5, нам нужно привести данные к int. И в нашем исходном файле есть пропущенное число, поэтому такой случай надо отдельно обозначить:

.var1, err = to_int(arr[0]) * 5
.var2, err = to_int(arr[1]) * 5
.var3, err = to_int(arr[2]) * 5

Посмотрим, как теперь выглядят строки в выходном файле:

{"file":"/opt/habr/vector/error.txt","host":"test-mon","message":"1,,3","source_type":"file","timestamp":"2025-07-16T17:29:05.314777030Z","var1":5,"var2":0,"var3":15}

Видно, что теперь у нас обработались наши поля, а поле с пустым значением приравнялось у нулю. Всё потому что arr[1] у нас null и при преобразовании через `to_int, мы получаем 0

Применяем VRL на практике

Теперь давайте попробуем закрепить все полученные знания на реальном примере.
Представим такой кейс, что у нас есть банковское приложение для перевода средств между клиентами. И это приложение генерирует данные по каждой транзакции.

Возьмём за основу такие данные с такой структурой полей:

"account_receiver": "234567891012",
"account_sender": "750",
"auth_channel_end": "PS",
"auth_channel_start": "PS",
"auth_context_end": "PS,43604000B000DA02,2025-02-14T10:10:28+03:00",
"auth_context_start": "PS,43604000B000DA02,2025-02-14T10:10:28+03:00",
"authorization_time": "2025-01-21 08:10:35",
"branch_code": "123456789101",
"channel_type": "3",
"compliance_rules": "345678912101,91,[],[],,AC",
"counterparty_id_a": "",
"counterparty_id_b": "urn:86661307-056805-0",
"customer_external_id": "436046005035947",
"customer_id": "234567891012",
"customer_internal_id": "",
"device_type": "1",
"entry_time_end": "2025-02-14 10:10:28",
"entry_time_start": "2025-02-14 10:10:28",
"entrypoint_node_end": "43604000B000DA02",
"entrypoint_node_start": "43604000B000DA02",
"forwarding": "",
"forwarding_reason": "",
"operation_code": "234567891012",
"processing_code": "900",
"processing_time_sec": "55",
"security_flag": "",
"session_token_a": "jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4",
"session_token_b": "678F728F12B9D00030BA8@2.3.4.5:1234",
"session_token_c": "",
"status_http": "",
"transaction_end_time": "2025-02-14 10:11:17",
"transaction_id": "51ab65e0-0805-498f-8508-3cc4cd459ffd",
"transaction_start_time": "2025-01-21 08:10:22",
"transaction_status_code": "16",
"transaction_type": "522",
"used_services_raw": "OIP,override,0,,,2025-02-14T10:10:23+03:00,,,,[];CAT,,0,,,2025-02-14T10:10:31+03:00,,,,[]",
"user_interaction_time_sec": "43"

И сами строки в сыром виде, будут выглядеть так:

2025-02-14T10:11:17+03:00;522;123456789101;51ab65e0-0805-498f-8508-3cc4cd459ffd;234567891012;2025-01-21T10:10:22+05:00;2025-01-21T10:10:35+05:00;234567891012;750;234567

891012;;;3;1;jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4;678F728F12B9D00030BA8@2.3.4.5:1234;;;436046005035947;;urn:86661307-056805-0;PS,43604000B000DA02,2025-02-14T10:10:2

8+03:00;PS,43604000B000DA02,2025-02-14T10:10:28+03:00;"";900;"345678912101,91,[],[],,AC";"OIP,override,0,,,2025-02-14T10:10:23+03:00,,,,[];CAT,,0,,,2025-02-14T10:10:31+

03:00,,,,[]";16;;55;43;

Теперь представим себе такую задачу: наша банковская система генерирует данные в таком формате, а партнёру надо преобразовать данные в удобный для него формат. Нам необходимо будет поменять формат меток времени из RFC 3339 в простой DateTime формат, а также нам надо заменить разделители в compliance_rules и used_services_raw с «;» на «&»

Для этого набросаем такой VRL. Для начала нам надо разбить строку на части, поскольку количество элементов не фиксировано, и просто разбить через «;» не получится, то мы поделим сообщение на части с помощью регулярного выражения по ключевым моментам в каждом сообщении:

.message = string!(.message)
.message = parse_regex!(.message, r'^(?P<log_part1>.*?);"(?P<log_security_flag>)";(?P<log_processing_code>[^;]*);"(?P<log_compliance_rules>[^"]*)";"(?P<log_used_service
s_raw>[^"]*)";(?P<log_part2>.*)$')
.part1 = .message.log_part1
.security_flag = .message.log_security_flag
.processing_code = .message.log_processing_code
.compliance_rules = .message.log_compliance_rules
.used_services_raw = .message.log_used_services_raw
.part2 = .message.log_part2

Здесь мы разбиваем сообщение на несколько частей, чтобы в дальнейшем их обработать отдельно. Также мы сразу создаём уже вычлененные поля в отдельные поля.

Теперь перейдём к обработке первой части сообщения:

.part1 = string(.part1)
arr = split(.part1, ";")
.transaction_end_time, err = if arr[0] != "" {
    unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+"))
    unix_timestamp = to_int(unix_timestamp) + 10800
    format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
    ""
}
.transaction_type = to_string(arr[1])
.branch_code = to_string(arr[2])
.transaction_id = to_string(arr[3])
.customer_id = to_string(arr[4])
.transaction_start_time, err = if arr[5] != "" {
    unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[5], "%+"))
    unix_timestamp = to_int(unix_timestamp) + 10800
    format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
    ""
}
.authorization_time, err = if arr[6] != "" {
    unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[6], "%+"))
    unix_timestamp = to_int(unix_timestamp) + 10800
    format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
    ""
}
.operation_code = to_string(arr[7])
.account_sender = to_string(arr[8])
.account_receiver = to_string(arr[9])
.forwarding = to_string(arr[10])
.forwarding_reason = to_string(arr[11])
.channel_type = to_string(arr[12])
.device_type = to_string(arr[13])
.session_token_a = to_string(arr[14])
.session_token_b = to_string(arr[15])
.session_token_c = to_string(arr[16])
.customer_internal_id = to_string(arr[17])
.customer_external_id = to_string(arr[18])
.counterparty_id_a = to_string(arr[19])
.counterparty_id_b = to_string(arr[20])
.auth_context_start = to_string(arr[21])
.auth_context_end = to_string(arr[22])

Разберём это детальнее. Для начала мы разбиваем первую часть с помощью «;» и записываем это в массив arr:

.part1 = string(.part1)
arr = split(.part1, ";")

После чего мы получаем мы преобразуем наш первый timestamp transaction_end_time в нужный формат.

.transaction_end_time, err = if arr[0] != "" {
    unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+"))
    unix_timestamp = to_int(unix_timestamp) + 10800
    format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
    ""
}

Для начала мы проверяем, что первый элемент массива arr не пустой, в противном случае он останется пустым, после чего мы преобразуем нашу метку времени в обычный unix timestamp, преобразуем его в int и прибавляем к нему 10800 (3 часа). Это необходимо для того, чтобы не сбить наше время от таймзоны, поскольку на выходе у нас получится unix timestamp в GMT +0. И после этого мы преобразуем наш unix timestamp в нужный формат времени. Далее мы будем применять аналогичный способ для всех меток времени.
А также создаём отдельные поля:

.transaction_type = to_string(arr[1])
...
.auth_context_end = to_string(arr[22])

Аналогичным образом мы поступим и для auth_context_start и auth_context_end

.auth_context_start = string(.auth_context_start)
arr1 = split(.auth_context_start, ",")

.auth_channel_start = to_string(arr1[0])
.entrypoint_node_start = to_string(arr1[1])
.entry_time_start, err = if arr1[2] != "" {
    unix_timestamp = to_unix_timestamp(parse_timestamp!(arr1[2], "%+"))
    unix_timestamp = to_int(unix_timestamp) + 10800
    format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
    ""
}


.auth_context_end = string(.auth_context_end)
arr2 = split(.auth_context_end, ",")

.auth_channel_end = to_string(arr2[0])
.entrypoint_node_end = to_string(arr2[1])
.entry_time_end, err = if arr2[2] != "" {
    unix_timestamp = to_unix_timestamp(parse_timestamp!(arr2[2], "%+"))
    unix_timestamp = to_int(unix_timestamp) + 10800
    format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
     ""
}

Дальше мы поступим очень интересно. Внутри compliance_rules может идти список из нескольких правил, но в наем случае всегда одно. Мы будем менять разделитель «;» между правилами на «&»:

.compliance_rules = replace(.compliance_rules, ";", "&")

А также нам надо заменить запятые внутри квадратный скобок. Для этого мы воспользуемся replace_with, чтобы найти внутри compliance_rules блок с фигурными скобками и заменить внутри запятые:

.compliance_rules, err = replace_with(.compliance_rules, r'\[(.*?)\]') -> |m| {
    "[" + replace(m.captures[0], ",", "&") + "]"
}

Здесь передаём функции compliance_rules и задаём регулярное выражение для поиска блока с квадратными скобками. Полученное совпадение записывается в переменную m и уже внутри неё меняются запятые на «&».

Далее нам надо преобразовать метку внутри used_services_raw:

.used_services_raw = string(.used_services_raw)
.services_info = split(.used_services_raw, ";")
.parsed_services_info = []

for_each(.services_info) -> |_index, value| {
    next_service = split(value, ",")

    service_call_time, err = if next_service[5] != "" {
        unix_timestamp = to_unix_timestamp(parse_timestamp!(next_service[5], "%+"))
        unix_timestamp = to_int(unix_timestamp) + 10800
        format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
    } else {
        ""
    }

    obj = {
        "service_name": to_string(next_service[0]),
        "service_mode": to_string(next_service[1]),
        "number_of_diversions": to_string(next_service[2]),
        "assoc_party_id": to_string(next_service[3]),
        "service_id": to_string(next_service[4]),
        "service_call_time": service_call_time,
        "number_of_participants": to_string(next_service[6]),
        "action_type": to_string(next_service[7]),
        "customer_group_tag": to_string(next_service[8]),
        "service_cost_info": to_string(next_service[9])
    }

    .parsed_services_info = push(.parsed_services_info, obj)
}

Внутри used_services_raw у нас может идти сколько угодно сервисов, поэтому мы разбиваем его с помощью «;» и записываем в массив service_info.
Далее мы парсим каждый сервис из массива, пробежавшись по нему функцией for_each. Внутри мы по знакомой схеме меняем timestamp, записываем это в словарь obj и записываем его в отдельное поле.

Затем мы идёт простая часть с вычленением полей из второй части сообщения:

.part2 = string(.part2)
arr3 = split(.part2, ";")
.transaction_status_code = to_string(arr3[0])
.status_http = to_string(arr3[1])
.processing_time_sec = to_string(arr3[2])
.user_interaction_time_sec = to_string(arr3[3])

И теперь нам остаётся собрать воедино все разбитые поля в .message, чтобы записать обработанную строку в выходной файл.

.message =      .transaction_end_time + ";" +
        .transaction_type + ";" +
        .branch_code + ";" +
        .transaction_id + ";" +
        .customer_id + ";" +
        .transaction_start_time + ";" +
        .authorization_time + ";" +
        .operation_code + ";" +
        .account_sender + ";" +
        .account_receiver + ";" +
        .forwarding + ";" +
        .forwarding_reason + ";" +
        .channel_type + ";" +
        .device_type + ";" +
        .session_token_a + ";" +
        .session_token_b + ";" +
        .session_token_c + ";" +
        .customer_internal_id + ";" +
        .customer_external_id + ";" +
        .counterparty_id_a + ";" +
        .counterparty_id_b + ";" +
        .auth_channel_start + "," +
        .entrypoint_node_start + "," +
        .entry_time_start + ";" +
        .auth_channel_end + "," +
        .entrypoint_node_end + "," +
        .entry_time_end + ";\"" + 
        .security_flag + "\";" +
        .processing_code + ";\"" +
        .compliance_rules + "\";\""

for_each(.parsed_services_info) -> |index, obj| {
    if index != 0 {
        .message = .message + "&"
    }
    .message = .message + 
               obj.service_name + "," +
               obj.service_mode + "," +
               obj.number_of_diversions + "," +
               obj.assoc_party_id + "," +
               obj.service_id + "," +
               obj.service_call_time + "," +
               obj.number_of_participants + "," +
               obj.action_type + "," +
               obj.customer_group_tag + "," +
               obj.service_cost_info
        }


.message = .message + "\";" +
                .transaction_status_code + ";" +
                .status_http + ";" +
                .processing_time_sec + ";" +
                .user_interaction_time_sec + ";"

Здесь мы поочерёдно прописываем в нужном порядке наши поля, добавляя к ним разделители.
Отдельно можно выделить секцию с parsed_services_info. Нам также надо заменить разделитель между сервисами на «&». Поэтому мы поочерёдно записываем содержимое словарей в message, а также добавляем перед ним «&». Но здесь, стоит заметить, что у нас добавлена проверка для первого элемента, чтобы перед ним не ставился разделитель, поскольку перед ним нет другого сервиса, иначе первый сервис тоже бы начинался с разделителя, что неверно.

Конечный VRL будет выглядеть так:

.message = string!(.message)
.message = parse_regex!(.message, r'^(?P<log_part1>.*?);"(?P<log_security_flag>)";(?P<log_processing_code>[^;]*);"(?P<log_compliance_rules>[^"]*)";"(?P<log_used_services_raw>[^"]*)";(?P<log_part2>.*)$')
.part1 = .message.log_part1
.security_flag = .message.log_security_flag
.processing_code = .message.log_processing_code
.compliance_rules = .message.log_compliance_rules
.used_services_raw = .message.log_used_services_raw
.part2 = .message.log_part2

.part1 = string(.part1)
arr = split(.part1, ";")
.transaction_end_time, err = if arr[0] != "" {
    unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+"))
    unix_timestamp = to_int(unix_timestamp) + 10800
    format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
    ""
}
.transaction_type = to_string(arr[1])
.branch_code = to_string(arr[2])
.transaction_id = to_string(arr[3])
.customer_id = to_string(arr[4])
.transaction_start_time, err = if arr[5] != "" {
    unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[5], "%+"))
    unix_timestamp = to_int(unix_timestamp) + 10800
    format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
    ""
}
.authorization_time, err = if arr[6] != "" {
    unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[6], "%+"))
    unix_timestamp = to_int(unix_timestamp) + 10800
    format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
    ""
}
.operation_code = to_string(arr[7])
.account_sender = to_string(arr[8])
.account_receiver = to_string(arr[9])
.forwarding = to_string(arr[10])
.forwarding_reason = to_string(arr[11])
.channel_type = to_string(arr[12])
.device_type = to_string(arr[13])
.session_token_a = to_string(arr[14])
.session_token_b = to_string(arr[15])
.session_token_c = to_string(arr[16])
.customer_internal_id = to_string(arr[17])
.customer_external_id = to_string(arr[18])
.counterparty_id_a = to_string(arr[19])
.counterparty_id_b = to_string(arr[20])
.auth_context_start = to_string(arr[21])
.auth_context_end = to_string(arr[22])


.auth_context_start = string(.auth_context_start)
arr1 = split(.auth_context_start, ",")

.auth_channel_start = to_string(arr1[0])
.entrypoint_node_start = to_string(arr1[1])
.entry_time_start, err = if arr1[2] != "" {
    unix_timestamp = to_unix_timestamp(parse_timestamp!(arr1[2], "%+"))
    unix_timestamp = to_int(unix_timestamp) + 10800
    format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
    ""
}


.auth_context_end = string(.auth_context_end)
arr2 = split(.auth_context_end, ",")

.auth_channel_end = to_string(arr2[0])
.entrypoint_node_end = to_string(arr2[1])
.entry_time_end, err = if arr2[2] != "" {
    unix_timestamp = to_unix_timestamp(parse_timestamp!(arr2[2], "%+"))
    unix_timestamp = to_int(unix_timestamp) + 10800
    format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
     ""
}


.compliance_rules = replace(.compliance_rules, ";", "&")

.compliance_rules, err = replace_with(.compliance_rules, r'\[(.*?)\]') -> |m| {
    "[" + replace(m.captures[0], ",", "&") + "]"
}

.used_services_raw = string(.used_services_raw)
.services_info = split(.used_services_raw, ";")
.parsed_services_info = []

for_each(.services_info) -> |_index, value| {
    next_service = split(value, ",")
        
    service_call_time, err = if next_service[5] != "" {
        unix_timestamp = to_unix_timestamp(parse_timestamp!(next_service[5], "%+"))
        unix_timestamp = to_int(unix_timestamp) + 10800
        format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
    } else {
        ""
    }
            
    obj = {
        "service_name": to_string(next_service[0]),
        "service_mode": to_string(next_service[1]),
        "number_of_diversions": to_string(next_service[2]),
        "assoc_party_id": to_string(next_service[3]),
        "service_id": to_string(next_service[4]),
        "service_call_time": service_call_time,
        "number_of_participants": to_string(next_service[6]),
        "action_type": to_string(next_service[7]),
        "customer_group_tag": to_string(next_service[8]),
        "service_cost_info": to_string(next_service[9])
    }
            
    .parsed_services_info = push(.parsed_services_info, obj)
}


.part2 = string(.part2)
arr3 = split(.part2, ";")
.transaction_status_code = to_string(arr3[0])
.status_http = to_string(arr3[1])
.processing_time_sec = to_string(arr3[2])
.user_interaction_time_sec = to_string(arr3[3])


.message =      .transaction_end_time + ";" +
        .transaction_type + ";" +
        .branch_code + ";" +
        .transaction_id + ";" +
        .customer_id + ";" +
        .transaction_start_time + ";" +
        .authorization_time + ";" +
        .operation_code + ";" +
        .account_sender + ";" +
        .account_receiver + ";" +
        .forwarding + ";" +
        .forwarding_reason + ";" +
        .channel_type + ";" +
        .device_type + ";" +
        .session_token_a + ";" +
        .session_token_b + ";" +
        .session_token_c + ";" +
        .customer_internal_id + ";" +
        .customer_external_id + ";" +
        .counterparty_id_a + ";" +
        .counterparty_id_b + ";" +
        .auth_channel_start + "," +
        .entrypoint_node_start + "," +
        .entry_time_start + ";" +
        .auth_channel_end + "," +
        .entrypoint_node_end + "," +
        .entry_time_end + ";\"" + 
        .security_flag + "\";" +
        .processing_code + ";\"" +
        .compliance_rules + "\";\""

for_each(.parsed_services_info) -> |index, obj| {
    if index != 0 {
        .message = .message + "&"
    }
    .message = .message + 
               obj.service_name + "," +
               obj.service_mode + "," +
               obj.number_of_diversions + "," +
               obj.assoc_party_id + "," +
               obj.service_id + "," +
               obj.service_call_time + "," +
               obj.number_of_participants + "," +
               obj.action_type + "," +
               obj.customer_group_tag + "," +
               obj.service_cost_info
        }


.message = .message + "\";" +
                .transaction_status_code + ";" +
                .status_http + ";" +
                .processing_time_sec + ";" +
                .user_interaction_time_sec + ";"


del(.host)
del(.file)
del(.timestamp)
del(.source_type)

И на выходе мы получаем такую обработанную строку:

2025-02-14 10:11:17;522;123456789101;51ab65e0-0805-498f-8508-3cc4cd459ffd;234567891012;2025-01-21 08:10:22;2025-01-21 08:10:35;234567891012;750;234567891012;;;3;1;jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4;678F728F12B9D00030BA8@2.3.4.5:1234;;;436046005035947;;urn:86661307-056805-0;PS,43604000B000DA02,2025-02-14 10:10:28;PS,43604000B000DA02,2025-02-14 10:10:28;"";900;"345678912101,91,[],[],,AC";"OIP,override,0,,,2025-02-14 10:10:23,,,,[]&CAT,,0,,,2025-02-14 10:10:31,,,,[]";16;;55;43;

Заключение

Надеюсь, мне удалось вам донести принцип работы с VRL. Мы рассмотрели синтаксис и базовые функции для работы с данными. А также попробовали применить полученные знания на практике. Не стоит сильно заострят внимание на требованиях к обработке данных из примера, поскольку главной задачей было показать базовый подход к обработке данных, а также зацепить нестандартные методы.
Для будет приятна, если эта статья может вам освоить такой замечательный инструмент как vector и поможем вам составлять пайплайны для обработки своих данных.

Теги:
Хабы:
0
Комментарии0

Публикации

Ближайшие события