Когда мы разрабатываем веб-сервисы на Python, мы почти всегда работаем с готовыми абстракциями: фреймворками (fastapi, flask, django) и веб-серверами (uvicorn, gunicorn). Фреймворк предоставляет удобную модель работы с HTTP, сервер принимает TCP-соединения, парсит HTTP и передаёт данные в приложения через интерфейсы вроде WSGI или ASGI.

Эти абстракции начинают восприниматься как нечто само собой разумеющееся. Но по мере накопления опыта к ним возникает всё больше вопросов:

  • Где проходит граница ответственности между веб-сервером и веб-приложением?

  • Что именно делают WSGI и ASGI?

  • В какой момент происходит TCP three-way handshake?

  • Как достигается graceful shutdown?

  • Как достигается параллельная обработка соединений?

  • Что делает ядро Linux, пока процесс «ждёт клиента»?

Обычно та��ие детали скрыты за слоями абстракций. Фреймворки скрывают детали работы HTTP, веб-серверы - детали работы с TCP, а стандартная библиотека Python - системные вызовы. В результате сетевое взаимодействие начинает ощущаться как «магия».

В этой статье мы разберём, какие механизмы предоставляет Linux для работы с сетью и как на их основе строятся фреймворки и библиотеки, которыми мы пользуемся каждый день. К концу статьи мы напишем минимальный WSGI-веб-сервер, с помощью которого можно запускать произвольные WSGI-приложения. Рассматривать будем исключительно Linux и его сетевой стек, так как именно на нём работает большинство production веб-приложений. Весь код из статьи находится в репозитории.

Содержание статьи:

  1. Сетевой стек Linux изнутри

    1. Разбираемся в базе

    2. Пробуем сокеты на практике

      1. UNIX-сокеты

      2. TCP-сокеты

    3. Сокеты как файловые дескрипторы

    4. Что происходит с процессом во время ожидания входящих подключений?

  2. Пишем WSGI-сервер

    1. Начинаем писать TCP-сервер

      1. Добавляем idle-timeout и graceful shutdown

      2. Реализуем файловый интерфейс над сокетом

    2. Реализуем WSGI

    3. Тестируем наш сервер

  3. Выводы

Статья получилась довольно объемной, поэтому если вы уже знакомы с сокетами и системными вызовами Linux, можете сразу перейти к написанию WSGI-сервера или другой интересующей вас части.

Сетевой стек Linux изнутри

Разбираемся в базе

В Linux действует принцип «всё есть файл». Сетевые соединения физически файлами не являются, но представлены файловыми дескрипторами. А объектами ядра, на которые ссылаются такие дескрипторы, являются сокеты.

Сокет - это объект ядра, представляющий абстракцию для межпроцессного взаимодействия, включая сетевое. Сокеты классифицируются по семейству адресов (Domain). Основные семейства:

  • AF_UNIX - для локального межпроцессного взаимодействия;

  • AF_INET - для IPv4;

  • AF_INET6 - для IPv6.

Также сокеты различаются по типу передачи данных (Type), основные типы:

  • SOCK_STREAM - двусторонний, надёжный поток байтов, используется, например, в TCP;

  • SOCK_DGRAM - ненадёжный способ передачи отдельных сообщений, ограниченного размера, используется, например, для протокола UDP.

Каждый сокет может быть привязан к адресу. Формат адреса зависит от семейства: для AF_UNIX это путь к файлу в файловой системе, для AF_INET — пара (IP-адрес, порт). Кроме того, сокет определяется конкретным протоколом взаимодействия (Protocol): TCP, UDP, ICMP, RAW (для сырых IP-пакетов) и т.д.

Хотя сокеты предоставляют интерфейс для обмена данными, программы в пользовательском пространстве не могут работать с ними напрямую. Все операции с сокетами выполняются через системные вызовы.

Системный вызов - это интерфейс между пользовательскими программами и ядром. Через системные вызовы программы запрашивают ресурсы и действия ядра. Обычно они не вызываются напрямую: почти все системные вызовы имеют обёрточные функции в библиотеке C, которые выполняют подготовку к работе в режиме ядра. Благодаря этому работа с системными вызовами выглядит как вызов обычной функции. Именно через библиотеку C Python-код общается с ядром.

С сокетами можно взаимодействовать через системные вызовы, стандартные для любых файловых дескрипторов, такие как read(), close(). А также есть системные вызовы, специфичные для файловых дескрипторов сокетов. Рассмотрим, как представлены основные из них в библиотеке C.

Системный вызов

Описание

int socket(int domain, int type, int protocol)

Создаёт сокет и возвращает номер файлового дескриптора, который на него ссылается. Если передать protocol = 0, будет выбран протокол по умолчанию для выбранного семейства адресов и типа передачи. Например, для AF_INET и SOCK_STREAM автоматически выбирается TCP.

int bind(int sockfd, const struct sockaddr *addr, int addrlen)

Привязывает сокет к адресу. Принимает файловый дескриптор сокета, адрес и размер (в байтах) структуры, на которую указывает addr.

int listen(int sockfd, int backlog)

Переводит сокет в режим ожидания входящих соединений. backlog задаёт максимальный размер очереди установленных, но ещё не принятых вызовом accept() соединений (accept queue).

int accept(int sockfd, struct sockaddr addr, int addrlen)

Используется с сокетами, ориентированными на установление соединения (для которых вызван listen()). Извлекает первый запрос на соединение из очереди ожидающих соединений, создаёт новый сокет для него и возвращает его файловый дескриптор. В addr и addrlen записываются адрес подключившегося клиента и размер этого адреса - это выходные параметры вызова.

int connect(int sockfd, const struct sockaddr *addr, int addrlen)

Создаёт соединение между сокетом с файловым дескриптором sockfd и сокетом по адресу addr. Сокет, с которым устанавливается соединение, должен находиться в режиме ожидания соединений, т. е. для него должен быть выполнен вызов listen().

int recv(int sockfd, void buf[], int len, int flags)

Получает данные из сокета. В качестве аргументов передаются дескриптор сокета, буфер для записи, его размер и флаги. Возвращает число реально полученных байт.

int send(int sockfd, const void buf[], int len, int flags)

Отправляет данные в сокет. Сокет должен быть связан с удалённой стороной - через connect() на стороне клиента или через accept() на стороне сервера.

Используя эти вызовы, можно выстроить следующую схему межпроцессного взаимодействия:

  1. Серверный процесс создаёт сокет;

  2. Привязывает к нему адрес;

  3. Переводит сокет в режим прослушивания;

  4. Клиентский процесс создаёт сокет;

  5. Подключается к серверному сокету, вставая в очередь ожидания;

  6. Серверный сокет принимает соединение, создавая новый сокет для общения с клиентом;

  7. Клиентский и серверный процессы обмениваются данными, пока кто-то из них не закроет соединение;

  8. Один из участников закрывает соединение;

  9. Второй из участников закрывает соединение;

  10. Серверный сокет остаётся открытым для принятия запросов от других клиентских процессов.

Схема межпроцессного взаимодействия через сокеты
Схема межпроцессного взаимодействия через сокеты

Пробуем сокеты на практике

UNIX-сокеты

В Python использовать api сокетов можно через встроенную библиотеку socket. Почти все методы класса сокета из этой библиотеки однозначно соответствуют вызовам библиотеки C, которые мы рассмотрели.

Попробуем воспроизвести описанную нами схему межпроцессного взаимодействия, используя эту библиотеку. Напишем простую серверную программу, которая будет ждать подключения от клиентского процесса, затем прочитает 1024 байт, отправит прочитанные данные в stdout и обратно в клиентский сокет, после чего закроет серверный и клиентский сокеты.

Использовать будем сокет семейства AF_UNIX для локального взаимодействия процессов, адресом в таких сокетах является путь к файлу сокета. Важно отметить, что AF_UNIX не использует стек TCP/IP. Обмен данными происходит внутри ядра через механизм локальных сокетов, без формирования IP-пакетов и прохождения через сетевой стек.

Серверное приложение:

import os
import socket

ADDR = "/tmp/demo.sock"

# Если файл остался от прошлого запуска
if os.path.exists(ADDR):
    os.remove(ADDR)

# Создаем сокет
server_socket = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)

server_socket.bind(ADDR)
server_socket.listen()

print("Python: сервер слушает: ", ADDR)

client_socket, client_address = server_socket.accept()

print(f"Python: клиент подключился, его адрес: {client_address}")


data = client_socket.recv(1024)
print("Python: клиент отправил:", data.decode())
client_socket.send(b"Your data processed")

client_socket.close()
server_socket.close()

Клиентское:

import socket

ADDR = "/tmp/demo.sock"

client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client_socket.connect(ADDR)

data = "Hello from UNIX socket client"
client_socket.send(data.encode())
reply = client_socket.recv(1024)
print("Python: ответ сервера:", reply.decode())

client_socket.close()

Запустим серверное приложение и посмотрим на системные вызовы, которые делает процесс, с помощью утилиты strace. С помощью параметра -e trace=network оставим только системные вызовы, связанные с сетью.

strace -e trace=network python3 server.py
socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0) = 3
bind(3, {sa_family=AF_UNIX, sun_path="/tmp/demo.sock"}, 17) = 0
listen(3, 128)                            = 0
Python: сервер слушает:  /tmp/demo.sock
accept4(3

Примечание: флаг SOCK_CLOEXEC в вызове socket() отвечает за автоматическое закрытие сокета, если процесс вызовет execve() (замену другим исполняемым файлом). Это предотвращает утечку дескрипторов. Библиотека Python этот флаг устанавливает автоматически.

А также посмотрим список файловых дескрипторов нашего процесса:

ps aux | grep python3
omelche+  428648  0.0  0.0  16820 11552 pts/4    S+   09:57   0:00 python3 server.py
ls -l /proc/428648/fd total 0
lrwx------ 1 omelchenko_ma omelchenko_ma 64 Feb 14 09:57 0 -> /dev/pts/4
lrwx------ 1 omelchenko_ma omelchenko_ma 64 Feb 14 09:57 1 -> /dev/pts/4
lrwx------ 1 omelchenko_ma omelchenko_ma 64 Feb 14 09:57 2 -> /dev/pts/4
lrwx------ 1 omelchenko_ma omelchenko_ma 64 Feb 14 09:57 3 -> 'socket:[814233]'

Важно понимать, что сокет в пользовательском пространстве - это не отдельная сущность, а объект ядра, на который указывает файловый дескриптор. Все системные вызовы (bind(), listen(), accept(), recv(), send()) фактически работают с числовым идентификатором файлового дескриптора, а не напрямую с сокетом как самостоятельным объектом. Именно поэтому сокеты подчиняются тем же правилам, что и обычные файлы: их можно закрывать через close(), наследовать при fork(), передавать между процессами и т.д.

Как видим, вызов socket() создал новый сокет, на который ссылается файловый дескриптор под номером 3. Номер 3 выбран, потому что под номерами 0, 1 и 2 находятся стандартные потоки stdin, stdout, stderr, которые создаются для каждого процесса автоматически. Все дальнейшие системные вызовы работают с сокетом через номер созданного файлового дескриптора.

Теперь, не выключая сервер, запустим клиентское приложение.

strace -e trace=network python3 client.py socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0) = 3
connect(3, {sa_family=AF_UNIX, sun_path="/tmp/demo.sock"}, 17) = 0
sendto(3, "Hello from UNIX socket client", 29, 0, NULL, 0) = 29
recvfrom(3, "Your data processed", 1024, 0, NULL, NULL) = 19
Python: ответ сервера: Your data processed
+++ exited with 0 +++

Появилось продолжение вывода серверной программы:

strace -e trace=network python3 server.py
socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0) = 3
bind(3, {sa_family=AF_UNIX, sun_path="/tmp/demo.sock"}, 17) = 0
listen(3, 128)                            = 0
Python: сервер слушает:  /tmp/demo.sock
accept4(3, {sa_family=AF_UNIX}, [110 => 2], SOCK_CLOEXEC) = 4
getsockname(4, {sa_family=AF_UNIX, sun_path="/tmp/demo.sock"}, [128 => 17]) = 0
Python: клиент подключился, его адрес: recvfrom(4, "Hello from UNIX socket client", 1024, 0, NULL, NULL) = 29
Python: клиент отправил: Hello from UNIX socket client
sendto(4, "Your data processed", 19, 0, NULL, 0) = 19
+++ exited with 0 +++

Можно заметить, что некоторые вызовы, которые делает библиотека Python, немного отличаются от тех, что мы разобрали. В серверной программе вместо вызова accept() используется вызов accept4(), он позволяет сразу же задать дополнительные параметры (такие как SOCK_CLOEXEC) для создаваемых клиентских сокетов. Также, вместо вызовов send() и recv() используются sendto() и recvfrom(), это просто более общие вызовы, которые могут использоваться для несвязанных сокетов. Последние два аргумента - это адрес и размер адреса удалённого сокета. Если сокеты уже связаны, передаётся NULL, как в нашем случае.

После вызова accept4() идёт вызов getsockname(), его делает библиотека Python для получения адреса клиентского сокета, который вернул вызов accept4().

В остальном наблюдаемые нами системные вызовы соответствуют схеме межпроцессного взаимодействия, которую мы рассмотрели выше и которую воспроизводили в коде. Но, кажется, одного вызова не хватает, где же закрытие сокета вызовом close()? Всё правильно, системный вызов close() является общим для всех файловых дескрипторов и не классифицируется strace как сетевой. Чтобы его увидеть, можете выполнить strace с флагом -e trace=network,close.

TCP-сокеты

Перейдём наконец к сетевому межпроцессному взаимодействию. Единственное, что для этого необходимо изменить - это семейство и адрес сокета.

Сервер:

import socket

ADDR = ("127.0.0.1", 9999)


# Создаем сокет
server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)

# Нужно для повторного использования адреса при перезапуске программы
# Иначе он будет на некоторое время зарезервирован ядром
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

server_socket.bind(ADDR)
server_socket.listen()

print("Python: сервер слушает: ", ADDR)

client_socket, client_address = server_socket.accept()

print(f"Python: клиент подключился, его адрес: {client_address}")


data = client_socket.recv(1024)
print("Python: клиент отправил:", data.decode())
client_socket.sendall(b"Your data received from TCP socket")

client_socket.close()
server_socket.close()

Клиент:

import socket

ADDR = ("127.0.0.1", 9999)

client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(ADDR)

data = "Hello from TCP socket client"
client_socket.send(data.encode())
reply = client_socket.recv(1024)
print("Python: ответ сервера:", reply.decode())

client_socket.close()

Запустим наш сервер:

strace -e trace=network python3 server.py
socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 3
setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(3, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
listen(3, 128)                          = 0
Python: сервер слушает:  ('127.0.0.1', 9999)
accept4(3

На уровне системных вызовов тоже поменялись только семейство, адрес и протокол взаимодействия. Тут появляется незнакомый нам ещё системный вызов setsockopt(). Он позволяет менять параметры сокета после его создания. Мы применяем его, чтобы установить параметр SO_REUSEADDR в 1. Он позволяет повторно привязаться к адресу и порту, которые могут находиться в состоянии TIME_WAIT после завершения предыдущего соединения.

Проверим, что наш сокет открыт с помощью утилиты netstat или любой другой, показывающей сетевые соединения:

netstat -tlnp
Proto  Local Address      Foreign Address    State    PID/Program
tcp    127.0.0.1:9999     0.0.0.0:*          LISTEN   566502/python3

Действительно, наше приложение открыло TCP-сокет и готово принимать входящие соединения.

Перед тем, как запускать клиентскую программу, запустим перехватчик сетевого трафика. Можно использовать утилиту tcpdump или любую другую. Я буду использовать wireshark. Выбираем loopback интерфейс, так как клиент и сервер будут находиться на одном хосте, и трафик будет идти через loopback. И ставим фильтр по порту - у нас это 9999. Теперь запускаем клиентскую программу.

strace -e trace=network python3 client.py
socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 3
connect(3, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(3, "Hello from TCP socket client", 28, 0, NULL, 0) = 28
recvfrom(3, "Your data received from TCP sock"..., 1024, 0, NULL, NULL) = 34
Python: ответ сервера: Your data received from TCP socket
+++ exited with 0 +++

И видим продолжение вывода серверной программы:

strace -e trace=network python3 server.py
socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 3
setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(3, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
listen(3, 128)                          = 0
Python: сервер слушает:  ('127.0.0.1', 9999)
accept4(3, {sa_family=AF_INET, sin_port=htons(51906), sin_addr=inet_addr("127.0.0.1")}, [16], SOCK_CLOEXEC) = 4
getsockname(4, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, [128 => 16]) = 0
Python: клиент подключился, его адрес: ('127.0.0.1', 51906)
recvfrom(4, "Hello from TCP socket client", 1024, 0, NULL, NULL) = 28
Python: клиент отправил: Hello from TCP socket client
sendto(4, "Your data received from TCP sock"..., 34, 0, NULL, 0) = 34
+++ exited with 0 +++

А вот вывод wireshark:

Сетевой трафик при межпроцессном взаимодействии через TCP-сокеты
Сетевой трафик при межпроцессном взаимодействии через TCP-сокеты

Первые три сегмента - это three-way handshake. Three-way handshake инициируется вызовом connect() на стороне клиента. Ядро сервера обрабатывает SYN/SYN-ACK/ACK, и помещает соединение в очередь accept queue. Вызов accept() извлекает установленное соединение из очереди. Затем клиент и сервер обмениваются данными. Получающая сторона при этом в соответствии с протоколом TCP отправляет подтверждение получения (сегмент с флагом ACK). После обмена данными по инициативе сервера происходит корректное завершение TCP-соединения - последние три сегмента.

Таким образом, реализация TCP полностью находится в сетевом стеке ядра. Пользовательская программа работает лишь с сокетом семейства AF_INET через системные вызовы, не взаимодействуя напрямую с сегментами, флагами и состояниями протокола TCP.

Python code
↓
libc
↓
syscall
↓
Kernel TCP/IP stack
↓
Network Interface Card

Обратите внимание: переход от локального взаимодействия к сетевому потребовал изменить только семейство адресов и формат адреса. Вся последовательность вызовов - socket(), bind(), listen(), accept(), recv(), send() - осталась идентичной. В этом и состоит ценность абстракции сокетов: один и тот же интерфейс работает независимо от транспорта.

Сокеты как файловые дескрипторы

Хочу продемонстрировать, что к файловым дескрипторам сокетов можно применять обычные для файловых дескрипторов системные вызовы, такие как read() и write(). Для примера сделаем HTTP-запрос на example.com, используя именно эти системные вызовы.

import os
import socket

client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# DNS в этой статье не рассматривается, поэтому используем ip-адрес
# Актуальный адрес домена host example.com можно посмотреть командой
# host example.com
# Скорее всего это будет ip cloudflare
# Поэтому обязательно указываем доменное имя в заголовке запроса
client_socket.connect(("8.6.112.0", 80))

# Получаем номер файлового дескриптора
fd = client_socket.fileno()

data = b"GET / HTTP/1.0\r\nHost: example.com\r\n\r\n"

# Пишем в сокет, через общий для файловых дескрипторов системный вызов write
os.write(fd, data)

# Читаем из сокета через системный вызов read
response = os.read(fd, 4096)
print(response.decode())

client_socket.close()

Запустим:

strace -e trace=network,read,write,close python3 socket_as_fd.py
...
Системные вызовы, связанные с запуском интерпретатора python
...
socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 3
connect(3, {sa_family=AF_INET, sin_port=htons(80), sin_addr=inet_addr("8.6.112.0")}, 16) = 0
write(3, "GET / HTTP/1.0\r\nHost: example.co"..., 37) = 37
read(3, "HTTP/1.1 200 OK\r\nDate: Sat, 14 F"..., 4096) = 798
write(1, "HTTP/1.1 200 OK\r\nDate: Sat, 14 F"..., 798HTTP/1.1 200 OK
Date: Sat, 14 Feb 2026 10:25:19 GMT
Content-Type: text/html
Connection: close
CF-RAY: 9cdbdc5e9e66ec39-DME
Last-Modified: Thu, 12 Feb 2026 14:00:39 GMT
Allow: GET, HEAD
Accept-Ranges: bytes
Age: 11273
cf-cache-status: HIT
Server: cloudflare
<!doctype html><html lang="en"><head><title>Example Domain</title><meta name="viewport" content="width=device-width, initial-scale=1"><style>body{background:#eee;width:60vw;margin:15vh auto;font-family:system-ui,sans-serif}h1{font-size:1.5em}div{opacity:0.8}a:link,a:visited{color:#348}</style></head><body><div><h1>Example Domain</h1><p>This domain is for use in documentation examples without needing permission. Avoid use in operations.</p><p><a href="https://iana.org/domains/example">Learn more</a></p></div></body></html>
) = 798
write(1, "\n", 1)                       = 1
close(3)                                = 0
+++ exited with 0 +++

Как видим, всё работает точно так же, как с системными вызовами recv(), send(). Но для сетевых приложений предпочтительно использовать recv() и send(), так как они позволяют передавать дополнительные флаги и правильно обрабатывать особенности сокетов (например, частичные отправки или неблокирующий режим).

Что происходит с процессом во время ожидания входящих подключений?

После того как процесс сделал вызов accept(), но входящих подключений ещё нет, потребляет ли он процессорное время? Ответ - нет.

Системный вызов accept() проверяет очередь ожидающих соединений на сокете. Если очередь пуста, процесс переводится ядром в статус sleeping и не участвует в распределении процессорного времени.


Как только к сокету приходит новое соединение:

  1. ядро добавляет процесс обратно в run queue планировщика;

  2. планировщик видит, что проце��с готов, и выделяет ему CPU;

  3. accept() возвращает новый файловый дескриптор для соединения.

Чтобы увидеть это на практике, немного модифицируем пример с серверным TCP-сокетом. После ответа клиенту и закрытия клиентского соединения добавим бесконечный цикл с CPU-bound операциями. Клиентский сокет оставим без изменений.

import socket

ADDR = ("127.0.0.1", 9999)

# Создаем сокет
server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)

# Нужно для повторного использования адреса при перезапуске программы
# Иначе он будет на некоторое время зарезервирован ядром
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

server_socket.bind(ADDR)
server_socket.listen()

print("Python: сервер слушает: ", ADDR)

client_socket, client_address = server_socket.accept()

print(f"Python: клиент подключился, его адрес: {client_address}")


data = client_socket.recv(1024)
print("Python: клиент отправил:", data.decode())
client_socket.send(b"Your data received from TCP socket")
client_socket.close()

counter = 0
try:
    while True:
        counter += 1
finally:
    server_socket.close()

Включаем сервер:

strace -e trace=network python3 server.py
socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 3
setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(3, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
listen(3, 128)                          = 0
Python: сервер слушает:  ('127.0.0.1', 9999)
accept4(3

Смотрим статус процесса:

ps aux | grep python3
omelche+ 1289282  0.3  0.0  16828 11692 pts/5    S+   16:31   0:00 python3 server.py

Видим, что наш процесс находится в статусе S+, что соответствует sleeping. Теперь запускаем клиентский процесс:

python3 client.py

И проверяем статус нашего процесса, после чего убиваем его:

ps aux | grep python3
omelche+ 1289282 46.7  0.0  16828 11756 pts/5    R+   16:31   0:33 python3 server.py
kill 1289282

Как видим, процесс перешёл в статус R+, что соответствует статусу running. Процесс активно потребляет CPU или стоит в очереди планировщика.

Пишем WSGI-сервер

Начинаем писать TCP-сервер

Наш сервер будет состоять из двух компонентов: непосредственно сам сервер и обработчик. Сервер будет принимать входящие клиентские соединения, читать поток байт, который посылает клиент, и передавать его в обработчик. Обработчик будет реализовывать некоторую логику обработки и отдавать выходной поток байт обратно серверу, после чего сервер будет отправлять выходной поток клиенту. После обработки клиента сервер будет переходить к следующему клиенту.

Схема работы TCP-сервера
Схема работы TCP-сервера

Вот каким будет интерфейс:

from collections.abc import Iterator
from typing import Protocol


class TCPHandlerI(Protocol):
    def handle(self, data: Iterator[bytes]) -> Iterator[bytes]:
        raise NotImplementedError()


class TCPServerI(Protocol):
    def __init__(
        self,
        host: str,
        port: int,
        handler: TCPHandlerI,
    ):
        raise NotImplementedError()

    def serve_forever(self):
        """Запуск сервера."""
        raise NotImplementedError()

Реализуем TCP-сервер, а также напишем простой хендлер с эхо-логикой.

import socket
import traceback
from collections.abc import Iterator

from .interface import TCPHandlerI, TCPServerI


class TCPServer(TCPServerI):
    def __init__(
        self,
        host: str,
        port: int,
        handler: TCPHandlerI,
    ):
        self.address = (host, port)
        self.server_socket = socket.socket(
            family=socket.AF_INET,
            type=socket.SOCK_STREAM,
        )
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind(self.address)

        self.handler = handler

    def serve_forever(self):
        """
        Основной цикл сервера: опрашиваем сокет,
        принимаем и обрабатываем клиентов.
        """
        self.server_socket.listen()
        print(
            "Python: сервер запущен на прослушивание "
            f"{self.address[0]}:{self.address[1]}"
        )

        with self.server_socket as server_socket:
            while True:
                client_socket, addr = server_socket.accept()
                self._handle_request(client_socket, addr)

    def _handle_request(self, client_socket: socket.socket, addr):
        """
        Обработчик клиентского соединения.
        """
        print(f"Python: обрабатываю запрос клиента {addr}")
        try:
            self._process_request(client_socket)
        except Exception:
            self.handle_error(client_socket, addr)
        finally:
            self.shutdown_request(client_socket)

    def _process_request(self, client_socket: socket.socket):
        """
        Читает данные клиента и вызывает бизнес-логику из self.handler.
        """
        client_data = self.read_client_data(client_socket)
        processed_data = self.handler.handle(client_data)
        self.send_to_client(client_socket, processed_data)

    def shutdown_request(self, client_socket: socket.socket):
        """Закрытие соединения с клиентом, отправка FIN."""
        try:
            client_socket.shutdown(socket.SHUT_WR)
            print("Python: отправил (FIN) клиенту")
        except OSError:
            # Если клиент уже закрыл соединение
            pass
        client_socket.close()
        print("Python: закрыл соединение с клиентом")

    def read_client_data(self, client_socket: socket.socket) -> Iterator[bytes]:
        """Чтение данных клиента."""

        while True:
            chunk = client_socket.recv(1024)
            if chunk:
                yield chunk
            else:
                print("Python: клиент прислал (FIN).")
                break

    def send_to_client(
        self, client_socket: socket.socket, data: Iterator[bytes]
    ):
        """Отправляет данные клиенту."""
        print("Python: отправляю ответ клиенту")
        for chunk in data:
            try:
                client_socket.sendall(chunk)
            except OSError:
                print("Python: клиент закрыл соединение до получения данных")

    def handle_error(self, client_socket, addr):
        """Обработка ошибок во время выполнения запроса."""
        print(
            f"Python: произошла ошибка во время обработки запроса клиента {addr}"
        )
        traceback.print_exc()


class TCPEchoHandler(TCPHandlerI):
    def handle(self, data: Iterator[bytes]) -> Iterator[bytes]:
        return data


if __name__ == "__main__":
    server = TCPServer("0.0.0.0", 9999, TCPEchoHandler())
    server.serve_forever()

В коде появляются новые для нас методы:

  • sendall() этот метод гарантирует отправку всех передаваемых данных в сокет. В отличие от метода send(), который из-за переполнения буфера ядра может отправить лишь часть данных. Метод sendall() мог быть реализован так:

def sendall(sock, data):
    total_sent = 0
    length = len(data)

    while total_sent < length:
        sent = sock.send(data[total_sent:])
        if sent == 0:
            raise RuntimeError("socket connection broken")
        total_sent += sent
  • shutdown() соответствует одноименному системному вызову, он позволяет частично закрыть сокет, не освобождая файловый дескриптор. Сокет можно частично закрыть в одном направлении. В качестве аргумента передаётся:

    • SHUT_RD - закроет сокет для чтения.

    • SHUT_WR - закроет сокет для записи и в случае TCP-сокета, отправит принимающей стороне сегмент с флагом FIN.

Теперь реализуем простого клиента:

import socket
import sys

SOCK_ADDRESS = ("127.0.0.1", 9999)
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

try:
    client_socket.connect(SOCK_ADDRESS)
except Exception:
    sys.exit("Сервер недоступен")


with client_socket:
    while True:
        user_input = input("Python: клиентский ввод - ")
        if user_input == "exit":
            break

        client_socket.send(user_input.encode())

        server_response = client_socket.recv(1024)
        if not server_response:
            print("Python: получен (FIN) от сервера")
            break

        print(f"Python: ответ сервера - {server_response.decode()}")

Запускаем наш сервер:

python3 -m simple_tcp_server.server
Python: сервер запущен на прослушивание 0.0.0.0:9999

Запускаем клиента:

python3 -m simple_tcp_server.client
Python: клиентский ввод - hi
Python: ответ сервера - hi
Python: клиентский ввод - who are you?
Python: ответ сервера - who are you?
Python: клиентский ввод - im think you are echo server
Python: ответ сервера - im think you are echo server
Python: клиентский ввод - exit

Мы уже научились читать системные вызовы, поэтому настоятельно рекомендую изучить их самостоятельно.

Итак, всё работает, но какие есть недостатки у нашей реализации?

  1. Первое и самое очевидное - это синхронная работа сервера. Пока не будет обслужен первый клиент, все остальные клиенты, которые хотят подключиться, будут ждать.

  2. Время, которое клиент может занимать соединение, никак не ограничено. Клиент может просто подключиться, удерживать соединение и даже не отправлять никаких данных.

  3. Отсутствие graceful shutdown, при прекращении работы сервера все активные соединения немедленно прервутся, из-за чего возможна потеря данных или неконсистентное состояние системы.

С первой проблемой мы будем разбираться уже во второй части статьи. А последними двумя прямо сейчас и займёмся.

Добавляем idle-timeout и graceful shutdown

Начнём с таймаута на бездействие клиента. Если клиент n-ое время не отправляет данные, мы хотим обрывать соединение. Но как же это сделать, если системный вызов recv() является блокирующим и возвращает результат только после того, как удалённая сторона отправила данные. Пришло время познакомиться с новым системным вызовом - встречайте вызов select(). У вызова есть сильные ограничения, но о них в другой раз. Вызов позволяет проверять сразу множество дескрипторов на готовность к чтению/записи и имеет таймаут, по истечении которого, если ни один дескриптор не готов, вызов возвращает пустое множество. Этим мы и воспользуемся, чтобы задать клиенту таймаут бездействия.

Модифицируем метод read_client_data() добавив туда проверку готовности сокета через select():

import select

class TCPServer(TCPServerI):
	def read_client_data(self, client_socket: socket.socket) -> Iterator[bytes]:
		"""Чтение данных клиента."""
	
		while True:
			ready, _, _ = select.select(
				[client_socket],
				[],
				[],
				self.client_idle_timeout,
			)
			if ready:
				chunk = client_socket.recv(1024)
			else:
				print("Python: timeout ожидания клиента")
				break
	
			if chunk:
				yield chunk
			else:
				print("Python: клиент прислал (FIN).")
				break

Включаем сервер и подключаемся клиентом, спустя 5 секунд бездействия клиент будет отключен и сервер сможет принять соединение от нового клиента:

strace -e trace=network,pselect6 python3 -m tcp_server_with_idle_timeout.server
socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 9
setsockopt(9, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(9, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(9, 128)                          = 0
Python: сервер запущен на прослушивание 0.0.0.0:9999
accept4(9, {sa_family=AF_INET, sin_port=htons(44420), sin_addr=inet_addr("127.0.0.1")}, [16], SOCK_CLOEXEC) = 10
getsockname(10, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, [128 => 16]) = 0
Python: обрабатываю запрос клиента ('127.0.0.1', 44420)
Python: отправляю ответ клиенту
pselect6(11, [10], NULL, NULL, {tv_sec=5, tv_nsec=0}, NULL) = 0 (Timeout)
Python: timeout ожидания клиента.
shutdown(10, SHUT_WR)                   = 0
Python: отправил (FIN) клиенту
Python: закрыл соединение с клиентом
accept4(9

Библиотека Python делает вызов pselect() вместо select(). Он немного отличается способом задания таймаута и обработкой сигналов.

Теперь займёмся graceful shutdown. Вместо того чтобы сразу же обрывать все соединения при завершении работы сервера, необходимо корректно обработать сигналы завершения.

При получении сигнала завершения необходимо:

  • Прекратить принимать входящие соединения;

  • Дать время на корректное завершение всех существующих соединений.

Обработка сигналов в Python происходит следующим образом:

  1. Сигнал приходит от ядра в процесс;

  2. Ядро помечает процесс как имеющий pending signal (сигнал ожидает обработки);

  3. Если Python находится в ожидании системного вызова, то системный вызов прерывается на время обработки сигнала. Если Python выполняет свой байт-код, выполнение байт-кода будет прервано, т.к. в цикле интерпретатора есть специальная инструкция, проверяющая наличие сигналов;

  4. В главном потоке выполняется обработчик сигнала;

  5. Выполнение байт-кода или системного вызова возобновляются.

Посмотрим, как это работает:

import signal
import socket

def signal_handler(signum, frame):
    print(f"Python: получен сигнал; {signum}")
    print(f"Python: frame: {frame}")

signal.signal(signal.SIGTERM, signal_handler)

ADDR = ("127.0.0.1", 9999)
server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
server_socket.bind(ADDR)
server_socket.listen()

client_socket, _ = server_socket.accept()

server_socket.close()
client_socket.close()

В аргумент frame обработчика сигнала, приходит стек вызова на момент получения сигнала. Запускаем и пробуем убить процесс через kill:

strace -e trace=network python3 signals.py
socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 9
bind(9, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
listen(9, 128)                          = 0
accept4(9, 0x7ffe95e6fd30, [16], SOCK_CLOEXEC) = ? ERESTARTSYS (To be restarted if SA_RESTART is set)
--- SIGTERM {si_signo=SIGTERM, si_code=SI_USER, si_pid=1275635, si_uid=1000} ---
Python: получен сигнал; 15
Python: frame: <frame at 0x7f03fab4f540, file '/usr/lib/python3.14/socket.py', line 298, code accept>
accept4(9

Как видим, выполнение системного вызова и правда приостанавливается на время обработки сигнала, после чего успешно возобновляется. Теперь убиваем процесс через kill -9.

Применим обработку сигналов на практике и напишем graceful shutdown для нашего сервера:

import select
import signal
import socket
import time
from collections.abc import Iterator

from .interface import TCPHandlerI, TCPServerI


class TCPServer(TCPServerI):
    def __init__(
        self,
        host: str,
        port: int,
        handler: TCPHandlerI,
        poll_interval: float = 0.5,
        client_idle_timeout: float = 5,
        shutdown_timeout: float = 10,
    ):
        self.address = (host, port)
        self.server_socket = socket.socket(
            family=socket.AF_INET,
            type=socket.SOCK_STREAM,
        )
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind(self.address)

        self.poll_interval = poll_interval
        self.client_idle_timeout = client_idle_timeout
        self.shutdown_timeout = shutdown_timeout
        self.handler = handler

        self.is_shutdown = False
        signal.signal(signal.SIGTERM, self._on_shutdown)
        signal.signal(signal.SIGINT, self._on_shutdown)

    def _on_shutdown(self, signum, _):
        """Сигнальный обработчик - выставляет флаг завершения."""
        print("Python: получен сигнал завершения")
        print(
            "Python: после обработки всех активных запросов сервер будет остановлен"
        )
        self.is_shutdown = True

    def serve_forever(self):
        """
        Основной цикл сервера: опрашиваем сокет,
        принимаем и обрабатываем клиентов.
        """
        self.server_socket.listen()
        print(
            "Python: сервер запущен на прослушивание "
            f"{self.address[0]}:{self.address[1]}"
        )
        self.is_shutdown = False

        with self.server_socket as server_socket:
            while not self.is_shutdown:
                ready, _, _ = select.select(
                    [server_socket],
                    [],
                    [],
                    self.poll_interval,
                )
                if ready:
                    client_socket, addr = server_socket.accept()
                    self._handle_request(client_socket, addr)

    def read_client_data(self, client_socket: socket.socket) -> Iterator[bytes]:
        """Чтение данных клиента."""
        deadline = time.perf_counter() + self.client_idle_timeout
        shutdown_deadline_set = False

        while True:
            if self.is_shutdown and not shutdown_deadline_set:
                shutdown_deadline_set = True
                deadline = time.perf_counter() + self.shutdown_timeout

            if time.perf_counter() > deadline:
                print("Python: timeout ожидания клиента")
                break

            ready, _, _ = select.select(
                [client_socket],
                [],
                [],
                self.poll_interval,
            )
            if ready:
                if not self.is_shutdown:
                    deadline = time.perf_counter() + self.client_idle_timeout

                chunk = client_socket.recv(1024)
                if chunk:
                    yield chunk
                else:
                    print("Python: клиент прислал (FIN).")
                    break

Я добавил следующие атрибуты сервера: is_shutdown - флаг начала завершения работы сервера, poll_interval - время опроса сокетов, shutdown_timeout - время, которое сервер даёт клиенту на завершение передачи данных после начала shutdown.
В serve_forever() теперь опрашивается сокет через select(), чтобы между вызовами делать проверку на начало shutdown. Если сервер начал завершать свою работу, serve_forever() завершается и новые соединения больше не принимаются. В методе read_client_data() добавилось вычисление deadline, по истечении которого, данные от клиента больше не принимаются. В остальных методах изменений нет.

Можете проверить, теперь после получения SIGTERM сервер будет работать с клиентом лишь 10 секунд, после чего остановит свою работу.

Реализуем файловый интерфейс над сокетом

Перед тем, как перейти к реализации обработчиков прикладных протоколов поверх нашего TCP-сервера, необходимо ещё кое-что сделать. Сейчас в наш обработчик передаётся поток байтов Iterator[bytes]. Количество байт, вычитываемое из сокета, сейчас захардкожено. Это допустимо, когда мы хотим считать все данные, которые посылает нам клиент. Но представим, что клиент отправил два HTTP-запроса подряд в одном TCP-соединении. С Iterator[bytes] мы вычитываем данные чанками по 1024 байта и не можем остановиться ровно на границе первого запроса - часть второго запроса попадёт в тот же чанк. Нам нужен интерфейс, который позволяет прочитать ровно N байт и сохранить остаток для следующего вызова.

В Python таким интерфейсом для потока байт является RawIOBase из модуля io стандартной библиотеки. Вот его основные методы:

class RawIOBase(IOBase):
    def read(self, size=-1) -> bytes:
        """Читает до size байт. При size < 0 читает до конца потока."""
        raise NotImplementedError()

    def readinto(self, b: bytearray) -> int:
        """
        Читает данные в переданный буфер b.
        Возвращает количество прочитанных байт.
        """
        raise NotImplementedError()

    def readline(self, size=-1) -> bytes:
        """Читает одну строку (до символа \\n включительно)."""
        raise NotImplementedError()

    def readlines(self, hint=-1) -> list[bytes]:
        """Читает строки. hint задаёт примерный лимит прочитанных байт."""
        raise NotImplementedError()

Конечно же, в Python уже есть реализация RawIOBase для сокета - её возвращает метод сокета makefile(). Но наша цель - реализовать веб-сервер с минимальным использованием библиотек, поэтому реализуем этот интерфейс для сокета самостоятельно.

Ключевой момент реализации - внутренний буфер. Системный вызов recv() может вернуть как меньше данных, чем запрошено (если столько ещё не пришло), так и больше, чем нужно вызывающему коду. Например, мы вызвали read(50), а recv(1024) вернул 200 байт - 150 байт нужно где-то сохранить до следующего вызова read(). Для этого SocketIO хранит внутренний bytearray-буфер: при каждом чтении сначала проверяется буфер, и только если данных не хватает - делается recv() из сокета.
Вот и сама реализация:

import io
import logging
import select
import socket
import time
from threading import Event


class SocketIO(io.RawIOBase):
    def __init__(
        self,
        socket: socket.socket,
        shutdown_event: Event,
        poll_interval: float = 0.5,
        idle_timeout: float = 5,
        shutdown_timeout: float = 10,
        recv_chunk_size: int = 1024,
    ):
        self.socket = socket
        self.poll_interval = poll_interval
        self.idle_timeout = idle_timeout
        self.shutdown_timeout = shutdown_timeout
        self.recv_chunk_size = recv_chunk_size

        # Внутренний буфер: recv() может вернуть больше данных, чем запросил read(size).
        # Остаток сохраняется здесь до следующего вызова read().
        self.buffer = bytearray()

        self.deadline = time.perf_counter() + self.idle_timeout
        self.shutdown_event = shutdown_event
        self.shutdown_deadline_set = False

        self.is_socket_end = False

    def _recv(self, chunk_size: int) -> bytes:
        """
        Низкоуровневое чтение из сокета с поддержкой таймаутов.
        Возвращает до chunk_size байт или b"" если соединение закрыто.
        """
        if self.is_socket_end:
            return b""

        while True:
            # При получении сигнала завершения переключаемся на shutdown_timeout
            if self.shutdown_event.is_set() and not self.shutdown_deadline_set:
                self.deadline = time.perf_counter() + self.shutdown_timeout
                self.shutdown_deadline_set = True

            # Ждём готовности сокета не дольше poll_interval,
            # чтобы периодически проверять таймауты и флаг завершения
            ready, _, _ = select.select([self.socket], [], [], self.poll_interval)
            if ready:
                # Клиент прислал данные - сбрасываем idle-таймаут
                if not self.shutdown_deadline_set:
                    self.deadline = time.perf_counter() + self.idle_timeout

                chunk = self.socket.recv(chunk_size)
                if chunk:
                    return chunk

                else:
                    logging.debug("Python: клиент прислал (FIN).")
                    self.is_socket_end = True
                    return b""

            if time.perf_counter() > self.deadline:
                logging.warning("Python: timeout ожидания клиента")
                raise TimeoutError("Клиент слишком долго отправлял данные")

    def read(self, size=-1) -> bytes:
        """
        Возвращает ровно size байт из сокета (или меньше, если соединение закрыто).
        При size < 0 читает всё до конца соединения.
        """

        if size < 0:
            # Режим "читаем всё": забираем буфер и дочитываем сокет до конца
            chunks = [bytes(self.buffer)]
            self.buffer = bytearray()
            while chunk := self._recv(self.recv_chunk_size):
                chunks.append(chunk)
            return b"".join(chunks)
        else:
            # Режим "читаем ровно size байт": дочитываем в буфер,
            # Пока не наберём нужное количество
            while len(self.buffer) < size:
                chunk = self._recv(self.recv_chunk_size)
                if not chunk:
                    break
                self.buffer.extend(chunk)

            to_return = self.buffer[:size]
            self.buffer = self.buffer[size:]
            return bytes(to_return)

    def readinto(self, b: bytearray) -> int:
        """Записывает данные из сокета в байтовый буфер b."""
        data = self.read(len(b))
        n = len(data)
        b[:n] = data
        return n

    def readline(self, size=-1) -> bytes:
        """
        Читает одну строку (до символа \\n включительно).
        Сначала ищем \\n в буфере, если не нашли - дочитываем из сокета чанками.
        """
        while True:
            # Ищем конец строки в том, что уже есть в буфере
            newline_pos = self.buffer.find(b"\n")
            if newline_pos != -1:
                # Нашли \n - отдаём строку включая \n, остаток остаётся в буфере
                end = newline_pos + 1
                if size >= 0:
                    end = min(end, size)
                line = bytes(self.buffer[:end])
                self.buffer = self.buffer[end:]
                return line

            # Если достигли лимита size - отдаём что есть
            if size >= 0 and len(self.buffer) >= size:
                line = bytes(self.buffer[:size])
                self.buffer = self.buffer[size:]
                return line

            # \n не найден - дочитываем из сокета
            chunk = self._recv(self.recv_chunk_size)
            if not chunk:
                # Соединение закрыто - отдаём остаток буфера
                line = bytes(self.buffer)
                self.buffer = bytearray()
                return line
            self.buffer.extend(chunk)

    def readlines(self, hint=-1) -> list[bytes]:
        """
        Возвращает строки, лимит количества прочитанных байт задаётся через hint.
        """
        lines = []
        total = 0
        while True:
            line = self.readline()
            if not line:
                break
            lines.append(line)
            total += len(line)
            if 0 < hint <= total:
                break
        return lines

    def __iter__(self):
        return self

    def __next__(self):
        line = self.readline()
        if not line:
            raise StopIteration
        return line

В коде самого сервера остаётся возвращать вместо Iterator[bytes] нашу новую структуру SocketIO. С полным кодом сервера можете ознакомиться в моём репозитории. Я также убрал всё логирование через print. Теперь наш сервер готов и можно переходить к реализации серверного интерфейса WSGI в хендлере.

Реализуем WSGI

Важно отметить, что переходя к WSGI, мы говорим об интерфейсе между сервером и приложением в рамках конкретного прикладного протокола - HTTP/S. Сейчас в нашем сервере обрабатывается только TCP и сервер абстрагирован от конкретного прикладного протокола. А для парсинга прикладных протоколов и обработки запросов у нас есть интерфейс TCPHandlerI.

Зачем вообще нужен WSGI? Фреймворки, такие как flask и fastapi, не возятся сами с низкоуровневыми деталями HTTP, их задача обрабатывать HTTP-запросы и передавать ответ серверу, который отправит его клиентам. Чтобы сервера и приложения были совместимы, был разработан интерфейс WSGI, который каждая сторона (сервер и приложение) должны реализовать.

WSGI для Python3 описан в PEP3333. Приложение в нём должно предоставлять функцию:

HELLO_WORLD = b"Hello world!\n"

def app(environ, start_response):
    """Simplest possible application object"""
    status = '200 OK'
    response_headers = [('Content-type', 'text/plain')]
    start_response(status, response_headers)
    return [HELLO_WORLD]
  • environ - словарь переменных окружения, таких как: метод запроса, заголовки запроса, тело запроса, информация о сервере и версии протоколов;

  • start_response - обратный вызов, через который приложение записывает статус ответа и заголовки.

  • Функция возвращает тело ответа в виде байтовых строк; Такую функцию предоставляют все WSGI-совместимые фреймворки.

Схема взаимодействия WSGI-сервера и WSGI-приложения выглядит следующим образом:

Схема взаимодействия WSGI-сервера и WSGI-приложения
Схема взаимодействия WSGI-сервера и WSGI-приложения
  1. Клиент отправляет запрос серверу;

  2. Сервер парсит HTTP;

  3. Сервер передаёт запрос приложению через интерфейс WSGI, т.е. через функцию app(environ, start_response);

  4. Приложение реализует некоторую бизнес-логику;

  5. Через start_response() приложение передаёт статус и заголовки, а тело - через возвращаемое значение;

  6. На основании ответа от приложения сервер формирует ответ для клиента;

  7. Сервер отправляет ответ клиенту.

Если мы хотим реализовать протокол со стороны сервера, нам нужно реализовать парсинг HTTP-запроса из входного потока байт, вызвать приложение и сформировать HTTP-ответ. Звучит несложно, делаем:

import io
import sys
from collections.abc import Iterator
from dataclasses import dataclass

from final_tcp_server.interface import TCPHandlerI
from final_tcp_server.socket_io import SocketIO


@dataclass
class HTTPRequest:
    method: str
    path: str
    protocol: str
    headers: dict[str, str]
    body: io.BytesIO


class WSGIHandler(TCPHandlerI):
    def __init__(self, app, host: str, port: int):
        self.app = app
        self.port = port
        self.host = host

    def handle(self, data: SocketIO) -> Iterator[bytes]:

        while True:
            try:
                http_request = self._parse_http_request(data)
            except ConnectionError:
                break
            except Exception:
                status, headers_list = (
                    "400 Bad Request",
                    [("Content-Type", "text/plain")],
                )
                body_iterator = [b"400 Bad Request"]
                response = self._generate_http_response(
                    status,
                    headers_list,
                    body_iterator,
                    connection_close=True,
                )
                yield from response
                break

            environ = self._generate_environ(http_request)
            response_headers = []

            def start_response(status, headers_list):
                response_headers.extend([status, headers_list])

            # Вызываем WSGI-приложение
            try:
                body_iterator = self.app(environ, start_response)
                status, headers_list = response_headers
            except Exception:
                status, headers_list = (
                    "500 Internal Server Error",
                    [("Content-Type", "text/plain")],
                )
                body_iterator = [b"Internal Server Error"]

            response = self._generate_http_response(
                status,
                headers_list,
                body_iterator,
            )
            yield from response

    def _parse_http_request(self, request: SocketIO) -> HTTPRequest:
        """Упрощенный парсинг http-запроса."""
        # Парсим request line
        request_line = request.readline()
        if not request_line:
            raise ConnectionError("Клиент закрыл соединение")

        try:
            method, path, protocol = request_line.decode("ascii").strip().split()
        except ValueError:
            raise ValueError("Invalid request line")

        # Парсим headers
        headers: dict[str, str] = {}
        while True:
            line = request.readline()
            if not line or line in (b"\r\n", b"\n"):
                break

            try:
                name, value = line.decode("ascii").split(":", 1)
            except ValueError:
                raise ValueError(f"Invalid header line: {line!r}")

            headers[name.strip()] = value.strip()

        return HTTPRequest(
            method=method,
            path=path,
            protocol=protocol,
            headers=headers,
            # Оставшуюся часть запроса передаем как body
            body=request,
        )

    def _generate_environ(self, http_request: HTTPRequest) -> dict:
        """Формируем environ для wsgi-приложения."""
        path, _, query_string = http_request.path.partition("?")
        environ = {
            "REQUEST_METHOD": http_request.method,
            "SCRIPT_NAME": "",
            "PATH_INFO": path,
            "QUERY_STRING": query_string,
            "CONTENT_TYPE": http_request.headers.get("Content-Type", ""),
            "CONTENT_LENGTH": http_request.headers.get("Content-Length", ""),
            "SERVER_NAME": self.host,
            "SERVER_PORT": str(self.port),
            "SERVER_PROTOCOL": http_request.protocol,
            "wsgi.version": (1, 0),
            "wsgi.url_scheme": "http",
            "wsgi.input": http_request.body,
            "wsgi.errors": sys.stderr,
            "wsgi.multithread": False,
            "wsgi.multiprocess": False,
            "wsgi.run_once": False,
        }
        # Добавляем http-заголовки
        for key, value in http_request.headers.items():
            environ[f"HTTP_{key.upper().replace('-', '_')}"] = value
        return environ

    def _generate_http_response(
        self,
        status: str,
        headers_list: list[tuple[str, str]],
        body_iterator: Iterator[bytes],
        connection_close: bool = False,
    ) -> Iterator[bytes]:
        """Формируем HTTP-ответ."""
        # Формируем заголовки
        content_length_is_set = False
        response_lines = [f"HTTP/1.1 {status}"]
        for k, v in headers_list:
            response_lines.append(f"{k}: {v}")
            if k.lower() == "content-length":
                content_length_is_set = True
        if not content_length_is_set:
            body_iterator = list(body_iterator)
            content_length = sum(len(chunk) for chunk in body_iterator)
            response_lines.append(f"Content-Length: {content_length}")

        if connection_close:
            response_lines.append("Connection: close")

        # "\r\n\r\n" перед телом
        response_lines.append("")
        response_lines.append("")

        # yield заголовков
        yield ("\r\n".join(response_lines).encode("iso-8859-1"))
        # yield тела
        yield from body_iterator

160 строк на чистом Python и наш WSGI-обработчик готов. В нём у нас цикл для поддержки keep-alive соединений, когда в рамках одного TCP-соединения обрабатывается сразу несколько HTTP-запросов. Цикл заканчивается, когда происходит ошибка парсинга HTTP-запроса. Это может произойти в двух случаях, когда запрос действительно некорректный или когда клиент закрыл соединение и запрос данных из сокета возвращает пустую байтовую строку.

Обратите внимание, что тело запроса сервер не вычитывает из сокета, а передаёт приложению наш file-like интерфейс над сокетом. Ответственным за вычитку ровно Content-length байт из сокета является уже приложение.

Конечно, это не production-ready решение, как минимум не хватает:

  • Ограничений на размер входящих данных. Сейчас строку запроса и заголовки можно вычитывать бесконечно долго.

  • Поддержки SSL

  • Поддержки HTTP2

  • Поддержки chunked encoding

А вот код WSGI-сервера, который наследуется от TCPServer и принимает на вход WSGI-приложение:

from final_tcp_server.server import TCPServer

from .handler import WSGIHandler


class WSGIServer(TCPServer):
    def __init__(
        self,
        host,
        port,
        app,
        poll_interval=0.5,
        client_idle_timeout=5,
        shutdown_timeout=10,
    ):
        handler = WSGIHandler(app, host, port)
        super().__init__(
            host,
            port,
            handler,
            poll_interval,
            client_idle_timeout,
            shutdown_timeout,
        )

Но сервер вполне функционален и мы уже можем запустить на нём любое WSGI-приложение. Для примера возьмём flask.

import logging
import time

from flask import Flask, jsonify, request

from .server import WSGIServer

app = Flask("app")


@app.route("/ping")
def ping():
    return "pong"


@app.route("/sleep")
def sleep():
    time.sleep(0.5)
    return "pong"


@app.route("/post", methods=["POST"])
def submit():
    data = request.json
    name = data.get("name", "Unknown")
    return jsonify({"message": f"Hello, {name}!"})


@app.route("/page")
def page():
    return """
    <!DOCTYPE html>
    <html>
        <head>
            <title>Simple Page</title>
        </head>
        <body>
            <h1>Hello, world!</h1>
            <p>This is a simple HTTP page from Flask.</p>
        </body>
    </html>
    """


if __name__ == "__main__":
    server = WSGIServer("0.0.0.0", 9999, app)
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    server.serve_forever()

Тестируем наш сервер

Итоговый код находится в моём репозитории в модуле wsgi

Запустим наш сервер и проверим его в действии. Я использую менеджер зависимостей uv, и запускаю через него.

uv run -m wsgi.app
INFO:root:Python: сервер запущен на прослушивание 0.0.0.0:9999 uv run -m wsgi.app
INFO:root:Python: сервер запущен на прослушивание 0.0.0.0:9999
curl localhost:9999/ping
pong

curl -X POST http://127.0.0.1:9999/post \
    -H "Content-Type: application/json" \
    -d '{"name": "Максим"}'
{"message":"Hello, Максим!"}

curl localhost:9999/page
<!DOCTYPE html>
<html>
    <head>
        <title>Simple Page</title>
    </head>
    <body>
        <h1>Hello, world!</h1>
        <p>This is a simple HTTP page from Flask.</p>
    </body>
</html>

Всё работает ожидаемо. Теперь проведём небольшое нагрузочное тестирование через утилиту wrk. Запускаем тестирование на 10 секунд в один поток и одно подключение:

wrk -t1 -c1 -d10s http://127.0.0.1:9999/ping
Running 10s test @ http://127.0.0.1:9999/ping
  1 threads and 1 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   125.36us   88.38us   5.91ms   99.31%
    Req/Sec     8.04k   250.56     8.79k    71.29%
  80811 requests in 10.10s, 6.32MB read
Requests/sec:   8001.52
Transfer/sec:    640.75KB

Получаем ~8k rps и 125 мкс latency. Неплохой результат для однопоточного сервера на Python. Важно учитывать, что это результат для одного keep-alive соединения — wrk переиспользует TCP-соединение между запросами, что исключает накладные расходы на handshake.

Но что если для обработки запроса необходимо ходить во внешние api в течение 500 мс. Такая ситуация имитируется на эндпоинте /sleep.

wrk -t1 -c1 -d10s http://127.0.0.1:9999/sleep
Running 10s test @ http://127.0.0.1:9999/sleep
  1 threads and 1 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   501.66ms  178.15us 501.90ms   84.21%
    Req/Sec     1.79      0.42     2.00     78.95%
  19 requests in 10.02s, 1.52KB read
Requests/sec:      1.90
Transfer/sec:     155.56B

Получаем ожидаемые 2 rps и latency в 500 мс. При этом если проследить за процессом через htop он почти не использует cpu и почти всё время находится в статусе sleeping. Это подводка к следующей части статьи, которую я когда-нибудь обязательно напишу. В ней мы разберём способы параллельной обработки запросов от нескольких клиентов и кратно увеличим rps сервера и, конечно же, не забудем про системные вызовы и базу Linux, на которой это всё реализуется.

Выводы

Мы прошли путь от системных вызовов socket() и accept() до работающего WSGI-сервера, на котором полноценно запускаются любые WSGI-приложение. По дороге мы:

  • Разобрались, как Linux предоставляет сетевой интерфейс через сокеты и файловые дескрипторы;

  • Посмотрели через strace, что на самом деле делает Python, когда мы вызываем socket.recv() или socket.send();

  • Написали TCP-сервер с idle-timeout и graceful shutdown;

  • Реализовали WSGI-обработчик, который парсит HTTP и вызывает произвольное WSGI-приложение.

Весь код - это чистый Python и системные вызовы Linux. Никаких uvicorn, gunicorn или сторонних библиотек для работы с сетью. Конечно, наш сервер далёк от production: нет SSL, нет ограничений на размер запроса, нет HTTP/2. Но главная цель была другой - убрать «магию» и увидеть, что стоит за абстракциями, которыми мы пользуем��я каждый день.

Главное ограничение нашего сервера - он обрабатывает клиентов строго по одному. Пока один клиент ждёт ответ от базы или внешнего api, все остальные стоят в очереди. В следующей части мы это исправим: разберём потоки, процессы и мультиплексирование ввода-вывода, и посмотрим, какие системные вызовы за ними стоят.

Полный код в репозитории.