Как стать автором
Обновить

19 способов сделать сокет-сервер на Python. Эволюционный подход. Часть 2. Блокирующие сокеты и многозадачность

Время на прочтение13 мин
Количество просмотров17K

В данном материале мы от теоретического рассмотрения сокетов перейдем к практике построения сервера на их основе. Главный вопрос статьи — это как обрабатывать на сервере несколько соединений одновременно.

Резюме:

  • проблема блокирующих сокетов;

  • распараллеливание процессами;

  • заменяем тяжеловесные процессы легковесными потоками;

  • синхронизация потоков и проблема взаимной блокировки;

  • как работает GIL и зачем он нужен;

  • все проблемы решаются использованием select().

Использование блокирующих сокетов

Методы accept(), connect() и recv() класса socket являются блокирующими. Это значит, что после их вызова выполнение программы остановится и не возобновится до тех пор, пока не поступят соответствующие данные из сети. А если программа не выполняется, то и другие соединения и задачи также не обрабатываются.

Проблема усугубляется тем, что у нас соединение не закрывается сразу после выдачи ответа (как в HTTP-серверах), а выполняется в бесконечном цикле, пока пользователь сам не выйдет из программы. Только тогда программа сможет перейти к обработке следующего соединения. Это значит, что сервер может обрабатывать вообще только одно соединение за раз, в чем мы и сможем убедиться, выполнив следующий код:

import socket

HOST = ""  # Использовать все адреса: виден и снаружи, и изнутри
PORT = 50007  # Произвольный незарезервированный порт

# Проверяем, что скрипт был запущен на исполнение, а не импортирован
if __name__ == "__main__":
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv_sock:
        serv_sock.bind((HOST, PORT))
        serv_sock.listen(1)
        # Accepting multiple connections, but only one at a time
        while True:
            print("Waiting for connection...")
            sock, addr = serv_sock.accept()
            with sock:
                print("Connected by", addr)
                while True:
                    # Receive
                    try:
                        data = sock.recv(1024)
                    except ConnectionError:
                        print(f"Client suddenly closed while receiving")
                        break
                    print(f"Received: {data} from: {addr}")
                    data = data.upper()
                    # Send
                    print(f"Send: {data} to: {addr}")
                    try:
                        sock.sendall(data)
                    except ConnectionError:
                        print(f"Client suddenly closed, cannot send")
                        break
                print("Disconnected by", addr)

Ясно, что для серверов, в обязанности которых входит обрабатывать десятки и даже сотни и тысячи соединений одновременно, это абсолютно неприемлемо. Нам нужна многозадачность (multitasking) — одновременное выполнение нескольких задач. Рассмотрим вкратце все способы реализовать одновременность (concurrency) вычислений в Python.

Одновременности вычислений можно добиться двумя способами: параллельными вычислениями (parallel computing) и мультиплексированием (multiplexing). Параллельность означает, что разные потоки выполнения могут физически выполняться в одно и то же время — на разных ядрах процессора. А мультиплексирование — это чередование во времени выполнения разных задач в одном и том же потоке выполнения, на одном ядре процессора. Чтобы получше понять, что эти понятия означают, и почему разделение именно такое, нужно обратиться к внутреннему устройству компьютера.

Настоящая одновременность достигается только параллельными вычислениями, где задачи распределяются между разными ядрами процессора или машинами. Но правда жизни такова, что настоящая одновременность в реальных задачах нам не очень-то и нужна. Точнее, редко когда нужна. Дело в том, что центральный процессор (ЦП) и память — это еще не весь компьютер. Есть еще сетевая плата, жесткие диски, клавиатура и другие устройства ввода-вывода (I/O). И если ЦП и память научились делать очень быстрыми, то остальные компоненты системы работают на скоростях совершенно другого порядка. И часто даже не по своей вине. Например, данные по сети могут преодолевать сотни и тысячи километров, проходя через десяток промежуточных узлов и маршрутизаторов. А отклик от клавиатуры вообще зависит от действий человека, который, как известно, может "зависать" на минуты. (Что с него взять — он ведь даже не цифровой.)

