Когда мы разрабатываем веб-сервисы на Python, мы почти всегда работаем с готовыми абстракциями: фреймворками (fastapi, flask, django) и веб-серверами (uvicorn, gunicorn). Фреймворк предоставляет удобную модель работы с HTTP, сервер принимает TCP-соединения, парсит HTTP и передаёт данные в приложения через интерфейсы вроде WSGI или ASGI.
Эти абстракции начинают восприниматься как нечто само собой разумеющееся. Но по мере накопления опыта к ним возникает всё больше вопросов:
Где проходит граница ответственности между веб-сервером и веб-приложением?
Что именно делают
WSGIиASGI?В какой момент происходит TCP three-way handshake?
Как достигается graceful shutdown?
Как достигается параллельная обработка соединений?
Что делает ядро Linux, пока процесс «ждёт клиента»?
Обычно та��ие детали скрыты за слоями абстракций. Фреймворки скрывают детали работы HTTP, веб-серверы - детали работы с TCP, а стандартная библиотека Python - системные вызовы. В результате сетевое взаимодействие начинает ощущаться как «магия».
В этой статье мы разберём, какие механизмы предоставляет Linux для работы с сетью и как на их основе строятся фреймворки и библиотеки, которыми мы пользуемся каждый день. К концу статьи мы напишем минимальный WSGI-веб-сервер, с помощью которого можно запускать произвольные WSGI-приложения. Рассматривать будем исключительно Linux и его сетевой стек, так как именно на нём работает большинство production веб-приложений. Весь код из статьи находится в репозитории.
Содержание статьи:
Статья получилась довольно объемной, поэтому если вы уже знакомы с сокетами и системными вызовами Linux, можете сразу перейти к написанию WSGI-сервера или другой интересующей вас части.
Сетевой стек Linux изнутри
Разбираемся в базе
В Linux действует принцип «всё есть файл». Сетевые соединения физически файлами не являются, но представлены файловыми дескрипторами. А объектами ядра, на которые ссылаются такие дескрипторы, являются сокеты.
Сокет - это объект ядра, представляющий абстракцию для межпроцессного взаимодействия, включая сетевое. Сокеты классифицируются по семейству адресов (Domain). Основные семейства:
AF_UNIX- для локального межпроцессного взаимодействия;AF_INET- для IPv4;AF_INET6- для IPv6.
Также сокеты различаются по типу передачи данных (Type), основные типы:
SOCK_STREAM- двусторонний, надёжный поток байтов, используется, например, в TCP;SOCK_DGRAM- ненадёжный способ передачи отдельных сообщений, ограниченного размера, используется, например, для протокола UDP.
Каждый сокет может быть привязан к адресу. Формат адреса зависит от семейства: для AF_UNIX это путь к файлу в файловой системе, для AF_INET — пара (IP-адрес, порт). Кроме того, сокет определяется конкретным протоколом взаимодействия (Protocol): TCP, UDP, ICMP, RAW (для сырых IP-пакетов) и т.д.
Хотя сокеты предоставляют интерфейс для обмена данными, программы в пользовательском пространстве не могут работать с ними напрямую. Все операции с сокетами выполняются через системные вызовы.
Системный вызов - это интерфейс между пользовательскими программами и ядром. Через системные вызовы программы запрашивают ресурсы и действия ядра. Обычно они не вызываются напрямую: почти все системные вызовы имеют обёрточные функции в библиотеке C, которые выполняют подготовку к работе в режиме ядра. Благодаря этому работа с системными вызовами выглядит как вызов обычной функции. Именно через библиотеку C Python-код общается с ядром.
С сокетами можно взаимодействовать через системные вызовы, стандартные для любых файловых дескрипторов, такие как read(), close(). А также есть системные вызовы, специфичные для файловых дескрипторов сокетов. Рассмотрим, как представлены основные из них в библиотеке C.
Системный вызов | Описание |
|---|---|
| Создаёт сокет и возвращает номер файлового дескриптора, который на него ссылается. Если передать |
| Привязывает сокет к адресу. Принимает файловый дескриптор сокета, адрес и размер (в байтах) структуры, на которую указывает |
| Переводит сокет в режим ожидания входящих соединений. |
| Используется с сокетами, ориентированными на установление соединения (для которых вызван |
| Создаёт соединение между сокетом с файловым дескриптором |
| Получает данные из сокета. В качестве аргументов передаются дескриптор сокета, буфер для записи, его размер и флаги. Возвращает число реально полученных байт. |
| Отправляет данные в сокет. Сокет должен быть связан с удалённой стороной - через |
Используя эти вызовы, можно выстроить следующую схему межпроцессного взаимодействия:
Серверный процесс создаёт сокет;
Привязывает к нему адрес;
Переводит сокет в режим прослушивания;
Клиентский процесс создаёт сокет;
Подключается к серверному сокету, вставая в очередь ожидания;
Серверный сокет принимает соединение, создавая новый сокет для общения с клиентом;
Клиентский и серверный процессы обмениваются данными, пока кто-то из них не закроет соединение;
Один из участников закрывает соединение;
Второй из участников закрывает соединение;
Серверный сокет остаётся открытым для принятия запросов от других клиентских процессов.

Пробуем сокеты на практике
UNIX-сокеты
В Python использовать api сокетов можно через встроенную библиотеку socket. Почти все методы класса сокета из этой библиотеки однозначно соответствуют вызовам библиотеки C, которые мы рассмотрели.
Попробуем воспроизвести описанную нами схему межпроцессного взаимодействия, используя эту библиотеку. Напишем простую серверную программу, которая будет ждать подключения от клиентского процесса, затем прочитает 1024 байт, отправит прочитанные данные в stdout и обратно в клиентский сокет, после чего закроет серверный и клиентский сокеты.
Использовать будем сокет семейства AF_UNIX для локального взаимодействия процессов, адресом в таких сокетах является путь к файлу сокета. Важно отметить, что AF_UNIX не использует стек TCP/IP. Обмен данными происходит внутри ядра через механизм локальных сокетов, без формирования IP-пакетов и прохождения через сетевой стек.
Серверное приложение:
import os import socket ADDR = "/tmp/demo.sock" # Если файл остался от прошлого запуска if os.path.exists(ADDR): os.remove(ADDR) # Создаем сокет server_socket = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) server_socket.bind(ADDR) server_socket.listen() print("Python: сервер слушает: ", ADDR) client_socket, client_address = server_socket.accept() print(f"Python: клиент подключился, его адрес: {client_address}") data = client_socket.recv(1024) print("Python: клиент отправил:", data.decode()) client_socket.send(b"Your data processed") client_socket.close() server_socket.close()
Клиентское:
import socket ADDR = "/tmp/demo.sock" client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) client_socket.connect(ADDR) data = "Hello from UNIX socket client" client_socket.send(data.encode()) reply = client_socket.recv(1024) print("Python: ответ сервера:", reply.decode()) client_socket.close()
Запустим серверное приложение и посмотрим на системные вызовы, которые делает процесс, с помощью утилиты strace. С помощью параметра -e trace=network оставим только системные вызовы, связанные с сетью.
strace -e trace=network python3 server.py socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0) = 3 bind(3, {sa_family=AF_UNIX, sun_path="/tmp/demo.sock"}, 17) = 0 listen(3, 128) = 0 Python: сервер слушает: /tmp/demo.sock accept4(3
Примечание: флаг
SOCK_CLOEXECв вызовеsocket()отвечает за автоматическое закрытие сокета, если процесс вызоветexecve()(замену другим исполняемым файлом). Это предотвращает утечку дескрипторов. Библиотека Python этот флаг устанавливает автоматически.
А также посмотрим список файловых дескрипторов нашего процесса:
ps aux | grep python3 omelche+ 428648 0.0 0.0 16820 11552 pts/4 S+ 09:57 0:00 python3 server.py ls -l /proc/428648/fd total 0 lrwx------ 1 omelchenko_ma omelchenko_ma 64 Feb 14 09:57 0 -> /dev/pts/4 lrwx------ 1 omelchenko_ma omelchenko_ma 64 Feb 14 09:57 1 -> /dev/pts/4 lrwx------ 1 omelchenko_ma omelchenko_ma 64 Feb 14 09:57 2 -> /dev/pts/4 lrwx------ 1 omelchenko_ma omelchenko_ma 64 Feb 14 09:57 3 -> 'socket:[814233]'
Важно понимать, что сокет в пользовательском пространстве - это не отдельная сущность, а объект ядра, на который указывает файловый дескриптор. Все системные вызовы (bind(), listen(), accept(), recv(), send()) фактически работают с числовым идентификатором файлового дескриптора, а не напрямую с сокетом как самостоятельным объектом. Именно поэтому сокеты подчиняются тем же правилам, что и обычные файлы: их можно закрывать через close(), наследовать при fork(), передавать между процессами и т.д.
Как видим, вызов socket() создал новый сокет, на который ссылается файловый дескриптор под номером 3. Номер 3 выбран, потому что под номерами 0, 1 и 2 находятся стандартные потоки stdin, stdout, stderr, которые создаются для каждого процесса автоматически. Все дальнейшие системные вызовы работают с сокетом через номер созданного файлового дескриптора.
Теперь, не выключая сервер, запустим клиентское приложение.
strace -e trace=network python3 client.py socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0) = 3 connect(3, {sa_family=AF_UNIX, sun_path="/tmp/demo.sock"}, 17) = 0 sendto(3, "Hello from UNIX socket client", 29, 0, NULL, 0) = 29 recvfrom(3, "Your data processed", 1024, 0, NULL, NULL) = 19 Python: ответ сервера: Your data processed +++ exited with 0 +++
Появилось продолжение вывода серверной программы:
strace -e trace=network python3 server.py socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0) = 3 bind(3, {sa_family=AF_UNIX, sun_path="/tmp/demo.sock"}, 17) = 0 listen(3, 128) = 0 Python: сервер слушает: /tmp/demo.sock accept4(3, {sa_family=AF_UNIX}, [110 => 2], SOCK_CLOEXEC) = 4 getsockname(4, {sa_family=AF_UNIX, sun_path="/tmp/demo.sock"}, [128 => 17]) = 0 Python: клиент подключился, его адрес: recvfrom(4, "Hello from UNIX socket client", 1024, 0, NULL, NULL) = 29 Python: клиент отправил: Hello from UNIX socket client sendto(4, "Your data processed", 19, 0, NULL, 0) = 19 +++ exited with 0 +++
Можно заметить, что некоторые вызовы, которые делает библиотека Python, немного отличаются от тех, что мы разобрали. В серверной программе вместо вызова accept() используется вызов accept4(), он позволяет сразу же задать дополнительные параметры (такие как SOCK_CLOEXEC) для создаваемых клиентских сокетов. Также, вместо вызовов send() и recv() используются sendto() и recvfrom(), это просто более общие вызовы, которые могут использоваться для несвязанных сокетов. Последние два аргумента - это адрес и размер адреса удалённого сокета. Если сокеты уже связаны, передаётся NULL, как в нашем случае.
После вызова accept4() идёт вызов getsockname(), его делает библиотека Python для получения адреса клиентского сокета, который вернул вызов accept4().
В остальном наблюдаемые нами системные вызовы соответствуют схеме межпроцессного взаимодействия, которую мы рассмотрели выше и которую воспроизводили в коде. Но, кажется, одного вызова не хватает, где же закрытие сокета вызовом close()? Всё правильно, системный вызов close() является общим для всех файловых дескрипторов и не классифицируется strace как сетевой. Чтобы его увидеть, можете выполнить strace с флагом -e trace=network,close.
TCP-сокеты
Перейдём наконец к сетевому межпроцессному взаимодействию. Единственное, что для этого необходимо изменить - это семейство и адрес сокета.
Сервер:
import socket ADDR = ("127.0.0.1", 9999) # Создаем сокет server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) # Нужно для повторного использования адреса при перезапуске программы # Иначе он будет на некоторое время зарезервирован ядром server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(ADDR) server_socket.listen() print("Python: сервер слушает: ", ADDR) client_socket, client_address = server_socket.accept() print(f"Python: клиент подключился, его адрес: {client_address}") data = client_socket.recv(1024) print("Python: клиент отправил:", data.decode()) client_socket.sendall(b"Your data received from TCP socket") client_socket.close() server_socket.close()
Клиент:
import socket ADDR = ("127.0.0.1", 9999) client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client_socket.connect(ADDR) data = "Hello from TCP socket client" client_socket.send(data.encode()) reply = client_socket.recv(1024) print("Python: ответ сервера:", reply.decode()) client_socket.close()
Запустим наш сервер:
strace -e trace=network python3 server.py socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 3 setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0 bind(3, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, 16) = 0 listen(3, 128) = 0 Python: сервер слушает: ('127.0.0.1', 9999) accept4(3
На уровне системных вызовов тоже поменялись только семейство, адрес и протокол взаимодействия. Тут появляется незнакомый нам ещё системный вызов setsockopt(). Он позволяет менять параметры сокета после его создания. Мы применяем его, чтобы установить параметр SO_REUSEADDR в 1. Он позволяет повторно привязаться к адресу и порту, которые могут находиться в состоянии TIME_WAIT после завершения предыдущего соединения.
Проверим, что наш сокет открыт с помощью утилиты netstat или любой другой, показывающей сетевые соединения:
netstat -tlnp Proto Local Address Foreign Address State PID/Program tcp 127.0.0.1:9999 0.0.0.0:* LISTEN 566502/python3
Действительно, наше приложение открыло TCP-сокет и готово принимать входящие соединения.
Перед тем, как запускать клиентскую программу, запустим перехватчик сетевого трафика. Можно использовать утилиту tcpdump или любую другую. Я буду использовать wireshark. Выбираем loopback интерфейс, так как клиент и сервер будут находиться на одном хосте, и трафик будет идти через loopback. И ставим фильтр по порту - у нас это 9999. Теперь запускаем клиентскую программу.
strace -e trace=network python3 client.py socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 3 connect(3, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, 16) = 0 sendto(3, "Hello from TCP socket client", 28, 0, NULL, 0) = 28 recvfrom(3, "Your data received from TCP sock"..., 1024, 0, NULL, NULL) = 34 Python: ответ сервера: Your data received from TCP socket +++ exited with 0 +++
И видим продолжение вывода серверной программы:
strace -e trace=network python3 server.py socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 3 setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0 bind(3, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, 16) = 0 listen(3, 128) = 0 Python: сервер слушает: ('127.0.0.1', 9999) accept4(3, {sa_family=AF_INET, sin_port=htons(51906), sin_addr=inet_addr("127.0.0.1")}, [16], SOCK_CLOEXEC) = 4 getsockname(4, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, [128 => 16]) = 0 Python: клиент подключился, его адрес: ('127.0.0.1', 51906) recvfrom(4, "Hello from TCP socket client", 1024, 0, NULL, NULL) = 28 Python: клиент отправил: Hello from TCP socket client sendto(4, "Your data received from TCP sock"..., 34, 0, NULL, 0) = 34 +++ exited with 0 +++
А вот вывод wireshark:

Первые три сегмента - это three-way handshake. Three-way handshake инициируется вызовом connect() на стороне клиента. Ядро сервера обрабатывает SYN/SYN-ACK/ACK, и помещает соединение в очередь accept queue. Вызов accept() извлекает установленное соединение из очереди. Затем клиент и сервер обмениваются данными. Получающая сторона при этом в соответствии с протоколом TCP отправляет подтверждение получения (сегмент с флагом ACK). После обмена данными по инициативе сервера происходит корректное завершение TCP-соединения - последние три сегмента.
Таким образом, реализация TCP полностью находится в сетевом стеке ядра. Пользовательская программа работает лишь с сокетом семейства AF_INET через системные вызовы, не взаимодействуя напрямую с сегментами, флагами и состояниями протокола TCP.
Python code ↓ libc ↓ syscall ↓ Kernel TCP/IP stack ↓ Network Interface Card
Обратите внимание: переход от локального взаимодействия к сетевому потребовал изменить только семейство адресов и формат адреса. Вся последовательность вызовов - socket(), bind(), listen(), accept(), recv(), send() - осталась идентичной. В этом и состоит ценность абстракции сокетов: один и тот же интерфейс работает независимо от транспорта.
Сокеты как файловые дескрипторы
Хочу продемонстрировать, что к файловым дескрипторам сокетов можно применять обычные для файловых дескрипторов системные вызовы, такие как read() и write(). Для примера сделаем HTTP-запрос на example.com, используя именно эти системные вызовы.
import os import socket client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # DNS в этой статье не рассматривается, поэтому используем ip-адрес # Актуальный адрес домена host example.com можно посмотреть командой # host example.com # Скорее всего это будет ip cloudflare # Поэтому обязательно указываем доменное имя в заголовке запроса client_socket.connect(("8.6.112.0", 80)) # Получаем номер файлового дескриптора fd = client_socket.fileno() data = b"GET / HTTP/1.0\r\nHost: example.com\r\n\r\n" # Пишем в сокет, через общий для файловых дескрипторов системный вызов write os.write(fd, data) # Читаем из сокета через системный вызов read response = os.read(fd, 4096) print(response.decode()) client_socket.close()
Запустим:
strace -e trace=network,read,write,close python3 socket_as_fd.py ... Системные вызовы, связанные с запуском интерпретатора python ... socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 3 connect(3, {sa_family=AF_INET, sin_port=htons(80), sin_addr=inet_addr("8.6.112.0")}, 16) = 0 write(3, "GET / HTTP/1.0\r\nHost: example.co"..., 37) = 37 read(3, "HTTP/1.1 200 OK\r\nDate: Sat, 14 F"..., 4096) = 798 write(1, "HTTP/1.1 200 OK\r\nDate: Sat, 14 F"..., 798HTTP/1.1 200 OK Date: Sat, 14 Feb 2026 10:25:19 GMT Content-Type: text/html Connection: close CF-RAY: 9cdbdc5e9e66ec39-DME Last-Modified: Thu, 12 Feb 2026 14:00:39 GMT Allow: GET, HEAD Accept-Ranges: bytes Age: 11273 cf-cache-status: HIT Server: cloudflare <!doctype html><html lang="en"><head><title>Example Domain</title><meta name="viewport" content="width=device-width, initial-scale=1"><style>body{background:#eee;width:60vw;margin:15vh auto;font-family:system-ui,sans-serif}h1{font-size:1.5em}div{opacity:0.8}a:link,a:visited{color:#348}</style></head><body><div><h1>Example Domain</h1><p>This domain is for use in documentation examples without needing permission. Avoid use in operations.</p><p><a href="https://iana.org/domains/example">Learn more</a></p></div></body></html> ) = 798 write(1, "\n", 1) = 1 close(3) = 0 +++ exited with 0 +++
Как видим, всё работает точно так же, как с системными вызовами recv(), send(). Но для сетевых приложений предпочтительно использовать recv() и send(), так как они позволяют передавать дополнительные флаги и правильно обрабатывать особенности сокетов (например, частичные отправки или неблокирующий режим).
Что происходит с процессом во время ожидания входящих подключений?
После того как процесс сделал вызов accept(), но входящих подключений ещё нет, потребляет ли он процессорное время? Ответ - нет.
Системный вызов accept() проверяет очередь ожидающих соединений на сокете. Если очередь пуста, процесс переводится ядром в статус sleeping и не участвует в распределении процессорного времени.
Как только к сокету приходит новое соединение:
ядро добавляет процесс обратно в run queue планировщика;
планировщик видит, что проце��с готов, и выделяет ему CPU;
accept()возвращает новый файловый дескриптор для соединения.
Чтобы увидеть это на практике, немного модифицируем пример с серверным TCP-сокетом. После ответа клиенту и закрытия клиентского соединения добавим бесконечный цикл с CPU-bound операциями. Клиентский сокет оставим без изменений.
import socket ADDR = ("127.0.0.1", 9999) # Создаем сокет server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) # Нужно для повторного использования адреса при перезапуске программы # Иначе он будет на некоторое время зарезервирован ядром server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(ADDR) server_socket.listen() print("Python: сервер слушает: ", ADDR) client_socket, client_address = server_socket.accept() print(f"Python: клиент подключился, его адрес: {client_address}") data = client_socket.recv(1024) print("Python: клиент отправил:", data.decode()) client_socket.send(b"Your data received from TCP socket") client_socket.close() counter = 0 try: while True: counter += 1 finally: server_socket.close()
Включаем сервер:
strace -e trace=network python3 server.py socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 3 setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0 bind(3, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, 16) = 0 listen(3, 128) = 0 Python: сервер слушает: ('127.0.0.1', 9999) accept4(3
Смотрим статус процесса:
ps aux | grep python3 omelche+ 1289282 0.3 0.0 16828 11692 pts/5 S+ 16:31 0:00 python3 server.py
Видим, что наш процесс находится в статусе S+, что соответствует sleeping. Теперь запускаем клиентский процесс:
python3 client.py
И проверяем статус нашего процесса, после чего убиваем его:
ps aux | grep python3 omelche+ 1289282 46.7 0.0 16828 11756 pts/5 R+ 16:31 0:33 python3 server.py kill 1289282
Как видим, процесс перешёл в статус R+, что соответствует статусу running. Процесс активно потребляет CPU или стоит в очереди планировщика.
Пишем WSGI-сервер
Начинаем писать TCP-сервер
Наш сервер будет состоять из двух компонентов: непосредственно сам сервер и обработчик. Сервер будет принимать входящие клиентские соединения, читать поток байт, который посылает клиент, и передавать его в обработчик. Обработчик будет реализовывать некоторую логику обработки и отдавать выходной поток байт обратно серверу, после чего сервер будет отправлять выходной поток клиенту. После обработки клиента сервер будет переходить к следующему клиенту.

Вот каким будет интерфейс:
from collections.abc import Iterator from typing import Protocol class TCPHandlerI(Protocol): def handle(self, data: Iterator[bytes]) -> Iterator[bytes]: raise NotImplementedError() class TCPServerI(Protocol): def __init__( self, host: str, port: int, handler: TCPHandlerI, ): raise NotImplementedError() def serve_forever(self): """Запуск сервера.""" raise NotImplementedError()
Реализуем TCP-сервер, а также напишем простой хендлер с эхо-логикой.
import socket import traceback from collections.abc import Iterator from .interface import TCPHandlerI, TCPServerI class TCPServer(TCPServerI): def __init__( self, host: str, port: int, handler: TCPHandlerI, ): self.address = (host, port) self.server_socket = socket.socket( family=socket.AF_INET, type=socket.SOCK_STREAM, ) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind(self.address) self.handler = handler def serve_forever(self): """ Основной цикл сервера: опрашиваем сокет, принимаем и обрабатываем клиентов. """ self.server_socket.listen() print( "Python: сервер запущен на прослушивание " f"{self.address[0]}:{self.address[1]}" ) with self.server_socket as server_socket: while True: client_socket, addr = server_socket.accept() self._handle_request(client_socket, addr) def _handle_request(self, client_socket: socket.socket, addr): """ Обработчик клиентского соединения. """ print(f"Python: обрабатываю запрос клиента {addr}") try: self._process_request(client_socket) except Exception: self.handle_error(client_socket, addr) finally: self.shutdown_request(client_socket) def _process_request(self, client_socket: socket.socket): """ Читает данные клиента и вызывает бизнес-логику из self.handler. """ client_data = self.read_client_data(client_socket) processed_data = self.handler.handle(client_data) self.send_to_client(client_socket, processed_data) def shutdown_request(self, client_socket: socket.socket): """Закрытие соединения с клиентом, отправка FIN.""" try: client_socket.shutdown(socket.SHUT_WR) print("Python: отправил (FIN) клиенту") except OSError: # Если клиент уже закрыл соединение pass client_socket.close() print("Python: закрыл соединение с клиентом") def read_client_data(self, client_socket: socket.socket) -> Iterator[bytes]: """Чтение данных клиента.""" while True: chunk = client_socket.recv(1024) if chunk: yield chunk else: print("Python: клиент прислал (FIN).") break def send_to_client( self, client_socket: socket.socket, data: Iterator[bytes] ): """Отправляет данные клиенту.""" print("Python: отправляю ответ клиенту") for chunk in data: try: client_socket.sendall(chunk) except OSError: print("Python: клиент закрыл соединение до получения данных") def handle_error(self, client_socket, addr): """Обработка ошибок во время выполнения запроса.""" print( f"Python: произошла ошибка во время обработки запроса клиента {addr}" ) traceback.print_exc() class TCPEchoHandler(TCPHandlerI): def handle(self, data: Iterator[bytes]) -> Iterator[bytes]: return data if __name__ == "__main__": server = TCPServer("0.0.0.0", 9999, TCPEchoHandler()) server.serve_forever()
В коде появляются новые для нас методы:
sendall()этот метод гарантирует отправку всех передаваемых данных в сокет. В отличие от методаsend(), который из-за переполнения буфера ядра может отправить лишь часть данных. Методsendall()мог быть реализован так:
def sendall(sock, data): total_sent = 0 length = len(data) while total_sent < length: sent = sock.send(data[total_sent:]) if sent == 0: raise RuntimeError("socket connection broken") total_sent += sent
shutdown()соответствует одноименному системному вызову, он позволяет частично закрыть сокет, не освобождая файловый дескриптор. Сокет можно частично закрыть в одном направлении. В качестве аргумента передаётся:SHUT_RD- закроет сокет для чтения.SHUT_WR- закроет сокет для записи и в случае TCP-сокета, отправит принимающей стороне сегмент с флагомFIN.
Теперь реализуем простого клиента:
import socket import sys SOCK_ADDRESS = ("127.0.0.1", 9999) client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: client_socket.connect(SOCK_ADDRESS) except Exception: sys.exit("Сервер недоступен") with client_socket: while True: user_input = input("Python: клиентский ввод - ") if user_input == "exit": break client_socket.send(user_input.encode()) server_response = client_socket.recv(1024) if not server_response: print("Python: получен (FIN) от сервера") break print(f"Python: ответ сервера - {server_response.decode()}")
Запускаем наш сервер:
python3 -m simple_tcp_server.server Python: сервер запущен на прослушивание 0.0.0.0:9999
Запускаем клиента:
python3 -m simple_tcp_server.client Python: клиентский ввод - hi Python: ответ сервера - hi Python: клиентский ввод - who are you? Python: ответ сервера - who are you? Python: клиентский ввод - im think you are echo server Python: ответ сервера - im think you are echo server Python: клиентский ввод - exit
Мы уже научились читать системные вызовы, поэтому настоятельно рекомендую изучить их самостоятельно.
Итак, всё работает, но какие есть недостатки у нашей реализации?
Первое и самое очевидное - это синхронная работа сервера. Пока не будет обслужен первый клиент, все остальные клиенты, которые хотят подключиться, будут ждать.
Время, которое клиент может занимать соединение, никак не ограничено. Клиент может просто подключиться, удерживать соединение и даже не отправлять никаких данных.
Отсутствие graceful shutdown, при прекращении работы сервера все активные соединения немедленно прервутся, из-за чего возможна потеря данных или неконсистентное состояние системы.
С первой проблемой мы будем разбираться уже во второй части статьи. А последними двумя прямо сейчас и займёмся.
Добавляем idle-timeout и graceful shutdown
Начнём с таймаута на бездействие клиента. Если клиент n-ое время не отправляет данные, мы хотим обрывать соединение. Но как же это сделать, если системный вызов recv() является блокирующим и возвращает результат только после того, как удалённая сторона отправила данные. Пришло время познакомиться с новым системным вызовом - встречайте вызов select(). У вызова есть сильные ограничения, но о них в другой раз. Вызов позволяет проверять сразу множество дескрипторов на готовность к чтению/записи и имеет таймаут, по истечении которого, если ни один дескриптор не готов, вызов возвращает пустое множество. Этим мы и воспользуемся, чтобы задать клиенту таймаут бездействия.
Модифицируем метод read_client_data() добавив туда проверку готовности сокета через select():
import select class TCPServer(TCPServerI): def read_client_data(self, client_socket: socket.socket) -> Iterator[bytes]: """Чтение данных клиента.""" while True: ready, _, _ = select.select( [client_socket], [], [], self.client_idle_timeout, ) if ready: chunk = client_socket.recv(1024) else: print("Python: timeout ожидания клиента") break if chunk: yield chunk else: print("Python: клиент прислал (FIN).") break
Включаем сервер и подключаемся клиентом, спустя 5 секунд бездействия клиент будет отключен и сервер сможет принять соединение от нового клиента:
strace -e trace=network,pselect6 python3 -m tcp_server_with_idle_timeout.server socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 9 setsockopt(9, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0 bind(9, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("0.0.0.0")}, 16) = 0 listen(9, 128) = 0 Python: сервер запущен на прослушивание 0.0.0.0:9999 accept4(9, {sa_family=AF_INET, sin_port=htons(44420), sin_addr=inet_addr("127.0.0.1")}, [16], SOCK_CLOEXEC) = 10 getsockname(10, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, [128 => 16]) = 0 Python: обрабатываю запрос клиента ('127.0.0.1', 44420) Python: отправляю ответ клиенту pselect6(11, [10], NULL, NULL, {tv_sec=5, tv_nsec=0}, NULL) = 0 (Timeout) Python: timeout ожидания клиента. shutdown(10, SHUT_WR) = 0 Python: отправил (FIN) клиенту Python: закрыл соединение с клиентом accept4(9
Библиотека Python делает вызов pselect() вместо select(). Он немного отличается способом задания таймаута и обработкой сигналов.
Теперь займёмся graceful shutdown. Вместо того чтобы сразу же обрывать все соединения при завершении работы сервера, необходимо корректно обработать сигналы завершения.
При получении сигнала завершения необходимо:
Прекратить принимать входящие соединения;
Дать время на корректное завершение всех существующих соединений.
Обработка сигналов в Python происходит следующим образом:
Сигнал приходит от ядра в процесс;
Ядро помечает процесс как имеющий pending signal (сигнал ожидает обработки);
Если Python находится в ожидании системного вызова, то системный вызов прерывается на время обработки сигнала. Если Python выполняет свой байт-код, выполнение байт-кода будет прервано, т.к. в цикле интерпретатора есть специальная инструкция, проверяющая наличие сигналов;
В главном потоке выполняется обработчик сигнала;
Выполнение байт-кода или системного вызова возобновляются.
Посмотрим, как это работает:
import signal import socket def signal_handler(signum, frame): print(f"Python: получен сигнал; {signum}") print(f"Python: frame: {frame}") signal.signal(signal.SIGTERM, signal_handler) ADDR = ("127.0.0.1", 9999) server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) server_socket.bind(ADDR) server_socket.listen() client_socket, _ = server_socket.accept() server_socket.close() client_socket.close()
В аргумент frame обработчика сигнала, приходит стек вызова на момент получения сигнала. Запускаем и пробуем убить процесс через kill:
strace -e trace=network python3 signals.py socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 9 bind(9, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, 16) = 0 listen(9, 128) = 0 accept4(9, 0x7ffe95e6fd30, [16], SOCK_CLOEXEC) = ? ERESTARTSYS (To be restarted if SA_RESTART is set) --- SIGTERM {si_signo=SIGTERM, si_code=SI_USER, si_pid=1275635, si_uid=1000} --- Python: получен сигнал; 15 Python: frame: <frame at 0x7f03fab4f540, file '/usr/lib/python3.14/socket.py', line 298, code accept> accept4(9
Как видим, выполнение системного вызова и правда приостанавливается на время обработки сигнала, после чего успешно возобновляется. Теперь убиваем процесс через kill -9.
Применим обработку сигналов на практике и напишем graceful shutdown для нашего сервера:
import select import signal import socket import time from collections.abc import Iterator from .interface import TCPHandlerI, TCPServerI class TCPServer(TCPServerI): def __init__( self, host: str, port: int, handler: TCPHandlerI, poll_interval: float = 0.5, client_idle_timeout: float = 5, shutdown_timeout: float = 10, ): self.address = (host, port) self.server_socket = socket.socket( family=socket.AF_INET, type=socket.SOCK_STREAM, ) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind(self.address) self.poll_interval = poll_interval self.client_idle_timeout = client_idle_timeout self.shutdown_timeout = shutdown_timeout self.handler = handler self.is_shutdown = False signal.signal(signal.SIGTERM, self._on_shutdown) signal.signal(signal.SIGINT, self._on_shutdown) def _on_shutdown(self, signum, _): """Сигнальный обработчик - выставляет флаг завершения.""" print("Python: получен сигнал завершения") print( "Python: после обработки всех активных запросов сервер будет остановлен" ) self.is_shutdown = True def serve_forever(self): """ Основной цикл сервера: опрашиваем сокет, принимаем и обрабатываем клиентов. """ self.server_socket.listen() print( "Python: сервер запущен на прослушивание " f"{self.address[0]}:{self.address[1]}" ) self.is_shutdown = False with self.server_socket as server_socket: while not self.is_shutdown: ready, _, _ = select.select( [server_socket], [], [], self.poll_interval, ) if ready: client_socket, addr = server_socket.accept() self._handle_request(client_socket, addr) def read_client_data(self, client_socket: socket.socket) -> Iterator[bytes]: """Чтение данных клиента.""" deadline = time.perf_counter() + self.client_idle_timeout shutdown_deadline_set = False while True: if self.is_shutdown and not shutdown_deadline_set: shutdown_deadline_set = True deadline = time.perf_counter() + self.shutdown_timeout if time.perf_counter() > deadline: print("Python: timeout ожидания клиента") break ready, _, _ = select.select( [client_socket], [], [], self.poll_interval, ) if ready: if not self.is_shutdown: deadline = time.perf_counter() + self.client_idle_timeout chunk = client_socket.recv(1024) if chunk: yield chunk else: print("Python: клиент прислал (FIN).") break
Я добавил следующие атрибуты сервера: is_shutdown - флаг начала завершения работы сервера, poll_interval - время опроса сокетов, shutdown_timeout - время, которое сервер даёт клиенту на завершение передачи данных после начала shutdown.
В serve_forever() теперь опрашивается сокет через select(), чтобы между вызовами делать проверку на начало shutdown. Если сервер начал завершать свою работу, serve_forever() завершается и новые соединения больше не принимаются. В методе read_client_data() добавилось вычисление deadline, по истечении которого, данные от клиента больше не принимаются. В остальных методах изменений нет.
Можете проверить, теперь после получения SIGTERM сервер будет работать с клиентом лишь 10 секунд, после чего остановит свою работу.
Реализуем файловый интерфейс над сокетом
Перед тем, как перейти к реализации обработчиков прикладных протоколов поверх нашего TCP-сервера, необходимо ещё кое-что сделать. Сейчас в наш обработчик передаётся поток байтов Iterator[bytes]. Количество байт, вычитываемое из сокета, сейчас захардкожено. Это допустимо, когда мы хотим считать все данные, которые посылает нам клиент. Но представим, что клиент отправил два HTTP-запроса подряд в одном TCP-соединении. С Iterator[bytes] мы вычитываем данные чанками по 1024 байта и не можем остановиться ровно на границе первого запроса - часть второго запроса попадёт в тот же чанк. Нам нужен интерфейс, который позволяет прочитать ровно N байт и сохранить остаток для следующего вызова.
В Python таким интерфейсом для потока байт является RawIOBase из модуля io стандартной библиотеки. Вот его основные методы:
class RawIOBase(IOBase): def read(self, size=-1) -> bytes: """Читает до size байт. При size < 0 читает до конца потока.""" raise NotImplementedError() def readinto(self, b: bytearray) -> int: """ Читает данные в переданный буфер b. Возвращает количество прочитанных байт. """ raise NotImplementedError() def readline(self, size=-1) -> bytes: """Читает одну строку (до символа \\n включительно).""" raise NotImplementedError() def readlines(self, hint=-1) -> list[bytes]: """Читает строки. hint задаёт примерный лимит прочитанных байт.""" raise NotImplementedError()
Конечно же, в Python уже есть реализация RawIOBase для сокета - её возвращает метод сокета makefile(). Но наша цель - реализовать веб-сервер с минимальным использованием библиотек, поэтому реализуем этот интерфейс для сокета самостоятельно.
Ключевой момент реализации - внутренний буфер. Системный вызов recv() может вернуть как меньше данных, чем запрошено (если столько ещё не пришло), так и больше, чем нужно вызывающему коду. Например, мы вызвали read(50), а recv(1024) вернул 200 байт - 150 байт нужно где-то сохранить до следующего вызова read(). Для этого SocketIO хранит внутренний bytearray-буфер: при каждом чтении сначала проверяется буфер, и только если данных не хватает - делается recv() из сокета.
Вот и сама реализация:
import io import logging import select import socket import time from threading import Event class SocketIO(io.RawIOBase): def __init__( self, socket: socket.socket, shutdown_event: Event, poll_interval: float = 0.5, idle_timeout: float = 5, shutdown_timeout: float = 10, recv_chunk_size: int = 1024, ): self.socket = socket self.poll_interval = poll_interval self.idle_timeout = idle_timeout self.shutdown_timeout = shutdown_timeout self.recv_chunk_size = recv_chunk_size # Внутренний буфер: recv() может вернуть больше данных, чем запросил read(size). # Остаток сохраняется здесь до следующего вызова read(). self.buffer = bytearray() self.deadline = time.perf_counter() + self.idle_timeout self.shutdown_event = shutdown_event self.shutdown_deadline_set = False self.is_socket_end = False def _recv(self, chunk_size: int) -> bytes: """ Низкоуровневое чтение из сокета с поддержкой таймаутов. Возвращает до chunk_size байт или b"" если соединение закрыто. """ if self.is_socket_end: return b"" while True: # При получении сигнала завершения переключаемся на shutdown_timeout if self.shutdown_event.is_set() and not self.shutdown_deadline_set: self.deadline = time.perf_counter() + self.shutdown_timeout self.shutdown_deadline_set = True # Ждём готовности сокета не дольше poll_interval, # чтобы периодически проверять таймауты и флаг завершения ready, _, _ = select.select([self.socket], [], [], self.poll_interval) if ready: # Клиент прислал данные - сбрасываем idle-таймаут if not self.shutdown_deadline_set: self.deadline = time.perf_counter() + self.idle_timeout chunk = self.socket.recv(chunk_size) if chunk: return chunk else: logging.debug("Python: клиент прислал (FIN).") self.is_socket_end = True return b"" if time.perf_counter() > self.deadline: logging.warning("Python: timeout ожидания клиента") raise TimeoutError("Клиент слишком долго отправлял данные") def read(self, size=-1) -> bytes: """ Возвращает ровно size байт из сокета (или меньше, если соединение закрыто). При size < 0 читает всё до конца соединения. """ if size < 0: # Режим "читаем всё": забираем буфер и дочитываем сокет до конца chunks = [bytes(self.buffer)] self.buffer = bytearray() while chunk := self._recv(self.recv_chunk_size): chunks.append(chunk) return b"".join(chunks) else: # Режим "читаем ровно size байт": дочитываем в буфер, # Пока не наберём нужное количество while len(self.buffer) < size: chunk = self._recv(self.recv_chunk_size) if not chunk: break self.buffer.extend(chunk) to_return = self.buffer[:size] self.buffer = self.buffer[size:] return bytes(to_return) def readinto(self, b: bytearray) -> int: """Записывает данные из сокета в байтовый буфер b.""" data = self.read(len(b)) n = len(data) b[:n] = data return n def readline(self, size=-1) -> bytes: """ Читает одну строку (до символа \\n включительно). Сначала ищем \\n в буфере, если не нашли - дочитываем из сокета чанками. """ while True: # Ищем конец строки в том, что уже есть в буфере newline_pos = self.buffer.find(b"\n") if newline_pos != -1: # Нашли \n - отдаём строку включая \n, остаток остаётся в буфере end = newline_pos + 1 if size >= 0: end = min(end, size) line = bytes(self.buffer[:end]) self.buffer = self.buffer[end:] return line # Если достигли лимита size - отдаём что есть if size >= 0 and len(self.buffer) >= size: line = bytes(self.buffer[:size]) self.buffer = self.buffer[size:] return line # \n не найден - дочитываем из сокета chunk = self._recv(self.recv_chunk_size) if not chunk: # Соединение закрыто - отдаём остаток буфера line = bytes(self.buffer) self.buffer = bytearray() return line self.buffer.extend(chunk) def readlines(self, hint=-1) -> list[bytes]: """ Возвращает строки, лимит количества прочитанных байт задаётся через hint. """ lines = [] total = 0 while True: line = self.readline() if not line: break lines.append(line) total += len(line) if 0 < hint <= total: break return lines def __iter__(self): return self def __next__(self): line = self.readline() if not line: raise StopIteration return line
В коде самого сервера остаётся возвращать вместо Iterator[bytes] нашу новую структуру SocketIO. С полным кодом сервера можете ознакомиться в моём репозитории. Я также убрал всё логирование через print. Теперь наш сервер готов и можно переходить к реализации серверного интерфейса WSGI в хендлере.
Реализуем WSGI
Важно отметить, что переходя к WSGI, мы говорим об интерфейсе между сервером и приложением в рамках конкретного прикладного протокола - HTTP/S. Сейчас в нашем сервере обрабатывается только TCP и сервер абстрагирован от конкретного прикладного протокола. А для парсинга прикладных протоколов и обработки запросов у нас есть интерфейс TCPHandlerI.
Зачем вообще нужен WSGI? Фреймворки, такие как flask и fastapi, не возятся сами с низкоуровневыми деталями HTTP, их задача обрабатывать HTTP-запросы и передавать ответ серверу, который отправит его клиентам. Чтобы сервера и приложения были совместимы, был разработан интерфейс WSGI, который каждая сторона (сервер и приложение) должны реализовать.
WSGI для Python3 описан в PEP3333. Приложение в нём должно предоставлять функцию:
HELLO_WORLD = b"Hello world!\n" def app(environ, start_response): """Simplest possible application object""" status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [HELLO_WORLD]
environ- словарь переменных окружения, таких как: метод запроса, заголовки запроса, тело запроса, информация о сервере и версии протоколов;start_response- обратный вызов, через который приложение записывает статус ответа и заголовки.Функция возвращает тело ответа в виде байтовых строк; Такую функцию предоставляют все
WSGI-совместимые фреймворки.
Схема взаимодействия WSGI-сервера и WSGI-приложения выглядит следующим образом:

WSGI-сервера и WSGI-приложения Клиент отправляет запрос серверу;
Сервер парсит HTTP;
Сервер передаёт запрос приложению через интерфейс
WSGI, т.е. через функциюapp(environ, start_response);Приложение реализует некоторую бизнес-логику;
Через
start_response()приложение передаёт статус и заголовки, а тело - через возвращаемое значение;На основании ответа от приложения сервер формирует ответ для клиента;
Сервер отправляет ответ клиенту.
Если мы хотим реализовать протокол со стороны сервера, нам нужно реализовать парсинг HTTP-запроса из входного потока байт, вызвать приложение и сформировать HTTP-ответ. Звучит несложно, делаем:
import io import sys from collections.abc import Iterator from dataclasses import dataclass from final_tcp_server.interface import TCPHandlerI from final_tcp_server.socket_io import SocketIO @dataclass class HTTPRequest: method: str path: str protocol: str headers: dict[str, str] body: io.BytesIO class WSGIHandler(TCPHandlerI): def __init__(self, app, host: str, port: int): self.app = app self.port = port self.host = host def handle(self, data: SocketIO) -> Iterator[bytes]: while True: try: http_request = self._parse_http_request(data) except ConnectionError: break except Exception: status, headers_list = ( "400 Bad Request", [("Content-Type", "text/plain")], ) body_iterator = [b"400 Bad Request"] response = self._generate_http_response( status, headers_list, body_iterator, connection_close=True, ) yield from response break environ = self._generate_environ(http_request) response_headers = [] def start_response(status, headers_list): response_headers.extend([status, headers_list]) # Вызываем WSGI-приложение try: body_iterator = self.app(environ, start_response) status, headers_list = response_headers except Exception: status, headers_list = ( "500 Internal Server Error", [("Content-Type", "text/plain")], ) body_iterator = [b"Internal Server Error"] response = self._generate_http_response( status, headers_list, body_iterator, ) yield from response def _parse_http_request(self, request: SocketIO) -> HTTPRequest: """Упрощенный парсинг http-запроса.""" # Парсим request line request_line = request.readline() if not request_line: raise ConnectionError("Клиент закрыл соединение") try: method, path, protocol = request_line.decode("ascii").strip().split() except ValueError: raise ValueError("Invalid request line") # Парсим headers headers: dict[str, str] = {} while True: line = request.readline() if not line or line in (b"\r\n", b"\n"): break try: name, value = line.decode("ascii").split(":", 1) except ValueError: raise ValueError(f"Invalid header line: {line!r}") headers[name.strip()] = value.strip() return HTTPRequest( method=method, path=path, protocol=protocol, headers=headers, # Оставшуюся часть запроса передаем как body body=request, ) def _generate_environ(self, http_request: HTTPRequest) -> dict: """Формируем environ для wsgi-приложения.""" path, _, query_string = http_request.path.partition("?") environ = { "REQUEST_METHOD": http_request.method, "SCRIPT_NAME": "", "PATH_INFO": path, "QUERY_STRING": query_string, "CONTENT_TYPE": http_request.headers.get("Content-Type", ""), "CONTENT_LENGTH": http_request.headers.get("Content-Length", ""), "SERVER_NAME": self.host, "SERVER_PORT": str(self.port), "SERVER_PROTOCOL": http_request.protocol, "wsgi.version": (1, 0), "wsgi.url_scheme": "http", "wsgi.input": http_request.body, "wsgi.errors": sys.stderr, "wsgi.multithread": False, "wsgi.multiprocess": False, "wsgi.run_once": False, } # Добавляем http-заголовки for key, value in http_request.headers.items(): environ[f"HTTP_{key.upper().replace('-', '_')}"] = value return environ def _generate_http_response( self, status: str, headers_list: list[tuple[str, str]], body_iterator: Iterator[bytes], connection_close: bool = False, ) -> Iterator[bytes]: """Формируем HTTP-ответ.""" # Формируем заголовки content_length_is_set = False response_lines = [f"HTTP/1.1 {status}"] for k, v in headers_list: response_lines.append(f"{k}: {v}") if k.lower() == "content-length": content_length_is_set = True if not content_length_is_set: body_iterator = list(body_iterator) content_length = sum(len(chunk) for chunk in body_iterator) response_lines.append(f"Content-Length: {content_length}") if connection_close: response_lines.append("Connection: close") # "\r\n\r\n" перед телом response_lines.append("") response_lines.append("") # yield заголовков yield ("\r\n".join(response_lines).encode("iso-8859-1")) # yield тела yield from body_iterator
160 строк на чистом Python и наш WSGI-обработчик готов. В нём у нас цикл для поддержки keep-alive соединений, когда в рамках одного TCP-соединения обрабатывается сразу несколько HTTP-запросов. Цикл заканчивается, когда происходит ошибка парсинга HTTP-запроса. Это может произойти в двух случаях, когда запрос действительно некорректный или когда клиент закрыл соединение и запрос данных из сокета возвращает пустую байтовую строку.
Обратите внимание, что тело запроса сервер не вычитывает из сокета, а передаёт приложению наш file-like интерфейс над сокетом. Ответственным за вычитку ровно Content-length байт из сокета является уже приложение.
Конечно, это не production-ready решение, как минимум не хватает:
Ограничений на размер входящих данных. Сейчас строку запроса и заголовки можно вычитывать бесконечно долго.
Поддержки SSL
Поддержки HTTP2
Поддержки chunked encoding
А вот код WSGI-сервера, который наследуется от TCPServer и принимает на вход WSGI-приложение:
from final_tcp_server.server import TCPServer from .handler import WSGIHandler class WSGIServer(TCPServer): def __init__( self, host, port, app, poll_interval=0.5, client_idle_timeout=5, shutdown_timeout=10, ): handler = WSGIHandler(app, host, port) super().__init__( host, port, handler, poll_interval, client_idle_timeout, shutdown_timeout, )
Но сервер вполне функционален и мы уже можем запустить на нём любое WSGI-приложение. Для примера возьмём flask.
import logging import time from flask import Flask, jsonify, request from .server import WSGIServer app = Flask("app") @app.route("/ping") def ping(): return "pong" @app.route("/sleep") def sleep(): time.sleep(0.5) return "pong" @app.route("/post", methods=["POST"]) def submit(): data = request.json name = data.get("name", "Unknown") return jsonify({"message": f"Hello, {name}!"}) @app.route("/page") def page(): return """ <!DOCTYPE html> <html> <head> <title>Simple Page</title> </head> <body> <h1>Hello, world!</h1> <p>This is a simple HTTP page from Flask.</p> </body> </html> """ if __name__ == "__main__": server = WSGIServer("0.0.0.0", 9999, app) logger = logging.getLogger() logger.setLevel(logging.INFO) server.serve_forever()
Тестируем наш сервер
Итоговый код находится в моём репозитории в модуле wsgi
Запустим наш сервер и проверим его в действии. Я использую менеджер зависимостей uv, и запускаю через него.
uv run -m wsgi.app INFO:root:Python: сервер запущен на прослушивание 0.0.0.0:9999 uv run -m wsgi.app INFO:root:Python: сервер запущен на прослушивание 0.0.0.0:9999
curl localhost:9999/ping pong curl -X POST http://127.0.0.1:9999/post \ -H "Content-Type: application/json" \ -d '{"name": "Максим"}' {"message":"Hello, Максим!"} curl localhost:9999/page <!DOCTYPE html> <html> <head> <title>Simple Page</title> </head> <body> <h1>Hello, world!</h1> <p>This is a simple HTTP page from Flask.</p> </body> </html>
Всё работает ожидаемо. Теперь проведём небольшое нагрузочное тестирование через утилиту wrk. Запускаем тестирование на 10 секунд в один поток и одно подключение:
wrk -t1 -c1 -d10s http://127.0.0.1:9999/ping Running 10s test @ http://127.0.0.1:9999/ping 1 threads and 1 connections Thread Stats Avg Stdev Max +/- Stdev Latency 125.36us 88.38us 5.91ms 99.31% Req/Sec 8.04k 250.56 8.79k 71.29% 80811 requests in 10.10s, 6.32MB read Requests/sec: 8001.52 Transfer/sec: 640.75KB
Получаем ~8k rps и 125 мкс latency. Неплохой результат для однопоточного сервера на Python. Важно учитывать, что это результат для одного keep-alive соединения — wrk переиспользует TCP-соединение между запросами, что исключает накладные расходы на handshake.
Но что если для обработки запроса необходимо ходить во внешние api в течение 500 мс. Такая ситуация имитируется на эндпоинте /sleep.
wrk -t1 -c1 -d10s http://127.0.0.1:9999/sleep Running 10s test @ http://127.0.0.1:9999/sleep 1 threads and 1 connections Thread Stats Avg Stdev Max +/- Stdev Latency 501.66ms 178.15us 501.90ms 84.21% Req/Sec 1.79 0.42 2.00 78.95% 19 requests in 10.02s, 1.52KB read Requests/sec: 1.90 Transfer/sec: 155.56B
Получаем ожидаемые 2 rps и latency в 500 мс. При этом если проследить за процессом через htop он почти не использует cpu и почти всё время находится в статусе sleeping. Это подводка к следующей части статьи, которую я когда-нибудь обязательно напишу. В ней мы разберём способы параллельной обработки запросов от нескольких клиентов и кратно увеличим rps сервера и, конечно же, не забудем про системные вызовы и базу Linux, на которой это всё реализуется.
Выводы
Мы прошли путь от системных вызовов socket() и accept() до работающего WSGI-сервера, на котором полноценно запускаются любые WSGI-приложение. По дороге мы:
Разобрались, как Linux предоставляет сетевой интерфейс через сокеты и файловые дескрипторы;
Посмотрели через
strace, что на самом деле делает Python, когда мы вызываемsocket.recv()илиsocket.send();Написали TCP-сервер с idle-timeout и graceful shutdown;
Реализовали
WSGI-обработчик, который парсит HTTP и вызывает произвольноеWSGI-приложение.
Весь код - это чистый Python и системные вызовы Linux. Никаких uvicorn, gunicorn или сторонних библиотек для работы с сетью. Конечно, наш сервер далёк от production: нет SSL, нет ограничений на размер запроса, нет HTTP/2. Но главная цель была другой - убрать «магию» и увидеть, что стоит за абстракциями, которыми мы пользуем��я каждый день.
Главное ограничение нашего сервера - он обрабатывает клиентов строго по одному. Пока один клиент ждёт ответ от базы или внешнего api, все остальные стоят в очереди. В следующей части мы это исправим: разберём потоки, процессы и мультиплексирование ввода-вывода, и посмотрим, какие системные вызовы за ними стоят.
Полный код в репозитории.
