Как стать автором
Обновить
101.69
KTS
Создаем цифровые продукты для бизнеса

Websocket-сервер для геолокации на asyncio

Уровень сложностиСредний
Время на прочтение20 мин
Количество просмотров18K

Привет!

Меня зовут Артем, я занимаюсь бэкенд-разработкой в KTS и веду курс по асинхронному программированию в школе Metaclass. 

Чтобы показать, чему учатся наши студенты на курсе, мы провели live-code-вебинар, на котором написали геолокацию в режиме реального времени на вебсокетах. Эта статья написана по мотивам вебинара.


Вебсокеты нужны там, где идет взаимодействие с пользователем в режиме реального времени. С их помощью клиент может послать запрос к серверу, как в обычном HTTP. Но самое интересное, что используя вебсокеты, сервер тоже может послать данные клиенту, не дожидаясь от него HTTP-запроса. Это взаимодействие чем-то похоже на чат. Кстати, вебсокеты часто используются для реализации чата в вебе, но этим область применения технологии совсем не ограничивается.

Что будет в статье:

Какой сервис будем создавать?

В статье мы создадим веб-сервис, который позволяет делиться своим местоположением и отслеживать местоположение всех остальных подключенных к нему людей на карте мира в режиме реального времени. 

Ссылка на демонстрацию.

Выглядит это так: пользователь заходит в сервис. Если он разрешает отслеживание своего местоположения, сервис показывает его позицию на карте. Иначе он появится на карте в случайном месте и сможет менять свою геопозицию случайным образом при нажатии на специальную кнопку.

С помощью общения браузера с сервером по вебсокет-каналу каждый пользователь видит, где находятся другие пользователи, зашедшие на этот сайт, в режиме реального времени. На гифке ниже видно, как происходит изменение позиции активных пользователей:

Зеленым маркером обозначена позиция текущего пользователя, а красными - всех остальных
Зеленым маркером обозначена позиция текущего пользователя, а красными - всех остальных

Недостатки вебсокетов 

Вебсокеты — очень мощный инструмент, но применять его во всех задачах подряд не стоит. Например, если на вашем сайте просто запрашиваются данные пользователя после его авторизации, применять эту технологию не стоит. Рассмотрим минусы использования вебсокетов.

Первый недостаток — обработка большого количества вебсокет-соединений создает большую нагрузку на сервер. Когда вы делаете HTTP-запрос, вы обычно открываете соединение, делаете запрос, а затем закрываете соединение. А когда используете вебсокеты, вы должны поддерживать все соединения на сервере и иногда самостоятельно закрывать соединения отпавших клиентов. Может получиться такое, что у какого-нибудь крупного интернет-провайдера из-за каких-то неполадок сеть отпала на одну секунду. Тогда все клиенты этого провайдера отсоединятся, не закрыв свои каналы, и подключатся заново, но уже к другим сокетам. Таким образом, вы какое-то время (скорее всего до TCP-таймаута) будете обслуживать как неактивные, так и дублирующие их активные соединения, что может привести к проблемам производительности сервера.

Второй недостаток — у вебсокетов нет собственного протокола. В HTTP есть заголовки, коды ответа, методы запроса и т.д. В вебсокетах вы должны сами все это придумать и разработать. Вы должны подружить ваши сервер и клиент так, чтобы они могли понимать друг друга. К примеру, отправлять заголовки через заранее условленные поля в вебсокет-сообщениях.

Однако это может быть и преимуществом — вебсокет позволяет передавать только данные, которые вам нужны, не тратя пропускную способность канала на стандартные и чаще всего ненужные в вебсокетах заголовки: Host, Content-Length, Content-Type и т.д.  

Третий недостаток — соединение может долго оставаться открытым после потери связи с клиентом. У вас есть TCP-канал, который обрабатывает вебсокет-соединение. Возникает вопрос: что, если в это время пользователь шел по улице и его устройство переключилось на другую сотовую вышку? Соединение так и останется открытым, пока его не закроет сервер. По умолчанию соединение держится до проверки, обеспеченной TCP-протоколом, но до этой проверки вы можете не знать, что клиент отключился. Эту проблему чаще всего можно решить с помощью самописных ping-сообщений, чтобы каждые несколько секунд проверять: клиент на связи или уже нет.