Поэтому компьютерную систему можно условно поделить на две основные части: процессор и устройства ввода-вывода. (Память хоть и является достаточно медленной, но по сравнению с I/O, она быстрая, а благодаря кэш-памяти ее и вовсе можно условно принять за одно целое с процессором.) Соответственно этому, и все задачи можно условно поделить на те, которые ограничиваются производительностью процессора (CPU-bound), и те, которые ограничиваются производительностью устройств ввода-вывода (I/O-bound). Первые можно выполнить одновременно только распараллелив их по разным ядрам, а вторые можно мультиплексировать в рамках одного потока выполнения, т.е. на одном ядре процессора. Пока одна задача ждет ответа от удаленной машины или данных с жесткого диска, можно успеть запустить и выполнить еще несколько других задач. А когда и те остановятся на какой-нибудь операции ввода-вывода, можно вернуться обратно к первой. Так, процессорное время ядра разбивается на части и распределяется между разными задачами.

В Python есть несколько способов реализовать многозадачность:

  1. Процессы (создаются копии самого процесса приложения).

  2. Потоки (выполняются в одном процессе).

  3. Системный вызов select и его аналоги (выполняются в одном потоке).

Вытесняющая многозадачность для сокетов
Вытесняющая многозадачность для сокетов

Все они изначально реализованы в ОС, а Python лишь использует эти реализации через соответствующие системные вызовы. В ОС процессы и потоки могут выполнятся как параллельно, используя разные ядра процессора, так и мултиплексироваться в одном потоке выполнения. А select() — это функция, позволяющая мультиплексировать выполнение в ручном режиме, не автоматически, как это делают процессы и потоки. Поэтому о параллельности тут вообще речи не идет.

Однако, в Python из-за использования им глобальной блокировки интерпретатора (Global Interpreter Lock, GIL — подробнее о нем ниже) в один и тот же момент времени может выполнятся только один поток. Поэтому реальная параллельность в Python может быть только у процессов. Но так как большинство приложений не столько вычисляют, сколько ждут ответа от системы I/O, то параллельность и процессы нам мало помогают. Потоки и select из-за их легковесности остаются главными решениями в обеспечении многозадачности.

Тем не менее все же рассмотрим каждый из этих способов. Начнем с процессов.

Процессы

Процессы, запускаемые из скрипта Python — это те же процессы операционной системы (ОС), что и все остальные. Что и сам запущенный интерпретатор Python, в конце концов. Запуск нового процесса можно сравнить с делением живой клетки. Сначала копируются все элементы ее, а потом она разделяется на две самостоятельные сущности. Так и процесс — сначала копируется интерпретатор, все окружение и текущее состояние памяти, а потом выполнение продолжается с того же места, но уже в двух разных процессах. Если мы посмотрим в менеджере задач ОС список процессов, то можем увидеть, что их число увеличилось на единицу.

На Unix-системах процесс можно создать системным вызовом fork. В Python ему соответствует функция os.fork(). В момент вызова os.fork() порождается дочерний процесс, который будет просто копией текущего, который в момент вызова становится родительским:

if __name__ == "__main__":
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv_sock:
        serv_sock.bind((HOST, PORT))
        serv_sock.listen(1)
        while True:
            print("Waiting for connection...")
            sock, addr = serv_sock.accept()
            pid = os.fork()
            if pid:
                # Continue parent process
                pass
            else:
                # Start child process
                handle_connection(sock, addr)
                exit()

В результате появляется два одинаковых процесса вместо одного, но с разными pid (process id). Так, строка pid = os.fork() начинает выполняться еще только одним процессом, а заканчивает — уже двумя. И все последующие строки также выполняются уже в отдельных процессах. Направить их в разные русла можно с помощью оператора if, так как os.fork() возвращает разные значения в зависимости от процесса. В родительском процессе возвращается его реальный pid, а для дочернего — всегда 0. Поэтому там, где pid==0 выполняем функцию handle_connection() с бесконечным циклом обработки соединения внутри, а в основном процессе снова переходим к serv_sock.accept(). Когда соединение закроется, и бесконечный цикл остановится, то выполнится следующая после функции команда — exit(), которая и завершит данный дочерний процесс.

На месте вызова handle_connection(sock, addr) можно было бы разместить само содержимое данной функции — fork() это позволяет. Хотя это и выглядело бы не так эстетично. Но в последующих способах разделить потоки, без функций не обойтись.

В терминах ОС задача — это программа. В терминах программы задача — это подпрограмма (функция). Так, можно взять любую функцию и запустить ее как процесс или поток и она будет выполнятся одновременно с основным потоком выполнения. Поэтому мы и вынесли код обработки подключения, который теперь будет соответствовать задаче, в отдельную функцию:

def handle_connection(sock, addr):
	print("Connected by", addr)
	with sock:
		while True:
            try:
                data = sock.recv(1024)
            except ConnectionError:
                print(f"Client suddenly closed while receiving")
                break
            data = data.upper()
            try:
                sock.sendall(data)
            except ConnectionError:
                print(f"Client suddenly closed, cannot send")
                break
	print("Disconnected by", addr)

Другой способ распараллелить процессы помимо os.fork() — это использование модуля multiprocessing. Он уже работает на всех платформах и включает множество других полезных функций, которых мы, впрочем, здесь касаться не будем.

if __name__ == "__main__":
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv_sock:
        serv_sock.bind((HOST, PORT))
        serv_sock.listen(1)
        while True:
            print("Waiting for connection...")
            sock, addr = serv_sock.accept()
			p = multiprocessing.Process(target=handle_connection, args=(sock, addr))
			p.start()

При вызове метода p.start() создается дочерний процесс, который будет работать в фоне. Основной цикл продолжит выполняться, как выполнялся, и после p.start() управление перейдет снова в serv_sock.accept(). Как и в предыдущем примере основной цикл сервера только и делает, что вызывает serv_sock.accept() и создает новый процесс для полученного клиентского сокета. Вся прочая логика реализуется в дочерних процессах.

Что происходит при создании нового процесса? А происходит то, что в ОС на одну работающую программу становится больше. (В чем вы можете убедиться, открыв менеджер процессов.) Для нового процесса копируется все страницы памяти старого процесса, запускается новый интерпретатор Python и т.д. Правда страницы копируются отложено. Реальное копирование происходит, только если один из процесс внес в них изменения. Но как бы там ни было, дело это все равно затратное: и по времени и по объемам памяти.

Разделенное адресное пространство удобно в том плане, что один процесс не сможет испортить данные другого. Но использовать общие данные он тоже не может. Приходится для взаимодействия процессов использовать специальные разделяемые объекты/память (shared object/memory), pipes для обмена сообщениями и т.д. Поэтому удобство от разделения адресных пространств процессов нивелируется необходимостью дополнительно обеспечивать обмен информацией между ними.

Потоки

Но все же главный недостаток процессов — это их тяжеловесность. А единственное преимущество — параллельность вычислений. Так как большая часть времени работы процесса приходится на ожидание устройств ввода-вывода, а не на выполнение инструкций в процессоре, то все преимущества от параллельности сходят на нет из-за затрат на переключение процессов.

Гораздо выгоднее использовать легковесные потоки, пусть они в Python и не поддерживают параллельность. Так как количество ядер процессора обычно на несколько порядков меньше, чем число соединений, которые приходится обрабатывать, то параллельность тут отступает на второй план. Если вы хотите задействовать все ядра, проще создать несколько серверов с разными портами и распределять клиенты между ними.

API потоков аналогичный процессам, и чтобы перейти на них, нам понадобится изменить всего два слова:

if __name__ == "__main__":
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv_sock:
        serv_sock.bind((HOST, PORT))
        serv_sock.listen(1)
        while True:
            print("Waiting for connection...")
            sock, addr = serv_sock.accept()
            t = threading.Thread(target=handle_connection, args=(sock, addr))
            t.start()

Код по созданию нового подключения достаточно стандартный, и чтобы его каждый раз не дублировать, разработчикам пошли на встречу и сделали специальные классы TCPServer, UDPServer и их Threading- и Forking-версии:

from socketserver import ForkingTCPServer
from socketserver import ThreadingTCPServer

HOST, PORT = "", 50007
if __name__ == "__main__":
    # with ForkingTCPServer((HOST, PORT), ConnectionHandler) as server:
    with ThreadingTCPServer((HOST, PORT), ConnectionHandler) as server:
        server.serve_forever()

Функция-обработчик соединения была обернута в класс, что позволяет нам теперь хранить состояние и создавать более сложную логику, состоящую из нескольких методов, которые можно наследовать и переопределять:

class ConnectionHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print("Connected by", self.client_address)
        while True:
            try:
                data = self.request.recv(1024)
            except ConnectionError:
                print(f"Client suddenly closed while receiving")
                break
            if not data:
                break
            # Process
            data = data.upper()
            try:
                self.request.sendall(data)
            except ConnectionError:
                print(f"Client suddenly closed, cannot send")
                break
        print("Disconnected by", self.client_address)

Синхронизация потоков и взаимная блокировка

Однако, и потоки не панацея. Да, они занимают в памяти мало места, быстро создаются и переключаются. Чтобы обмениваться данными между ними, не нужно использовать дополнительные средства, вроде pipes — все потоки работают в едином пространстве памяти, а значит, они могут использовать одни и те же переменные. Но последний пункт является как удобством, так и большим недостатком.

Дело в том, что потоки могут переключиться в любой момент. Впрочем как и процессы. Но если процесс всегда уверен, что при возобновлении работы все его переменные остались такими же, как были, то потоки этим похвастаться не могут.

Рассмотрим такой пример. Допустим, хочет один поток добавить единицу в переменную x=5. Для этого нужно выполнить три операции: считать значение, изменить его, записать результат обратно. Вот мы считываем значение переменной x, потом... потом поток переключается на другой, где это значение умножается на 2. И вот эта переменная уже равна 10-ти, хотя только что равнялось 5-ти. Дальше управление снова возвращается первому потоку: "Прибавляю я, значит, к пяти единицу...", — говорит он, а в это время другой поток с ужасом глядит на него: "Какую к пяти единицу? Там уже десятка давно!" Да куда там... Была десятка, а стала шестерка. Вся работа второго потока пошла насмарку. Вот и думай, как тут работать, когда каждый делает, что хочет и когда хочет.

Конечно, на всякую проблему можно придумать решение. Вот и тут нам нужно всего лишь навсего заблокировать доступ для других потоков к переменной, пока все изменения окончательно не будут записаны в память.

Код будет выглядеть примерно так:

lock.acquire()  # Wait if already acquired
x = x + 1
lock.release()

Блокировщик устроен так, чтобы метод acquire() нельзя было вызывать 2 раза подряд без вызова release() в промежутке. При первом вызове устанавливается флаг — блокировщик занят. Если из другого потока будет вызван acquire() того же объекта lock, выполнение потока остановится, пока в первом потоке выполнение не дойдет до вызова release(). Тогда только объект lock освободится, и его сможет занять другой поток.

Все это называется синхронизацией потоков.

Так как Lock реализован как менеджер контекста и может использоваться с оператором with, то запись можно упростить:

# Thread 1
with lock:
	x = x + 1
# Thread 2
with lock:
	x = x * 2

Функции acquire() и release() будут вызваны автоматически при входе и выходе из блока with, соответственно, так как:

class Lock:
	# ...
    def __enter__(self):
        self.acquire()

    def __exit__(self, *args):
        self.release()

Но тут появляется другая опасность. Если, допустим, используется несколько блокировщиков, есть вероятность взаимной блокировки двух потоков и их полной остановки на веки вечные. Например, первый поток занимает (acquire) блокировщик A, а второй — B, после чего продолжают выполнение. Потом, не освободив предыдущие, они доходят до блокировщиков B и A, соответственно. Первый никогда не пройдет через B, потому что он уже занят вторым потоком, а второй — A, потому что тот занят первым. Такая ситуация называется взаимная блокировка, или deadlock.

# Thread 1
with lock_a:
	with lock_b:
		x = x + 1
# Thread 2 (deadlock)
with lock_b:
	with lock_a:
		x = x * 2

Чтобы такого не случалось, нужно создать иерархию блокировщиков и не использовать нижележащие без блокировки сначала вышележащих. То есть блокировщик B всегда должен идти после A, даже если A тут по логике и не нужен:

# Thread 2 (no deadlocks)
with lock_a:
	with lock_b:
		x = x * 2

Именно для решения этих двух проблем — необходимости синхронизации и опасности взаимной блокировки — и был введен в Python механизм глобальной блокировки интерпретатора (GIL). Чтобы внутренние данные интерпретатора, такие как счетчики использования ссылок, не могли изменяться из разных потоков одновременно, используется блокировщик, а чтобы избежать взаимной блокировки (deadlock), этот блокировщик сделан единственным. Так как он один на весь интерпретатор, то можно назвать его глобальным.

В начале исполнения потока блокировщик захватывается (acquire), в конце отпускается (release), после чего все повторяется для другого потока. Когда ОС переключает по своему усмотрению поток на другой, он, естественно, также пытается захватить блокировщик. Но так как он уже занят другим потоком, то вынужден ждать, пока GIL его не освободит. Именно поэтому разные потоки не могут работать в одно и то же время, и это уже GIL, а не ОС будет решать, когда переключать потоки, а когда нет. А так как в GIL время на каждый поток ограничивается по количеству выполненных инструкций и таймеру (в зависимости от того, что наступит раньше), то это событие никак не зависит от кода и его невозможно предсказать. Оно может наступить, как и с потоками ОС, в совершенно произвольный момент времени, и из программы его не угадать.

Мультиплексирование с помощью select

Другое дело — использование системного вызова select (в Python он вызывается функцией select.select()). Тут переключение между задачами осуществляется вручную и полностью управляется из кода. Никаких неожиданных перескакиваний из задачи в задачу, как это было в потоках или процессах, нет. Поэтому здесь не нужно никакой синхронизации и защиты от взаимных блокировок. Весь код выполняется синхронно — инструкция за инструкцией. Переключение задач — это тоже просто очередная последовательность инструкций, и нам заранее известно, когда она выполнится.

Принцип работы функции select() довольно простой — в аргументах передается список всех имеющихся сокетов, а возвращается список всех готовых. В соответствии с полученным сокетом мы сами выбираем, какую следующую задачу нужно выполнять. Когда задача выполнилась или перешла в режим ожидания, функция select() вызывается снова, и все повторяется.

Так, все процессорное время распределяется между разными задачами. То есть несколько потоков выполнения мультиплексируются в одном потоке выполнения процессора. Никаких случайных перескакиваний и при этом — никаких простоев. Если есть хоть один готовый сокет, всегда будет что-то выполняться. Так, с помощью полностью синхронного кода мы добились реальной асинхронности в выполнении задач. Вот так синхронность перешла в асинхронность.

Многозадачность, реализованная через select(), называется совместной, или кооперативной (cooperative), потому что, прежде, чем начала выполняться следующая задача, нужно, чтобы предыдущая явно объявила о своей готовности отдать процессорное время другим задачам. То есть задачи очевидным образом согласуют между собой момент переключения задач.

В противоположность кооперативной, процессы и потоки реализуют вытесняющую (pre-emptive) многозадачность. Она так называется, потому что если один из процессов или потоков выполняется слишком долго, то он может быть прерван при окончании выделенного ему интервала времени или при исчерпании лимита выполненных инструкций (time-slicing). Один процесс, таким образом, вытесняет другой, даже если тот еще может выполняться и выполняться.

Выводы

В реальной разработке необходимость осуществлять синхронизацию потоков может быть весьма дорогим и сомнительным удовольствием. Во-первых, это дополнительная работа и напряжение серых клеточек. А кто хочет работать (или платить) больше? Во-вторых, это сложно, а, значит, тут обязательно будут ошибки. А в случае потоков — трудноповторимые и трудноуловимые. Так что многочасовой багфиксинг обеспечен, при том — на пустом месте. Не говоря уже о постоянных сомнениях: в логике ли баг на этот раз или опять ошибка в синхронизации? И что самое неприятное, можно 1000 раз прогнать этот код и не получить вообще никакой ошибки. Так что никогда нет уверенности, что программа написана правильно, даже если 100% кода покрыта unit-тестами и все они зеленые.

Именно по этой причине использование select() и решений, построенных на его основе, оказывается куда предпочтительнее потоков. Ведь момент переключения выполнения всегда известен заранее, а значит, никакой синхронизации проводить не нужно. Все уже синхронизировано по определению. А использование интерпретатором GIL лишает потоки единственного их преимущества — параллельности исполнения.

Вот почему потоки, как и процессы, нужно знать, но никогда не использовать! А применять функцию select() и асинхронное программирование. Но об этом в следующий раз.

Исходники

< Назад | Начало | Вперед >

Подробнее:

Speed Up Your Python Program With Concurrency (RealPython.com)

Зачем нужен Python Global Interpreter Lock и как он работает (Tproger.ru)

Теги:
Хабы:
Всего голосов 8: ↑7 и ↓1+8
Комментарии1

Публикации

Истории

Работа

Data Scientist
80 вакансий
Python разработчик
118 вакансий

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань