Собираем логи фаервола Mikrotik в базу данных

Добрый день.

Хочу рассказать как легко и непринужденно можно настроить сервер сбора метаданных сетевого трафика для маршрутизаторов Микротик.

Цель: Целью будет, хранение «пережеванных» логов фаервола в базе данных, для последующего анализа.

Средства: Для реализации подойдет любой свежий дистрибутив Linux с rsyslogd v8 и выше, возможно предложенный синтаксис будет работать и на v7. Так же нам потребуется СУБД, я выбрал mariadb. Прирост БД будет варьироваться от количества журналируемых правил, потому размер накопителя на ваше усмотрение, в моем случае журналируются 30-40 правил, что в день составляет примерно 1200 тысяч строк. За месяц использования БД включая индексы выросла до 3,8Гб.

Механика: Маршрутизатор отправляет лог на удаленный сервер по UDP. Сервер rsyslog с помощью регулярных выражений проводит очистку строк от лишней информации, формирует SQL вставку и отправляет её в СУБД. СУБД, с помощью триггера до вставки, проводит дополнительную очистку и разделение полей, которые не удалось разобрать в rsyslog.

Настраиваем RSYSLOG


Редактируем файл /etc/rsyslog.conf
Добавляем туда следующие строки:

module(load="ommysql")
module(load="imudp")
input(type="imudp" port="514")

Тем самым подгружаем необходимые модули и открываем 514 порт UDP.

Строка лога от Микротика выглядит так:

20180927155341  BLOCKSMKNETS forward: in:ether6 - LocalTORF out:VLAN55 - RT_INET, src-mac 00:15:17:31:b8:d7, proto TCP (SYN), 192.168.0.234:2457->192.168.6.14:65535, len 60

Как видим, много лишнего для хранения в БД и внятная выборка будет затруднена.
По идее, мне надо складывать такие данные:

20180927155341 ether6 VLAN5 192.168.0.234 2457 192.168.6.14 65535 00:15:17:31:b8:d7 TCP SYN forward BLOCKSMKNETS 60

Получить такую строку средствами только одного rsyslog у меня не получилось. Регулярки rsyslog используют POSIX ERE/BRE, потому нет возможности применить такие фичи как, lookahead или lookbehind.

Тут есть инструмент который позволяет проводить отладку регулярок, попробуйте может у вас получится отделить порт от адреса, а так же название интерфейса от in: и out:. Только учтите что у некоторых протоколов sport и dport отсутствуют.

В общем у меня на выходе получилось так:

20180927155341 in:ether6 out:VLAN5 192.168.0.234:2457 192.168.6.14:65535 00:15:17:31:b8:d7 TCP (SYN) forward BLOCKSMKNETS 60

Тут есть документация как готовить регулярки rsyslog.

В конечном виде файл конфигурации приема лога от Микротика /etc/rsyslog.d/20-remote.conf будет выглядеть так:

$template tpl_traflog,"insert into traflog.traffic (datetime, inif, outif, src, dst, smac, proto, flags, chain, logpref, len) values ('%timereported:::date-mysql%', '%msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end%', '%msg:R,ERE,0,BLANK:\b[A-X]{3,4}\b--end%', '%msg:R,ERE,0,BLANK:\([A-Z]+\)|\(([A-Z]+\,){1,3}[A-Z]+\)--end%', '%msg:R,ERE,0,DFLT:[a-x]+--end%', '%msg:F,32:2%', '%msg:R,ERE,0,DFLT:[0-9]+$--end%' )",SQL

if ($fromhost-ip == '192.168.0.230') and ($syslogtag contains "firewall") then {action(type="ommysql" server="localhost" serverport="3306" db="traflog" uid="rsyslogger" pwd="rsyslogger" template="tpl_traflog") stop}