В нашем сервисе именно это и будет: каждую секунду все пользователи будут посылать на сервер сообщение-подтверждение, что они еще здесь. Необязательно отправлять ping-сообщения каждую секунду, но сейчас мы реализуем это так просто для наглядности. Если вы разрабатываете чат, интервал между сообщениями может быть в районе 10-30 секунд.

Чтобы просто посылать на сервер сигналы, можно применять и другие технологии. Вебсокеты нужны именно в том случае, когда вам нужно в реальном времени как посылать данные на сервер, так и получать данные от него. Существуют другие технологии для реализации режима реального времени в вебе такие как: polling, long polling, server sent events (SSE).

Если вам нужно реализовать, скажем, простой центр уведомлений на сайте с всплывающими окнами, для этого хорошо подойдет SSE. Это такая технология, когда только сервер посылает пакеты данных. Получается связь в одну сторону, но такая схема может иметь преимущество перед вебсокетами в конкретных задачах, ведь вы используете обычный протокол HTTP, а значит, ничего не нужно придумывать самостоятельно.

Вид из консоли

Посмотрим в Chrome Dev Tools, во вкладку Network, чтобы понять как устроено вебсокет-подключение клиента к серверу:

Пример запроса на установку WebSocket-соединение
Пример запроса на установку WebSocket-соединение
  1. Клиент посылает GET-запрос на адрес wss://lms.metaclass.studio/websocket_webinar/connect/. Примечательно в адресе то, что в начале стоит не https, а wss — web socket secure. Также клиент посылает заголовок "Connection: Upgrade", что говорит о его желании установить websocket-соединение.

  2. Сервер, если поддерживает вебсокеты, отвечает статусом 101 в подтверждение смены протокола и в заголовках посылает "Connection: upgrade" и "Upgrade: websocket"

Готово, мы установили соединение — этот процесс установки называется рукопожатием.

В Chrome Dev Tools удобно то, что мы можем перейти во вкладку Messages, выбрав запрос на общение по веб-сокетам, и увидеть все сообщения, которые приходят и отправляются. Красной стрелкой обозначаются входящие сообщения, а зеленой — исходящие:

Вид на websocket-сообщения из вкладки Chrome Dev Tools
Вид на websocket-сообщения из вкладки Chrome Dev Tools

Давайте рассмотрим, по какому алгоритму будет работать наш сервис:

  1. При установке соединения клиент отправляет серверу GET-запрос на /connect, и происходит рукопожатие по описанному выше алгоритму.

  2. После этого сервер отправляет клиенту сообщение с типом "initial". Стоит обратить внимание на то, что такие обозначения придумал я и в вебсокетах нет типов сообщений по умолчанию. Чуть ниже я привожу описание разработанного протокола. Первое сообщение с типом "initial" нужно, чтобы назначить клиенту уникальный id на время сессии. Он нужен для того, чтобы идентифицировать клиентов. Кроме того, в initial-сообщении сервер присылает массив пользователей, которые уже есть на карте на данный момент.

  3. Когда клиент готов (установил соединение, загрузил карту, отрисовал все метки), он посылает серверу сообщение "connect", которое говорит о готовности к общению. В этом сообщение отправляются выданный клиенту id , его имя, широта и долгота. После обработки данного сообщения сервер рассылает всем активным пользователям "add"-сообщение о том, что присоединился новый пользователь и его нужно показать на карте.

  4. После этого начинается непрерывное общение клиента с сервером для поддержания соединения. При этом раз в секунду клиент запрашивает текущее местоположение у браузера и отсылает полученные координаты на сервер. Такое общение позволяет не только проверять сервер и клиент на работоспособность, но и отслеживать перемещение пользователя.

  5. Если пользователь инициирует закрытие соединения самостоятельно, то клиент отправляет серверу сообщение с типом "disconnect" и после этого маркер этого пользователя удаляется с карты у всех других пользователей с помощью рассылки "remove"-сообщения.

  6. При перемещении пользователя на достаточно большое расстояние отправляется событие "move", сообщающее всем активным клиентам о том, что нужно передвинуть маркер. Если перемещение незначительное, маркер не перемещается — иначе пришлось бы каждую секунду переставлять точки на карте из-за погрешностей при определении позиции браузером.

Итак:

События от сервера — “initial”, “add”, “remove” и “move” 
События от клиента — “connect”, “ping” и “disconnect

Учимся работать с вебсокетами

Чтобы сократить повествование, мы подготовили основу заранее. Дальше будет показан путь от нее и до конечного сервиса. Финальный код можно посмотреть здесь: https://github.com/ktsstudio/metaclass_websocket_webinar 

Заранее была написана логика для раздачи aiohttp-сервером статических файлов: index.html и дополнительных js-скриптов, лежащих в папке /client/static. Статика раздается aiohttp-сервером только в целях удобства демонстрации.

Заранее написанный view для раздачи index.html | app/core/views.py
import os
from aiohttp import web
from app import BASE_DIR
from app.base.application import View


class IndexView(View):
   async def get(self):
       with open(os.path.join(BASE_DIR, 'client', 'index.html'), 'r') as f:
           file = f.read()

       return web.Response(body=file, headers={
           'Content-Type': 'text/html',
       })

Заранее написанные routes для работы со статикой | app/core/routes.py
import os
from app import BASE_DIR
from app.base.application import Application
from app.core.views import IndexView


def setup_routes(app: Application):
   app.router.add_static("/static", os.path.join(BASE_DIR, "client", "static"))
   app.router.add_view("/", IndexView)

Начнем с того, что напишем view для вебсокет-соединения:

class ConnectView(View):
   async def get(self):
       ws = await self.store.ws.handle_request(self.request)
       return ws


Обратите внимание, что я наследую этот View и некоторые другие компоненты от заранее написаных — в них совсем немного общей логики, которую не хотелось бы дублировать, например добавление поля store. Код родительских компонентов можно найти в репозитории.

View получилось очень лаконичным: вся логика по установке и поддержке соединения «спрятана» в Store. Но при этом и достаточно понятным. 

Не забудем добавить наш View в app/core/routes.py:setup_routes():

   app.router.add_view("/connect", ConnectView)

Для работы с вебсокетами мы напишем отдельный accessor. Сущность accessor позволяет инкапсулировать логику по работе с внешним источником данных, в нашем случае с вебсокет-соединениями. В accessor принято оставлять только логику по взаимодействию с источником данных, то есть исключать бизнес-логику — это позволит заменить accessor, не меняя остального кода в приложении: например, вы решили использовать long polling вместо websocket. Бизнес-логику — например, какое послать сообщение, если он отправил свою позицию? — желательно оставлять в менеджерах и view, но в этой статье мы немного нарушим это правило в целях упрощения.

Создадим WSAccessor в app/store/ws/ws_accessor.py и сразу напишем метод handle_request, который мы используем в ConnectView:

class WSAccessor(BaseAccessor):
   class Meta:
       name = 'ws'


   def _init_(self):
       self._connections: dict[str, Any] = {}


   async def handle_request(self, request: 'Request'):
       ws_response = web.WebSocketResponse()
       await ws_response.prepare(request)
       connection_id = str(uuid.uuid4())
       self._connections[connection_id] = ws_response
       await self.read(id_=connection_id)
       await self.close(connection_id)
       return ws_response

Не удивляйтесь, что в методе _init_ только два символа нижнего подчеркивания — этот метод вызывается после метода __init__ в BaseAccessor. Внутренний класс Meta тоже служит своей цели.

Метод handle_request() напрямую отображает все то, что нам нужно сделать с вебсокет-соединением:

  1. Подготовить ответ на запрос установки соединения. Конечно, надо использовать специальный тип ответа aiohttp.web.WebSocketResponseон уже содержит все нужные заголовки (например, Upgrade: websocket) и нужный статус ответа (101) для ответа на вебсокет-рукопожатия

  2. Отправить заголовки и статус ответа клиенту — этим занимается метод .prepare(). Он отправляет статус ответа и заголовки клиенту, не заканчивая при этом обработку запроса

  3. Выдать соединению уникальный id, чтобы использовать его в бизнес-логике. UUID отлично подходит на роль формата этого id

  4. Сохранить соединение к себе в память (в self._connections). В данном случае соединение — это обычный python-объект, и мы можем хранить его в словаре

  5. Асинхронно читать сообщение клиента, пока он не отключится или мы не отключим его сами. Асинхронно — значит не блокируясь, ожидая нового сообщения (про эту самую асинхронность мы рассказываем в нашем курсе)

  6. Закрыть соединение и удалить его из нашего словаря a.k.a inmemory базы данных self._connections

  7. Вернуть python-объект ответа, тем самым закрыв соединение и отпустив клиента на волю

