Зачем вообще «самописный» сканер
Nmap и masscan великолепны. Но в реальной жизни часто хочется мини-инструмент, который можно:
быстро встроить в свои пайплайны (CI, ночные проверки, health-чек задач),
тонко настроить под конкретную сеть/ограничения,
расширить под свои кейсы (например, сразу отправлять результат в Telegram/Prometheus/ELK).
В статье — практический разбор двух подходов: многопоточность на ThreadPoolExecutor и асинхронщина на asyncio. Плюс: баннер-граббинг, HTTP-проверка, простая TLS-детекция (версия протокола, ALPN), CLI, JSON/CSV-вывод и базовые приёмы «бережного» сканирования.
⚠️ Сканируйте только свои сети или с письменного разрешения владельца. Нагрузка на чужие хосты без согласования — плохая идея и, возможно, нарушение закона/политик.
Архитектура и выбор подхода
TCP connect-scan — самый «джентльменский» метод: мы делаем обычный connect(). В отличие от SYN-скана (нужны сырые сокеты/права root и лучше выбирать Scapy/nmap), connect-скан на чистом Python прозрачен и переносим.
Threads (ThreadPoolExecutor) — просто, предсказуемо, хорошо для LAN/небольших диапазонов.
asyncio — меньше накладных расходов при массовых коннектах, тонкий контроль параллелизма, удобные таймауты и бэкпрешер.
Я покажу оба. Для «продакшен-миникомбайна» внизу — полный asyncio-скрипт с CLI, форматами вывода и TLS-детекцией.
Быстрый вариант №1: ThreadPool + баннер-граббинг
Кейс: маленькая подсеть, хочется максимально «в лоб», без зависимостей.
import socket import ipaddress from concurrent.futures import ThreadPoolExecutor, as_completed NETWORKS = ["10.16.14.5/32", "10.16.25.32/27"] PORTS = range(1, 1025) # пример: только well-known CONNECT_TIMEOUT = 0.5 READ_TIMEOUT = 0.5 THREADS = 800 SERVICE_BANNERS = [ (b"SSH-", "ssh"), (b"FTP", "ftp"), (b"SMTP", "smtp"), (b"POP3", "pop3"), (b"IMAP", "imap"), (b"HTTP/", "http"), (b"HTTP", "http"), (b"Redis", "redis"), (b"MySQL", "mysql"), (b"PostgreSQL", "postgresql"), (b"RTSP", "rtsp"), (b"Telnet", "telnet"), (b"LDAP", "ldap"), (b"RFB", "vnc"), (b"AMQP", "amqp"), (b"SMB", "smb"), ] def expand_targets(networks): targets = [] for item in networks: net = ipaddress.ip_network(item, strict=False) targets.extend(str(h) for h in net.hosts()) return targets def detect_banner(sock: socket.socket) -> bytes: try: sock.settimeout(READ_TIMEOUT) return sock.recv(512) except (socket.timeout, OSError): return b"" def detect_protocol(sock: socket.socket) -> str: b = detect_banner(sock) for sig, name in SERVICE_BANNERS: if sig in b: return name # HTTP HEAD как «активная» проба try: sock.sendall(b"HEAD / HTTP/1.0\r\n\r\n") sock.settimeout(READ_TIMEOUT) data = sock.recv(128) if data.startswith(b"HTTP/"): return "http" except (socket.timeout, OSError): pass return "unknown" def scan_port(ip: str, port: int): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(CONNECT_TIMEOUT) if s.connect_ex((ip, port)) != 0: return None try: service = socket.getservbyport(port, "tcp") except OSError: service = "unknown" if service == "unknown": service = detect_protocol(s) return {"ip": ip, "port": port, "service": service} def main(): ips = expand_targets(NETWORKS) results = [] with ThreadPoolExecutor(max_workers=THREADS) as pool: futures = (pool.submit(scan_port, ip, p) for ip in ips for p in PORTS) for f in as_completed(futures): r = f.result() if r: results.append(r) print(f'{r["ip"]}:{r["port"]} -> {r["service"]}') if __name__ == "__main__": main()
Плюсы: мини-код, легко читать и отлаживать.
Минусы: при тысячах одновременных коннектов треды прожорливы, выше накладные.
Производственный вариант №2: asyncio + семафоры + TLS-детекция
Что добавим по сравнению с «быстрым» вариантом:
CLI (
argparse): сети, порты (диапазоны80,443,8000-8100), таймауты, параллелизм.Параллелизм через семафор: честный контроль нагрузки.
Баннер-граббинг и HTTP-HEAD.
TLS-детекция: пытаемся выполнить TLS-рукопожатие, собираем версию протокола и ALPN.
Форматы вывода:
plain(текст),json,csv.Мягкие таймауты и бэкпрешер, чтобы не «класть» сеть.
import argparse import asyncio import ipaddress import json import re import ssl import sys from typing import Iterable, List, Tuple, Dict, Any SERVICE_BANNERS: List[Tuple[bytes, str]] = [ (b"SSH-", "ssh"), (b"FTP", "ftp"), (b"SMTP", "smtp"), (b"POP3", "pop3"), (b"IMAP", "imap"), (b"HTTP/", "http"), (b"HTTP", "http"), (b"Redis", "redis"), (b"MySQL", "mysql"), (b"PostgreSQL", "postgresql"), (b"RTSP", "rtsp"), (b"Telnet", "telnet"), (b"LDAP", "ldap"), (b"RFB", "vnc"), (b"AMQP", "amqp"), (b"SMB", "smb"), ] TLS_PORTS_DEFAULT = {443, 8443, 9443, 7443, 10443} def parse_ports(spec: str) -> List[int]: """ Пример: "22,80,443,8000-8100" """ ports = set() for part in spec.split(","): part = part.strip() if not part: continue if "-" in part: a, b = part.split("-", 1) ports.update(range(int(a), int(b) + 1)) else: ports.add(int(part)) return sorted(p for p in ports if 1 <= p <= 65535) def expand_targets(net_specs: Iterable[str]) -> List[str]: ips: List[str] = [] for it in net_specs: it = it.strip() if not it: continue # поддерживаем и одиночные ip if re.match(r"^\d+\.\d+\.\d+\.\d+$", it): ips.append(it) continue net = ipaddress.ip_network(it, strict=False) ips.extend(str(h) for h in net.hosts()) return ips async def try_read_banner(reader: asyncio.StreamReader, n: int, timeout: float) -> bytes: try: return await asyncio.wait_for(reader.read(n), timeout=timeout) except asyncio.TimeoutError: return b"" except Exception: return b"" def detect_from_banner(b: bytes) -> str: for sig, name in SERVICE_BANNERS: if sig in b: return name return "unknown" async def try_http_probe(reader, writer, timeout: float) -> bool: try: writer.write(b"HEAD / HTTP/1.0\r\nHost: localhost\r\n\r\n") await writer.drain() data = await asyncio.wait_for(reader.read(128), timeout=timeout) return data.startswith(b"HTTP/") except Exception: return False async def try_tls(host: str, port: int, timeout: float) -> Dict[str, Any] | None: """ Пытаемся установить TLS (SNI=host). Если успешно — вернём версию TLS и ALPN. """ ctx = ssl.create_default_context() try: reader, writer = await asyncio.wait_for( asyncio.open_connection(host=host, port=port, ssl=ctx, server_hostname=host), timeout=timeout ) sslobj = writer.get_extra_info("ssl_object") info = { "tls": getattr(sslobj, "version", lambda: None)(), "alpn": getattr(sslobj, "selected_alpn_protocol", lambda: None)(), } writer.close() try: await writer.wait_closed() except Exception: pass return info except Exception: return None async def scan_one(host: str, port: int, conn_timeout: float, read_timeout: float, tls_guess: bool, sem: asyncio.Semaphore) -> Dict[str, Any] | None: async with sem: try: reader, writer = await asyncio.wait_for( asyncio.open_connection(host=host, port=port), timeout=conn_timeout ) except Exception: return None result: Dict[str, Any] = {"ip": host, "port": port, "service": "unknown"} # 1) Пассивный баннер b = await try_read_banner(reader, 512, read_timeout) srv = detect_from_banner(b) if srv != "unknown": result["service"] = srv else: # 2) HTTP-проба ok = await try_http_probe(reader, writer, read_timeout) if ok: result["service"] = "http" writer.close() try: await writer.wait_closed() except Exception: pass # 3) TLS-детекция (по порту или по факту неизвестного сервиса) if tls_guess and (port in TLS_PORTS_DEFAULT or result["service"] == "unknown"): tls_info = await try_tls(host, port, read_timeout) if tls_info: result["service"] = "https" result.update(tls_info) # 4) getservbyport как подсказка (последним слоем) if result["service"] == "unknown": try: import socket result["service"] = socket.getservbyport(port, "tcp") except Exception: pass return result async def run(ips: List[str], ports: List[int], conn_timeout: float, read_timeout: float, concurrency: int, tls_guess: bool, fmt: str) -> None: sem = asyncio.Semaphore(concurrency) tasks = [ scan_one(ip, p, conn_timeout, read_timeout, tls_guess, sem) for ip in ips for p in ports ] if fmt == "json": out = [] for coro in asyncio.as_completed(tasks): r = await coro if r: out.append(r) print(json.dumps(out, ensure_ascii=False, indent=2)) return if fmt == "csv": print("ip,port,service,tls,alpn") for coro in asyncio.as_completed(tasks): r = await coro if r: print(f'{r["ip"]},{r["port"]},{r.get("service","")},{r.get("tls","")},{r.get("alpn","")}') return # plain for coro in asyncio.as_completed(tasks): r = await coro if r: extra = [] if "tls" in r and r["tls"]: extra.append(f'TLS={r["tls"]}') if "alpn" in r and r["alpn"]: extra.append(f'ALPN={r["alpn"]}') suffix = f' ({", ".join(extra)})' if extra else "" print(f'{r["ip"]}:{r["port"]} -> {r["service"]}{suffix}') def build_cli(): ap = argparse.ArgumentParser(description="Async TCP port scanner with banner & TLS detection") ap.add_argument("-n", "--net", action="append", required=True, help="Сеть/адрес: пример 10.0.0.0/24 или 10.0.0.5. Можно несколько флагов.") ap.add_argument("-p", "--ports", default="1-1024,3389,5432,6379,8080-8090,8443", help="Порты: 22,80,443,8000-8100 (по умолчанию: популярные)") ap.add_argument("--conn-timeout", type=float, default=0.5, help="Таймаут установления TCP (сек)") ap.add_argument("--read-timeout", type=float, default=0.5, help="Таймаут чтения баннера/HTTP/TLS (сек)") ap.add_argument("-c", "--concurrency", type=int, default=1000, help="Параллелизм (кол-во одновременных коннектов)") ap.add_argument("--no-tls-guess", action="store_true", help="Отключить попытки TLS-детекции") ap.add_argument("-f", "--format", choices=["plain","json","csv"], default="plain", help="Формат вывода") return ap def main(): ap = build_cli() args = ap.parse_args() ips = expand_targets(args.net) ports = parse_ports(args.ports) try: asyncio.run(run( ips=ips, ports=ports, conn_timeout=args.conn_timeout, read_timeout=args.read_timeout, concurrency=args.concurrency, tls_guess=not args.no_tls_guess, fmt=args.format )) except KeyboardInterrupt: print("\nInterrupted.", file=sys.stderr) if __name__ == "__main__": main()
Что получает читатель «поверх кода»
Управление нагрузкой:
--concurrencyзадаёт одновременные соединения, семафор гарантирует верхнюю границу.Баннер-граббинг: читаем до 512 байт — хватает, чтобы поймать SSH/SMTP/Redis/… .
HTTP-проба: если сервер молчит — мягко «тычем»
HEAD /и распознаёмHTTP/….TLS-детекция: пытаемся установить TLS (SNI=хост/IP). Если ок — получаем
TLSv1.2/1.3и ALPN (http/1.1,h2).Форматы:
plainдля консоли,jsonиcsv— для пайплайнов/импорта.
Как сканировать «бережно»
Подбирайте
--concurrencyпод RTT и мощность хостов: для LAN 500–2000, для WAN/внешки 100–400.Увеличивайте таймауты на «дальних» сегментах:
--conn-timeout 1.0 --read-timeout 1.0.Сканы по ночам/окнах обслуживания — меньше ложных тревог и нагрузка на SLA ниже.
Логируйте свои действия (внутренний change-record) — это помогает безопасникам и вам самим.
Тюнинг системы (коротко)
Linux: увеличьте лимиты дескрипторов (
ulimit -n 65535),fs.file-max.Windows: следите за
TIME_WAITи ephemeral port range; уменьшайте параллелизм, если видите «исчерпание портов».Firewall: учитывайте rate-limits, IDS/IPS — могут резать агрессивные сканы.
Тестирование на столе
Поднимите локальные цели, чтобы увидеть, как детектятся сервисы:
# HTTP python -m http.server 8080 # Redis (если установлен) redis-server --port 6379 # SSH (на Linux обычно уже запущен на 22)
Проверьте:
python scanner.py -n 127.0.0.1/32 -p 22,6379,8080 --format plain
Ожидаемо увидите что-то вроде:
127.0.0.1:22 -> ssh 127.0.0.1:6379 -> redis 127.0.0.1:8080 -> http
Что ещё можно добавить (и почему я не впихивал в основу)
SYN-скан через Scapy/raw-сокеты — быстрее, но потребует прав и другой логики обработки.
TLS-сертификат (CN/SAN) — можно достать из
ssl_object.getpeercert()и включить в вывод.Фингерпринтинг: расширить сигнатуры, добавить SMB/NTLM-negotiate, RDP Cookie, MQTT CONNECT и т.д.
Параллельный UDP — отдельная история (и боль), но
asyncioотлично масштабируется и туда.
Выводы
Самописный сканер — это не «замена nmap», а настройка под себя: быстрые проверки, интеграция в пайплайны, тонкий контроль нагрузки.
Два подхода (threads/asyncio) закрывают 95% «живых» кейсов в LAN и внутри периметра.
Готов обсудить расширения: TLS-метаданные, RDP/SMB-фингерпринтинг, экспорт в Prometheus/Elastic.
Приложение: команды для запуска
# Простой прогон по /27 и популярным портам, plain-вывод python scanner.py -n 10.16.25.32/27 # JSON-вывод — удобно для пайплайна python scanner.py -n 10.16.25.32/27 -f json > result.json # «Мягкий» скан внешних хостов python scanner.py -n 203.0.113.10/32 -p 22,80,443,8080-8090 \ --conn-timeout 1.0 --read-timeout 1.0 -c 300