В первой строке описание шаблона(template) — строка SQL кода, для передачи её в СУБД.
Вторая строка это условие когда будет происходить действие, то есть запись в СУБД.
Условие выглядит так: если источник лога = 192.168.0.230 (if ($fromhost-ip == '192.168.0.230')) И если строка msg содержит «firewall»(and ($syslogtag contains «firewall»)), то используя модуль ommysql с параметрами подключения (then {action(type="ommysql" server="localhost" serverport="3306" db="traflog" uid="rsyslogger" pwd="..." ) вызываем шаблон tpl_traflog (template="tpl_traflog")), а после этого прекращаем дальнейшую обработку строки (stop}).

Возможно что в вашем случае что-либо пойдет не так, это может быть связано с названиями интерфейсов или префиксами логов, возможно что-то еще. Для отладки сделаем следующее, вторую строку комментируем, добавляем новый шаблон и два новых условия:

$template tpl_traflog_test,"%timereported:::date-mysql% %msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end% %msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end% %msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end% %msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end% %msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end% %msg:R,ERE,0,BLANK:\b[A-X]{3,4}\b--end% %msg:R,ERE,0,BLANK:\([A-Z]+\)|\(([A-Z]+\,){1,3}[A-Z]+\)--end% %msg:R,ERE,0,DFLT:[a-x]+--end% %msg:F,32:2% %msg:R,ERE,0,DFLT:[0-9]+$--end%\n"

if ($fromhost-ip == '192.168.0.230') then {action(type="omfile" file="/var/log/remote/192.168.0.230.log" )}

if ($fromhost-ip == '192.168.0.230') then {action(type="omfile" file="/var/log/remote/192.168.0.230.log" template="tpl_traflog_test" ) stop}

Перезапустим логгер.

Шаблон tpl_traflog_test аналогичен tpl_traflog, но без SQL INSERT.

Первое условие добавляет не обработанную строку %msg% в файл /var/log/remote/192.168.0.230.log, ибо шаблон не указан.

Второе условие добавляет обработанную строку в тот же файл.
Так будет удобнее сравнивать.
Далее подготовим базу данных.

Готовим БД


Настройку СУБД опустим, тут все стандартно.

Запускаем консоль mysql и выполняем следующий код:

--добавляем базу данных
create database traflog character set utf8 collate utf8_bin;
use traflog;

--добавляем таблицу
create table traffic (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
datetime DATETIME,
inif VARCHAR(20),
outif VARCHAR(20),
src VARCHAR(21),
sport INT(5),
dst VARCHAR(21),
dport INT(5),
smac VARCHAR(17),
proto VARCHAR(4),
flags VARCHAR(11),
chain VARCHAR(8),
logpref VARCHAR(24),
len INT(5)) ENGINE=MYISAM;

--добавляем пользователя
create user rsyslogger@localhost identified by '...';
grant all privileges on traflog.* to rsyslogger@localhost;

Таблица готова, пользователь есть.

Теперь добавим триггер, он сделает то, что не удалось логгеру, отделить адрес от порта, вычистить названия интерфейсов, убрать скобки с флага:

--добавляем триггер для traffic
DELIMITER //
create TRIGGER delim_ip_port_flags BEFORE insert ON traffic
FOR EACH ROW
begin
set NEW.inif = REGEXP_REPLACE ((NEW.inif), 'in:', '' );
set NEW.outif = REGEXP_REPLACE ((NEW.outif), 'out:', '' );
set NEW.sport = REGEXP_REPLACE ((NEW.src), '([0-9]+\.){3}[0-9]+:|([0-9]+\.){3}[0-9]+', '' );
set NEW.src = REGEXP_REPLACE ((NEW.src), ':[0-9]+', '' );
set NEW.dport = REGEXP_REPLACE ((NEW.dst), '([0-9]+\.){3}[0-9]+:|([0-9]+\.){3}[0-9]+', '' );
set NEW.dst = REGEXP_REPLACE ((NEW.dst), ':[0-9]+', '' );
set NEW.flags = REGEXP_REPLACE ((NEW.flags), '\\(|\\)', '' );
end //
delimiter ;

REGEXP_REPLACE ищет второй после запятой параметр(регулярку) и заменяет на третий параметр, в нашем случае в кавычках ничего нет поэтому просто уберет то что нашел.

Сделаем тестовую вставку, аналогично тому как будет делать логгер:

--вставка тестовой строки
insert into traffic (datetime, inif, outif, src, dst, smac, proto, chain, logpref)
values (20180730075437, 'in:ether6', 'out:VLAN55', '192.168.0.234:4997', '192.168.6.18:65535', '00:15:17:31:b8:d7', 'TCP', '(SYN)', 'forward', 'BLOCKSMKNETS');

Посмотрим что получилось:

select * from tarffic;

Если все верно, тогда идем дальше. Если нет ищем в чем ошибка.

Добавим хотя бы один индекс. Я не мастер создавать индексы, но как понял, в mysql для разных запросов правильнее использовать индексы с разными стыкующими полями, так как один запрос может использовать только один индекс(или я не прав?). Если понимаете, сделайте на свое усмотрение.
Мне часто приходится делать запросы с конкретным префиксом, поэтому этого я добавил такой индекс:
--добавляем индекс
create index traffic_index on traffic (datetime,logpref,src);

Готово.

Теперь нужно запустить отправку на маршрутизаторе, добавьте настройку удаленного сервера логов и действие к нему, добавьте опцию log в одно из правил фаервола, добавьте префикс не более 24 символов.

В консоли микротика это выглядит примерно так:

/system logging action
set 3 remote=192.168.0.94 src-address=192.168.0.230
add name=remote2 remote=192.168.0.19 syslog-facility=local6 target=remote
/system logging
add action=remote topics=error,account,critical,event,info
add action=remote2 topics=firewall

/ip firewall filter
...
add action=drop chain=input comment="drop ssh brute forcers" dst-port=22,8291 log=yes log-prefix=DROP_SSH_BRUTE protocol=tcp src-address-list=ssh_blacklist
...

Где 192.168.0.230 адрес маршрутизатора, 192.168.0.19 адрес лог сервера для логов фаервол, а 192.168.0.94 это другой лог сервер туда у меня валятся системные логи микротика, он нам сейчас не нужен. Наша настройка это remote2.

Далее смотрите что повалится в файл:

tail -f /var/log/remote/192.168.0.230.log

В файл должны посыпаться строки от маршрутизатора, если конечно ваше правило срабатывает достаточно часто.

Если каких-то полей не хватает, то есть не соблюдается последовательность datetime, inif, outif, src, dst, smac, proto, flags, chain, logpref, len, то можно попробовать поменять параметр в отладочных шаблонах логгера, заменить BLANK на DLFT. Тогда вместо пустоты какого либо поля появятся какие-то буквы, непомню уже какие. Если такое произошло, значит с регуляркой что-то не так и надо исправить.

Если все пошло как надо, то отключаем тестовые условия и шаблон.

Еще надо дефолтный конфиг в /etc/rsyslog.d/ спустить ниже, я переименовал его в 50-default.conf, дабы remote логи не сыпались в системный журнал /var/log/message
Перезапустим логгер.

Подождем немного, пока наша БД наполнится. Далее можем начинать выборку.

Несколько запросов для примера:

Что бы посмотреть размер БД и количество строк:
MariaDB [traflog]> select table_schema as "database", round(sum(data_length + index_length)/1024/1024,2) as "size Mb", TABLE_ROWS as "count rows" from information_schema.tables group by table_schema;        +--------------------+---------+------------+
| database           | size Mb | count rows |
+--------------------+---------+------------+
| information_schema |    0.17 |       NULL |
| traflog            | 3793.39 |   21839553 |
+--------------------+---------+------------+
2 rows in set (0.48 sec)

За месяц выросла почти 4Гб, но это зависит от количества и свойств логгируемых правил фаервола

Количество логгируемых префиксов
Количество логгируемых префиксов не равно количеству правил, некоторые правила работают с одним префиксом, но все же сколько всего префиксов? и сколько по ним отработано правил?:

MariaDB [traflog]> select logpref,count(logpref) from traffic group by logpref order by count(logpref) desc;
+----------------------+----------------+
| logpref              | count(logpref) |
+----------------------+----------------+
| ACCEPT_TORF_INET     |       14582602 |
| ACCEPT_SMK_PPP       |        1085791 |
| DROP_FORWARD_INVALID |         982374 |
| REJECT_BNK01         |         961503 |
| ACCEPT_MMAX_TORF     |         802455 |
| ACCEPT_TORF_PPP      |         736803 |
| SMTP_DNAT            |         689533 |
| ACCEPT_SMK_INET      |         451411 |
| ACCEPT_INET_TORF     |         389857 |
| BLOCK_SMKNETS        |         335424 |
| DROP_SMTP_BRUTE      |         285850 |
| ACCEPT_ROZN_TORF     |         154811 |
| ACCEPT_TORF_MMAX     |         148393 |
| DROP_ETHALL_ETHALL   |          80679 |
| ACCEPT_SMTP          |          48921 |
| DROP_SMTP_DDOS       |          32190 |
| RDP_DNAT             |          28757 |
| ACCEPT_TORF_ROZN     |          18456 |
| SIP_DNAT             |          15494 |
| 1CWEB_DNAT           |           6406 |
| BLOCKSMKNETS         |           5789 |
| DROP_SSH_BRUTE       |           3162 |
| POP_DNAT             |           1997 |
| DROP_RDP_BRUTE       |            442 |
| DROP_BNK01           |            291 |
| DROPALL              |            138 |
| ACCEPT_RTP_FORWARD   |             90 |
| REJECT_SMTP_BRUTE    |             72 |
| L2TP_INPUT_ACCEPT    |             33 |
+----------------------+----------------+
29 rows in set (2 min 51.03 sec)

ACCEPT_TORF_INET в лидерах, по этому префиксу можно найти всех кто сходил в интернет из нашей локальной сети, протоколы и порты записаны, настанет время и доступ кое-кому будет закрыт. Тут есть опорные данные для будущей работы над ошибками.

Лидер smtp тыка
Посмотрим кто сегодня пытался подобраться к smtp серверу:

MariaDB [traflog]> select src,count(dport) from traffic where logpref='SMTP_DNAT' and datetime > '2018101600000000' group by src order by count(dport) desc limit 10;
+----------------+--------------+
| src            | count(dport) |
+----------------+--------------+
| 191.96.249.92  |        12440 |
| 191.96.249.24  |         4556 |
| 191.96.249.61  |         4537 |
| 185.255.31.122 |         3119 |
| 178.57.79.250  |          226 |
| 185.36.81.174  |          216 |
| 185.234.219.32 |          211 |
| 89.248.162.145 |           40 |
| 45.125.66.157  |           32 |
| 188.165.124.31 |           21 |
+----------------+--------------+
10 rows in set, 1 warning (21.36 sec)

Понятно, узел 191.96.249.92 сегодня победитель. Посмотрим в каких логгируемых правилах он еще фигурировал:

MariaDB [traflog]> select src,dport,count(dport),logpref from traffic where src='191.96.249.92' group by logpref order by count(dport) desc;
+---------------+-------+--------------+-----------------+
| src           | dport | count(dport) | logpref         |
+---------------+-------+--------------+-----------------+
| 191.96.249.92 |    25 |       226989 | SMTP_DNAT       |
| 191.96.249.92 |    25 |       170714 | DROP_SMTP_BRUTE |
| 191.96.249.92 |    25 |         2907 | DROP_SMTP_DDOS  |
| 191.96.249.92 |    25 |         2061 | ACCEPT_SMTP     |
+---------------+-------+--------------+-----------------+
4 rows in set (10 min 44.21 sec)

