Как стать автором
Обновить
640.2
Альфа-Банк
Лучший мобильный банк по версии Markswebb

Свой асинхронный tcp-сервер за 15 минут с подробным разбором

Время на прочтение7 мин
Количество просмотров82K

Ранее я представил пару небольших постов о потенциальной роли Spring Boot 2 в реактивном программировании. После этого я получил ряд вопросов о том, как работают асинхронные операции в программировании в целом. Сегодня я хочу разобрать, что такое Non-blocking I/O и как применить это знание для создания небольшого tcp–сервера на python, который сможет обрабатывать множество открытых и тяжелых (долгих) соединений в один поток. Знание python не требуется: все будет предельно просто со множеством комментариев. Приглашаю всех желающих!

Мне, как и многим другим разработчикам, очень нравятся эксперименты, поэтому вся последующая статья будет состоять как раз из серии экспериментов и выводов, которые они несут. Предполагается, что вы недостаточно хорошо знакомы с тематикой, и будете охотно экспериментировать со мной. Исходники примеров можно найти на github.

Начнем с написания очень простого tcp–сервера. Задача сервера будет заключаться в получении и печати данных из сокета и возвращения строки Hello from server!. Примерно так это выглядит:

Синхронный tcp-сервер
import socket

# Задаем адрес сервера
SERVER_ADDRESS = ('localhost', 8686)

# Настраиваем сокет
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(SERVER_ADDRESS)
server_socket.listen(10)
print('server is running, please, press ctrl+c to stop')

# Слушаем запросы
while True:
    connection, address = server_socket.accept()
    print("new connection from {address}".format(address=address))

    data = connection.recv(1024)
    print(str(data))

    connection.send(bytes('Hello from server!', encoding='UTF-8'))

    connection.close()


Здесь все довольно просто. Если вы не знакомы с понятием сокета, то вот очень простая и практическая статья. Мы создаем сокет, ловим входящие соединения и обрабатываем их согласно заданной логике. Здесь стоит обратить внимание на сообщения. При создании нового соединения с клиентом мы пишем об этом в консоль.

Хочу сразу заметить, что не стоит серьезно вникать в листинги программ до полного прочтения статьи. Совершенно нормально, если что-то будет не совсем понятно в самом начале. Просто продолжайте чтение.

Нет особого смысла в сервере без клиента. Поэтому на следующем этапе необходимо написать небольшой клиент для использования сервера:

tcp-клиент
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket

MAX_CONNECTIONS = 20
address_to_server = ('localhost', 8686)

clients = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(MAX_CONNECTIONS)]
for client in clients:
    client.connect(address_to_server)

for i in range(MAX_CONNECTIONS):
    clients[i].send(bytes("hello from client number " + str(i), encoding='UTF-8'))

for client in clients:
    data = client.recv(1024)
    print(str(data))


Важной особенностью здесь является тот факт, что мы в начале устанавливаем максимально возможное количество соединений, а только потом используем их для передачи/хранения данных.

Давайте запустим сервер. Первое, что мы видим:

server is running, please, press ctrl+c to stop

Это означает, что мы успешно запустили наш сервер и он готов принимать входящие запросы. Запустим клиент и сразу увидим в консоли сервера (у вас порты могут быть другими):

server is running, please, press ctrl+c to stop
new connection from ('127.0.0.1', 39196)
b'hello from client number 0'
new connection from ('127.0.0.1', 39198)
b'hello from client number 1'
...

Что и следовало ожидать. В бесконечном цикле мы получаем новое соединение и сразу же обрабатываем данные из него. В чем тут проблема? Ранее мы использовали опцию server_socket.listen(10) для настройки сервера. Она означает максимальный размер очереди из еще не принятых подключений. Но в этом нет совершенно никакого смысла, ведь мы принимаем по одному соединению. Что предпринять в такой ситуации? На самом деле, существует несколько выходов.

  1. Распараллелить с помощью потоков/процессов (для этого можно, например, использовать fork или пул). Подробнее здесь.
  2. Обрабатывать запросы не по мере их подключения к серверу, а по мере наполнения этих соединений нужным количеством данных. Проще говоря, мы можем открыть сразу максимальное количество ресурсов и читать из них столько, сколько сможем (сколько необходимо на это процессорного времени в идеальном случае).

Вторая идея кажется заманчивой. Всего один поток и обработка множества соединений. Давайте посмотрим, как это будет выглядеть. Не стоит пугаться обилия кода. Если что-то сразу не понятно, то это вполне нормально. Можно попробовать запустить у себя и подебажить:

Асинхронный сервер

import select
import socket

SERVER_ADDRESS = ('localhost', 8686)

# Говорит о том, сколько дескрипторов единовременно могут быть открыты
MAX_CONNECTIONS = 10

# Откуда и куда записывать информацию
INPUTS = list()
OUTPUTS = list()


def get_non_blocking_server_socket():

    # Создаем сокет, который работает без блокирования основного потока
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(0)

    # Биндим сервер на нужный адрес и порт
    server.bind(SERVER_ADDRESS)

    # Установка максимального количество подключений
    server.listen(MAX_CONNECTIONS)

    return server


def handle_readables(readables, server):
    """
    Обработка появления событий на входах
    """
    for resource in readables:

        # Если событие исходит от серверного сокета, то мы получаем новое подключение
        if resource is server:
            connection, client_address = resource.accept()
            connection.setblocking(0)
            INPUTS.append(connection)
            print("new connection from {address}".format(address=client_address))

        # Если событие исходит не от серверного сокета, но сработало прерывание на наполнение входного буффера
        else:
            data = ""
            try:
                data = resource.recv(1024)

            # Если сокет был закрыт на другой стороне
            except ConnectionResetError:
                pass

            if data:

                # Вывод полученных данных на консоль
                print("getting data: {data}".format(data=str(data)))

                # Говорим о том, что мы будем еще и писать в данный сокет
                if resource not in OUTPUTS:
                    OUTPUTS.append(resource)

            # Если данных нет, но событие сработало, то ОС нам отправляет флаг о полном прочтении ресурса и его закрытии
            else:

                # Очищаем данные о ресурсе и закрываем дескриптор
                clear_resource(resource)


def clear_resource(resource):
    """
    Метод очистки ресурсов использования сокета
    """
    if resource in OUTPUTS:
        OUTPUTS.remove(resource)
    if resource in INPUTS:
        INPUTS.remove(resource)
    resource.close()

    print('closing connection ' + str(resource))


def handle_writables(writables):

    # Данное событие возникает когда в буффере на запись освобождается место
    for resource in writables:
        try:
            resource.send(bytes('Hello from server!', encoding='UTF-8'))
        except OSError:
            clear_resource(resource)


if __name__ == '__main__':

    # Создаем серверный сокет без блокирования основного потока в ожидании подключения
    server_socket = get_non_blocking_server_socket()
    INPUTS.append(server_socket)

    print("server is running, please, press ctrl+c to stop")
    try:
        while INPUTS:
            readables, writables, exceptional = select.select(INPUTS, OUTPUTS, INPUTS)
            handle_readables(readables, server_socket)
            handle_writables(writables)
    except KeyboardInterrupt:
        clear_resource(server_socket)
        print("Server stopped! Thank you for using!")


Давайте запустим наш новый сервер и посмотрим на консоль:
Вывод асинхронного сервера
server is running, please, press ctrl+c to stop
new connection from ('127.0.0.1', 56608)
new connection from ('127.0.0.1', 56610)
new connection from ('127.0.0.1', 56612)
new connection from ('127.0.0.1', 56614)
new connection from ('127.0.0.1', 56616)
new connection from ('127.0.0.1', 56618)
new connection from ('127.0.0.1', 56620)
new connection from ('127.0.0.1', 56622)
new connection from ('127.0.0.1', 56624)
getting data: b'hello from client number 0'
new connection from ('127.0.0.1', 56626)
getting data: b'hello from client number 1'
getting data: b'hello from client number 2'


Как можно понять по выводу, мы принимаем новые коннекты и данные почти параллельно. Более того, мы не ждем данных от нового подключения. Вместо этого мы устанавливаем новое.

Как это работает?


Дело в том, что все наши операции с ресурсами (а обращение к сокету относится к этой категории) происходят через системные вызовы операционной системы. Если говорить коротко, то системные вызовы представляют собой обращение к API операционной системы.

Рассмотрим, что происходит в первом случае и во втором.

Синхронный вызов


Давайте разберем рисунок:



Первая стрелка показывает, что наше приложение обращается к операционной системе для получения данных из ресурса. Далее наша программа блокируется до появления нужного события. Минус очевиден: если у нас один поток, то другие пользователи должны ждать обработку текущего.

Асинхронный вызов


Теперь посмотрим на рисунок, который иллюстрирует асинхронный вызов:



Первая стрелка, как и в первом случае, делает запрос к ОС (операционной системе) на получение данных из ресурсов. Но посмотрите, что происходит далее. Мы не ждем данных из ресурса и продолжаем работу. Что же делать нам? Мы отдали распоряжение ОС и не ждем результат сразу. Простейшим ответом будет самостоятельно опрашивать нашу систему на наличие данных. Таким образом, мы сможем утилизировать ресурсы и не блокировать наш поток.

Но самом деле такая система не является практичной. Такое состояние, в котором мы постоянно смотрим на данные и ждем какого-то события, называется активным ожиданием. Минус очевиден: мы впустую тратим процессорное время в случае, когда информация не обновилась. Более правильным решением было бы оставить блокировку, но сделать ее «умной». Наш поток не просто ждет какого-то определенного события. Вместо этого он ожидает любое изменение данных в нашей программе. Именно так и работает функция select в нашем асинхронном сервере:



Теперь можно вернуться к реализации нашего асинхронного сервера и взглянуть на него с новым знанием. Первое, что бросается в глаза, – метод работы. Если в первом случае наша программа выполнялась “сверху вниз”, то во втором случае мы оперируем событиями. Такой подход в разработке ПО называется событийно-ориентированным (event-driven development).

Сразу стоит отметить, что такой подход не является серебряной пулей. У него имеется масса недостатков. Во-первых, такой код сложнее поддерживать и менять. Во-вторых, у нас есть и всегда будут блокирующие вызовы, которые все портят. Например, в программе выше мы воспользовались функцией print. Дело в том, что такая функция тоже обращается к ОС, следовательно, наш поток выполнения блокируется и другие источники данных терпеливо ждут.

Заключение


Выбор подхода напрямую зависит от решаемой нами задачи. Позвольте задаче самой выбрать наиболее продуктивный подход. Например, популярный Javaвеб -сервер Tomcat использует потоки. Не менее популярный сервер Nginx использует асинхронный подход. Создатели популярного веб-сервера gunicorn на python пошли по пути prefork.

Спасибо, что прочитали статью до конца! В следующий раз (уже скоро) я расскажу о прочих возможных неблокирующих ситуациях в жизни наших программ. Буду рад вас видеть и в следующих постах.
Теги:
Хабы:
Всего голосов 22: ↑19 и ↓3+16
Комментарии17

Публикации

Информация

Сайт
digital.alfabank.ru
Дата регистрации
Дата основания
1990
Численность
свыше 10 000 человек
Местоположение
Россия