Vector является очень гибким агрегатором сообщений и поддерживает собственный язык для обработки событий - VRL.
Поэтому предлагаю без лишних слов перейти к изучению его возможностей. Давайте напишем простейшую конфигурацию с использованием VRL, которая будет удалять поле из JSON файла
На входе будут такие данные:
{
"field1": "foo",
"field2": "bar"
}
Наша первая конфигурация на VRL будет выглядеть так:
sources:
file_input:
type: file
include:
- /opt/habr/vector/intro.json
ignore_checkpoints: true
transforms:
parse_file:
type: remap
inputs:
- file_input
source: |
.message = parse_json!(.message)
.field1 = .message.field1
del(.host)
del(.file)
del(.message)
del(.timestamp)
del(.source_type)
sinks:
test_output:
type: file
inputs:
- parse_file
path: /opt/habr/vector/intro_out.json
encoding:
codec: json
И на выходе мы получим такую данные:
{
"field1": "foo"
}
Зачем в нашем трансформе такая часть с удалением других полей?
del(.host)
del(.file)
del(.message)
del(.timestamp)
del(.source_type)
Дело в том, что на источник данных в нашем примере - file, который содержит помимо самой строки, ещё и другие метаданные в событии.
Вот как бы выглядел наш вывод, если оставить эти поля:
{
"field1": "foo",
"file": "/opt/habr/vector/intro.json",
"host": "test-mon",
"message": {
"field1": "foo",
"field2": "bar"
},
"source_type": "file",
"timestamp": "2025-03-17T13:09:20.653470883Z"
}
Здесь стоит обратить внимание, что наше поле field1 встречается дважды. Правда в первом случае - это отдельный объект, поскольку мы объявили его внутри нашего трансформа, а во втором случае - это часть message, то есть содержимое нашей строки.
Полезной практикой при написании VRL будет создание отдельного файла под VRL. В нашем примере VRL получился совсем короткий, но бывают случаи, когда один VRL растягивается на 100, 200 и 300 строк. Поэтому хранить всё в одном файле будет неудобно, есть риск допустить ошибки в конфигурации, а также теряется читабельность.
Чтобы вынести VRL в отдельный файл, достаточно указать параметр "file" в transform
sources:
file_input:
type: file
include:
- /opt/habr/vector/intro.json
ignore_checkpoints: true
transforms:
parse_file:
type: remap
inputs:
- file_input
file: /etc/vector/habr/intro.vrl
sinks:
test_output:
type: file
inputs:
- parse_file
path: /opt/habr/vector/intro_out.json
encoding:
codec: json
Теперь наша конфигурация выглядит компактнее, а с самим VRL можно работать в отдельном файле.
2. Синтаксис VRL
Теперь изучив базовую работу с VRL, можно изучить его синтаксис, чтобы начать работать с Vector более гибко.
Переменные
Переменные в VRL могут содержать числовые, строковые и логические значения. Их можно присвоить напрямую:
$ a=5
5
$ b = "some text"
"some text"
$ pi = 3.14
3.14
$ k = true
true
Либо же можно присвоить значение как результат выполнения функции:
$ c = snakecase(b)
"some_text"
Также с числовыми переменными можно выполнять базовые математические операции:
$ a = 5 + 5
10
$ a = 5 - 2
3
$ a = 5 * 5
25
$ a = 5 / 5
1
$ a = 7 / 5
1.4
$ a = mod(5, 3)
2
А также мы можем объединять вместе строковые переменные:
$ str1 = "some text"
"some text"
$ str2 = "other text"
"other text"
$ str1 + " and " + str2
"some text and other text"
Также стоит отметить, что переменные, начинающиеся с точки - это JSON поля, которые пойдут дальше в sink, а без неё - это обычные переменные, которые существуют в рамках трансформа.
Массивы
VRL поддерживает работу с массивами. Массивы могут хранить в себе любые типы данных, такие как переменные, числа, строки или JSON объекты:
$ a = 5
5
$ arr = ["text", 5, 3.14, true, a, {"foo": "bar"}]
["text", 5, 3.14, true, 5, { "foo": "bar" }]
Операторы и циклы
Полноценно VRL поддерживает только оператор if, как таковых других циклов по типу for или while здесь нет, привычных нам по большинству языков программирования, но в случае с for есть функция, которая выполняет схожий механизм, о котором поговорим чуть позже. А вот при попытке использовать цикл while, мы увидим лишь, что это ключевое слово зарезервировано для будущих нужд:
$ while
error[E205]: reserved keyword
┌─ :1:1
│
1 │ while
│ ^^^^^
│ │
│ this identifier name is reserved for future use in the language
│ use a different name instead
│
= see language documentation at https://vrl.dev
= try your code in the VRL REPL, learn more at https://vrl.dev/examples
В целом, полезно ознакомиться со списком зарезервированных слов, чтобы избежать конфликтов - https://vector.dev/docs/reference/vrl/expressions/#keywords
Итак, начнём с простого оператора проверки условия if - else.
Здесь мы имеем вполне знакомый синтаксис:
if *условие* {
*дейстиве*
} else if *условие* {
*дейтсвие*
} else {
*действие*
}
И попробуем это выполнить на реальном примере:
$ a = 3
if (a == 10) {
"first block"
} else if (a <= 10) {
"second block"
} else {
"third block"
}
"second block"
Теперь перейдём к перебору массивов. Как я уже писал, на данный момент VRL не поддерживает цикл for и есть лишь функция, которая выполняет схожий механизм.
Рассмотрим функцию for_each
. Для примера возьмём такой массив:
.arr = ["my", "first", "array", "loop"]
И теперь попробуем применить функцию upcase
для каждого элемента массива с помощью for_each
:
$ .new_arr = []
$
$ for_each(.arr) -> |_index, value|{
.new_arr = push(.new_arr, upcase(value))
}
$
$ .new_arr
["MY", "FIRST", "ARRAY", "LOOP"]
$ .arr = del(.new_arr)
["MY", "FIRST", "ARRAY", "LOOP"]
Как видим, здесь всё не так просто. Для начала нам надо создать новый массив, который будет хранить обработанные значения. Затем мы пробегаемся по массиву .arr
, используя for_each
. Выражение |_index, value|
, означает, что мы будет работать только со значениями массива, и индексы нас не интересуют, поэтому будет обработан каждый элемент массива. Далее мы добавляем новое обработанное значение в массив .new_arr
с помощью функции push
. После чего мы проверяем содержимое временного массива, присваиваем его нашему исходному массиву и удаляем временный.
У вас мог возникнуть вопрос, почему мы не можем просто использовать исходный массив, и применить функцию upscale
к каждому элементу? Но, к сожалению, VRL не позволяет нам работать с массивом таким образом:
$ for_each(.arr) -> |index, value| {
.arr[index] = upcase(value)
}
error[E203]: syntax error
┌─ :2:10
│
2 │ .arr[index] = upcase(value)
│ ^^^^^
│ │
│ unexpected syntax token: "Identifier"
│ expected one of: "integer literal"
│
= see language documentation at https://vrl.dev
= try your code in the VRL REPL, learn more at https://vrl.dev/examples
Но на самом деле мы можем обработать этот массив иначе, не прибегая к созданию временного массива. Для этого мы можем воспользоваться функцией map_value
, которая позволяет обрабатывать значения:
$ map_values(.arr) -> |value| { upcase!(value) }
["MY", "FIRST", "ARRAY", "LOOP"]
Теперь давайте рассмотрим аналогичное действие, но на это раз обработаем только чётные элементы массива .arr
:
$ .new_arr = []
$
$ for_each(.arr) -> |index, value| {
if mod(index, 2) == 0 {
.new_arr = push(.new_arr, upcase(value))
} else{
.new_arr = push(.new_arr, value)
}
}
$
$ .new_arr
["MY", "first", "ARRAY", "loop"]
$ .arr = del(.new_arr)
["MY", "first", "ARRAY", "loop"]
Здесь мы вновь создаём наш временный массив, который будет хранить обработанные значения. На это раз мы указываем index
без нижнего подчёркивания и в теле for_each
проверяем значение индекса. Если его остаток от деления на 2 равен 0, тогда этот элемент массива .arr
чётный и мы применяем к нему функцию upscale
. В противном случаем элемент массива .arr
будет добавлен массиву .new_arr
без изменений.
Обратите внимание, на блок else
. Его обязательно надо добавить в наш цикл, поскольку без него нечётные элементы просто будут отброшены:
$ .new_arr
["MY", ARRAY"]
Обработка ошибок
Немало важной частью является обработка ошибок внутри вашего трансформа. При написании пайплайна могут возникать неоднозначные ситуации, которые требуют отдельного внимания.
Представим ситуацию, у нас есть файл с таким содержимым:
1,,3
И мы хотим разбить каждое число в отдельное поле и умножить его на 5, разделив их запятой. Для этого напишем такой VRL:
arr = split(.message, ",")
.var1 = to_int(arr[0]) * 5
.var2 = to_int(arr[1]) * 5
.var3 = to_int(arr[2]) * 5
А сам файл конфигурации будет выглядеть так:
sources:
file_input:
type: file
include:
- /opt/habr/vector/error.txt
ignore_checkpoints: true
transforms:
parse_file:
type: remap
inputs:
- file_input
file: /etc/vector/habr/errors.vrl
sinks:
test_output:
type: file
inputs:
- parse_file
path: /opt/habr/vector/error.json
encoding:
codec: json
Теперь попробуем запустить vector с такой конфигурацией:
root@test-mon:/etc/vector/habr# vector --config errors.yaml
2025-07-16T14:30:36.222710Z INFO vector::app: Log level is enabled. level="info"
2025-07-16T14:30:36.223878Z INFO vector::app: Loading configs. paths=["errors.yaml"]
2025-07-16T14:30:36.226896Z ERROR vector::topology::builder: Configuration error. error=Transform "parse_file":
error[E103]: unhandled fallible assignment
┌─ :1:7
│
1 │ arr = split(.message, ",")
│ ----- ^^^^^^^^^^^^^^^^^^^^
│ │ │
│ │ this expression is fallible because at least one argument's type cannot be verified to be valid
│ │ update the expression to be infallible by adding a `!`: `split!(.message, ",")`
│ │ `.message` argument type is `any` and this function expected a parameter `value` of type `string`
Во-первых, мы видим, что vector сразу ругается на функцию split
, поскольку такая запись может привести к ошибкам. В нашем случае мы полностью уверены в нашем выборе, поэтому обозначим функцию split
с символом «!», чтобы вектор проигнорировал потенциально опасную запись и продолжил работу:
arr = split!(.message, ",")
И попробуем запустить vector:
root@test-mon:/etc/vector/habr# vector --config errors.yaml
2025-07-16T14:35:09.037986Z INFO vector::app: Log level is enabled. level="info"
2025-07-16T14:35:09.039181Z INFO vector::app: Loading configs. paths=["errors.yaml"]
2025-07-16T14:35:09.042971Z ERROR vector::topology::builder: Configuration error. error=Transform "parse_file":
error[E103]: unhandled fallible assignment
┌─ :3:9
│
3 │ .var1 = to_int(arr[0]) * 5
│ ------- ^^^^^^^^^^^^^^^^^^ this expression is fallible because at least one argument's type cannot be verified to be valid
И вновь непорядок, вновь vector падает из-за ошибки. На это раз ему не нравится функция to_int
.
Представим, что мы очень уверенные в себе люди, и так же добавим «!» к функции
.var1 = to_int!(arr[0]) * 5
.var2 = to_int!(arr[1]) * 5
.var3 = to_int!(arr[2]) * 5
Наверное теперь всё должно пройти хорошо и vector не свалится в ошибку:
root@test-mon:/etc/vector/habr# vector --config errors.yaml
2025-07-16T14:36:22.005848Z INFO vector::app: Log level is enabled. level="info"
2025-07-16T14:36:22.006867Z INFO vector::app: Loading configs. paths=["errors.yaml"]
2025-07-16T14:36:22.010056Z INFO vector::topology::running: Running healthchecks.
2025-07-16T14:36:22.010141Z INFO vector::topology::builder: Healthcheck passed.
2025-07-16T14:36:22.010142Z INFO vector: Vector has started. debug="false" version="0.45.0" arch="x86_64" revision="063cabb 2025-02-24 14:52:02.810034614"
2025-07-16T14:36:22.010167Z INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2025-07-16T14:36:22.010197Z INFO source{component_kind="source" component_id=file_input component_type=file}: vector::sources::file: Starting file server. include=["/opt/habr/vector/error.txt"] exclude=[]
2025-07-16T14:36:22.010491Z INFO source{component_kind="source" component_id=file_input component_type=file}:file_server: file_source::checkpointer: Loaded checkpoint data.
2025-07-16T14:36:22.010623Z INFO source{component_kind="source" component_id=file_input component_type=file}:file_server: vector::internal_events::file::source: Found new file to watch. file=/opt/habr/vector/error.txt
2025-07-16T14:36:22.010881Z ERROR transform{component_kind="transform" component_id=parse_file component_type=remap}: vector::internal_events::remap: Mapping failed with event. error="function call error for \"to_int\" at (65:80): Invalid integer \"\": cannot parse integer from empty string" error_type="conversion_failed" stage="processing" internal_log_rate_limit=true
Вроде бы всё хорошо, в этот раз vector не упал. Но в логе можно увидеть одно сообщение:
2025-07-16T14:36:22.010881Z ERROR transform{component_kind="transform" component_id=parse_file component_type=remap}: vector::internal_events::remap: Mapping failed with event. error="function call error for \"to_int\" at (65:80): Invalid integer \"\": cannot parse integer from empty string" error_type="conversion_failed" stage="processing" internal_log_rate_limit=true
В этой записи, мы видим, что на вход функции to_int
поступило пустое число. Что же тогда vector записал в файл вывода?
{"file":"/opt/habr/vector/error.txt","host":"test-mon","message":"1,,3","source_type":"file","timestamp":"2025-07-16T14:36:22.010766422Z"}
А в выходном файле у нас записано событие целиком, поскольку оно обработалось с ошибкой.
Этот пример отчётливо показывает, что не стоит злоупотреблять знаками «!», а нужно их использовать только в том случае, если вы уверены, что ваша форма записи отработает корректно.
Но как же нам тогда поступить в этой ситуации? Для начала стоит отметить, зачем мы используем функцию to_int
. Дело в том, что после обработки функцией split
, мы получаем на выходе все элементы массива arr в string. Поэтому мы не можем просто умножить на 5, нам нужно привести данные к int. И в нашем исходном файле есть пропущенное число, поэтому такой случай надо отдельно обозначить:
.var1, err = to_int(arr[0]) * 5
.var2, err = to_int(arr[1]) * 5
.var3, err = to_int(arr[2]) * 5
Посмотрим, как теперь выглядят строки в выходном файле:
{"file":"/opt/habr/vector/error.txt","host":"test-mon","message":"1,,3","source_type":"file","timestamp":"2025-07-16T17:29:05.314777030Z","var1":5,"var2":0,"var3":15}
Видно, что теперь у нас обработались наши поля, а поле с пустым значением приравнялось у нулю. Всё потому что arr[1]
у нас null и при преобразовании через `to_int, мы получаем 0
Применяем VRL на практике
Теперь давайте попробуем закрепить все полученные знания на реальном примере.
Представим такой кейс, что у нас есть банковское приложение для перевода средств между клиентами. И это приложение генерирует данные по каждой транзакции.
Возьмём за основу такие данные с такой структурой полей:
"account_receiver": "234567891012",
"account_sender": "750",
"auth_channel_end": "PS",
"auth_channel_start": "PS",
"auth_context_end": "PS,43604000B000DA02,2025-02-14T10:10:28+03:00",
"auth_context_start": "PS,43604000B000DA02,2025-02-14T10:10:28+03:00",
"authorization_time": "2025-01-21 08:10:35",
"branch_code": "123456789101",
"channel_type": "3",
"compliance_rules": "345678912101,91,[],[],,AC",
"counterparty_id_a": "",
"counterparty_id_b": "urn:86661307-056805-0",
"customer_external_id": "436046005035947",
"customer_id": "234567891012",
"customer_internal_id": "",
"device_type": "1",
"entry_time_end": "2025-02-14 10:10:28",
"entry_time_start": "2025-02-14 10:10:28",
"entrypoint_node_end": "43604000B000DA02",
"entrypoint_node_start": "43604000B000DA02",
"forwarding": "",
"forwarding_reason": "",
"operation_code": "234567891012",
"processing_code": "900",
"processing_time_sec": "55",
"security_flag": "",
"session_token_a": "jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4",
"session_token_b": "678F728F12B9D00030BA8@2.3.4.5:1234",
"session_token_c": "",
"status_http": "",
"transaction_end_time": "2025-02-14 10:11:17",
"transaction_id": "51ab65e0-0805-498f-8508-3cc4cd459ffd",
"transaction_start_time": "2025-01-21 08:10:22",
"transaction_status_code": "16",
"transaction_type": "522",
"used_services_raw": "OIP,override,0,,,2025-02-14T10:10:23+03:00,,,,[];CAT,,0,,,2025-02-14T10:10:31+03:00,,,,[]",
"user_interaction_time_sec": "43"
И сами строки в сыром виде, будут выглядеть так:
2025-02-14T10:11:17+03:00;522;123456789101;51ab65e0-0805-498f-8508-3cc4cd459ffd;234567891012;2025-01-21T10:10:22+05:00;2025-01-21T10:10:35+05:00;234567891012;750;234567
891012;;;3;1;jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4;678F728F12B9D00030BA8@2.3.4.5:1234;;;436046005035947;;urn:86661307-056805-0;PS,43604000B000DA02,2025-02-14T10:10:2
8+03:00;PS,43604000B000DA02,2025-02-14T10:10:28+03:00;"";900;"345678912101,91,[],[],,AC";"OIP,override,0,,,2025-02-14T10:10:23+03:00,,,,[];CAT,,0,,,2025-02-14T10:10:31+
03:00,,,,[]";16;;55;43;
Теперь представим себе такую задачу: наша банковская система генерирует данные в таком формате, а партнёру надо преобразовать данные в удобный для него формат. Нам необходимо будет поменять формат меток времени из RFC 3339 в простой DateTime формат, а также нам надо заменить разделители в compliance_rules и used_services_raw с «;» на «&»
Для этого набросаем такой VRL. Для начала нам надо разбить строку на части, поскольку количество элементов не фиксировано, и просто разбить через «;» не получится, то мы поделим сообщение на части с помощью регулярного выражения по ключевым моментам в каждом сообщении:
.message = string!(.message)
.message = parse_regex!(.message, r'^(?P<log_part1>.*?);"(?P<log_security_flag>)";(?P<log_processing_code>[^;]*);"(?P<log_compliance_rules>[^"]*)";"(?P<log_used_service
s_raw>[^"]*)";(?P<log_part2>.*)$')
.part1 = .message.log_part1
.security_flag = .message.log_security_flag
.processing_code = .message.log_processing_code
.compliance_rules = .message.log_compliance_rules
.used_services_raw = .message.log_used_services_raw
.part2 = .message.log_part2
Здесь мы разбиваем сообщение на несколько частей, чтобы в дальнейшем их обработать отдельно. Также мы сразу создаём уже вычлененные поля в отдельные поля.
Теперь перейдём к обработке первой части сообщения:
.part1 = string(.part1)
arr = split(.part1, ";")
.transaction_end_time, err = if arr[0] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
.transaction_type = to_string(arr[1])
.branch_code = to_string(arr[2])
.transaction_id = to_string(arr[3])
.customer_id = to_string(arr[4])
.transaction_start_time, err = if arr[5] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[5], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
.authorization_time, err = if arr[6] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[6], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
.operation_code = to_string(arr[7])
.account_sender = to_string(arr[8])
.account_receiver = to_string(arr[9])
.forwarding = to_string(arr[10])
.forwarding_reason = to_string(arr[11])
.channel_type = to_string(arr[12])
.device_type = to_string(arr[13])
.session_token_a = to_string(arr[14])
.session_token_b = to_string(arr[15])
.session_token_c = to_string(arr[16])
.customer_internal_id = to_string(arr[17])
.customer_external_id = to_string(arr[18])
.counterparty_id_a = to_string(arr[19])
.counterparty_id_b = to_string(arr[20])
.auth_context_start = to_string(arr[21])
.auth_context_end = to_string(arr[22])
Разберём это детальнее. Для начала мы разбиваем первую часть с помощью «;» и записываем это в массив arr
:
.part1 = string(.part1)
arr = split(.part1, ";")
После чего мы получаем мы преобразуем наш первый timestamp transaction_end_time
в нужный формат.
.transaction_end_time, err = if arr[0] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
Для начала мы проверяем, что первый элемент массива arr
не пустой, в противном случае он останется пустым, после чего мы преобразуем нашу метку времени в обычный unix timestamp, преобразуем его в int
и прибавляем к нему 10800 (3 часа). Это необходимо для того, чтобы не сбить наше время от таймзоны, поскольку на выходе у нас получится unix timestamp в GMT +0. И после этого мы преобразуем наш unix timestamp в нужный формат времени. Далее мы будем применять аналогичный способ для всех меток времени.
А также создаём отдельные поля:
.transaction_type = to_string(arr[1])
...
.auth_context_end = to_string(arr[22])
Аналогичным образом мы поступим и для auth_context_start
и auth_context_end
.auth_context_start = string(.auth_context_start)
arr1 = split(.auth_context_start, ",")
.auth_channel_start = to_string(arr1[0])
.entrypoint_node_start = to_string(arr1[1])
.entry_time_start, err = if arr1[2] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(arr1[2], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
.auth_context_end = string(.auth_context_end)
arr2 = split(.auth_context_end, ",")
.auth_channel_end = to_string(arr2[0])
.entrypoint_node_end = to_string(arr2[1])
.entry_time_end, err = if arr2[2] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(arr2[2], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
Дальше мы поступим очень интересно. Внутри compliance_rules
может идти список из нескольких правил, но в наем случае всегда одно. Мы будем менять разделитель «;» между правилами на «&»:
.compliance_rules = replace(.compliance_rules, ";", "&")
А также нам надо заменить запятые внутри квадратный скобок. Для этого мы воспользуемся replace_with
, чтобы найти внутри compliance_rules
блок с фигурными скобками и заменить внутри запятые:
.compliance_rules, err = replace_with(.compliance_rules, r'\[(.*?)\]') -> |m| {
"[" + replace(m.captures[0], ",", "&") + "]"
}
Здесь передаём функции compliance_rules
и задаём регулярное выражение для поиска блока с квадратными скобками. Полученное совпадение записывается в переменную m и уже внутри неё меняются запятые на «&».
Далее нам надо преобразовать метку внутри used_services_raw
:
.used_services_raw = string(.used_services_raw)
.services_info = split(.used_services_raw, ";")
.parsed_services_info = []
for_each(.services_info) -> |_index, value| {
next_service = split(value, ",")
service_call_time, err = if next_service[5] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(next_service[5], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
obj = {
"service_name": to_string(next_service[0]),
"service_mode": to_string(next_service[1]),
"number_of_diversions": to_string(next_service[2]),
"assoc_party_id": to_string(next_service[3]),
"service_id": to_string(next_service[4]),
"service_call_time": service_call_time,
"number_of_participants": to_string(next_service[6]),
"action_type": to_string(next_service[7]),
"customer_group_tag": to_string(next_service[8]),
"service_cost_info": to_string(next_service[9])
}
.parsed_services_info = push(.parsed_services_info, obj)
}
Внутри used_services_raw
у нас может идти сколько угодно сервисов, поэтому мы разбиваем его с помощью «;» и записываем в массив service_info
.
Далее мы парсим каждый сервис из массива, пробежавшись по нему функцией for_each
. Внутри мы по знакомой схеме меняем timestamp, записываем это в словарь obj
и записываем его в отдельное поле.
Затем мы идёт простая часть с вычленением полей из второй части сообщения:
.part2 = string(.part2)
arr3 = split(.part2, ";")
.transaction_status_code = to_string(arr3[0])
.status_http = to_string(arr3[1])
.processing_time_sec = to_string(arr3[2])
.user_interaction_time_sec = to_string(arr3[3])
И теперь нам остаётся собрать воедино все разбитые поля в .message
, чтобы записать обработанную строку в выходной файл.
.message = .transaction_end_time + ";" +
.transaction_type + ";" +
.branch_code + ";" +
.transaction_id + ";" +
.customer_id + ";" +
.transaction_start_time + ";" +
.authorization_time + ";" +
.operation_code + ";" +
.account_sender + ";" +
.account_receiver + ";" +
.forwarding + ";" +
.forwarding_reason + ";" +
.channel_type + ";" +
.device_type + ";" +
.session_token_a + ";" +
.session_token_b + ";" +
.session_token_c + ";" +
.customer_internal_id + ";" +
.customer_external_id + ";" +
.counterparty_id_a + ";" +
.counterparty_id_b + ";" +
.auth_channel_start + "," +
.entrypoint_node_start + "," +
.entry_time_start + ";" +
.auth_channel_end + "," +
.entrypoint_node_end + "," +
.entry_time_end + ";\"" +
.security_flag + "\";" +
.processing_code + ";\"" +
.compliance_rules + "\";\""
for_each(.parsed_services_info) -> |index, obj| {
if index != 0 {
.message = .message + "&"
}
.message = .message +
obj.service_name + "," +
obj.service_mode + "," +
obj.number_of_diversions + "," +
obj.assoc_party_id + "," +
obj.service_id + "," +
obj.service_call_time + "," +
obj.number_of_participants + "," +
obj.action_type + "," +
obj.customer_group_tag + "," +
obj.service_cost_info
}
.message = .message + "\";" +
.transaction_status_code + ";" +
.status_http + ";" +
.processing_time_sec + ";" +
.user_interaction_time_sec + ";"
Здесь мы поочерёдно прописываем в нужном порядке наши поля, добавляя к ним разделители.
Отдельно можно выделить секцию с parsed_services_info
. Нам также надо заменить разделитель между сервисами на «&». Поэтому мы поочерёдно записываем содержимое словарей в message
, а также добавляем перед ним «&». Но здесь, стоит заметить, что у нас добавлена проверка для первого элемента, чтобы перед ним не ставился разделитель, поскольку перед ним нет другого сервиса, иначе первый сервис тоже бы начинался с разделителя, что неверно.
Конечный VRL будет выглядеть так:
.message = string!(.message)
.message = parse_regex!(.message, r'^(?P<log_part1>.*?);"(?P<log_security_flag>)";(?P<log_processing_code>[^;]*);"(?P<log_compliance_rules>[^"]*)";"(?P<log_used_services_raw>[^"]*)";(?P<log_part2>.*)$')
.part1 = .message.log_part1
.security_flag = .message.log_security_flag
.processing_code = .message.log_processing_code
.compliance_rules = .message.log_compliance_rules
.used_services_raw = .message.log_used_services_raw
.part2 = .message.log_part2
.part1 = string(.part1)
arr = split(.part1, ";")
.transaction_end_time, err = if arr[0] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
.transaction_type = to_string(arr[1])
.branch_code = to_string(arr[2])
.transaction_id = to_string(arr[3])
.customer_id = to_string(arr[4])
.transaction_start_time, err = if arr[5] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[5], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
.authorization_time, err = if arr[6] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[6], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
.operation_code = to_string(arr[7])
.account_sender = to_string(arr[8])
.account_receiver = to_string(arr[9])
.forwarding = to_string(arr[10])
.forwarding_reason = to_string(arr[11])
.channel_type = to_string(arr[12])
.device_type = to_string(arr[13])
.session_token_a = to_string(arr[14])
.session_token_b = to_string(arr[15])
.session_token_c = to_string(arr[16])
.customer_internal_id = to_string(arr[17])
.customer_external_id = to_string(arr[18])
.counterparty_id_a = to_string(arr[19])
.counterparty_id_b = to_string(arr[20])
.auth_context_start = to_string(arr[21])
.auth_context_end = to_string(arr[22])
.auth_context_start = string(.auth_context_start)
arr1 = split(.auth_context_start, ",")
.auth_channel_start = to_string(arr1[0])
.entrypoint_node_start = to_string(arr1[1])
.entry_time_start, err = if arr1[2] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(arr1[2], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
.auth_context_end = string(.auth_context_end)
arr2 = split(.auth_context_end, ",")
.auth_channel_end = to_string(arr2[0])
.entrypoint_node_end = to_string(arr2[1])
.entry_time_end, err = if arr2[2] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(arr2[2], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
.compliance_rules = replace(.compliance_rules, ";", "&")
.compliance_rules, err = replace_with(.compliance_rules, r'\[(.*?)\]') -> |m| {
"[" + replace(m.captures[0], ",", "&") + "]"
}
.used_services_raw = string(.used_services_raw)
.services_info = split(.used_services_raw, ";")
.parsed_services_info = []
for_each(.services_info) -> |_index, value| {
next_service = split(value, ",")
service_call_time, err = if next_service[5] != "" {
unix_timestamp = to_unix_timestamp(parse_timestamp!(next_service[5], "%+"))
unix_timestamp = to_int(unix_timestamp) + 10800
format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")
} else {
""
}
obj = {
"service_name": to_string(next_service[0]),
"service_mode": to_string(next_service[1]),
"number_of_diversions": to_string(next_service[2]),
"assoc_party_id": to_string(next_service[3]),
"service_id": to_string(next_service[4]),
"service_call_time": service_call_time,
"number_of_participants": to_string(next_service[6]),
"action_type": to_string(next_service[7]),
"customer_group_tag": to_string(next_service[8]),
"service_cost_info": to_string(next_service[9])
}
.parsed_services_info = push(.parsed_services_info, obj)
}
.part2 = string(.part2)
arr3 = split(.part2, ";")
.transaction_status_code = to_string(arr3[0])
.status_http = to_string(arr3[1])
.processing_time_sec = to_string(arr3[2])
.user_interaction_time_sec = to_string(arr3[3])
.message = .transaction_end_time + ";" +
.transaction_type + ";" +
.branch_code + ";" +
.transaction_id + ";" +
.customer_id + ";" +
.transaction_start_time + ";" +
.authorization_time + ";" +
.operation_code + ";" +
.account_sender + ";" +
.account_receiver + ";" +
.forwarding + ";" +
.forwarding_reason + ";" +
.channel_type + ";" +
.device_type + ";" +
.session_token_a + ";" +
.session_token_b + ";" +
.session_token_c + ";" +
.customer_internal_id + ";" +
.customer_external_id + ";" +
.counterparty_id_a + ";" +
.counterparty_id_b + ";" +
.auth_channel_start + "," +
.entrypoint_node_start + "," +
.entry_time_start + ";" +
.auth_channel_end + "," +
.entrypoint_node_end + "," +
.entry_time_end + ";\"" +
.security_flag + "\";" +
.processing_code + ";\"" +
.compliance_rules + "\";\""
for_each(.parsed_services_info) -> |index, obj| {
if index != 0 {
.message = .message + "&"
}
.message = .message +
obj.service_name + "," +
obj.service_mode + "," +
obj.number_of_diversions + "," +
obj.assoc_party_id + "," +
obj.service_id + "," +
obj.service_call_time + "," +
obj.number_of_participants + "," +
obj.action_type + "," +
obj.customer_group_tag + "," +
obj.service_cost_info
}
.message = .message + "\";" +
.transaction_status_code + ";" +
.status_http + ";" +
.processing_time_sec + ";" +
.user_interaction_time_sec + ";"
del(.host)
del(.file)
del(.timestamp)
del(.source_type)
И на выходе мы получаем такую обработанную строку:
2025-02-14 10:11:17;522;123456789101;51ab65e0-0805-498f-8508-3cc4cd459ffd;234567891012;2025-01-21 08:10:22;2025-01-21 08:10:35;234567891012;750;234567891012;;;3;1;jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4;678F728F12B9D00030BA8@2.3.4.5:1234;;;436046005035947;;urn:86661307-056805-0;PS,43604000B000DA02,2025-02-14 10:10:28;PS,43604000B000DA02,2025-02-14 10:10:28;"";900;"345678912101,91,[],[],,AC";"OIP,override,0,,,2025-02-14 10:10:23,,,,[]&CAT,,0,,,2025-02-14 10:10:31,,,,[]";16;;55;43;
Заключение
Надеюсь, мне удалось вам донести принцип работы с VRL. Мы рассмотрели синтаксис и базовые функции для работы с данными. А также попробовали применить полученные знания на практике. Не стоит сильно заострят внимание на требованиях к обработке данных из примера, поскольку главной задачей было показать базовый подход к обработке данных, а также зацепить нестандартные методы.
Для будет приятна, если эта статья может вам освоить такой замечательный инструмент как vector и поможем вам составлять пайплайны для обработки своих данных.