Да, методов self.read() и self.close() еще нет, но мы их скоро напишем. 

Вспоминаем, что ConnectView дергает ws.handle_request() из self.store и добавляем строчку к нашему Store. Можно воспринимать Store просто как набор аксессоров, доступный из любого места в приложении:

import typing
from app.store.ws.ws_accessor import WSAccessor
if typing.TYPE_CHECKING:
   from app.base.application import Application

class Store:
   def __init__(self, app: "Application"):
       self.app = app
       self.ws = WSAccessor(self)

Кому-то могут показаться странными строчки  3-4. Они позволяют импортировать сущности только при проверке типов (как в IDE), тем самым сохраняя удобство разработки и устраняя циклические зависимости. 

Давайте напишем метод read() в app/store/ws/ws_accessor:

async def read(self, id_: str):
   async for message in self._connections[id_]:
       await self.logger.info(json.loads(message.data))

Мы асинхронно итерируемся по соединению с заданным id и, как только нам придет новое сообщение, логгируем его содержимое. Мы заранее договорились с клиентом, что он присылает нам строки, содержащие json-данные, поэтому можем сразу распарсить их. Этот цикл будет работать до тех пор, пока websocket-соединение не начнет закрываться или мы вручную не прервем его.

Если канал начал закрываться, то наш цикл прервется, и мы выйдем из метода read(), перейдя в метод close(). В методе close() у нас уже, скорее всего, неработоспособный канал — из-за чего-то же прервался метод read(). Поэтому в close() мы не можем ничего отправлять в него, но можем почистить ресурсы и «правильно» закрыть соединение:

async def close(self, id_: str):
   try:
       connection = self._connections.pop(id_)
       await connection.close()
   except KeyError:
       return None
   return None

На строчках 3-4 мы находим соединение в нашем словаре соединений и закрываем его, используя специальный метод close() у объекта соединения.

Мы написали всю необходимую логику по установлению websocket-соединения. Рассмотрим путь запроса:

  1. Есть route /connect, который ведет в WSConnectView. Клиент посылает GET-запрос на /connect и инициирует установку websocket-соединения

  2. Выполняется python-метод WSConnectView.get()

  3. Выполняется метод self.store.ws.handle_request(), в который мы прокидываем self.request — пришедший HTTP-запрос в виде python-объекта

  4. В методе self.store.ws.handle_request() мы формируем WebSocketResponse

  5. Отсылаем заголовки, оставляя канал открытым

  6. Выдаем соединению случайно сгенерированный connection_id

  7. Добавляем пару id/соединение в словарь, чтобы всегда иметь доступ ко всем соединениям

  8. Начинаем читать сообщения из websocket-канала

  9. Когда соединение будет закрыто или случится ошибка, чтение закончится и мы выйдем из цикла

  10. Выполнится self.close() и корректно закроет соединение

  11. Объект response вернется в WSConnectView

Из WSConnectView ответ вернется пользователю, окончательно завершая обработку websocket-соединения.

Если мы сейчас запустим наш код и перейдём на http://localhost:8080, то увидим ответ 101 и нужные заголовки. Соединение установлено.

Пишем сообщения клиенту

Итак, читать из вебсокет-соединения мы научились. Однако для полноценного общения клиента и сервера нам не хватает методов. Давайте их напишем, но для начала немного поговорим о протоколе взаимодействия.

В общем смысле протокол означает набор некоторых правил, которые позволяют наладить общение между разными компонентами одной системы. Например, когда человеку говорят «Привет», скорее всего он поздоровается в ответ — получается своего рода протокол общения. Вы точно знакомы с протоколом HTTP, который часто служит для общения клиента с сервером и имеет в себя строго установленный стандарт общения — статусы ответов, общепринятые заголовки, методы запросов и т.д. А вот протокол WebSocket не имеет никакого стандарта, поэтому приходится придумывать его самостоятельно. 

