Добрый день.
Хочу рассказать как легко и непринужденно можно настроить сервер сбора метаданных сетевого трафика для маршрутизаторов Микротик.
Цель: Целью будет, хранение «пережеванных» логов фаервола в базе данных, для последующего анализа.
Средства: Для реализации подойдет любой свежий дистрибутив Linux с rsyslogd v8 и выше, возможно предложенный синтаксис будет работать и на v7. Так же нам потребуется СУБД, я выбрал mariadb. Прирост БД будет варьироваться от количества журналируемых правил, потому размер накопителя на ваше усмотрение, в моем случае журналируются 30-40 правил, что в день составляет примерно 1200 тысяч строк. За месяц использования БД включая индексы выросла до 3,8Гб.
Механика: Маршрутизатор отправляет лог на удаленный сервер по UDP. Сервер rsyslog с помощью регулярных выражений проводит очистку строк от лишней информации, формирует SQL вставку и отправляет её в СУБД. СУБД, с помощью триггера до вставки, проводит дополнительную очистку и разделение полей, которые не удалось разобрать в rsyslog.
Редактируем файл /etc/rsyslog.conf
Добавляем туда следующие строки:
Тем самым подгружаем необходимые модули и открываем 514 порт UDP.
Строка лога от Микротика выглядит так:
Как видим, много лишнего для хранения в БД и внятная выборка будет затруднена.
По идее, мне надо складывать такие данные:
Получить такую строку средствами только одного rsyslog у меня не получилось. Регулярки rsyslog используют POSIX ERE/BRE, потому нет возможности применить такие фичи как, lookahead или lookbehind.
Тут есть инструмент который позволяет проводить отладку регулярок, попробуйте может у вас получится отделить порт от адреса, а так же название интерфейса от in: и out:. Только учтите что у некоторых протоколов sport и dport отсутствуют.
В общем у меня на выходе получилось так:
Тут есть документация как готовить регулярки rsyslog.
В конечном виде файл конфигурации приема лога от Микротика /etc/rsyslog.d/20-remote.conf будет выглядеть так:
В первой строке описание шаблона(template) — строка SQL кода, для передачи её в СУБД.
Вторая строка это условие когда будет происходить действие, то есть запись в СУБД.
Условие выглядит так: если источник лога = 192.168.0.230 (
Возможно что в вашем случае что-либо пойдет не так, это может быть связано с названиями интерфейсов или префиксами логов, возможно что-то еще. Для отладки сделаем следующее, вторую строку комментируем, добавляем новый шаблон и два новых условия:
Перезапустим логгер.
Шаблон tpl_traflog_test аналогичен tpl_traflog, но без SQL INSERT.
Первое условие добавляет не обработанную строку %msg% в файл /var/log/remote/192.168.0.230.log, ибо шаблон не указан.
Второе условие добавляет обработанную строку в тот же файл.
Так будет удобнее сравнивать.
Далее подготовим базу данных.
Настройку СУБД опустим, тут все стандартно.
Запускаем консоль mysql и выполняем следующий код:
Таблица готова, пользователь есть.
Теперь добавим триггер, он сделает то, что не удалось логгеру, отделить адрес от порта, вычистить названия интерфейсов, убрать скобки с флага:
REGEXP_REPLACE ищет второй после запятой параметр(регулярку) и заменяет на третий параметр, в нашем случае в кавычках ничего нет поэтому просто уберет то что нашел.
Сделаем тестовую вставку, аналогично тому как будет делать логгер:
Посмотрим что получилось:
Если все верно, тогда идем дальше. Если нет ищем в чем ошибка.
Добавим хотя бы один индекс. Я не мастер создавать индексы, но как понял, в mysql для разных запросов правильнее использовать индексы с разными стыкующими полями, так как один запрос может использовать только один индекс(или я не прав?). Если понимаете, сделайте на свое усмотрение.
Мне часто приходится делать запросы с конкретным префиксом, поэтому этого я добавил такой индекс:
Готово.
Теперь нужно запустить отправку на маршрутизаторе, добавьте настройку удаленного сервера логов и действие к нему, добавьте опцию log в одно из правил фаервола, добавьте префикс не более 24 символов.
В консоли микротика это выглядит примерно так:
Где 192.168.0.230 адрес маршрутизатора, 192.168.0.19 адрес лог сервера для логов фаервол, а 192.168.0.94 это другой лог сервер туда у меня валятся системные логи микротика, он нам сейчас не нужен. Наша настройка это remote2.
Далее смотрите что повалится в файл:
В файл должны посыпаться строки от маршрутизатора, если конечно ваше правило срабатывает достаточно часто.
Если каких-то полей не хватает, то есть не соблюдается последовательность datetime, inif, outif, src, dst, smac, proto, flags, chain, logpref, len, то можно попробовать поменять параметр в отладочных шаблонах логгера, заменить BLANK на DLFT. Тогда вместо пустоты какого либо поля появятся какие-то буквы, непомню уже какие. Если такое произошло, значит с регуляркой что-то не так и надо исправить.
Если все пошло как надо, то отключаем тестовые условия и шаблон.
Еще надо дефолтный конфиг в /etc/rsyslog.d/ спустить ниже, я переименовал его в 50-default.conf, дабы remote логи не сыпались в системный журнал /var/log/message
Перезапустим логгер.
Подождем немного, пока наша БД наполнится. Далее можем начинать выборку.
Несколько запросов для примера:
В будущем планируется прикручивание веб интерфейса с типовыми запросами и формами.
Вектор задан, надеюсь что данная статья будет полезной.
Всем спасибо!
Список литературы:
Документация по rsyslog
Документация по mysql
Документация по Mikrotik logging
Спасибо сообществу LOR за подсказки
UPD.1
Добавил в базу данных поле flags, теперь можно отслеживать длительность подключения отлавливая SYN, FIN.
Исправлены некоторые ошибки в регулярках rsyslog, а так же триггеры mysql.
Что любопытно, дефолтное правило defconf: drop invalid дропает все финальные пакеты TCP соединений, в итоге все узлы которые пытаются по науке закрыть соединение терпят неудачу, отправляя несколько FINов. Правильно ли это?
Я добавил правило разрешающее прохождение TCP с флагами ACK,FIN.
Под спойлером SQL процедура, которая покажет время TCP соединений за последние пять минут
В результате выполнения процедуры будет создано две временных таблицы.
Таблица conn_syn_fin содержит записи логов с флагами SYN и FIN, далее с помощью курсора в этой таблице производится поиск.
Таблица connless содержит список соединений, открытых и завершенных, у завершенных есть длительность у открытых соответственно нет.
Обратите внимание, время выборки минус пять минут от текущего времени. У меня запрос выполняется медленно. Медленно проходит поиск по курсору, обрабатывает примерно 10 записей в секунду, пытался всячески ускорить, но время выполнения всегда примерно одинаковое.
Так же обратите внимание, данная процедура предназначена лишь для демонстрации. Если потребуется делать выборку по конкретному src/sport/dst/dport лучше сделать отдельную процедуру аналогично этой. Если вы мастер sql, то можете написать свой запрос лучше.
После выполнения процедуры временные таблицы conn_syn_fin и connless остаются, можете их посмотреть более детально, если обнаружите что-то подозрительное или не достоверное. После запуска процедуры старые таблицы удаляться и появятся новые. Напишите если найдете ошибку.
Хочу рассказать как легко и непринужденно можно настроить сервер сбора метаданных сетевого трафика для маршрутизаторов Микротик.
Цель: Целью будет, хранение «пережеванных» логов фаервола в базе данных, для последующего анализа.
Средства: Для реализации подойдет любой свежий дистрибутив Linux с rsyslogd v8 и выше, возможно предложенный синтаксис будет работать и на v7. Так же нам потребуется СУБД, я выбрал mariadb. Прирост БД будет варьироваться от количества журналируемых правил, потому размер накопителя на ваше усмотрение, в моем случае журналируются 30-40 правил, что в день составляет примерно 1200 тысяч строк. За месяц использования БД включая индексы выросла до 3,8Гб.
Механика: Маршрутизатор отправляет лог на удаленный сервер по UDP. Сервер rsyslog с помощью регулярных выражений проводит очистку строк от лишней информации, формирует SQL вставку и отправляет её в СУБД. СУБД, с помощью триггера до вставки, проводит дополнительную очистку и разделение полей, которые не удалось разобрать в rsyslog.
Настраиваем RSYSLOG
Редактируем файл /etc/rsyslog.conf
Добавляем туда следующие строки:
module(load="ommysql") module(load="imudp") input(type="imudp" port="514")
Тем самым подгружаем необходимые модули и открываем 514 порт UDP.
Строка лога от Микротика выглядит так:
20180927155341 BLOCKSMKNETS forward: in:ether6 - LocalTORF out:VLAN55 - RT_INET, src-mac 00:15:17:31:b8:d7, proto TCP (SYN), 192.168.0.234:2457->192.168.6.14:65535, len 60
Как видим, много лишнего для хранения в БД и внятная выборка будет затруднена.
По идее, мне надо складывать такие данные:
20180927155341 ether6 VLAN5 192.168.0.234 2457 192.168.6.14 65535 00:15:17:31:b8:d7 TCP SYN forward BLOCKSMKNETS 60
Получить такую строку средствами только одного rsyslog у меня не получилось. Регулярки rsyslog используют POSIX ERE/BRE, потому нет возможности применить такие фичи как, lookahead или lookbehind.
Тут есть инструмент который позволяет проводить отладку регулярок, попробуйте может у вас получится отделить порт от адреса, а так же название интерфейса от in: и out:. Только учтите что у некоторых протоколов sport и dport отсутствуют.
В общем у меня на выходе получилось так:
20180927155341 in:ether6 out:VLAN5 192.168.0.234:2457 192.168.6.14:65535 00:15:17:31:b8:d7 TCP (SYN) forward BLOCKSMKNETS 60
Тут есть документация как готовить регулярки rsyslog.
В конечном виде файл конфигурации приема лога от Микротика /etc/rsyslog.d/20-remote.conf будет выглядеть так:
$template tpl_traflog,"insert into traflog.traffic (datetime, inif, outif, src, dst, smac, proto, flags, chain, logpref, len) values ('%timereported:::date-mysql%', '%msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end%', '%msg:R,ERE,0,BLANK:\b[A-X]{3,4}\b--end%', '%msg:R,ERE,0,BLANK:\([A-Z]+\)|\(([A-Z]+\,){1,3}[A-Z]+\)--end%', '%msg:R,ERE,0,DFLT:[a-x]+--end%', '%msg:F,32:2%', '%msg:R,ERE,0,DFLT:[0-9]+$--end%' )",SQL if ($fromhost-ip == '192.168.0.230') and ($syslogtag contains "firewall") then {action(type="ommysql" server="localhost" serverport="3306" db="traflog" uid="rsyslogger" pwd="rsyslogger" template="tpl_traflog") stop}
В первой строке описание шаблона(template) — строка SQL кода, для передачи её в СУБД.
Вторая строка это условие когда будет происходить действие, то есть запись в СУБД.
Условие выглядит так: если источник лога = 192.168.0.230 (
if ($fromhost-ip == '192.168.0.230')) И если строка msg содержит «firewall»(and ($syslogtag contains «firewall»)), то используя модуль ommysql с параметрами подключения (then {action(type="ommysql" server="localhost" serverport="3306" db="traflog" uid="rsyslogger" pwd="..." ) вызываем шаблон tpl_traflog (template="tpl_traflog")), а после этого прекращаем дальнейшую обработку строки (stop}).Возможно что в вашем случае что-либо пойдет не так, это может быть связано с названиями интерфейсов или префиксами логов, возможно что-то еще. Для отладки сделаем следующее, вторую строку комментируем, добавляем новый шаблон и два новых условия:
$template tpl_traflog_test,"%timereported:::date-mysql% %msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end% %msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end% %msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end% %msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end% %msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end% %msg:R,ERE,0,BLANK:\b[A-X]{3,4}\b--end% %msg:R,ERE,0,BLANK:\([A-Z]+\)|\(([A-Z]+\,){1,3}[A-Z]+\)--end% %msg:R,ERE,0,DFLT:[a-x]+--end% %msg:F,32:2% %msg:R,ERE,0,DFLT:[0-9]+$--end%\n" if ($fromhost-ip == '192.168.0.230') then {action(type="omfile" file="/var/log/remote/192.168.0.230.log" )} if ($fromhost-ip == '192.168.0.230') then {action(type="omfile" file="/var/log/remote/192.168.0.230.log" template="tpl_traflog_test" ) stop}
Перезапустим логгер.
Шаблон tpl_traflog_test аналогичен tpl_traflog, но без SQL INSERT.
Первое условие добавляет не обработанную строку %msg% в файл /var/log/remote/192.168.0.230.log, ибо шаблон не указан.
Второе условие добавляет обработанную строку в тот же файл.
Так будет удобнее сравнивать.
Далее подготовим базу данных.
Готовим БД
Настройку СУБД опустим, тут все стандартно.
Запускаем консоль mysql и выполняем следующий код:
--добавляем базу данных create database traflog character set utf8 collate utf8_bin; use traflog; --добавляем таблицу create table traffic (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, datetime DATETIME, inif VARCHAR(20), outif VARCHAR(20), src VARCHAR(21), sport INT(5), dst VARCHAR(21), dport INT(5), smac VARCHAR(17), proto VARCHAR(4), flags VARCHAR(11), chain VARCHAR(8), logpref VARCHAR(24), len INT(5)) ENGINE=MYISAM; --добавляем пользователя create user rsyslogger@localhost identified by '...'; grant all privileges on traflog.* to rsyslogger@localhost;
Таблица готова, пользователь есть.
Теперь добавим триггер, он сделает то, что не удалось логгеру, отделить адрес от порта, вычистить названия интерфейсов, убрать скобки с флага:
--добавляем триггер для traffic DELIMITER // create TRIGGER delim_ip_port_flags BEFORE insert ON traffic FOR EACH ROW begin set NEW.inif = REGEXP_REPLACE ((NEW.inif), 'in:', '' ); set NEW.outif = REGEXP_REPLACE ((NEW.outif), 'out:', '' ); set NEW.sport = REGEXP_REPLACE ((NEW.src), '([0-9]+\.){3}[0-9]+:|([0-9]+\.){3}[0-9]+', '' ); set NEW.src = REGEXP_REPLACE ((NEW.src), ':[0-9]+', '' ); set NEW.dport = REGEXP_REPLACE ((NEW.dst), '([0-9]+\.){3}[0-9]+:|([0-9]+\.){3}[0-9]+', '' ); set NEW.dst = REGEXP_REPLACE ((NEW.dst), ':[0-9]+', '' ); set NEW.flags = REGEXP_REPLACE ((NEW.flags), '\\(|\\)', '' ); end // delimiter ;
REGEXP_REPLACE ищет второй после запятой параметр(регулярку) и заменяет на третий параметр, в нашем случае в кавычках ничего нет поэтому просто уберет то что нашел.
Сделаем тестовую вставку, аналогично тому как будет делать логгер:
--вставка тестовой строки insert into traffic (datetime, inif, outif, src, dst, smac, proto, chain, logpref) values (20180730075437, 'in:ether6', 'out:VLAN55', '192.168.0.234:4997', '192.168.6.18:65535', '00:15:17:31:b8:d7', 'TCP', '(SYN)', 'forward', 'BLOCKSMKNETS');
Посмотрим что получилось:
select * from tarffic;
Если все верно, тогда идем дальше. Если нет ищем в чем ошибка.
Добавим хотя бы один индекс. Я не мастер создавать индексы, но как понял, в mysql для разных запросов правильнее использовать индексы с разными стыкующими полями, так как один запрос может использовать только один индекс(или я не прав?). Если понимаете, сделайте на свое усмотрение.
Мне часто приходится делать запросы с конкретным префиксом, поэтому этого я добавил такой индекс:
--добавляем индекс create index traffic_index on traffic (datetime,logpref,src);
Готово.
Теперь нужно запустить отправку на маршрутизаторе, добавьте настройку удаленного сервера логов и действие к нему, добавьте опцию log в одно из правил фаервола, добавьте префикс не более 24 символов.
В консоли микротика это выглядит примерно так:
/system logging action set 3 remote=192.168.0.94 src-address=192.168.0.230 add name=remote2 remote=192.168.0.19 syslog-facility=local6 target=remote /system logging add action=remote topics=error,account,critical,event,info add action=remote2 topics=firewall /ip firewall filter ... add action=drop chain=input comment="drop ssh brute forcers" dst-port=22,8291 log=yes log-prefix=DROP_SSH_BRUTE protocol=tcp src-address-list=ssh_blacklist ...
Где 192.168.0.230 адрес маршрутизатора, 192.168.0.19 адрес лог сервера для логов фаервол, а 192.168.0.94 это другой лог сервер туда у меня валятся системные логи микротика, он нам сейчас не нужен. Наша настройка это remote2.
Далее смотрите что повалится в файл:
tail -f /var/log/remote/192.168.0.230.log
В файл должны посыпаться строки от маршрутизатора, если конечно ваше правило срабатывает достаточно часто.
Если каких-то полей не хватает, то есть не соблюдается последовательность datetime, inif, outif, src, dst, smac, proto, flags, chain, logpref, len, то можно попробовать поменять параметр в отладочных шаблонах логгера, заменить BLANK на DLFT. Тогда вместо пустоты какого либо поля появятся какие-то буквы, непомню уже какие. Если такое произошло, значит с регуляркой что-то не так и надо исправить.
Если все пошло как надо, то отключаем тестовые условия и шаблон.
Еще надо дефолтный конфиг в /etc/rsyslog.d/ спустить ниже, я переименовал его в 50-default.conf, дабы remote логи не сыпались в системный журнал /var/log/message
Перезапустим логгер.
Подождем немного, пока наша БД наполнится. Далее можем начинать выборку.
Несколько запросов для примера:
Что бы посмотреть размер БД и количество строк:
За месяц выросла почти 4Гб, но это зависит от количества и свойств логгируемых правил фаервола
MariaDB [traflog]> select table_schema as "database", round(sum(data_length + index_length)/1024/1024,2) as "size Mb", TABLE_ROWS as "count rows" from information_schema.tables group by table_schema; +--------------------+---------+------------+ | database | size Mb | count rows | +--------------------+---------+------------+ | information_schema | 0.17 | NULL | | traflog | 3793.39 | 21839553 | +--------------------+---------+------------+ 2 rows in set (0.48 sec)
За месяц выросла почти 4Гб, но это зависит от количества и свойств логгируемых правил фаервола
Количество логгируемых префиксов
Количество логгируемых префиксов не равно количеству правил, некоторые правила работают с одним префиксом, но все же сколько всего префиксов? и сколько по ним отработано правил?:
ACCEPT_TORF_INET в лидерах, по этому префиксу можно найти всех кто сходил в интернет из нашей локальной сети, протоколы и порты записаны, настанет время и доступ кое-кому будет закрыт. Тут есть опорные данные для будущей работы над ошибками.
MariaDB [traflog]> select logpref,count(logpref) from traffic group by logpref order by count(logpref) desc; +----------------------+----------------+ | logpref | count(logpref) | +----------------------+----------------+ | ACCEPT_TORF_INET | 14582602 | | ACCEPT_SMK_PPP | 1085791 | | DROP_FORWARD_INVALID | 982374 | | REJECT_BNK01 | 961503 | | ACCEPT_MMAX_TORF | 802455 | | ACCEPT_TORF_PPP | 736803 | | SMTP_DNAT | 689533 | | ACCEPT_SMK_INET | 451411 | | ACCEPT_INET_TORF | 389857 | | BLOCK_SMKNETS | 335424 | | DROP_SMTP_BRUTE | 285850 | | ACCEPT_ROZN_TORF | 154811 | | ACCEPT_TORF_MMAX | 148393 | | DROP_ETHALL_ETHALL | 80679 | | ACCEPT_SMTP | 48921 | | DROP_SMTP_DDOS | 32190 | | RDP_DNAT | 28757 | | ACCEPT_TORF_ROZN | 18456 | | SIP_DNAT | 15494 | | 1CWEB_DNAT | 6406 | | BLOCKSMKNETS | 5789 | | DROP_SSH_BRUTE | 3162 | | POP_DNAT | 1997 | | DROP_RDP_BRUTE | 442 | | DROP_BNK01 | 291 | | DROPALL | 138 | | ACCEPT_RTP_FORWARD | 90 | | REJECT_SMTP_BRUTE | 72 | | L2TP_INPUT_ACCEPT | 33 | +----------------------+----------------+ 29 rows in set (2 min 51.03 sec)
ACCEPT_TORF_INET в лидерах, по этому префиксу можно найти всех кто сходил в интернет из нашей локальной сети, протоколы и порты записаны, настанет время и доступ кое-кому будет закрыт. Тут есть опорные данные для будущей работы над ошибками.
Лидер smtp тыка
Посмотрим кто сегодня пытался подобраться к smtp серверу:
Понятно, узел 191.96.249.92 сегодня победитель. Посмотрим в каких логгируемых правилах он еще фигурировал:
Этот специализируется только на smtp, ~1% попаданий для попытки подбора пароля или попытки отправки какой-нибудь фигни, остальное ушло в баню.
Запрос формировался 10 минут это много, текущие индексы не подходят для него, либо можно переформулировать запрос, но сейчас не будем об этом.
MariaDB [traflog]> select src,count(dport) from traffic where logpref='SMTP_DNAT' and datetime > '2018101600000000' group by src order by count(dport) desc limit 10; +----------------+--------------+ | src | count(dport) | +----------------+--------------+ | 191.96.249.92 | 12440 | | 191.96.249.24 | 4556 | | 191.96.249.61 | 4537 | | 185.255.31.122 | 3119 | | 178.57.79.250 | 226 | | 185.36.81.174 | 216 | | 185.234.219.32 | 211 | | 89.248.162.145 | 40 | | 45.125.66.157 | 32 | | 188.165.124.31 | 21 | +----------------+--------------+ 10 rows in set, 1 warning (21.36 sec)
Понятно, узел 191.96.249.92 сегодня победитель. Посмотрим в каких логгируемых правилах он еще фигурировал:
MariaDB [traflog]> select src,dport,count(dport),logpref from traffic where src='191.96.249.92' group by logpref order by count(dport) desc; +---------------+-------+--------------+-----------------+ | src | dport | count(dport) | logpref | +---------------+-------+--------------+-----------------+ | 191.96.249.92 | 25 | 226989 | SMTP_DNAT | | 191.96.249.92 | 25 | 170714 | DROP_SMTP_BRUTE | | 191.96.249.92 | 25 | 2907 | DROP_SMTP_DDOS | | 191.96.249.92 | 25 | 2061 | ACCEPT_SMTP | +---------------+-------+--------------+-----------------+ 4 rows in set (10 min 44.21 sec)
Этот специализируется только на smtp, ~1% попаданий для попытки подбора пароля или попытки отправки какой-нибудь фигни, остальное ушло в баню.
Запрос формировался 10 минут это много, текущие индексы не подходят для него, либо можно переформулировать запрос, но сейчас не будем об этом.
В будущем планируется прикручивание веб интерфейса с типовыми запросами и формами.
Вектор задан, надеюсь что данная статья будет полезной.
Всем спасибо!
Список литературы:
Документация по rsyslog
Документация по mysql
Документация по Mikrotik logging
Спасибо сообществу LOR за подсказки
UPD.1
Добавил в базу данных поле flags, теперь можно отслеживать длительность подключения отлавливая SYN, FIN.
Исправлены некоторые ошибки в регулярках rsyslog, а так же триггеры mysql.
Что любопытно, дефолтное правило defconf: drop invalid дропает все финальные пакеты TCP соединений, в итоге все узлы которые пытаются по науке закрыть соединение терпят неудачу, отправляя несколько FINов. Правильно ли это?
Я добавил правило разрешающее прохождение TCP с флагами ACK,FIN.
Под спойлером SQL процедура, которая покажет время TCP соединений за последние пять минут
connections_list()
DROP PROCEDURE IF EXISTS connections_list; DELIMITER // CREATE PROCEDURE connections_list() BEGIN DECLARE logid BIGINT UNSIGNED; DECLARE done INT DEFAULT FALSE; DECLARE datefin DATETIME; DECLARE datesyn DATETIME; DECLARE conntime TIME; DECLARE connsport INT; DECLARE conndport INT; DECLARE connsrc VARCHAR(21); DECLARE conndst VARCHAR(21); DECLARE cur CURSOR FOR SELECT id,datetime,src,sport,dst,dport FROM conn_syn_fin WHERE flags='SYN'; DECLARE CONTINUE HANDLER FOR NOT FOUND SET done=TRUE; DROP TABLE IF EXISTS conn_syn_fin; DROP TABLE IF EXISTS connless; CREATE temporary TABLE connless(datestart DATETIME,dateend DATETIME,duration TIME,src VARCHAR(21),sport INT,dst VARCHAR(21),dport INT); CREATE temporary TABLE conn_syn_fin (SELECT * from traffic WHERE datetime > now() - interval 5 minute and src in (select src from traffic where datetime > now() - interval 5 minute and logpref='TCP_FIN' and flags like '%FIN%') and (flags like '%SYN%' or flags like '%FIN%') order by id); OPEN cur; read_loop: LOOP FETCH cur INTO logid,datesyn,connsrc,connsport,conndst,conndport; IF done THEN LEAVE read_loop; END IF; set datefin=(SELECT datetime FROM conn_syn_fin WHERE id>logid and src=connsrc and sport=connsport and flags like '%FIN%' and dst=conndst and dport=conndport limit 1); set conntime=(SELECT timediff(datefin,datesyn)); INSERT INTO connless (datestart,dateend,duration,src,sport,dst,dport) value (datesyn,datefin,conntime,connsrc,connsport,conndst,conndport); END LOOP; CLOSE cur; select * from connless; END; // DELIMITER ;
В результате выполнения процедуры будет создано две временных таблицы.
Таблица conn_syn_fin содержит записи логов с флагами SYN и FIN, далее с помощью курсора в этой таблице производится поиск.
Таблица connless содержит список соединений, открытых и завершенных, у завершенных есть длительность у открытых соответственно нет.
Обратите внимание, время выборки минус пять минут от текущего времени. У меня запрос выполняется медленно. Медленно проходит поиск по курсору, обрабатывает примерно 10 записей в секунду, пытался всячески ускорить, но время выполнения всегда примерно одинаковое.
Так же обратите внимание, данная процедура предназначена лишь для демонстрации. Если потребуется делать выборку по конкретному src/sport/dst/dport лучше сделать отдельную процедуру аналогично этой. Если вы мастер sql, то можете написать свой запрос лучше.
call connections_list();
MariaDB [traflog]> call connections_list(); +---------------------+---------------------+----------+---------------+-------+-----------------+-------+ | datestart | dateend | duration | src | sport | dst | dport | +---------------------+---------------------+----------+---------------+-------+-----------------+-------+ | 2019-03-20 14:12:19 | 2019-03-20 14:13:14 | 00:00:55 | 192.168.0.81 | 41868 | 87.250.250.207 | 443 | | 2019-03-20 14:12:25 | NULL | NULL | 192.168.0.65 | 49311 | 52.5.23.125 | 443 | | 2019-03-20 14:12:31 | 2019-03-20 14:12:51 | 00:00:20 | 192.168.0.104 | 54433 | 217.69.139.42 | 443 | | 2019-03-20 14:12:31 | 2019-03-20 14:12:51 | 00:00:20 | 192.168.0.104 | 54434 | 217.69.139.42 | 443 | | 2019-03-20 14:12:32 | NULL | NULL | 192.168.0.119 | 37977 | 209.85.233.95 | 443 | ... | 2019-03-20 14:17:12 | NULL | NULL | 192.168.0.119 | 39331 | 91.213.158.131 | 443 | | 2019-03-20 14:17:13 | NULL | NULL | 192.168.0.90 | 63388 | 87.240.185.236 | 443 | +---------------------+---------------------+----------+---------------+-------+-----------------+-------+ 399 rows in set (33.17 sec) Query OK, 0 rows affected (33.18 sec)
После выполнения процедуры временные таблицы conn_syn_fin и connless остаются, можете их посмотреть более детально, если обнаружите что-то подозрительное или не достоверное. После запуска процедуры старые таблицы удаляться и появятся новые. Напишите если найдете ошибку.
