Cегодня мы с вами напишем решение для фреймворка Starlette, которое позволит группировать открытые вебсокет соединения.
Starlette довольно молодой фреймворк, и какие-то «плюшки» для него приходится писать самостоятельно. В предыдущей статье я показал как можно реализовать JWT сессии и «Djangoподобную» структуру, сегодня мы рассмотрим как группировать вебсокет соединения.
Допустим я и мой друг (назовем его UnnamedUser) решили пообщаться в чате.
Когда я захожу в комнату с ID=1, мой браузер устанавливает первое вебсокет соединение с сервером (для упрощения назовем его «канал 1»*).
Когда UnnamedUser заходит в комнату с ID=1, его браузер устанавливает второе вебсокет соединение с сервером (для упрощения назовем его «канал 2»*).
* далее по тексту вебсокет соединение будет называться каналом
Чтобы решить эту проблему нам необходимо объединить наши каналы в группу (к примеру room_1) и делать массовые рассылки при наступлении какого либо события (к примеру кто-то написал в чат).
Для хранения групп создадим класс ChannelGroups и объявим глобально его экземпляр:
Чтобы добавить для открытых соединений связь с channel_groups нам необходимо унаследоваться от базового класса WebSocketEndpoint.
Начнем с создания вспомогательного класса Channel:
В качестве uuid (уникального идентификатора) канала мы будем использовать встроенный в Python механизм идентификации UUID objects.
Создадим основной класс ChannelEndpoint от которого мы будем наследоваться в наших endpoints:
При инициализации мы переопределяем базовый метод WebSocketEndpoint on_connect, добавляя к нему объект Channel.
В endpoints унаследованных от ChannelEndpoint появляются новые публичные методы:
В процессе эксплуатации нам понадобится отправка сообщения в группы из любого места кода, «мониторинг» групп, а также «очистка».
Отправка сообщения в группу из любого места кода:
«Мониторинг» групп:
Очистка:
Менеджер пакетов pip:
Исходный код решения
**UPD 2022:**
channel-box дорос до версии 0.4 и теперь дружит с последними версиями Starlette.
Чат на channel-box здесь:
channel-box.andrey-sobolev.ru
Вступление
Starlette довольно молодой фреймворк, и какие-то «плюшки» для него приходится писать самостоятельно. В предыдущей статье я показал как можно реализовать JWT сессии и «Djangoподобную» структуру, сегодня мы рассмотрим как группировать вебсокет соединения.
Для чего нужна группировка вебсокет cоединений?
Допустим я и мой друг (назовем его UnnamedUser) решили пообщаться в чате.
Когда я захожу в комнату с ID=1, мой браузер устанавливает первое вебсокет соединение с сервером (для упрощения назовем его «канал 1»*).
Когда UnnamedUser заходит в комнату с ID=1, его браузер устанавливает второе вебсокет соединение с сервером (для упрощения назовем его «канал 2»*).
* далее по тексту вебсокет соединение будет называться каналом
Технически «канал 1» и «канал 2» это два разных объекта класса WebSocketEndpoint, поэтому в чате мы видим только свои сообщения, а не сообщения других участников (как ожидалось).
Чтобы решить эту проблему нам необходимо объединить наши каналы в группу (к примеру room_1) и делать массовые рассылки при наступлении какого либо события (к примеру кто-то написал в чат).
Где хранить группы?
Для хранения групп создадим класс ChannelGroups и объявим глобально его экземпляр:
import time import uuid from simple_print.functions import sprint_f from starlette.endpoints import WebSocketEndpoint class ChannelGroups: def __init__(self): self.created = time.time() _CHANNEL_GROUPS = {} created = None async def group_send(self, group, payload): self.clean_expired() for channel in self._CHANNEL_GROUPS.get(group, {}): await channel.send(payload) def groups_show(self): if self._CHANNEL_GROUPS: for group in self._CHANNEL_GROUPS: sprint_f(f"\n{group}", "green") for channel in self._CHANNEL_GROUPS.get(group, {}): sprint_f(channel, "cyan") if channel.is_expired(): sprint_f("expired", "red") else: sprint_f("Channel groups is empty", "yellow") def groups_flush(self): self._CHANNEL_GROUPS = {} def group_add(self, group, channel): self._CHANNEL_GROUPS.setdefault(group, {}) self._CHANNEL_GROUPS[group][channel] = "" def remove_channel(self, channel): for group in self._CHANNEL_GROUPS: if channel in self._CHANNEL_GROUPS[group]: del self._CHANNEL_GROUPS[group][channel] if not any(self._CHANNEL_GROUPS[group]): del self._CHANNEL_GROUPS[group] def clean_expired(self): for group in self._CHANNEL_GROUPS: for channel in self._CHANNEL_GROUPS.get(group, {}): if channel.is_expired(): del self._CHANNEL_GROUPS[group][channel] if not any(self._CHANNEL_GROUPS[group]): del self._CHANNEL_GROUPS[group] channel_groups = ChannelGroups()
Унаследуемся от WebSocketEndpoint
Чтобы добавить для открытых соединений связь с channel_groups нам необходимо унаследоваться от базового класса WebSocketEndpoint.
Начнем с создания вспомогательного класса Channel:
class Channel: def __init__(self, websocket, expires, encoding): self.channel_uuid = str(uuid.uuid1()) self.websocket = websocket self.expires = expires self.encoding = encoding self.created = time.time() async def send(self, payload): websocket = self.websocket if self.encoding == "json": try: await websocket.send_json(payload) except RuntimeError: pass elif self.encoding == "text": try: await websocket.send_text(payload) except RuntimeError: pass elif self.encoding == "bytes": try: await websocket.send_bytes(payload) except RuntimeError: pass else: try: await websocket.send(payload) except RuntimeError: pass self.created = time.time() def is_expired(self): return self.expires + int(self.created) < time.time() def __repr__(self): return f"{self.channel_uuid}"
В качестве uuid (уникального идентификатора) канала мы будем использовать встроенный в Python механизм идентификации UUID objects.
Создадим основной класс ChannelEndpoint от которого мы будем наследоваться в наших endpoints:
class ChannelEndpoint(WebSocketEndpoint): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.expires = 60 * 60 * 24 self.encoding = "json" self.channel_groups = channel_groups async def on_connect(self, websocket, **kwargs): await super().on_connect(websocket, **kwargs) self.channel = Channel(websocket=websocket, expires=self.expires, encoding=self.encoding) async def on_disconnect(self, websocket, close_code): await super().on_disconnect(websocket, close_code) self.channel_groups.remove_channel(self.channel) async def group_send(self, payload): await self.channel_groups.group_send(self.group, payload) def get_or_create(self, group): assert self._validate_name(group), "Invalid group name" self.channel_groups.group_add(group, self.channel) self.group = group def _validate_name(self, name): if name.isidentifier(): return True raise TypeError("Group names must be valid python identifier only alphanumerics and underscores are accepted")
При инициализации мы переопределяем базовый метод WebSocketEndpoint on_connect, добавляя к нему объект Channel.
В endpoints унаследованных от ChannelEndpoint появляются новые публичные методы:
- get_or_create(self, group) — для получения или создания группы
- group_send(self, payload) — для отправки сообщений в каналы, которые входят в данную группу.
Пример интеграции
routes = [ Route("/chat/", endpoint=ChatView) Route("/chat/ws", endpoint=ChatChannel) ] html = """ <!DOCTYPE html> <html> <head> <title>ws</title> </head> <body> <h1>ChannelEndpoint</h1> <form action="" onsubmit="sendMessage(event)"> <label>group_id: </label><input type="text" id="groupId" autocomplete="off" value="2"><br/> <label>username: </label><input type="text" id="username" autocomplete="off" value="test_user2"><br/> <label>message: </label><input type="text" id="messageText" autocomplete="off" value="test_message2"><br/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var ws = new WebSocket("ws://localhost/chat/chat/ws"); ws.onmessage = function(event) { console.log('Message received %s', event.data) var messages = document.getElementById('messages'); var message = document.createElement('li'); var data = JSON.parse(event.data); message.innerHTML = `<strong>${data.username} :</strong> ${data.message}`; messages.appendChild(message); }; function sendMessage(event) { var username = document.getElementById("username"); var group_id = document.getElementById("groupId"); var input = document.getElementById("messageText"); var data = { "group_id": group_id.value, "username": username.value, "message": input.value, }; console.log('Message send %s', data) ws.send(JSON.stringify(data)); event.preventDefault(); } </script> </body> </html> """ class ChatView(HTTPEndpoint): async def get(self, request): return HTMLResponse(html) class ChatChannel(ChannelEndpoint): # наследуемся от ChannelEndpoint async def on_receive(self, websocket, data): group_id = data["group_id"] message = data["message"] username = data["username"] if message.strip(): await self.get_or_create(group_id) # получаем группу (и все ее каналы) из словаря CHANNEL_GROUPS payload = { "username": username, "message": message, } await self.group_send(payload) # отправляем сообщение всем участникам группы
Методы ChannelGroups
В процессе эксплуатации нам понадобится отправка сообщения в группы из любого места кода, «мониторинг» групп, а также «очистка».
Отправка сообщения в группу из любого места кода:
from channel_box import channel_groups await channel_groups.group_send('my_chat_1', {"username": "New User", "message": "Hello world"})
«Мониторинг» групп:
from channel_box import channel_groups channel_groups.groups_show()
Очистка:
from channel_box import channel_groups channel_groups.groups_flush()
Установка
Менеджер пакетов pip:
pip install channel-box
Исходный код решения
**UPD 2022:**
channel-box дорос до версии 0.4 и теперь дружит с последними версиями Starlette.
Чат на channel-box здесь:
channel-box.andrey-sobolev.ru
