
Привет, Хабр!
Меня зовут Ксения Змичеровская, я системный аналитик в компании R-Vision. Совсем недавно мы выпустили собственный плагин R-Object для работы с R-Vision SIEM. Наш плагин – это полноценная среда разработки с преднастроенными шаблонами объектов, подсветкой синтаксиса, подсказками и валидацией VRL-кода. Плагин позволяет выполнять автоматическое тестирование правил, проводить полнофункциональную пошаговую отладку, а также запускать тесты на производительность.
Объекты экспертизы в R-Vision SIEM составляются с помощью специального языка R-Object. Каждый объект описывается в отдельном yaml-файле с расширением .ro и может содержать блоки на языке выражений VRL. В этой статье мы подробно изучим работу с плагином от его установки до запуска тестов и написания правил.
Чтобы вам было легче меня понять, давайте ознакомимся с основными терминами:
Syslog – это стандарт отправки и регистрации сообщений о происходящих в системе событиях.
VRL (Vector Remap Language) – это язык на основе выражений, разработанный для безопасной и эффективной работы с данными наблюдения, такими как журналы и метрики.
Корреляционное событие – событие, которое создается при соблюдении условий, заданных в правилах корреляции.
Raw (сырое событие) – событие до применения правила нормализации.
Базовое событие (Base event) – событие, которое пришло на вход и может привести к срабатыванию правила корреляц��и.
Парсинг – это извлечение структурированной информации из неструктурированных или полуструктурированных данных.
Маппинг – это процесс создания сопоставлений элементов данных между различными моделями данных
Начнем с самого простого – нам потребуется редактор кода Visual Studio Code, в который мы сможем загрузить расширение и начать работу. Загрузочный файл можно найти на официальном маркетплейсе Microsoft Visual Studio или во вкладке «Расширения» в VS Code. Для загрузки плагина в VS Code:
Откройте Visual Studio Code.
Выберите значок расширений в левой области.
В поиске введите RObject LSP extension.
Нажмите кнопку Install или Установить.
Перезагрузите приложение VS Code.

Знакомство с плагином

В карточке расширения можно найти актуальную информацию о:
параметрах плагина;
новых версиях (release notes).
поддерживаемых командах:
создание файла (New robject file);
проверка (Check);
запуск тестов (Run a test);
запуск теста производительности (Run a bench).
поддерживаемых языках разработки:
R-Object;
VRL.
Основные функции
Для работы с функциями плагина R-Object необходимо ввести команду в строку VS Code. Откройте палитру команд кода (Command Palette), нажав Command+Shift+P на macOS или Control+Shift+P в Windows и Linux. Эта комбинация клавиш откроет меню команд в верхней части окна.

Создание файла
Для создания нового файла введите команду RObject: New robject file.

Далее будет предложен список объектов, которые можно создать в плагине:
Правило нормализации (Normalization rule)
Обрабатывает и преобразовывает поступающие события согласно условиям и коду нормализации.Правило корреляции (Correlation rule)
Отвечает за обнаружение событий, которые соответствуют заданным критериям, а также может инициировать создание корреляционных событий и оповещений.Правило агрегации (Aggregation rule)
Описывает логику объединения нескольких событий безопасности в одно агрегированное.Правило сегментации (Segmentation rule)
Определяет логику группировки корреляционных событий в оповещения.Активный список (Active list)
Двумерный массив данных для накопления информации. Его можно использовать как буфер при корреляции событий.
При выборе любого из объектов, пользователю будет представлен готовый шаблон структуры с заполненными полями. С другими примерами можно ознакомиться, вызвав команду RObject: Examples. Таким образом, количество доступных примеров будет расширяться.
Структура файла в плагине R-Object
Если посмотреть на созданный файл, можно разделить синтаксис написания объектов в R-Object на три логических блока: общая информация об объекте, описание логики работы и тесты для автоматического тестирования.
В общей информации указываются метаданные нашего объекта – параметры id, name, type, version, author, description и т. д. Пользователь на свое усмотрение может добавлять любые кастомные поля, которые захочет.
Функции подсказок и валидации синтаксиса в плагине помогут заполнить все необходимые для выбранного объекта экспертизы поля, а автоподстановка ускорит процесс написания кода. Для добавления кода нужно нажать Control+Пробел (Windows/Linux) или Command+Пробел (macOS) и выбрать нужный параметр.

Логическая составляющая у каждого объекта экспертизы своя. Для правил нормализации это параметры filter, в которых мы можем отфильтровать поток входящих событий, и normalizer, в котором определяется схема нормализации «сырого» события.
Создание тестов
Блок с тестами необходим для проверки работы описанной логики на массиве тестовых событий и просмотра полученного результата. Для создания тестов нам необходимо назвать блок (указав значения в параметре name), заполнить тестовые данные и указать ожидаемый результат.
tests: - name: Test 1 events: - { "id": "64a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:45:08", "dvendor": "Microsoft", "shost": "arm1", "externalId": "4720", "Account_name": "Admin", "newuser": "Aleg", "OldUacValue": "0x0", "PrimaryGroupID": "513" } - { "id": "36a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:46:08", "dvendor": "Microsoft", "shost": "arm3", "externalId": "4732", "Account_name": "Admin", "newuser": "Oleg", "targetSid": "S-1-5-32-545", "OldUacValue": "0x0", "PrimaryGroupID": "513" } - { "id": "67a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:46:08", "dvendor": "Microsoft", "shost": "arm4", "externalId": "4720", "Account_name": "MikeDA", "newuser": "Leo", "OldUacValue": "0x0", "PrimaryGroupID": "513" } - { "id": "68a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:47:08", "dvendor": "Microsoft", "shost": "arm5", "externalId": "4720", "Account_name": "Kali", "newuser": "Paul", "OldUacValue": "0x15", "PrimaryGroupID": "513" } - { "id": "11a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:45:08", "dvendor": "Microsoft", "shost": "arm16", "externalId": "4720", "Account_name": "adminadmin", "newuser": "Oleg1", "OldUacValue": "0x15", "PrimaryGroupID": "513" } assertion: !vrl | assert_eq!(.message, "На хосте arm1 был создан пользователь Admin")
Кроме проверки работоспособности логики, можно написать тесты производительности:

В результате мы получим информацию о затраченном времени работы правила. Это позволит отследить изменения в производительности правила и исключить деградацию скорости.
benches: - name: Test repetitions: 10000 events: - { "id": "64a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:45:08", "dvendor": "Microsoft", "shost": "arm1", "externalId": "4720", "Account_name": "Admin", "newuser": "Aleg", "OldUacValue": "0x0", "PrimaryGroupID": "513" } - { "id": "36a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:46:08", "dvendor": "Microsoft", "shost": "arm3", "externalId": "4732", "Account_name": "Admin", "newuser": "Oleg", "targetSid": "S-1-5-32-545", "OldUacValue": "0x0", "PrimaryGroupID": "513" } - { "id": "67a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:46:08", "dvendor": "Microsoft", "shost": "arm4", "externalId": "4720", "Account_name": "MikeDA", "newuser": "Leo", "OldUacValue": "0x0", "PrimaryGroupID": "513" } - { "id": "68a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:47:08", "dvendor": "Microsoft", "shost": "arm5", "externalId": "4720", "Account_name": "Kali", "newuser": "Paul", "OldUacValue": "0x15", "PrimaryGroupID": "513" } - { "id": "11a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:45:08", "dvendor": "Microsoft", "shost": "arm16", "externalId": "4720", "Account_name": "adminadmin", "newuser": "Oleg1", "OldUacValue": "0x15", "PrimaryGroupID": "513" }
Разбор примера написания правила нормализации
Давайте попробуем написать простое правило нормализации. Допустим, у нас есть поток данных из некоторого источника в формате syslog.
raw: "2023-12-11T12:32:41.000Z 5.86.210.12 - user=user2 act=file op=del host=pc1 name=pomodoro.txt"
Мы хотим преобразовать эти события к нашей модели данных. Для этого необходимо:
Заполнить блок filter
Мы хотим, чтобы нормализовался весь поток, поэтому нам необходимо указать true, чтобы через фильтр прошли все события.
filter: !vrl | # добавьте свой VRL код, например true # Обратите внимание, что входные события не могут быть изменены в этом блоке
Если нам нужно отфильтровать входящий поток, можно указать необходимое условие. Например, мы хотим работать с событиями веб-сервера Nginx, в которых есть «HTTP». Используем функцию contains, которая определяет, содержит ли строка (наше сырое событие .raw) значение, указанное в подстроке (в нашем случае это будет «HTTP»). Важно отметить, что на языке VRL сырое событие хранится в точке (.), через которую осуществляется обращение к переменным. Чтобы функция работала корректно, преобразуем сырое событие в строку, используя функцию to_string, и добавим обработку ошибок на случай, если придёт пустое событие и мы не сможем его преобразовать. Итоговое выражение будет выглядеть так:
contains(to_string(.raw) ?? "", "HTTP")
Заполнить блок normalizer
Преобразуем наше сырое (.raw) событие с помощью функции parse_syslog (функция парсит значение в формате syslog) и запишем получившееся значение в переменную parsed_audit_event. Эта функция преобразует наше сырое событие в два блока: .timestamp и .message;
Временную метку можно записать в наше поле без дальнейшего преобразования. Чтобы создать поле исходного нормализированного события, необходимо задать его имя с точкой в начале и присвоить этому атрибуту значение .Timestamp = parsed_audit_event.timestamp;
Мы можем разбить вторую часть событий, которая содержится в .message по ключам с помощью функции parse_key_value и результаты сохранить в переменной parsed_message, не забывая при этом про обработку ошибок;
Когда мы провели обработку данных, можно приступать к маппингу остальных полей нашей модели и полученных событий. Проделаем аналогичное действие, как с временной меткой: значения из поля user положим в поле duser, поле act будет в objType, поле op в fileType, а host в dhost.
В результате мы получаем следующий код:
normalizer: !vrl | # добавьте свой VRL код, например parsed_audit_event = parse_syslog!(.raw) .Timestamp = parsed_audit_event.timestamp parsed_message = parse_key_value(parsed_audit_event.message) ?? "" .duser = parsed_message.user .objType = parsed_message.act .fileType = parsed_message.op .dhost = parsed_message.host
Осталось добавить блок с тестами, в котором мы укажем примеры событий и ожидаемый результат (например, в первом событии отчет будет с нуля [0], а поле duser будет равно user2). Желательно добавить проверку всех полей, чтобы найти возможные ошибки при нормализации.
Чтобы увидеть все получившиеся события, выведем в блок с ожидаемым результатом точку (в точке хранится наше финальное событие).
tests: # Наименование теста. Лучше делать уникальным для каждого теста в рамках правила - name: Test event # Список тестовых событий events: - raw: "2022-08-11T12:32:41.000Z 5.86.210.12 - user=user2 act=file op=del host=pc1 name=pomodoro.txt" - raw: "2022-08-11T12:32:41.000Z 5.86.210.12 - user=user3 act=file op=add host=pc2 name=example.txt" - raw: "2022-08-11T12:32:41.000Z 5.86.210.12 - user=user4 act=file op=mod host=pc3 name=example.txt" - raw: "2022-08-11T12:32:41.000Z 5.86.210.12 - user=user5 act=file op=add host=pc4 name=example.txt" - raw: "2022-08-11T12:32:41.000Z 5.86.210.12 - user=user6 act=file op=create host=pc5 name=example.txt" assertion: !vrl | assert_eq!(.[0].duser, "user2") assert_eq!(.[0].dhost, "pc1") assert_eq!(.[0].fileType, "del") assert_eq!(.[0].objType, "file") .
Для проверки файла на корректность введите команду RObject: Check. Мы видим, что в блоке с выводом результатов наше правило прошло валидацию.

Для запуска теста введите команду RObject: Run a test. В результате мы получаем сообщение об успешном выполнении теста и список нормализованных событий. Нормализация действительно прошла как мы ожидали.

При написании правила нормализации мы использовали функцию parse_syslog, которая преобразовала наше сырое событие в два блока: .timestamp и .message. Чтобы проверить эту гипотезу и посмотреть на работу «под капотом», предлагаю ввести тестовые поля и записать в них предварительные результаты. Добавляем в код три тестовые переменные:
.test = .
В первой переменной мы будем выводить наше первоначальное событие. Запускаем тесты и видим, что наша переменная вернула следующее значение:

Разберем наше событие как матрешку: сделаем еще одно поле и в него запишем значение .raw, что на языке VRL будет показывать содержимое raw из сырого события.
.test_raw = .raw
Опять запускаем тесты и смотрим результат:

Преобразуем .raw, используя функцию parse_syslog, и смотрим результат. Создаем еще одно тестовое поле, чтобы вывести в него получившееся значение.
parsed_audit_event = parse_syslog!(.raw) .test_syslog = parsed_audit_event
После запуска тестов мы видим, что поле test_syslog действительно разбилось на два блока: .timestamp и .message.

Мы можем обращаться к ним и разбирать нашу «матрешку» на новые ключи. Добавим еще пару тестовых полей:
parsed_message = parse_key_value(parsed_audit_event.message) ?? "" .test_message = parsed_audit_event.message .test_key_value = parsed_message
В очередной раз запускаем тесты и смотрим на вывод:

Теперь можно обращаться к каждому ключу самостоятельно и брать из них значения. Благодаря этому, маппинг полей будет очень простым и изящным. Использование тестовых полей помогло нам понять, как выглядит событие на промежуточных результатах. Это отличный способ отладить правило нормализации и увидеть работу функций.
Тестирование на производительность
Уделим внимание тестированию нашего правила. Для этого добавим блок benches с такими же тестовыми событиями:
benches: - name: Test bench repetitions: 10000 events: - raw: "2022-08-11T12:32:41.000Z 5.86.210.12 - user=user2 act=file op=del host=pc1 name=pomodoro.txt" - raw: "2022-08-11T12:32:41.000Z 5.86.210.12 - user=user3 act=file op=add host=pc2 name=example.txt" - raw: "2022-08-11T12:32:41.000Z 5.86.210.12 - user=user4 act=file op=mod host=pc3 name=example.txt" - raw: "2022-08-11T12:32:41.000Z 5.86.210.12 - user=user5 act=file op=add host=pc4 name=example.txt" - raw: "2022-08-11T12:32:41.000Z 5.86.210.12 - user=user6 act=file op=create host=pc5 name=example.txt"
Запускаем тесты на производительность командой RObject: Run a bench:

Теперь представим, что мы не можем (или не хотим) использовать встроенные функции VRL. Тогда маппинг нашего правила необходимо делать с помощью регулярных выражений. Для этого разобьем наше сырое сообщение на ключи и присвоим каждому из них переменную. Далее, берем значение и складываем его в наши поля. Новый код будет выглядеть так:
message1 = parse_regex_all!(.raw, r'(?P<key_value>(\S*))') .timestamp = message1[0].key_value duser = parse_key_value(message1[3].key_value) ?? "" .duser = duser.user objType = parse_key_value(message1[4].key_value) ?? "" .objType = objType.act fileType = parse_key_value(message1[5].key_value) ?? "" .fileType = fileType.op dhost = parse_key_value(message1[6].key_value) ?? "" .dhost = dhost.host fname = parse_key_value(message1[7].key_value) ?? "" .fname = fname.name
Давайте посмотрим, сколько времени займет работа текущей версии маппинга полей. Вводим команду для запуска теста на производительность:

Как мы видим, время работы увеличилось почти в два раза. Действительно, использование регулярных выражений – это довольно тяжелая операция и лучше не проводить ее без необходимости. Таким образом, мы можем смотреть не только на результат, но и на затраты для выполнения кода.
Разбор примера написания правила корреляции
Корреляции на основе последовательности из двух событий
Предположим, мы хотим зафиксировать два события: создание пользователя и добавление его в группу администраторов. Время между двумя событиями не будет превышать 5 минут. Для этого нам потребуется определить два псевдонима (алиаса) и объединить их по названию хоста и аккаунту. Алиасы позволяют группировать события в отдельные потоки и связывать их в логические цепочки. Время жизни корреляционного окна будет 300 секунд (5 минут). Обогатим корреляционное событие сообщением, в котором укажем, на каком хосте и какой пользователь стал администратором. Наш итоговый код будет выглядеть следующим образом:
# Уникальный идентификатор правила. Можно сгенерировать UUID средствами VS Code id: r-vision.ru/create_new_local_user # Тип правила type: correlation_rule # Человекочитаемый идентификатор правила name: create_new_local_user # Версия редакции этого (с таким id) правила (для системы контроля версий) version: 0.0.1 # Уровень угрозы корреляционного события, испускаемого правилом severity: low # Время жизни события в алиасе ttl: 300 # Создаем два алиаса по разным событиям: создание пользователя и добавление его в локальную группу (администраторы) aliases: event4720: filter: !vrl | .externalId == "4720" && .OldUacValue == "0x0" && .PrimaryGroupID == "513" event4732/4750: filter: !vrl | .externalId == "4732" || .externalId == "4750" # Объединяем события по названию акаунта и хосту select: alias: event4720 join: alias: event4732/4750 on: - eq: { event4720: .Account_name, event4732/4750: .Account_name} - eq: { event4720: .dhost, event4732/4750: .dhost} #Обогатим наше событие сообщением on_correlate: !vrl | duser = to_string(%event4720.Account_name) ?? "" dhost = to_string(%event4720.shost) ?? "" .message = "На хосте {{dhost}} был создан пользователь {{duser}} c правами админа"
Добавим тестовые данные, чтобы посмотреть работоспособность написанной логики правила:
# Список тестов правила tests: - name: Test 1 events: - { "id": "64a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:45:08", "dvendor": "Microsoft", "shost": "arm1", "externalId": "4720", "Account_name": "Admin", "newuser": "Aleg", "OldUacValue": "0x0", "PrimaryGroupID": "513" } - { "id": "36a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:46:08", "dvendor": "Microsoft", "shost": "arm3", "externalId": "4732", "Account_name": "Admin", "newuser": "Oleg", "targetSid": "S-1-5-32-545", "OldUacValue": "0x0", "PrimaryGroupID": "513" } - { "id": "67a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:46:08", "dvendor": "Microsoft", "shost": "arm4", "externalId": "4720", "Account_name": "MikeDA", "newuser": "Leo", "OldUacValue": "0x0", "PrimaryGroupID": "513" } - { "id": "68a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:47:08", "dvendor": "Microsoft", "shost": "arm5", "externalId": "4720", "Account_name": "Kali", "newuser": "Paul", "OldUacValue": "0x15", "PrimaryGroupID": "513" } - { "id": "11a438cc-650a-4332-9085-864072c6cfec", "timestamp": "28.08.2023 14:45:08", "dvendor": "Microsoft", "shost": "arm16", "externalId": "4720", "Account_name": "adminadmin", "newuser": "Oleg1", "OldUacValue": "0x15", "PrimaryGroupID": "513" } assertion: !vrl | assert_eq!(.message, "На хосте arm1 был создан пользователь Admin")
Для запуска теста по шагам введите команду RObject: Run a test with trace.

Мы видим работу правила на каждом этапе:
Этап вхождения базового события (Base event).
На этом этапе мы видим событие, которое пришло на вход и может в дальнейшем привести к сработке правила корреляции.Этап проверки фильтра.
В нашем правиле мы не использовали фильтр, но можно было добавить разграничение входного потока, чтобы выделить события, поступающие с Windows:filter: !vrl | .dvendor == "Microsoft"
Этап проверки соответствия нашего события фильтру первого алиаса.
- Открытие корреляционного окна (если фильтр удовлетворяет условию).Этап проверки соответствия нашего события фильтру второго алиаса.
- Соединение двух событий (если фильтр удовлетворяет условию).Этап обогащения корреляционного события (если случилась корреляция).
Вывод результата в виде получившихся корреляционных событий.
Таким образом можно посмотреть на работу правила на каждом этапе и увидеть, что пошло не так.
Запуск теста в интерактивном режиме
Для запуска теста в интерактивном режиме введите команду RObject: Run a test in interactive mode.

Текущий функционал похож на запуск тестов по шагам. Отличие заключается в том, что весь результат выводится нам не сразу, а мы проходим все шаги в отладчике.
При желании можно добавлять тестовые переменные (как мы делали в правиле нормализации) и отладочные команды, чтобы отлеживать изменения при реализации более сложной логики.
Правило корреляции с использованием группировки
Давайте разберем еще один пример написания правила корреляции, в котором нам необходимо группировать несколько однотипных событий. Предположим, мы хотим за 10 минут отследить запуск большого количества легитимных команд. Наш код будет выглядеть следующим образом:
id: test_rule type: correlation_rule name: test version: 0.0.1 # Общее время для корреляционного окна 10 минут ttl: 600 severity: critical filter: !vrl | # из всего потока выделяем события винды .dvendor == "Microsoft" # Фильтруем поток событий по EventID (создание процесса) и самому процессу, создавая алиас aliases: processes: filter: !vrl | .externalId == "4726" && ( .dproc == "ipconfig.exe" || .dproc == "nltest.exe" || .dproc == "ping.exe") # У нас один алиас, поэтому его не надо объединять, но т.к. блок обязательный, то заполняем его select: alias: processes #Группируем события по пользователю и хосту, как только таких событий будет 3, то испускаем корр. событие group: - alias: processes by: - duser - dhost count: 3 # Обогащаем получившиеся корр. событие on_correlate: !vrl | dhost = to_string(%processes[0].dhost) ?? "" duser = to_string(%processes[0].duser) ?? "" .dprocs = [to_string(%processes[0].dproc), to_string(%processes[1].dproc), to_string(%processes[2].dproc) ] ?? "" .message = "На хосте {{dhost}} под пользователем {{duser}} были запущены программы " + (join(.dprocs, ",") ?? "") .groupedBy = ["duser","dhost"] tests: - name: Test 1 events: - { "id": "1","timestamp": "26.02.2024 14:46:08", "dvendor": "Microsoft","externalId": "4726", "dhost": "arm", "dproc": "ipconfig.exe", "duser": "user"} - { "id": "2","timestamp": "26.02.2024 14:47:08", "dvendor": "Microsoft","externalId": "4726", "dhost": "arm", "dproc": "ping.exe", "duser": "user"} - { "id": "3","timestamp": "26.02.2024 14:45:08", "dvendor": "Microsoft","externalId": "4726", "dhost": "arm", "dproc": "nltest.exe", "duser": "user"} assertion: !vrl | assert_eq!(.[0].dhost, "arm0") assert_eq!(.[1].duser, "user")
Давайте запустим тесты и посмотрим на получившийся результат:

Заключение
В этой статье мы подробно изучили функционал плагина R-Object и его возможности. Надеюсь, этот туториал поможет вам добавить в процесс разработки новые функции, улучшить существующие и сделать работу с R-Vision SIEM более удобной и персонализированной. Если у вас остались любые вопросы про наш плагин, я с радостью отвечу на них в комментариях!
