Всех приветствую!
Послушал обзор курса Мониторинг высоконагруженных систем от OTUS, в котором упоминалось, что используется асинхронный подход в мониторинге и решил реализовать его в установленном по гайду WG. Готового легковесного в интернете ранее не нашел, а вопрос назрел в ввиду оперативного понимания, кто забивает канал, да и логи хотелось бы увидеть о событиях. Если читатель надеется, что тут я опишу изменение исходника, не тратьте время, будем обрабатывать вывод команды wg в консоли Python-ом, который уже есть в ubuntu 20.04 и содержит asincio. Можете по
читать как писался код
Напишем первую функция которая будет создавать корутины с блоками клиентов из вывода `STDOUT` в бесконечном цикле и то ради чего мы это делаем - писать в лог и мониторить. Добавим обработку исключений в процессе создание корутин для остановки по ctrl+c и по ошибкам asyncio плюс паузу 5 минут в цикле по сбору вывода wg. Для мониторинга создадим http сервер, который при задержках можно мгновенно открыть и определить кто забил канал и эту страницу можно опрашивать в панели grafan-ы на будущее.
async def main_loop() -> None: '''Основной цикл''' await log_event("Wireguard Logging Service Started") asyncio.create_task(start_http_server()) while True: try: wg_output: str = await read_wg_output() peer_blocks = re.split(r"\n(?=peer: )", wg_output.strip()) tasks = [process_peer(block) for block in peer_blocks] if tasks: await asyncio.gather(*tasks) await asyncio.sleep(300) # Ожидание перед итерацией 5 минут except (KeyboardInterrupt, asyncio.CancelledError): await stop() break
Здесь определился с названиями функции, отправил первую запись в лог, определил, что захваченное делится на куски начинающиеся с peer и убрал с вывода пустые символы. Собранные куски переберем с помощью list comprehension и обработаем функцией в которой будет вся логика. Результат отправим исполняться стандартной asyncio.gather(), которая соберет корутины из блоков и отправит в цикл asincio исполняться конкурентно.---
Пишем все те функции название которые придумали.
Первая логирование, получаем текст и через контекстный менеджер добавляем в файл.
import aiofiles import os from datetime import datetime async def log_event(message) -> None: '''Функция логирование события в файл с временной меткой''' async with aiofiles.open(LOG_FILE, mode='a') as f: await f.write(f"{datetime.now()} - {message}\n")
Во второй опишем веб сервер, добавив в лог событие запуска
from aiohttp import web async def start_http_server() -> None: '''Запуск HTTP сервера для отдачи состояния подключений''' app = web.Application() app.router.add_get(handle_uniq_record) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, http_host, http_port) await site.start() await log_event(f"HTTP сервер запущен на http://{http_host}:{http_port}")
Плюс обработчик с сортировкой ставить активных клиентов в начало.
async def handle_uniq_record(request) -> web.Response: '''Обработка HTTP запроса и возврат текущего состояния подключений''' sorted_items = sorted(uniq_record.items(), key=lambda x: x[1]['connected'], reverse=True) uniq_record = dict(sorted_items) return web.json_response(uniq_record)
Следующей заберем вывод записав ошибки в лог с исключением если вывода не будет.
from asyncio.subprocess import Process async def read_wg_output() -> str: '''Чтение вывода команды wg''' proc: Process = await asyncio.create_subprocess_shell( "wg", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,) stdout, stderr = await proc.communicate() if stderr or not stdout.decode().strip(): await log_event(f"Error running, check wg: {stderr.decode()}") await stop() raise RuntimeError("Stopping service due to empty or error output from wg") return stdout.decode()
В следующей функции основная логика, в которой я изрядно поломал голову исключив пустые события возникающие при каждой итерации. Придумал писать состояния клиентов в словарь и вести объем трафика по ним по принципу лоад оверейдж, 3 значения разделенные временем итерации, 5 минут. Пришлось скрупулезно с оглядкой на статью, что дали мне при обучении в skillbox обработать приходящий вывод и добавить условие проверки в tasks = [process_peer(block) for block in peer_blocks if block.strip()] функции main_loop().
async def process_peer(peer_block) -> None: '''Функция основной логики: запись событий подключения/отключения, обновление состояния в uniq_record''' THRESHOLD = 5 # временной порог за который обсчитывать активность или неактивность key_map = ['transfer', 'transfer 5m', 'transfer 15m'] pattern = re.compile( r'peer:\s*(?P<peer>\S+)' r'(?:\s*endpoint:\s*(?P<endpoint>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+))? r'(?:.*?latest handshake:\s*' r'(?:(?P<days>\d{1,2})\s*days?,\s*)?' r'(?:(?P<hours>\d{1,2})\s*hours?,\s*)?' r'(?:(?P<minutes>\d{1,2})\s*minutes?,\s*)?' r'(?P<seconds>\d{1,2})\s*seconds ago)?' r'(?:.*?transfer:\s*(?P<transfer>.*))?', re.IGNORECASE | re.DOTALL ) match = pattern.search(peer_block) if match: peer = match.group('peer') endpoint = match.group('endpoint') or '' days = int(match.group('days') or 0) hours = int(match.group('hours') or 0) minutes = int(match.group('minutes') or 0) seconds = int(match.group('seconds') or 0) transfer = match.group('transfer').strip() if match.group('transfer') else '' user = await get_user_from_peer(peer) uniq_record.setdefault(user, { 'connected': False, 'transfer': '', 'transfer 5m': '', 'transfer 15m': '', 'current_index': 0 }) if hours == 0 and seconds and minutes <= THRESHOLD and transfer and uniq_record[user]['connected'] != True: await log_event(f"User {user} connected from {endpoint}") uniq_record[user]['connected'] = True elif (hours > 0 or minutes > THRESHOLD) and uniq_record[user]['connected'] != False: await log_event(f"User {user} {endpoint} disconnected: {transfer}") uniq_record[user]['connected'] = False idx = uniq_record[user]['current_index'] uniq_record[user][key_map[idx]] = transfer #записывает текущее значение объема потока в состояние. uniq_record[user]['current_index'] = (idx + 1) % 3 #циклично перезаписывает индекс 0->1->2->0 и т.д.
В следующей функции get_user_from_peer(peer) привязываем найденного значения peer к реальному имени клиента, сначала подумал взять имена файлов ключей которые создавал именую именем клиента за основу, но решил, что лучше брать имя папки.
async def get_user_from_peer(peer) -> str: '''Поиск пользователя по peer в файлах клиентов, возврат имени папки пользователя или unknown''' CLIENTS_PATH = путь до папки с клиентами for folder, _, files in os.walk(CLIENTS_PATH): for file in files: file_path = os.path.join(folder, file) try: async with aiofiles.open(file_path, mode='r') as f: content = await f.read() if peer in content: user_folder = os.path.basename(folder) return user_folder except Exception: continue return "unknown"
Вложенным циклом перебираем файлы в папке ища peer, при наличии возвращаем имя папки в которой он нашелся.
Так, осталось написать функцию для корректного завершения корутин, кстати отличная статья здесь на хабре про асинхронность в пайтоне, я там нашел команду, asyncio.all_tasks(), которая собирает все корутины в переменную и в цикле уже завершаю каждую.
async def stop() -> None: '''Остановка всех задач и логирование остановки сервиса''' await log_event("Wireguard Logging Service Stopped") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True)
Вроде все, осталось обернуть все в класс используя на полную ООП и создать условия запуска.
if __name__ == "__main__": monitor = WireguardMonitor() try: asyncio.run(monitor.main_loop()) except KeyboardInterrupt: print("Программа остановлена пользователем.")
в которой создаём объект класса и запускаем цикл asincio.run в которой и будет асинхронно выполняться задачи.---
---
или перейти к результату в котором получились приемлемые логи по событию не связанного со служебным циклом и вывод на http страницу объема проходящего трафика разделенного временем с состоянием активности клиентов.
"wg-external": {
"connected": true,
"transfer": "15.55 GiB received, 3.05 GiB sent",
"transfer 5m": "15.52 GiB received, 3.2 GiB sent",
"transfer 15m": "15.50 GiB received, 3.0 GiB sent",
"current_index": 2
},
"guest": {
"connected": false,
"transfer": "",
"transfer 5m": "",
"transfer 15m": "",
"current_index": 2
},
Весь код находится в репе или
в этом блоке
import asyncio
from asyncio.subprocess import Process
import aiofiles
import re
import os
from datetime import datetime
from aiohttp import web
class WireguardMonitor:
def init(self, threshold=30,
log_file="/var/log/wireguard/wireguard.log",
clients_path="/etc/wireguard/clients") -> None:
'''Инициализация параметров и создание необходимых директорий'''
os.makedirs("/var/log/wireguard", exist_ok=True) self.log_file: str=log_file self.clients_path: str=clients_path self.THRESHOLD: int = threshold self.LOG_FILE: str = log_file self.CLIENTS_PATH: str = clients_path self.uniq_record = dict() self.http_host = '0.0.0.0' self.http_port = 58105 self.key_map = ['transfer', 'transfer 5m', 'transfer 15m'] self.pattern = re.compile( r'peer:\s*(?P<peer>\S+)' r'(?:\s*endpoint:\s*(?P<endpoint>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+))?' r'(?:.*?latest handshake:\s*' r'(?:(?P<days>\d{1,2})\s*days?,\s*)?' r'(?:(?P<hours>\d{1,2})\s*hours?,\s*)?' r'(?:(?P<minutes>\d{1,2})\s*minutes?,\s*)?' r'(?P<seconds>\d{1,2})\s*seconds ago)?' r'(?:.*?transfer:\s*(?P<transfer>.*))?', re.IGNORECASE | re.DOTALL ) async def stop(self) -> None: '''Остановка всех задач и логирование остановки сервиса''' await self.log_event("Wireguard Logging Service stopped") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) async def read_wg_output(self) -> str: '''Чтение вывода команды wg''' proc: Process = await asyncio.create_subprocess_shell( "wg", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if stderr or not stdout.decode().strip(): await self.log_event(f"Error running, check wg: {stderr.decode()}") await self.stop() raise RuntimeError("Stopping service due to empty or error output from wg") return stdout.decode() async def get_user_from_peer(self, peer) -> str: '''Поиск пользователя по peer в файлах клиентов, возврат имени папки пользователя или "unknown" peer - публичный ключ клиента взятый из вывода wg ''' for folder, _, files in os.walk(self.CLIENTS_PATH): for file in files: file_path = os.path.join(folder, file) try: async with aiofiles.open(file_path, mode='r') as f: content = await f.read() if peer in content: user_folder = os.path.basename(folder) return user_folder except Exception: continue return "unknown" async def log_event(self, message) -> None: '''Функция логирование события в файл с временной меткой message - текс для записи в файл лога ''' async with aiofiles.open(self.LOG_FILE, mode='a') as f: await f.write(f"{datetime.now()} - {message}\n") async def process_peer(self, peer_block) -> None: '''Функция основной логики: запись событий подключения/отключения, обновление состояния клиентов в словаре peer_block - текстовый блок вывода команды wg ''' match = self.pattern.search(peer_block) if match: peer = match.group('peer') endpoint = match.group('endpoint') or '' days = int(match.group('days') or 0) hours = int(match.group('hours') or 0) minutes = int(match.group('minutes') or 0) seconds = int(match.group('seconds') or 0) transfer = match.group('transfer').strip() if match.group('transfer') else '' user = await self.get_user_from_peer(peer) self.uniq_record.setdefault(user, { 'connected': False, 'transfer': '', 'transfer 5m': '', 'transfer 15m': '', 'current_index': 0 }) if hours == 0 and seconds and minutes <= self.THRESHOLD and transfer and self.uniq_record[user]['connected'] != True: await self.log_event(f"User {user} connected from {endpoint}") self.uniq_record[user]['connected'] = True elif (hours > 0 or minutes > self.THRESHOLD) and self.uniq_record[user]['connected'] != False: await self.log_event(f"User {user} {endpoint} disconnected: {transfer}") self.uniq_record[user]['connected'] = False idx = self.uniq_record[user]['current_index'] self.uniq_record[user][self.key_map[idx]] = transfer self.uniq_record[user]['current_index'] = (idx + 1) % 3 async def handle_uniq_record(self, request) -> web.Response: '''Обработка HTTP запроса и возврат текущего состояния подключений клиентов с сортировкой активных подключений, ставит на первые позициии. request - запрос формируемый при открытии или обновления веб страницы на HTTP сервер. ''' sorted_items = sorted(self.uniq_record.items(), key=lambda x: x[1]['connected'], reverse=True) self.uniq_record = dict(sorted_items) return web.json_response(self.uniq_record) async def start_http_server(self) -> None: '''Запуск HTTP сервера для отдачи состояния подключений''' app = web.Application() app.router.add_get('/uniq_record', self.handle_uniq_record) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, self.http_host, self.http_port) await site.start() await self.log_event(f"HTTP сервер запущен на http://{self.http_host}:{self.http_port}") async def main_loop(self) -> None: '''Основной цикл''' await self.log_event("Wireguard Logging Service Started") asyncio.create_task(self.start_http_server()) while True: try: wg_output: str = await self.read_wg_output() peer_blocks = re.split(r"\n(?=peer: )", wg_output.strip()) tasks = [self.process_peer(block) for block in peer_blocks if block.strip()] if tasks: await asyncio.gather(*tasks) await asyncio.sleep(300) # Ожидание перед итерацией 5 минут except (KeyboardInterrupt, asyncio.CancelledError): await self.stop() break
if name == "main":
monitor = WireguardMonitor()
try:
asyncio.run(monitor.main_loop())
except KeyboardInterrupt:
print("Программа остановлена пользователем.")
