
Недавно меня попросили помочь в определении источников утечки трафика в одной из организаций. Задачу усугубляло большое количество устройств в одном широковещательном домене, множество неуправляемых коммутаторов, отсутствие любой карты сети, а также старенький роутер на входе. В общем, это были настоящие "Авгиевы конюшни", но в итоге задача была решена, и данная статья посвящена методам, которые я использовал. Кто оказался виновником, я раскрою в конце статьи, чтобы не портить интригу.
В текущей задаче я придерживался следующих принципов:

Для определения источника аномального трафика я разбил задачу на несколько подзадач:
Круглосуточный мониторинг скорости доступа к интернету в различных частях организации.
Непрерывное сканирование всего трафика с целью выявления хостов, потребляющих его в наибольшем объеме.
Анализ хостов, генерирующих наибольший трафик, с последующим устранением выявленных проблем.

Задача 1: Круглосуточный мониторинг скорости доступа к интернету в разных уголках организации

На узлы я установил пользовательские экспортеры на Python, которые запускают Speedtest CLI в заданные промежутки времени и отправляют метрики в Prometheus.

Экспортер выдает для Prometheus такие метрики, как:
internet_download_speed_mbps # Скорость загрузки из интернета
internet_upload_speed_mbps # Скорость выгрузки в интернет
internet_ping_ms # Задержка пинга
Код экспортера представлен ниже - speedtest_exporter.py:
Скрытый текст
# metrics prometheus
# Скорость загрузки из интернета
# avg(internet_download_speed_mbps) by (instance)
# Скорость выгрузки в интернет
# avg(internet_upload_speed_mbps) by (instance)
# Задержка пинга
# avg(internet_ping_ms) by (instance)
from flask import Flask, Response
import speedtest
import logging
import time
import threading
app = Flask(__name__)
# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Переменные для кеша
metrics_data = {
"download_speed": 0.0,
"upload_speed": 0.0,
"ping": 0.0
}
last_test_time = 0
data_lock = threading.Lock()
cache_duration = 60 # Измеряем раз в минуту
def run_speedtest():
global metrics_data, last_test_time
while True:
try:
logging.info("Запуск измерений интернет-соединения.")
st = speedtest.Speedtest()
st.get_best_server()
download_speed = st.download() / 1e6 # Конвертируем в Мбит/с
upload_speed = st.upload() / 1e6 # Конвертируем в Мбит/с
ping = st.results.ping # Пинг в миллисекундах
with data_lock:
metrics_data["download_speed"] = download_speed
metrics_data["upload_speed"] = upload_speed
metrics_data["ping"] = ping
last_test_time = time.time()
logging.info(f"Измерения завершены: Загрузка: {download_speed:.2f} Mbps, "
f"Выгрузка: {upload_speed:.2f} Mbps, "
f"Пинг: {ping:.2f} ms.")
except Exception as e:
logging.error(f"Ошибка при измерениях: {str(e)}")
time.sleep(cache_duration)
@app.route('/metrics')
def metrics():
with data_lock:
response = f"""
# HELP internet_download_speed_mbps Download speed in Mbps
# TYPE internet_download_speed_mbps gauge
internet_download_speed_mbps {metrics_data["download_speed"]}
# HELP internet_upload_speed_mbps Upload speed in Mbps
# TYPE internet_upload_speed_mbps gauge
internet_upload_speed_mbps {metrics_data["upload_speed"]}
# HELP internet_ping_ms Ping latency in milliseconds
# TYPE internet_ping_ms gauge
internet_ping_ms {metrics_data["ping"]}
"""
return Response(response, mimetype="text/plain")
if __name__ == '__main__':
# Запускаем тест скорости в отдельном потоке
speedtest_thread = threading.Thread(target=run_speedtest, daemon=True)
speedtest_thread.start()
# Запускаем Flask
app.run(host='0.0.0.0', port=9101)
Задача 2: Круглосуточное сканирование всего трафика и определение адресов хостов, наиболее сильно этот трафик расходующих

Мы проводим мониторинг трафика в местах его схождения. Удобнее всего это делать на шлюзе, а в нашем случае роутере, либо настраивая зеркалирование трафика на порт, к которому подключаем анализирующее устройство. Для этих целей я использовал Orange Pi, так как эти дешевые и многофункциональные одноплатные компьютеры просты в использовании и удобны в работе. О применении Orange Pi я писал в предыдущих статьях тут и тут.

Код анализатора трафика представлен ниже - traffic_monitor.py:
Скрытый текст
import time
import logging
from logging.handlers import RotatingFileHandler
from collections import defaultdict
from scapy.all import sniff, IP
from threading import Thread
import datetime
import ipaddress
# =======================
# Статичные переменные
# =======================
# Задержка между логированиями в секундах (по умолчанию 5 минут)
DELAY_SECONDS = 60 # 300 секунд = 5 минут
# Список префиксов сетевых адресов для широковещательного трафика
NETWORKS = ['192.168.2.0/24', '192.168.10.0/24'] # Добавьте необходимые префиксы
# Вычисляем широковещательные адреса для каждой сети
NETWORK_BROADCAST_ADDRS = [str(ipaddress.IPv4Network(net).broadcast_address) for net in NETWORKS]
# Широковещательный адрес по умолчанию
BROADCAST_IP = '255.255.255.255'
# Хранение статистики трафика
traffic_stats = defaultdict(int)
broadcast_traffic_stats = defaultdict(int)
# =======================
# Настройка логирования с ротацией
# =======================
LOG_FILENAME = 'top_ip_traffic.log'
# Создаем обработчик ротации логов
rotating_handler = RotatingFileHandler(
LOG_FILENAME, # Имя файла
maxBytes=5 * 1024 * 1024, # Максимальный размер файла (5 MB)
backupCount=3 # Максимальное количество резервных копий (3 файла)
)
# Настраиваем формат логов
rotating_handler.setFormatter(logging.Formatter('%(message)s'))
# Получаем корневой логгер и устанавливаем ему обработчик ротации
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(rotating_handler)
def packet_callback(packet):
"""Callback функция для обработки пакетов."""
if IP in packet:
src_ip = packet[IP].src
dst_ip = packet[IP].dst
packet_length = len(packet)
# Проверяем, является ли адрес назначения широковещательным
is_broadcast = False
if dst_ip == BROADCAST_IP or dst_ip in NETWORK_BROADCAST_ADDRS:
is_broadcast = True
# else:
# for prefix in NETWORK_PREFIXES:
# if dst_ip.startswith(prefix):
# is_broadcast = True
# break
if is_broadcast:
# Увеличиваем трафик для источника и назначения для широковещательного трафика
broadcast_traffic_stats[src_ip] += packet_length
broadcast_traffic_stats[dst_ip] += packet_length
else:
# Увеличиваем трафик для обычного трафика
traffic_stats[src_ip] += packet_length
traffic_stats[dst_ip] += packet_length
def log_top_ips():
"""Логирует IP-адреса с наибольшим трафиком."""
while True:
time.sleep(DELAY_SECONDS)
try:
# Получаем текущую метку времени
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Определяем топ IP для обычного трафика
top_ips = sorted(traffic_stats.items(), key=lambda x: x[1], reverse=True)[:10]
# Определяем топ IP для широковещательного трафика
top_broadcast_ips = sorted(broadcast_traffic_stats.items(), key=lambda x: x[1], reverse=True)[:10]
# Логируем информацию
logging.info("." * 37)
logging.info(f" Топ 10 IP на {now}")
logging.info(" Общий трафик ")
# Добавляем заголовки таблицы
logging.info(f"{'IP-адрес':<20} {'Байты':>15}")
logging.info("-" * 35)
for ip, bytes_used in top_ips:
logging.info(f"{ip:<20} {bytes_used:>15}")
logging.info(" Широковещательный трафик ")
logging.info(f"{'IP-адрес':<20} {'Байты':>15}")
logging.info("-" * 35)
for ip, bytes_used in top_broadcast_ips:
logging.info(f"{ip:<20} {bytes_used:>15}")
logging.info("." * 37)
# Очищаем статистику обоих типов трафика
traffic_stats.clear()
broadcast_traffic_stats.clear()
except Exception as e:
logging.error(f"Произошла ошибка при логировании: {e}")
def main():
"""Основная функция для захвата пакетов и логирования трафика."""
try:
# Запускаем поток для логирования
logger_thread = Thread(target=log_top_ips, daemon=True)
logger_thread.start()
# Запускаем захват пакетов с оптимизированным фильтром
# Фильтр захватывает только IP-пакеты для повышения эффективности
sniff(iface=None, prn=packet_callback, store=0, filter="ip")
except KeyboardInterrupt:
logging.info("Завершение работы программы пользователем...")
except Exception as e:
logging.error(f"Произошла ошибка в основной функции: {e}")
# Запуск программы
if __name__ == "__main__":
main()
Созданием виртуального окружения, установкой зависимостей и настройкой служб для указанных выше скриптов займется Ansible. Плейбук, который я применял, приведен ниже: ansible_speedtest.yml.
Скрытый текст
---
- name: Установка Python и настройка Speedtest Exporter
hosts: localhost
tasks:
- name: Обновление списка пакетов
apt:
update_cache: true
become: true
- name: Установка необходимых пакетов
apt:
name:
- curl
- python3-venv
- python3.11-dev
- libffi-dev
- libssl-dev
- nmap
state: present
become: true
- name: Проверка существования виртуального окружения
stat:
path: "/etc/apt/sources.list.d/ookla_speedtest-cli.list"
register: speedtest_repo
- name: Добавление репозитория speedtest-cli
shell: curl -s https://packagecloud.io/install/repositories/ookla/speedtest-cli/script.deb.sh | bash
args:
executable: /bin/bash
become: true
when: not speedtest_repo.stat.exists
- name: Установка speedtest
apt:
name: speedtest
state: present
become: true
- name: Проверка существования виртуального окружения
stat:
path: "{{ playbook_dir }}/.venv"
register: venv_directory
- name: Создание виртуального окружения
command: python3 -m venv .venv
args:
chdir: "{{ playbook_dir }}"
when: not venv_directory.stat.exists
- name: Обновить pip и setuptools в виртуальном окружении
pip:
name:
- pip
- setuptools
state: latest
virtualenv: "{{ playbook_dir }}/.venv"
- name: Установка зависимостей из requirements.txt
pip:
requirements: "{{ playbook_dir }}/requirements.txt"
virtualenv: "{{ playbook_dir }}/.venv"
# службы
# speedtest exporter
- name: Создание файла службы для speedtest exporter
become: true
copy:
dest: /etc/systemd/system/speedtest_exporter.service
content: |
[Unit]
Description=Служба Speedtest Exporter
After=network.target
[Service]
User={{ ansible_env.USER }}
WorkingDirectory={{ playbook_dir }}
ExecStart={{ playbook_dir }}/.venv/bin/python3 {{ playbook_dir }}/speedtest_exporter.py
Restart=always
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
- name: Перезагрузка системных служб
become: true
command: systemctl daemon-reload
- name: Запуск службы speedtest_exporter
become: true
systemd:
name: speedtest_exporter
state: started
enabled: yes
## traffic monitor
- name: Создание файла службы для traffic monitor (привилегированный режим)
become: true
copy:
dest: /etc/systemd/system/traffic_monitor.service
content: |
[Unit]
Description=Служба Traffic Monitor
After=network.target
[Service]
User=root
WorkingDirectory={{ playbook_dir }}
ExecStart={{ playbook_dir }}/.venv/bin/python3 {{ playbook_dir }}/traffic_monitor.py
Restart=always
RestartSec=5s
TimeoutSec=30
StandardOutput=journal
StandardError=journal
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
- name: Перезагрузка системных служб
become: true
command: systemctl daemon-reload
- name: Запуск службы traffic_monitor
become: true
systemd:
name: traffic_monitor
state: restarted
enabled: yes
Задача 3: Изучение хостов, генерирующего наибольших трафик, устранение проблем
Далее подключаем Grafana к Prometheus и изучаем периоды падения трафика на примере тестовой среды.

Также Alertmanager позволяет настроить предупреждения по пороговым значениям метрик. В моем случае, при падении трафика ниже допустимого, приходит оповещение, например, в телеграмм.
Теперь в моменты падения графика мы изучаем лог нашего анализатора трафика и определяем ip адреса узлов, наиболее активно использующих трафик:

После того, как подозрительный ip найден, мы должны определиться, что мы хотим узнать о нем:

Из инструментария, мы используем утилиту nmap, с помощью которой определим местоположение, выполним обычное, а затем агрессивное сканирование. Напишем нужный скрипт на python:
Скрытый текст
import sys
import socket
import requests
from pythonping import ping
import re
import nmap
import datetime
import time
import subprocess
def validate_ip(ip):
pattern = re.compile(r"^(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)$")
return pattern.match(ip) is not None
def reverse_dns(ip):
try:
return socket.gethostbyaddr(ip)[0]
except socket.herror:
return "Не удалось найти доменное имя (reverse DNS lookup)."
def check_ping(ip):
"""
Проверяет доступность хоста по IP адресу с помощью команды ping.
:param ip: IP-адрес хоста
:return: Среднее время отклика в миллисекундах (float) если доступен, иначе False
"""
try:
# Выполняем команду ping с 1 пакетом
output = subprocess.check_output(
["ping", "-c", "1", "-W", "1", ip],
stderr=subprocess.STDOUT,
universal_newlines=True
)
# Парсим вывод ping для получения среднего времени отклика
for line in output.splitlines():
if "time=" in line:
# Пример строки: "64 bytes from 8.8.8.8: icmp_seq=1 ttl=117 time=14.2 ms"
time_ms = float(line.split("time=")[1].split(" ")[0])
return time_ms
return False
except subprocess.CalledProcessError:
return False
def wait_for_host(ip, timeout=3600, interval=60):
"""
Ждет, пока хост станет доступен или истечет таймаут.
:param ip: IP-адрес хоста
:param timeout: Таймаут в секундах (по умолчанию 1 час)
:param interval: Интервал между проверками в секундах (по умолчанию 60 секунд)
:return: True если хост стал доступен, False если таймаут истек
"""
start_time = datetime.datetime.now()
end_time = start_time + datetime.timedelta(seconds=timeout)
while datetime.datetime.now() < end_time:
ping_result = check_ping(ip)
if isinstance(ping_result, float):
print(f"Хост {ip} доступен. Среднее время отклика: {ping_result} ms")
return True
else:
remaining = end_time - datetime.datetime.now()
remaining_seconds = int(remaining.total_seconds())
hours, remainder = divmod(remaining_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
print(f"Хост {ip} недоступен. Ожидание... Осталось времени: {hours}ч {minutes}мин {seconds}с")
time.sleep(interval)
print(f"Таймаут ожидания хоста {ip} истек после {timeout / 3600} часа(ов).")
return False
def geolocation(ip):
try:
response = requests.get(
f"http://ip-api.com/json/{ip}?fields=status,country,regionName,city,zip,lat,lon,timezone,isp,org,as",
timeout=10
)
data = response.json()
if data.get('status') == 'success':
return data
else:
return {"error": "Не удалось получить геолокацию."}
except Exception as e:
return {"error": str(e)}
def print_geolocation(geo):
if 'error' in geo:
print(f"Геолокация: {geo['error']}")
else:
print("Геолокация:")
print(f" Страна: {geo.get('country')}")
print(f" Регион: {geo.get('regionName')}")
print(f" Город: {geo.get('city')}")
print(f" Почтовый индекс: {geo.get('zip')}")
print(f" Широта: {geo.get('lat')}")
print(f" Долгота: {geo.get('lon')}")
print(f" Часовой пояс: {geo.get('timezone')}")
print(f" Провайдер: {geo.get('isp')}")
print(f" Организация: {geo.get('org')}")
print(f" Автономная система: {geo.get('as')}")
def scan_ip(ip):
"""
Сканирует указанный IP-адрес и выводит результаты сканирования.
Параметры:
ip (str): IP-адрес для сканирования.
"""
scanner = nmap.PortScanner()
try:
# Проведение сканирования с опциями:
# -sS : TCP SYN scan
# -sV : Определение версий сервисов
# -O : Определение операционной системы
scanner.scan(ip, arguments='-sS -sV -O')
result = {}
host_info = scanner[ip]
# Инициализация
result['hostname'] = "Не определено"
result['mac_address'] = "Не доступен"
result['open_ports'] = []
result['services'] = {}
result['os'] = "Не определена"
# Получение имени хоста
if 'hostnames' in host_info and host_info['hostnames']:
hostname = host_info['hostnames'][0]['name']
result['hostname'] = hostname if hostname else "Не определено"
# Получение MAC-адреса
if 'addresses' in host_info:
if 'mac' in host_info['addresses']:
result['mac_address'] = host_info['addresses']['mac']
# Обработка TCP-портов
if 'tcp' in host_info:
for port in host_info['tcp']:
state = host_info['tcp'][port]['state']
if state == 'open':
result['open_ports'].append(port)
service_name = host_info['tcp'][port]['name']
service_product = host_info['tcp'][port].get('product', 'Неизвестно')
service_version = host_info['tcp'][port].get('version', 'Неизвестно')
result['services'][port] = f"{service_name} ({service_product} {service_version})"
# Обработка UDP-портов (опционально)
if 'udp' in host_info:
for port in host_info['udp']:
state = host_info['udp'][port]['state']
if state == 'open':
result['open_ports'].append(port)
service_name = host_info['udp'][port]['name']
service_product = host_info['udp'][port].get('product', 'Неизвестно')
service_version = host_info['udp'][port].get('version', 'Неизвестно')
result['services'][port] = f"{service_name} ({service_product} {service_version})"
# Определение операционной системы
if 'osmatch' in host_info and host_info['osmatch']:
result['os'] = host_info['osmatch'][0]['name']
# Вывод результатов
print(f"Результаты сканирования для {ip}:")
print(f" Имя хоста: {result['hostname']}")
print(f" MAC-адрес: {result['mac_address']}")
print(f" Операционная система: {result['os']}")
if result["open_ports"]:
# Сортировка портов для удобства
open_ports_sorted = sorted(result["open_ports"])
print(" Открытые порты:", ", ".join(map(str, open_ports_sorted)))
print(" Детали сервисов:")
for port in open_ports_sorted:
service_info = result["services"].get(port, "Информация недоступна")
print(f" Порт {port}: {service_info}")
else:
print(" Открытых портов не обнаружено.")
except Exception as e:
print(f"Результаты сканирования для {ip}:")
print(f" Ошибка: {str(e)}")
def aggressive_scan(ip):
"""
Выполняет "агрессивное" сканирование ip с помощью Nmap.
:param ip: IP-адрес (или диапазон), который необходимо просканировать.
:return: Словарь с информацией об узле:
{
'hostname': str (имя хоста или None),
'mac_address': str (MAC-адрес или None),
'open_ports': список открытых портов,
'services': словарь {порт: {протокол, сервис, продукт, версия}},
'os': список возможных ОС (с их баллами совпадения) или None
}
"""
scanner = nmap.PortScanner()
# Аргументы:
# -A — агрессивное сканирование (включает проверку версий и ОС)
# -T4 — скорость сканирования (T4 обычно достаточно быстрая)
# 1-65535 — диапазон всех портов (можно сузить при необходимости)
scanner.scan(hosts=ip, ports="1-65535", arguments="-A -T4")
results = {
'hostname': None,
'mac_address': None,
'open_ports': [],
'services': {},
'os': None
}
if ip not in scanner.all_hosts():
return results # Хост не найден или не ответил
host_info = scanner[ip]
# Получаем имя хоста (если удалось определить)
hostname = host_info.hostname()
if hostname:
results['hostname'] = hostname
# MAC-адрес (если удалось определить)
if 'addresses' in host_info and 'mac' in host_info['addresses']:
results['mac_address'] = host_info['addresses']['mac']
# Определение ОС
if 'osmatch' in host_info:
results['os'] = [
{
'name': os_item['name'],
'accuracy': os_item['accuracy'],
'os_family': os_item.get('osclass', [{}])[0].get('osfamily')
}
for os_item in host_info['osmatch']
]
# Список протоколов (tcp, udp и т.д.)
for proto in host_info.all_protocols():
ports = host_info[proto].keys()
for port in ports:
port_state = host_info[proto][port]['state']
if port_state == 'open':
results['open_ports'].append(port)
# Информация о сервисе
service_name = host_info[proto][port].get('name', '')
service_product = host_info[proto][port].get('product', '')
service_version = host_info[proto][port].get('version', '')
results['services'][port] = {
'protocol': proto,
'service': service_name,
'product': service_product,
'version': service_version
}
return results
def print_scan_results(scan_data):
"""
Печатает результаты агрессивного сканирования в консоль.
"""
print("\nРезультаты агрессивного сканирования:")
if not scan_data['hostname'] and not scan_data['open_ports']:
print("Хост не ответил или не найден.")
return
print(f"Имя хоста: {scan_data['hostname']}")
print(f"MAC-адрес: {scan_data['mac_address']}")
if scan_data['os']:
print("Возможные ОС:")
for os_info in scan_data['os']:
print(f" - {os_info['name']} (точность: {os_info['accuracy']}%), семейство: {os_info['os_family']}")
else:
print("Не удалось определить ОС.")
if scan_data['open_ports']:
print("\nОткрытые порты и сервисы:")
for port in sorted(scan_data['open_ports']):
service_info = scan_data['services'][port]
print(f" Порт {port}/{service_info['protocol']}: {service_info['service']} "
f"(продукт: {service_info['product']}, версия: {service_info['version']})")
else:
print("Открытых портов не найдено.")
def main():
if len(sys.argv) != 2:
print("Использование: python py_ip_monitor.py <IP-адрес>")
sys.exit(1)
ip = sys.argv[1]
if not validate_ip(ip):
print("Некорректный IP-адрес.")
sys.exit(1)
print(f"\nИнформация для IP: {ip}\n{'='*40}")
# Обратный DNS
dns = reverse_dns(ip)
print(f"Обратный DNS: {dns}")
# Геолокация
geo = geolocation(ip)
print_geolocation(geo)
# Ping
ping_result = check_ping(ip)
if isinstance(ping_result, float):
print(f"Среднее время отклика (ping): {ping_result} ms")
else:
print(f"Ping: {ping_result}")
# Сканирование портов
scan_ip(ip)
# 4. Расширенное сканирование портов (aggressive_scan + вывод)
scan_result = aggressive_scan(ip)
print_scan_results(scan_result)
if __name__ == "__main__":
main()
Запустим сканирование и получим следующие данные:
mac-адрес
тип операционной системы
открытые порты
Все это позволит нам понять какие сервисы запущены на узле и определить его предназначение.

Если доступ к аномальным узлам есть, и мы можем на них авторизоваться, то изучаем запущенные процессы и ищем те, что активно используют сетевое соединение. Например c помощью утилиты nethogs.
Итог
Что теперь с этим делать? Зависит от ваших задач.
Если узел расходует трафик который не должен расходовать, то проще всего заблокировать MAC-адрес на роутере, добавив соответствующее правило, или внести необходимые изменения в файрвол (при его наличии).
Также изучая arp таблицы можно найти местоположение устройства.
В моем же случае виновником был смарт-телевизор, применение которого весьма чувствительно для сети, поставляемой через VPN. Хозяин устройства был найден, а телевизор изъят.
В данной статье мы рассмотрели простые способы создания пользовательских экспортеров для prometheus, способы мониторинга трафика в сети, а также методы быстрого развертывания наших скриптов на узлах с применением системы управления конфигурациями Ansible.
Всем спасибо за внимание!