Давайте установим правила для нашего протокола:

  1. Все данные приходят в формате JSON

  2. Все данные, как передаваемые, так и и получаемые, состоят из двух полей: kind и payload, где kind — строка, а payload — словарь, причем kind может принимать одно из нескольких заранее согласованных значений (“add”, “remove” и т.д.)

  3. Клиент сможет отправлять серверу сообщения только после того, как сервер первым ему напишет и выдаст уникальный id

  4. Клиент в каждом сообщении посылает свой уникальный id

Для поддержки 2 и 3 пункта протокола напишем в app/store/ws/ws_accessor.py dataclass, который будет называться Event:

@dataclass
class Event:
   kind: str
   payload: dict

Теперь нам нужен метод ws_accessor, который будет принимать Event, преобразовывать его в JSON и писать в вебсокет-канал. Мы помним, что вебсокеты поддерживают только строки (еще бинарные данные, но сейчас про них не говорим), поэтому мы должны преобразовать наши данные в строку и только после этого отправить:

async def push(self, id_: str, event: Event):
   json_data = json.dumps(asdict(event))
   await self._push(id_, json_data)

Напишем приватный метод self._push(). Он вынесен отдельно от self.push(), чтобы не было соблазна случайно отправить клиенту неправильно сформированную строку. Все сообщения отправляются только через self.push() — этот метод выступает некоторым гарантом валидности переданных данных. Вот так выглядит self._push():

async def _push(self, id_: str, data: str):
   await self._connections[id_].send_str(data)

Находим соединение и вызываем send_str(). Тут еще стоит добавить обработку ошибок — например KeyError, ConnectionResetError — но для упрощения сейчас мы это опускаем. 

Если мы вызовем метод self.push(), например в handle_request(), увидим, что в браузере действительно появится входящее сообщение:

Работает! Сервер прислал сообщение клиенту.

Напишем еще один метод — раз у нас несколько клиентов, то при изменении позиции одного из них нужно сообщить об этом всем остальным. 

Вызывать в цикле push — не очень приятно. Поэтому напишем self.push_all(), чтобы сообщать о своей локации всем, кроме определенных пользователей (нет смысла сообщать клиенту данные, которые он сам и передал): 

async def push_all(self, event: Event, except_of: Optional[list[str]] = None):
   if except_of is None:
       except_of = []
   json_data = json.dumps(asdict(event))
   results = await asyncio.gather(
       *[
           self._push(id_, json_data)
           for id_ in self._connections.keys()
           if str(id_) not in except_of
       ], return_exceptions=True,
   )
   for result in results:
       if isinstance(result, Exception):
           self.logger.warning(result)

Из необычного здесь — использование asyncio.gather (что позволяет выполнить запросы одновременно, а не один за другим) с аргументом  return_exceptions=True. Это нужно для того, чтобы мы точно залогировали все ошибки, а не только первую попавшуюся. В продакшн-решениях, конечно, надо не только логировать ошибки, но и обрабатывать их.

Итак, мы написали все методы, которые нужны для работы с вебсокетами, исходя из бизнес-логики приложения. Не хватает только самой бизнес-логики.

Реализуем бизнес-логику

Для нее создадим python-модуль app/store/geo.

Мы в рамках протокола оперируем полем kind — типом события. Давайте перечислим все возможные типы для клиента и сервера в app/store/geo/events.py:

class ServerEventKind:
    # отправляется при установке соединения с клиентом, сообщает ему выданный id
    INITIAL = 'initial'
    # сообщает клиенту, что к сервису подключился новый пользователь
    ADD = 'add'
    # сообщает клиенту, что определенный пользователь изменил свою геопозицию
    MOVE = 'move'
    # сообщает клиенту, что от сервиса отключился пользователь
    REMOVE = 'remove'


class ClientEventKind:
    # отправляется в ответ на ServerEventKind.INITIAL, содержит данные о клиенте
    CONNECT = 'connect'
    # отправляется клиентом раз в n секунд, содержит данные о текущей геопозиции
    PING = 'ping'
    # отправляется клиентом при ручном отсоединении от сервиса
    DISCONNECT = 'disconnect'