Этот специализируется только на smtp, ~1% попаданий для попытки подбора пароля или попытки отправки какой-нибудь фигни, остальное ушло в баню.

Запрос формировался 10 минут это много, текущие индексы не подходят для него, либо можно переформулировать запрос, но сейчас не будем об этом.

В будущем планируется прикручивание веб интерфейса с типовыми запросами и формами.
Вектор задан, надеюсь что данная статья будет полезной.

Всем спасибо!

Список литературы:

Документация по rsyslog
Документация по mysql
Документация по Mikrotik logging

Спасибо сообществу LOR за подсказки

UPD.1
Добавил в базу данных поле flags, теперь можно отслеживать длительность подключения отлавливая SYN, FIN.
Исправлены некоторые ошибки в регулярках rsyslog, а так же триггеры mysql.

Что любопытно, дефолтное правило defconf: drop invalid дропает все финальные пакеты TCP соединений, в итоге все узлы которые пытаются по науке закрыть соединение терпят неудачу, отправляя несколько FINов. Правильно ли это?

Я добавил правило разрешающее прохождение TCP с флагами ACK,FIN.

Под спойлером SQL процедура, которая покажет время TCP соединений за последние пять минут
connections_list()

DROP PROCEDURE IF EXISTS connections_list;
DELIMITER //
CREATE PROCEDURE connections_list()
BEGIN
DECLARE logid BIGINT UNSIGNED;
DECLARE done INT DEFAULT FALSE;
DECLARE datefin DATETIME;
DECLARE datesyn DATETIME;
DECLARE conntime TIME;
DECLARE connsport INT;
DECLARE conndport INT;
DECLARE connsrc VARCHAR(21);
DECLARE conndst VARCHAR(21);
DECLARE cur CURSOR FOR SELECT id,datetime,src,sport,dst,dport FROM conn_syn_fin WHERE flags='SYN';
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done=TRUE;
DROP TABLE IF EXISTS conn_syn_fin;
DROP TABLE IF EXISTS connless;
CREATE temporary TABLE connless(datestart DATETIME,dateend DATETIME,duration TIME,src VARCHAR(21),sport INT,dst VARCHAR(21),dport INT);
CREATE temporary TABLE conn_syn_fin (SELECT * from traffic WHERE datetime > now() - interval 5 minute and
 src in (select src from traffic where datetime > now() - interval 5 minute and logpref='TCP_FIN' and flags like '%FIN%') and (flags like '%SYN%' or flags like '%FIN%') order by id);
OPEN cur;
read_loop: LOOP
        FETCH cur INTO logid,datesyn,connsrc,connsport,conndst,conndport;
                IF done THEN
                LEAVE read_loop;
        END IF;
        set datefin=(SELECT datetime FROM conn_syn_fin WHERE id>logid and src=connsrc and sport=connsport and flags like '%FIN%' and dst=conndst and dport=conndport limit 1);
        set conntime=(SELECT timediff(datefin,datesyn));
INSERT INTO connless (datestart,dateend,duration,src,sport,dst,dport) value (datesyn,datefin,conntime,connsrc,connsport,conndst,conndport);
END LOOP;
CLOSE cur;
select * from connless;
END;
//
DELIMITER ;


В результате выполнения процедуры будет создано две временных таблицы.
Таблица conn_syn_fin содержит записи логов с флагами SYN и FIN, далее с помощью курсора в этой таблице производится поиск.
Таблица connless содержит список соединений, открытых и завершенных, у завершенных есть длительность у открытых соответственно нет.
Обратите внимание, время выборки минус пять минут от текущего времени. У меня запрос выполняется медленно. Медленно проходит поиск по курсору, обрабатывает примерно 10 записей в секунду, пытался всячески ускорить, но время выполнения всегда примерно одинаковое.
Так же обратите внимание, данная процедура предназначена лишь для демонстрации. Если потребуется делать выборку по конкретному src/sport/dst/dport лучше сделать отдельную процедуру аналогично этой. Если вы мастер sql, то можете написать свой запрос лучше.

call connections_list();

MariaDB [traflog]> call connections_list();
+---------------------+---------------------+----------+---------------+-------+-----------------+-------+
| datestart           | dateend             | duration | src           | sport | dst             | dport |
+---------------------+---------------------+----------+---------------+-------+-----------------+-------+
| 2019-03-20 14:12:19 | 2019-03-20 14:13:14 | 00:00:55 | 192.168.0.81  | 41868 | 87.250.250.207  |   443 |
| 2019-03-20 14:12:25 | NULL                | NULL     | 192.168.0.65  | 49311 | 52.5.23.125     |   443 |
| 2019-03-20 14:12:31 | 2019-03-20 14:12:51 | 00:00:20 | 192.168.0.104 | 54433 | 217.69.139.42   |   443 |
| 2019-03-20 14:12:31 | 2019-03-20 14:12:51 | 00:00:20 | 192.168.0.104 | 54434 | 217.69.139.42   |   443 |
| 2019-03-20 14:12:32 | NULL                | NULL     | 192.168.0.119 | 37977 | 209.85.233.95   |   443 |
...
| 2019-03-20 14:17:12 | NULL                | NULL     | 192.168.0.119 | 39331 | 91.213.158.131  |   443 |
| 2019-03-20 14:17:13 | NULL                | NULL     | 192.168.0.90  | 63388 | 87.240.185.236  |   443 |
+---------------------+---------------------+----------+---------------+-------+-----------------+-------+
399 rows in set (33.17 sec)

Query OK, 0 rows affected (33.18 sec)



После выполнения процедуры временные таблицы conn_syn_fin и connless остаются, можете их посмотреть более детально, если обнаружите что-то подозрительное или не достоверное. После запуска процедуры старые таблицы удаляться и появятся новые. Напишите если найдете ошибку.
  • +15
  • 11,3k
  • 5
Поделиться публикацией

Комментарии 5

    0
    MySQL это конечно хорошо, но в 2018 году для сбора логов переходят на NoSQL-хранилища, например elasticsearch.
    По началу связка ELK кажется слишком сложной для такой простой задачи, как просто собрать логи в одну базу, но после того, как результат из elasticsearch появляется через несколько секунд, а не минут из mysql, приходит понимание, зачем все это нужно было.
    ссылки
    habr.com/post/277029
    Не нашел сразу более новой инструкции, чем эта www.elastic.co/blog/how-to-centralize-logs-with-rsyslog-logstash-and-elasticsearch-on-ubuntu-14-04
    сейчас нужно ставить 6 версии, и этап с конвертаций в JSON не нужен
    www.itzgeek.com/how-tos/linux/ubuntu-how-tos/how-to-install-elasticsearch-logstash-and-kibana-elk-stack-on-ubuntu-18-04-ubuntu-16-04.html

    Еще попробуйте logstash, у него есть встроенный плагин для syslog
    далее парсер Grok filter plugin
    и вставка в базу через logstash-output-jdbc
      0
      Я бы рассмотрел все-таки graylog как более законченное решение, чем elasticsearch (да, у грейлога под капотом те же эластик и монга — но что поделать )
        0
        graylog хорош, когда много серверов и данные падают активно.

        но а из БД для логов я бы все же выбрал influxdb. Тем более rsyslog с ним умеет работать.
          0
          Influx для логов? Ничего не перепутали? Она больше для метрик подходит. И полнотекстового поиска в ней нет. Может имели в виду clickhouse?!
          0
          имхо elk более универсальное решение, правда напильником больше махать придется

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое