Привет, Хабр!
Сегодня мы рассмотрим, как создать собственный DHCP‑сервер на Python. Суть сервера проста — он динамически раздаёт IP‑адреса устройствам в сети, избавляя нас от головной боли статической конфигурации.
Суть сервера будет заключаться в том, чтобы «подставлять» IP‑адреса устройствам, заходящим в сеть. Сервер будет ловить DHCP‑запросы от клиентов, выбирать свободный IP из заранее подготовленного пула и подтверждать выдачу.
Инструменты которые будем использовать:
Python 3 — основной ЯП.
Scapy — для перехвата, анализа и формирования сетевых пакетов.
Logging — для детального логирования работы сервера.
SQLite — для хранения информации об аренде IP (опционально).
Первые шаги
Перейдем сразу к делу. Начнём с минимальной настройки для перехвата DHCP‑пакетов и отправки ответов с помощью scapy.
#!/usr/bin/env python3 import logging from scapy.all import * from datetime import datetime # Настраиваем логирование: всё, что происходит – записываем в файл и выводим в консоль. logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler("dhcp_server.log"), logging.StreamHandler() ] ) # Пул IP-адресов (пример: 192.168.1.100 - 192.168.1.200) IP_POOL = [f"192.168.1.{i}" for i in range(100, 201)] leased_ips = {} # Формат: {client_mac: ip_address} def get_free_ip(): for ip in IP_POOL: if ip not in leased_ips.values(): return ip return None def dhcp_packet_callback(packet): if packet.haslayer(DHCP): dhcp_options = packet[DHCP].options msg_type = None for opt in dhcp_options: if isinstance(opt, tuple) and opt[0] == 'message-type': msg_type = opt[1] break client_mac = packet[Ether].src logging.info(f"Получен пакет от {client_mac}: тип сообщения {msg_type}") if msg_type == 1: # DHCPDISCOVER handle_discover(packet, client_mac) elif msg_type == 3: # DHCPREQUEST handle_request(packet, client_mac) elif msg_type == 7: # DHCPRELEASE handle_release(packet, client_mac) def handle_discover(packet, client_mac): free_ip = get_free_ip() if free_ip is None: logging.error("Нет свободных IP-адресов!") return logging.info(f"Выдаём {free_ip} клиенту {client_mac}") offer_pkt = Ether(src=get_if_hwaddr(conf.iface), dst=client_mac) / \ IP(src="192.168.1.1", dst="255.255.255.255") / \ UDP(sport=67, dport=68) / \ BOOTP(op=2, yiaddr=free_ip, siaddr="192.168.1.1", chaddr=mac2bytes(client_mac)) / \ DHCP(options=[("message-type", "offer"), ("server_id", "192.168.1.1"), "end"]) sendp(offer_pkt, iface=conf.iface) def handle_request(packet, client_mac): requested_ip = None for opt in packet[DHCP].options: if isinstance(opt, tuple) and opt[0] == 'requested_addr': requested_ip = opt[1] break if requested_ip is None: logging.error("Не удалось определить запрошенный IP!") return if requested_ip in leased_ips.values(): logging.warning(f"IP {requested_ip} уже занят!") return leased_ips[client_mac] = requested_ip logging.info(f"Подтверждаем выдачу IP {requested_ip} клиенту {client_mac}") ack_pkt = Ether(src=get_if_hwaddr(conf.iface), dst=client_mac) / \ IP(src="192.168.1.1", dst="255.255.255.255") / \ UDP(sport=67, dport=68) / \ BOOTP(op=2, yiaddr=requested_ip, siaddr="192.168.1.1", chaddr=mac2bytes(client_mac)) / \ DHCP(options=[("message-type", "ack"), ("server_id", "192.168.1.1"), "end"]) sendp(ack_pkt, iface=conf.iface) def handle_release(packet, client_mac): if client_mac in leased_ips: released_ip = leased_ips.pop(client_mac) logging.info(f"Клиент {client_mac} освободил IP {released_ip}") else: logging.warning(f"Получен RELEASE от неизвестного клиента {client_mac}") def mac2bytes(mac): return bytes.fromhex(mac.replace(":", "")) if __name__ == "__main__": logging.info("DHCP-сервер запущен. Ожидаем запросы...") sniff(filter="udp and (port 67 or 68)", prn=dhcp_packet_callback, store=0)
Настраиваем простой DHCP‑сервер, который с помощью Scapy слушает UDP‑пакеты на портах 67 и 68 и обрабатывает DHCP‑сообщения. Сначала инициализируется логирование, чтобы фиксировать все действия сервера как в консоли, так и в файле. Определён пул IP‑адресов, из которого сервер будет выдавать свободные адреса клиентам. Функция get_free_ip() перебирает пул и возвращает первый доступный адрес, если таковой имеется.
Основная логика обработки пакетов реализована в функции dhcp_packet_callback(), которая определяет тип полученного DHCP‑сообщения и делегирует его обработку соответствующим функциям: handle_discover() для DHCPDISCOVER, handle_request() для DHCPREQUEST и handle_release() для DHCPRELEASE. В зависимости от типа запроса, сервер либо предлагает IP‑адрес, либо подтверждает его выдачу, либо освобождает ранее выданный адрес. Дополнительная функция mac2bytes() используется для преобразования MAC‑адреса из строкового формата в байты, необходимые для формирования корректного пакета BOOTP.
Расширяем функциональность: поддержка DHCPv6
DHCPv6 работает иначе. Базовую реализация обработки пакетов:
from scapy.layers.dhcp6 import * from scapy.layers.inet6 import IPv6 def dhcp6_packet_callback(packet): if packet.haslayer(DHCP6_Solicit): client_duid = packet[DHCP6_Solicit].duid logging.info(f"DHCPv6: Получен Solicit от к��иента с DUID {client_duid.hex()}") # Простейший алгоритм генерации IPv6-адреса: используем часть DUID offered_ipv6 = "2001:db8:0:1::" + str(int.from_bytes(client_duid[:2], 'big') % 100 + 10) logging.info(f"DHCPv6: Предлагаем адрес {offered_ipv6} для DUID {client_duid.hex()}") adv = IPv6(src="2001:db8:0:1::1", dst="ff02::1:2") / \ UDP(sport=547, dport=546) / \ DHCP6_Advertise(trid=packet[DHCP6_Solicit].trid) / \ DHCP6Opt_ServerId(duid=b'\x00\x01\x00\x01' + b'\x00'*6) / \ DHCP6Opt_IA_NA(iaid=1, T1=0, T2=0, opts=[DHCP6Opt_IAAddress(addr=offered_ipv6, preferred_lifetime=3600, valid_lifetime=7200)]) send(adv)
Сначала импортируем необходимые модули для работы с DHCPv6 и IPv6 из библиотеки Scapy. Затем в функции dhcp6_packet_callback проверяем, содержит ли полученный пакет слой DHCP6_Solicit, который сигнализирует о том, что клиент отправил запрос на получение IPv6-адреса. Из этого пакета извлекается DUID, и выводим его в лог, чтобы отследить, от какого устройства поступил запрос. После этого используется простейший алгоритм для генерации предлагаемого IPv6-адреса, где из первых байтов DUID вычисляется число, которое затем используется для создания адреса в подсети «2001:db8:0:1::».
После вычисления адреса формируется ответный пакет типа DHCP6_Advertise, который сообщает клиенту, что сервер готов предложить данный IPv6-адрес. Пакет конструируется с использованием нескольких слоёв: сначала формируется IPv6-заголовок с указанием источника и адреса назначения (мультикаст для DHCPv6), затем UDP‑заголовок с соответствующими портами, далее сам DHCPv6-заголовок с идентификатором транзакции, а также опции, такие как идентификатор сервера и параметры аренды (с указанием времени предпочтения и валидности адреса). В завершении пакет отправляется через функцию send, что и завершает процесс предложения адреса клиенту.
Логирование, мониторинг и обработка конфликтов IP
Чтобы DHCP‑сервер работал стабильно и без сюрпризов, важно вести учёт арендованных IP‑адресов и оперативно выявлять возможные конфликты. Если мы раздаём IP, но не следим за их состоянием, то рано или поздно столкнёмся с коллизиями, когда два устройства окажутся с одним и тем же адресом. Поэтому реализуем три механизма:
Логирование всех арендуемых IP, чтобы можно было отследить, кто, когда и какой IP получил.
Сохранение информации в базу SQLite, чтобы сервер мог восстанавливать данные даже после перезапуска.
Обнару��ение IP‑конфликтов с помощью ARP, чтобы проверять, не занят ли уже адрес, который мы собираемся выдать.
Сохранение аренды IP в базу данных
Для хранения информации об аренде IP используем SQLite. Это позволит серверу отслеживать все выданные IP и, в случае чего, избежать повторной раздачи занятого адреса.
import sqlite3 def init_db(): """Инициализирует базу данных для хранения лизинговых записей.""" conn = sqlite3.connect("dhcp_leases.db") cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS leases ( client_mac TEXT PRIMARY KEY, ip_address TEXT, lease_time TIMESTAMP )''') conn.commit() return conn def save_lease(conn, client_mac, ip_address): """Сохраняет информацию о выданном IP-адресе в базу данных.""" cursor = conn.cursor() cursor.execute("REPLACE INTO leases (client_mac, ip_address, lease_time) VALUES (?, ?, ?)", (client_mac, ip_address, datetime.now())) conn.commit() logging.info(f"Лиз сохранён в БД: {client_mac} -> {ip_address}")
Функция init_db() создаёт таблицу, если её ещё нет, и открывает соединение с базой. save_lease() записывает или обновляет запись о выделенном IP, привязывая его к MAC‑адресу клиента.
Теперь при перезапуске сервера можно восстановить список выданных IP.
Привязываем логику к обработке запросов
Теперь интегрируем SQLite в обработку DHCP‑запросов. Когда клиент запрашивает IP (DHCPREQUEST), мы не только выдаём ему адрес, но и записываем эту информацию в базу.
#!/usr/bin/env python3 import logging import sqlite3 from scapy.all import * from datetime import datetime # Настройка расширенного логирования для продакшена logging.basicConfig( level=logging.DEBUG, # Для продакшена можно переключить на INFO format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler("dhcp_server_ext.log"), logging.StreamHandler() ] ) # Пул IP-адресов для сети 192.168.1.0/24 (пример) IP_POOL = [f"192.168.1.{i}" for i in range(100, 201)] leased_ips = {} def init_db(): """Инициализируем базу данных для хранения информации об аренде IP.""" conn = sqlite3.connect("dhcp_leases_ext.db") cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS leases ( client_mac TEXT PRIMARY KEY, ip_address TEXT, lease_time TIMESTAMP )''') conn.commit() return conn db_conn = init_db() def save_lease(conn, client_mac, ip_address): """Сохраняем аренду IP в базу данных.""" cursor = conn.cursor() cursor.execute("REPLACE INTO leases (client_mac, ip_address, lease_time) VALUES (?, ?, ?)", (client_mac, ip_address, datetime.now())) conn.commit() logging.debug(f"Сохранена аренда: {client_mac} -> {ip_address}") def get_free_ip(): """Возвращаем первый свободный IP из пула.""" for ip in IP_POOL: if ip not in leased_ips.values(): return ip return None def check_ip_conflict(ip_address): """Проверяем, используется ли IP, с помощью ARP-запроса.""" ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip_address), timeout=2, verbose=0) if ans: logging.warning(f"Конфликт: {ip_address} отвечает на ARP-запрос") return len(ans) > 0 def dhcp_packet_callback(packet): """Основной callback для обработки входящих DHCP-пакетов.""" if packet.haslayer(DHCP): try: dhcp_options = packet[DHCP].options msg_type = None for opt in dhcp_options: if isinstance(opt, tuple) and opt[0] == 'message-type': msg_type = opt[1] break client_mac = packet[Ether].src logging.info(f"Получен пакет от {client_mac}: тип сообщения {msg_type}") if msg_type == 1: # DHCPDISCOVER handle_discover(packet, client_mac) elif msg_type == 3: # DHCPREQUEST handle_request(packet, client_mac) elif msg_type == 7: # DHCPRELEASE handle_release(packet, client_mac) else: logging.debug(f"Тип сообщения {msg_type} не обрабатывается") except Exception as e: logging.error(f"Ошибка при обработке пакета: {e}") def handle_discover(packet, client_mac): """Обрабатываем DHCPDISCOVER: отправляем клиенту предложение (OFFER).""" free_ip = get_free_ip() if free_ip is None: logging.error("Нет свободных IP!") return logging.info(f"Выдаём IP {free_ip} клиенту {client_mac}") try: offer_pkt = Ether(src=get_if_hwaddr(conf.iface), dst=client_mac) / \ IP(src="192.168.1.1", dst="255.255.255.255") / \ UDP(sport=67, dport=68) / \ BOOTP(op=2, yiaddr=free_ip, siaddr="192.168.1.1", chaddr=mac2bytes(client_mac)) / \ DHCP(options=[("message-type", "offer"), ("server_id", "192.168.1.1"), "end"]) sendp(offer_pkt, iface=conf.iface) except Exception as e: logging.error(f"Ошибка при отправке OFFER: {e}") def handle_request(packet, client_mac): """Обрабатываем DHCPREQUEST: подтверждаем выдачу IP клиенту.""" requested_ip = None for opt in packet[DHCP].options: if isinstance(opt, tuple) and opt[0] == 'requested_addr': requested_ip = opt[1] break if requested_ip is None: logging.error("Запрошенный IP не определён!") return if check_ip_conflict(requested_ip): logging.error(f"IP {requested_ip} уже занят!") return leased_ips[client_mac] = requested_ip save_lease(db_conn, client_mac, requested_ip) logging.info(f"Подтверждаем выдачу IP {requested_ip} клиенту {client_mac}") try: ack_pkt = Ether(src=get_if_hwaddr(conf.iface), dst=client_mac) / \ IP(src="192.168.1.1", dst="255.255.255.255") / \ UDP(sport=67, dport=68) / \ BOOTP(op=2, yiaddr=requested_ip, siaddr="192.168.1.1", chaddr=mac2bytes(client_mac)) / \ DHCP(options=[("message-type", "ack"), ("server_id", "192.168.1.1"), "end"]) sendp(ack_pkt, iface=conf.iface) except Exception as e: logging.error(f"Ошибка при отправке ACK: {e}") def handle_release(packet, client_mac): """Обрабатываем DHCPRELEASE: освобождаем ранее выданный IP.""" if client_mac in leased_ips: released_ip = leased_ips.pop(client_mac) logging.info(f"Клиент {client_mac} освободил IP {released_ip}") else: logging.warning(f"Получен RELEASE от неизвестного клиента {client_mac}") def mac2bytes(mac): """Преобразуем MAC-адрес в байтовую строку без разделителей.""" return bytes.fromhex(mac.replace(":", "")) if __name__ == "__main__": logging.info("Запуск продакшен-версии DHCP-сервера. Ловим пакеты!") try: sniff(filter="udp and (port 67 or 68)", prn=dhcp_packet_callback, store=0) except KeyboardInterrupt: logging.info("Остановка сервера по запросу пользователя.")
Получаем запрашиваемый клиентом IP‑адрес из DHCP‑пакета. Проверяем через check_ip_conflict(), не используется ли он уже в сети. Если адрес свободен — записываем аренду в базу (save_lease()). Отправляем клиенту DHCPACK, подтверждая, что он получил этот IP.
Освоить навыки сетевого инженера с акцентом на траблшутинг можно на специализации "Network Engineer".
Инициализация базы данных при старте сервера
Чтобы сервер сразу мог работать с базой данных, запускаем init_db() в main:
if __name__ == "__main__": db_conn = init_db() logging.info("Запускаем DHCP-сервер с поддержкой базы данных...") sniff(filter="udp and (port 67 or 68)", prn=dhcp_packet_callback, store=0)
Теперь сервер сможет записывать и восстанавливать IP‑адреса при каждом запуске.
Обнаружение IP-конфликтов через ARP
Допустим, что клиент запросил 192.168.1.105, но этот IP уже используется другим устройством в сети (например, был назначен вручную). Чтобы не создать дублирующуюся конфигурацию, перед выдачей IP отправляем ARP‑запрос и проверяем, не отвечает ли кто‑то уже с этим адресом.
def check_ip_conflict(ip_address): """Проверяет, используется ли IP-адрес в сети с помощью ARP-запроса.""" ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=ip_address), timeout=2, verbose=0) if ans: logging.warning(f"Конфликт обнаружен: {ip_address} уже отвечает на ARP-запрос!") return len(ans) > 0
Функция отправляет ARP‑запрос, запрашивая, есть ли в сети устройство, которое уже использует ip_address. Если есть ответ — значит, IP занят, и выдавать его другому клиенту нельзя. Если ответа нет — адрес свободен, можно смело его выдавать.
Теперь в handle_request() перед тем, как выдать IP, добавили вызов check_ip_conflict(), чтобы не допустить конфликтов.
Комплексный пример
Теперь, когда мы разобрали все основные моменты, соберём всё вместе и реализуем полноценный DHCP‑сервер.
Создадим DHCP‑сервер, который:
Раздаёт IP‑адреса клиентам в сети по DHCPv4.
Поддерживает логирование всех событий.
Сохраняет данные о выданных IP в SQLite, чтобы при перезапуске сервера аренды не терялись.
Перед выдачей IP проверяет, не занят ли он уже в сети (ARP‑запросы).
Реагирует на освобождение IP (DHCPRELEASE) и удаляет их из списка арендованных.
Готов к дальнейшему расширению, например, добавлению DHCPv6.
Код:
#!/usr/bin/env python3 import logging import sqlite3 from scapy.all import * from datetime import datetime # === Настройки логирования === logging.basicConfig( level=logging.DEBUG, # Для продакшена можно изменить на INFO format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler("dhcp_server.log"), logging.StreamHandler() ] ) # === Определение пула IP-адресов === IP_POOL = [f"192.168.1.{i}" for i in range(100, 201)] leased_ips = {} # Храним активные аренды в формате {client_mac: ip_address} # === Инициализация базы данных SQLite === def init_db(): """Создаёт базу данных для хранения аренды IP.""" conn = sqlite3.connect("dhcp_leases.db") cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS leases ( client_mac TEXT PRIMARY KEY, ip_address TEXT, lease_time TIMESTAMP )''') conn.commit() return conn def save_lease(conn, client_mac, ip_address): """Сохраняет информацию об аренде IP в базу данных.""" cursor = conn.cursor() cursor.execute("REPLACE INTO leases (client_mac, ip_address, lease_time) VALUES (?, ?, ?)", (client_mac, ip_address, datetime.now())) conn.commit() logging.info(f"Лиз сохранён в БД: {client_mac} -> {ip_address}") def remove_lease(conn, client_mac): """Удаляет запись об аренде IP при освобождении.""" cursor = conn.cursor() cursor.execute("DELETE FROM leases WHERE client_mac = ?", (client_mac,)) conn.commit() logging.info(f"IP-адрес освобождён для {client_mac}") # === Функции работы с IP === def get_free_ip(): """Возвращает первый свободный IP из пула.""" for ip in IP_POOL: if ip not in leased_ips.values(): return ip return None def check_ip_conflict(ip_address): """Проверяет, используется ли IP-адрес в сети с помощью ARP-запроса.""" ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip_address), timeout=2, verbose=0) if ans: logging.warning(f"Конфликт: {ip_address} уже используется!") return len(ans) > 0 # === Обработчик входящих DHCP-пакетов === def dhcp_packet_callback(packet): """Определяет тип DHCP-пакета и вызывает нужную функцию обработки.""" if packet.haslayer(DHCP): try: dhcp_options = packet[DHCP].options msg_type = None for opt in dhcp_options: if isinstance(opt, tuple) and opt[0] == 'message-type': msg_type = opt[1] break client_mac = packet[Ether].src logging.info(f"DHCP-пакет от {client_mac}, тип: {msg_type}") if msg_type == 1: # DHCPDISCOVER handle_discover(packet, client_mac) elif msg_type == 3: # DHCPREQUEST handle_request(packet, client_mac) elif msg_type == 7: # DHCPRELEASE handle_release(packet, client_mac) except Exception as e: logging.error(f"Ошибка обработки пакета: {e}") # === Обработчики DHCP-пакетов === def handle_discover(packet, client_mac): """Обрабатывает DHCPDISCOVER и отправляет клиенту предложение (OFFER).""" free_ip = get_free_ip() if free_ip is None: logging.error("Нет свободных IP!") return logging.info(f"Выдаём IP {free_ip} клиенту {client_mac}") offer_pkt = Ether(src=get_if_hwaddr(conf.iface), dst=client_mac) / \ IP(src="192.168.1.1", dst="255.255.255.255") / \ UDP(sport=67, dport=68) / \ BOOTP(op=2, yiaddr=free_ip, siaddr="192.168.1.1", chaddr=mac2bytes(client_mac)) / \ DHCP(options=[("message-type", "offer"), ("server_id", "192.168.1.1"), "end"]) sendp(offer_pkt, iface=conf.iface) def handle_request(packet, client_mac): """Обрабатывает DHCPREQUEST и подтверждает выдачу IP.""" requested_ip = None for opt in packet[DHCP].options: if isinstance(opt, tuple) and opt[0] == 'requested_addr': requested_ip = opt[1] break if requested_ip is None: logging.error("Запрошенный IP не определён!") return if check_ip_conflict(requested_ip): logging.error(f"Конфликт: IP {requested_ip} уже занят!") return leased_ips[client_mac] = requested_ip save_lease(db_conn, client_mac, requested_ip) logging.info(f"Подтверждаем выдачу IP {requested_ip} клиенту {client_mac}") ack_pkt = Ether(src=get_if_hwaddr(conf.iface), dst=client_mac) / \ IP(src="192.168.1.1", dst="255.255.255.255") / \ UDP(sport=67, dport=68) / \ BOOTP(op=2, yiaddr=requested_ip, siaddr="192.168.1.1", chaddr=mac2bytes(client_mac)) / \ DHCP(options=[("message-type", "ack"), ("server_id", "192.168.1.1"), "end"]) sendp(ack_pkt, iface=conf.iface) def handle_release(packet, client_mac): """Обрабатывает DHCPRELEASE и освобождает IP-адрес.""" if client_mac in leased_ips: released_ip = leased_ips.pop(client_mac) remove_lease(db_conn, client_mac) logging.info(f"Клиент {client_mac} освободил IP {released_ip}") else: logging.warning(f"Получен RELEASE от неизвестного клиента {client_mac}") def mac2bytes(mac): """Преобразует MAC-адрес в байтовую строку без разделителей.""" return bytes.fromhex(mac.replace(":", "")) # === Запуск сервера === if __name__ == "__main__": db_conn = init_db() logging.info("DHCP-сервер запущен. Ожидаем запросы...") try: sniff(filter="udp and (port 67 or 68)", prn=dhcp_packet_callback, store=0) except KeyboardInterrupt: logging.info("Остановка сервера по запросу пользователя.")
В будущем можно расширить поддержку IPv6, добавить систему аренды IP с TTL, интеграцию с внешними сервисами мониторинга и другие плюшки.
Главное, что стоит вынести — это то, что организация работы DHCP‑сервера не сводится просто к «раздаче IP». Это контроль за сетью, предотвращение коллизий, логирование действий, мониторинг выданных адресов и учёт всех арендуемых IP.
Удачи вам и стабильных релизов!
Что выбрать для маршрутизации VLAN: роутер на палочке или коммутатор третьего уровня? Обсудим это 19 марта на открытом уроке. Присоединяйтесь