Чтобы правильно обрабатывать сообщения клиента, я буду использовать manager. Manager — это такая сущность, которая умеет работать с разными accessor для достижения поставленной бизнес-задачи. Например, при авторизации пользователя manager может закинуть его сессию в redis, а дату последнего логина в postgreSQL, используя соответствующие accessor, которые понятия не имеют друг о друге.

Внутри модуля app/store/geo, в файле geo_manager.py создадим GeoManager, вместе с которым сразу создадим dataclass User и поле self._users, которое будет хранить данные о подключенных пользователях:

@dataclass
class User:
   id: str
   name: str
   latitude: float
   longitude: float

class GeoManager(BaseAccessor):
   class Meta:
       name = 'geo'

   def _init_(self) -> None:
       self._users: dict[str, User] = {}

Сразу добавляем его в app/store/store.py:

class Store:
   def __init__(self, app: "Application"):
       ...
       self.geo = GeoManager(self)

Наш менеджер должен уметь обрабатывать события всех типов, пришедшие из вебсокет-соединения. Для этого напишем метод handle_event():

async def handle_event(self, event: Event):
   user_id = event.payload['id']  # клиент всегда присылает поле id
   if event.kind == ClientEventKind.CONNECT:
       await self._on_connect(user_id, event.payload)
   elif event.kind == ClientEventKind.DISCONNECT:
       await self._on_disconnect(user_id)
   elif event.kind == ClientEventKind.PING:
       await self._on_ping(user_id, event.payload['latitude'], event.payload['longitude'])
   else:
       raise NotImplementedError(event.kind)

Для каждого типа события мы вызовем свой обработчик: например, для ClientEventKind.CONNECT вызовем self.on_connect. К обработчикам вернемся чуть позже. Не забываем про вариант, когда от клиента пришло какое-то незнакомое событие — в таком случае лучше выкинуть исключение NotImplementedError, чтобы не столкнуться с неочевидным поведением. 

Теперь нам нужно как-то вызывать метод handle_event(). Не будем усложнять и вызовем бизнес-логику в коде ws_accessor, изменив метод read():

async def read(self, id_: str):
   async for message in self._connections[id_]:
       raw_event = json.loads(message.data)
       await self.store.geo.handle_event(event=Event(
           kind=raw_event['kind'],
           payload=raw_event['payload'],
       ))

Мы написали handle_event(), но помним, что по нашему протоколу сервер первым должен отправить сообщение. Поэтому напишем еще метод handle_open(), который и будет отправлять это сообщение. Добавим его в GeoManager:

async def handle_open(self, user_id: str):
   await self.store.ws.push(
       user_id,
       event=Event(
           kind=ServerEventKind.INITIAL,
           payload={
               'id': str(user_id),
               'users': [asdict(_user) for _user in self._users.values()],
           },
       ),
   )

Этот метод отправляет клиенту сообщение при установке соединение. Он посылает выданный клиенту id соединения и список пользователей, уже подключенных к сервису, чтобы этот клиент мог заранее расставить маркеры у себя на карте.

Вызовем его опять же в методе ws_accessor, изменив handle_request():

async def handle(self, request: 'Request'):
   …
   self._connections[connection_id] = ws_response
   await self.store.geo.handle_open(str(connection_id))
   await self.read(id_=connection_id)
   …

Теперь перезапустим сервер и проверим, как это все работает:

Сообщение от сервера
Сообщение от сервера
Сообщение от клиента
Сообщение от клиента

Действительно, сервер при установке соединение первым отправил id и список пользователей. Список пока пустой, так как мы будем добавлять пользователей в методе _on_connect.

Обрабатываем сообщения от клиента

При отправке клиенту сообщения “initial” он ответит нам сообщением “connect”, если у него все хорошо. После “connect” мы будем добавлять пользователя в self._users и сообщать о нем всем остальным пользователям. Вот так будет выглядеть метод GeoManager._on_connect:

async def _on_connect(self, user_id: str, payload: dict):
   latitude, longitude, name = payload['latitude'], payload['longitude'], payload['name']
   self.logger.info(f'{name} joined! {latitude=}, {longitude=}')
   self._users[user_id] = User(
       id=user_id,
       name=name,
       latitude=latitude,
       longitude=longitude,
   )
   await self.store.ws.push_all(Event(
       kind=ServerEventKind.ADD,
       payload=asdict(self._users[user_id]),
   ), except_of=[user_id])

Сразу допишем метод _on_ping. Клиент будет присылать нам событие “ping” и свои текущие координаты каждую секунду — так настроено в клиенте для более наглядной демонстрации. В методе _on_ping мы должны сравнить пришедшие координаты и те, которые мы уже положили в self._users. И конечно, надо обновить координаты пользователя в базе:

async def _on_ping(self, user_id, latitude, longitude):
   user = self._users[user_id]
   self.logger.info(f'ping from {user.name}')

   if abs(user.latitude - latitude) > 0.05 or abs(user.longitude - longitude) > 0.05:
       self.logger.info(f'{user.name} moved!')
       user.latitude, user.longitude = latitude, longitude
       await self.store.ws.push_all(Event(
           kind=ServerEventKind.MOVE,
           payload=asdict(user),
       ), except_of=[user_id])
   else:
       user.latitude, user.longitude = latitude, longitude
Зачем нужно условие на строке 5? И как вообще браузер определяет мою геопозицию, если у меня в ноутбуке нет GPS ?

Для устройств без GPS-модуля определение геопозиции происходит с помощью Wi-Fi-сетей. Ваш компьютер знает силу сигналов от разных сетей рядом с вами, а у компаний вроде Google и Apple есть свои базы данных этих точек: то есть SSID точки и ее примерное местоположение. Если вы подключились к точке доступа Wi-Fi с телефона, скорее всего, в базу добавились id этой точки доступа + координаты вашего телефона с корректировкой по силе и направлению сигнала точки. Ваш компьютер также по силе сигнала способен оценить расстояние и направление до роутера и на основе этих данных вычислить примерную геопозицию.

Из-за этого способа оценки latitude и longitude будут постоянно меняться: сигналы от Wi-Fi-точек то усиливаются, то слабеют. Будет неоптимально, если наш сервис начнет оповещать всех клиентов о том, что широта пользователя изменилась на 0,001 из-за погрешности при ее определении. Условие на строке 6 нужно именно для того, чтобы не делать оповещений при малых изменениях, а вот если перемещение значительное, рассылаем пользователям новую информацию о местоположении юзера.

Метод _on_disconnect пишется по аналогии: получаем сообщение, удаляем пользователя из self._users, пишем всем активным клиентам, что данный пользователь ушел с радаров. Вот так он выглядит:

async def _on_disconnect(self, user_id: str):
   user = self._users.pop(user_id)
   self.logger.info(f'{user.name} left, bye bye!')
   await self.store.ws.push_all(Event(
       kind=ServerEventKind.REMOVE,
       payload={
           'id': user_id,
       }
   ), except_of=[user_id])

Перезапускаем сервер и смотрим, что происходит в браузере и в консоли сервера:

Клиент успешно установил соединение и начал отправлять ping-сообщения
Клиент успешно установил соединение и начал отправлять ping-сообщения
Сервер успешно обрабатывает сообщения клиента и логирует их
Сервер успешно обрабатывает сообщения клиента и логирует их

Подключим еще одного клиента, переместим его и посмотрим, что будет приходить первому:

Сообщения, приходящие первому клиенту
Сообщения, приходящие первому клиенту

Поздравляю, мы реализовали основную функцию нашего сервиса!

Теперь клиенты теперь в реальном времени получают информацию друг о друге и могут отслеживать взаимные передвижения.

Однако осталась еще одна проблема: что, если у пользователя отключили Интернет и он не послал сообщение “disconnect”? Он так и будет висеть на карте других пользователей? Конечно, это неправильно. Давайте исправлять.

Избавляемся от фантомных пользователей

Из-за особенностей вебсокет-соединения ушедший клиент продолжает числиться активным еще некоторое время — соединение может оставаться открытым какое-то количество секунд после физического отключения клиента. Обычно это время около 75 секунд — это примерный интервал между проверками TCP-соединения на жизнеспособность в Unix-системах. После этого времени отправляется несколько контрольных запросов, на которые клиент должен ответить, чтоб сохранить соединение. Если он не отвечает, то соединение считается закрытым и разрывается. Может случиться ситуация, что клиент отключится сразу же после такой проверки и соединение останется висеть открытым еще около 75 секунд. Точка пользователя на карте все еще будет показываться, но на самом деле его уже и след простыл.

Однако мы не можем разрывать соединение как только клиент отключился:

  • во-первых, для этого потребовалось бы делать проверку почти каждое мгновение, и проверок было бы слишком много

  • во-вторых, возможно, у клиента просто случился моментный сбой в сети, а сервер его уже отключил

Потому хотелось бы просто уменьшить 75 секунд фантомного существования пользователя до более приемлемого значения. Самый простой способ решить эту проблему — создавать фоновую задачи, которая будет спать какое-то время и обрывать соединение после его истечения. Если пользователь все-таки проявит активность, эта задача будет отменена, а на ее место придет новая с обновленным таймаутом. То есть если от пользователя не будет ответа на ping-сообщения в течение 10 секунд, сервер будет считать, что он отключился. Время таймаута можно ставить на свое усмотрение — с его помощью вы просто уменьшаете свою зону неопределенности до приемлемого для вас значения.

Для начала добавим в ws_accessor.py:init словарь self._timeout_tasks: dict[str, Task]. Это словарь, в котором будут id соединения и asyncio-задача. Когда соединение будет открыто, мы положим в словарь задачу и через какое-то время она выполнится. Заодно добавим константу CONNECTION_TIMEOUT и установим ее равной 10 секундам:

class WSAccessor(BaseAccessor):
   CONNECTION_TIMEOUT = 10

   def _init_(self):
       self._connections: dict[str, Any] = {}
       self._timeout_tasks: dict[str, Task] = {}

Напишем два метода для достижения нашей цели:

def _refresh_timeout(self, connection_id: str):
   task = self._timeout_tasks.get(connection_id)
   if task:
       task.cancel()

   self._timeout_tasks[connection_id] = asyncio.create_task(self._close_by_timeout(connection_id))

async def _close_by_timeout(self, connection_id: str):
   await asyncio.sleep(self.CONNECTION_TIMEOUT)
   await self.store.geo.handle_close(connection_id)
   await self.store.ws.close(connection_id)

Метод _refresh_timeout позволит нам обновлять таймауты при появлении нового сообщения от пользователя, а метод _close_by_timeout как раз будет являться нашей фоновой задачей для закрытия соединения.

Осталось только добавить вызов метода _refresh_timeout. Самым оптимальным местом является метод read():

async def read(self, id_: str):
   async for message in self._connections[id_]:
       self._refresh_timeout(id_)
	     …

Однако может быть и такое, что только что присоединившийся клиент сразу же отключился. Обработаем и это в WSAccessor.handle_request():

async def handle_request(self, request: 'Request'):
   …
   self._connections[connection_id] = ws_response
   self._refresh_timeout(connection_id)
   await self.store.geo.handle_open(str(connection_id))
   …

Теперь, если клиент закрыл браузер, не разорвав соединение, мы узнаем об этом максимум через 10 секунд:

Заключение

В результате мы написали сервис, который позволяет пользователям делиться геолокацией друг с другом с помощью вебсокетов. В данном случае эта технология действительно полезна, поскольку сервису необходимо уметь как отправлять позицию текущего пользователя, так и получать позиции других, плюс делать это в режиме реального времени, чтобы можно было отслеживать перемещение пользователей.

В дальнейшем такому сервису можно добавить разделение пользователей по комнатам, чтобы, например, видеть перемещения только своих друзей, собирающихся на встречу, или показывать траектории движения участников соревнований по спортивному ориентированию, накладывая их пути на туристическую карту местности. В общем, на основе этого примера можно сделать различные интересные сервисы.

Мы убедились, что вебсокеты — мощная технология с множеством возможностей, к использованию которой необходимо подходить ответственно и осторожно. Не стоит забывать про ее минусы, а так же лучше ознакомиться с ее уязвимостями перед серьезным использованием.


Другие наши статьи по бэкенду и асинхронному программированию для начинающих:

Другие наши статьи по бэкенду и асинхронному программированию для продвинутого уровня:


👉 Если статья была интересной, приходите к нам на курс, где мы решаем подобные задачи: «Асинхронное программирование для начинающих»

Теги:
Хабы:
Всего голосов 12: ↑12 и ↓0+12
Комментарии4

Публикации

Информация

Сайт
kts.tech
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия