Как стать автором
Обновить

Группировка вебсокет соединений для асинхронного фреймворка Starlette

Время на прочтение6 мин
Количество просмотров4K
Cегодня мы с вами напишем решение для фреймворка Starlette, которое позволит группировать открытые вебсокет соединения.

Вступление


Starlette довольно молодой фреймворк, и какие-то «плюшки» для него приходится писать самостоятельно. В предыдущей статье я показал как можно реализовать JWT сессии и «Djangoподобную» структуру, сегодня мы рассмотрим как группировать вебсокет соединения.

Для чего нужна группировка вебсокет cоединений?


Допустим я и мой друг (назовем его UnnamedUser) решили пообщаться в чате.

Когда я захожу в комнату с ID=1, мой браузер устанавливает первое вебсокет соединение с сервером (для упрощения назовем его «канал 1»*).

Когда UnnamedUser заходит в комнату с ID=1, его браузер устанавливает второе вебсокет соединение с сервером (для упрощения назовем его «канал 2»*).

* далее по тексту вебсокет соединение будет называться каналом
Технически «канал 1» и «канал 2» это два разных объекта класса WebSocketEndpoint, поэтому в чате мы видим только свои сообщения, а не сообщения других участников (как ожидалось).

Чтобы решить эту проблему нам необходимо объединить наши каналы в группу (к примеру room_1) и делать массовые рассылки при наступлении какого либо события (к примеру кто-то написал в чат).

Где хранить группы?


Для хранения групп создадим класс ChannelGroups и объявим глобально его экземпляр:

import time
import uuid
from simple_print.functions import sprint_f
from starlette.endpoints import WebSocketEndpoint

class ChannelGroups:

    def __init__(self):
        self.created = time.time()  

    _CHANNEL_GROUPS = {} 
    created = None

    async def group_send(self, group, payload):
        self.clean_expired()
        for channel in self._CHANNEL_GROUPS.get(group, {}):
            await channel.send(payload)

    def groups_show(self):
        if self._CHANNEL_GROUPS:
            for group in self._CHANNEL_GROUPS:
                sprint_f(f"\n{group}", "green")
                for channel in self._CHANNEL_GROUPS.get(group, {}):
                    sprint_f(channel, "cyan")
                    if channel.is_expired():
                        sprint_f("expired", "red")
        else:
            sprint_f("Channel groups is empty", "yellow")    

    def groups_flush(self):
        self._CHANNEL_GROUPS = {}

    def group_add(self, group, channel):
        self._CHANNEL_GROUPS.setdefault(group, {})
        self._CHANNEL_GROUPS[group][channel] = ""

    def remove_channel(self, channel):
        for group in self._CHANNEL_GROUPS:
            if channel in self._CHANNEL_GROUPS[group]:
                del self._CHANNEL_GROUPS[group][channel]
                if not any(self._CHANNEL_GROUPS[group]):
                    del self._CHANNEL_GROUPS[group]            

    def clean_expired(self):
        for group in self._CHANNEL_GROUPS:
            for channel in self._CHANNEL_GROUPS.get(group, {}):
                if channel.is_expired():
                    del self._CHANNEL_GROUPS[group][channel]
                    if not any(self._CHANNEL_GROUPS[group]):
                        del self._CHANNEL_GROUPS[group]            


channel_groups = ChannelGroups()

Унаследуемся от WebSocketEndpoint


Чтобы добавить для открытых соединений связь с channel_groups нам необходимо унаследоваться от базового класса WebSocketEndpoint.

Начнем с создания вспомогательного класса Channel:
class Channel:

    def __init__(self, websocket, expires, encoding):
        self.channel_uuid = str(uuid.uuid1())
        self.websocket = websocket
        self.expires = expires
        self.encoding = encoding
        self.created = time.time()

    async def send(self, payload):
        websocket = self.websocket
        if self.encoding == "json":
            try:
                await websocket.send_json(payload)
            except RuntimeError:
                pass
        elif self.encoding == "text":
            try:
                await websocket.send_text(payload)
            except RuntimeError:
                pass               
        elif self.encoding == "bytes":
            try:
                await websocket.send_bytes(payload)
            except RuntimeError:
                pass                   
        else:
            try:
                await websocket.send(payload)
            except RuntimeError:
                pass                   
        self.created = time.time()

    def is_expired(self):
        return self.expires + int(self.created) < time.time()

    def __repr__(self):
        return f"{self.channel_uuid}"

В качестве uuid (уникального идентификатора) канала мы будем использовать встроенный в Python механизм идентификации UUID objects.

Создадим основной класс ChannelEndpoint от которого мы будем наследоваться в наших endpoints:

class ChannelEndpoint(WebSocketEndpoint):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.expires = 60 * 60 * 24  
        self.encoding = "json"
        self.channel_groups = channel_groups

    async def on_connect(self, websocket, **kwargs):
        await super().on_connect(websocket, **kwargs)
        self.channel = Channel(websocket=websocket, expires=self.expires, encoding=self.encoding)

    async def on_disconnect(self, websocket, close_code):
        await super().on_disconnect(websocket, close_code)
        self.channel_groups.remove_channel(self.channel)

    async def group_send(self, payload):
        await self.channel_groups.group_send(self.group, payload)

    def get_or_create(self, group):
        assert self._validate_name(group), "Invalid group name"
        self.channel_groups.group_add(group, self.channel)
        self.group = group

    def _validate_name(self, name):
        if name.isidentifier():
            return True
        raise TypeError("Group names must be valid python identifier only alphanumerics and underscores are accepted")

При инициализации мы переопределяем базовый метод WebSocketEndpoint on_connect, добавляя к нему объект Channel.

В endpoints унаследованных от ChannelEndpoint появляются новые публичные методы:

  • get_or_create(self, group) — для получения или создания группы
  • group_send(self, payload) — для отправки сообщений в каналы, которые входят в данную группу.

Пример интеграции


routes = [
    Route("/chat/", endpoint=ChatView)
    Route("/chat/ws", endpoint=ChatChannel)
]

html = """
<!DOCTYPE html>
<html>
    <head>
        <title>ws</title>
    </head>
    <body>
        <h1>ChannelEndpoint</h1>
        <form action="" onsubmit="sendMessage(event)">
            <label>group_id: </label><input type="text" id="groupId" autocomplete="off" value="2"><br/>
            <label>username: </label><input type="text" id="username" autocomplete="off" value="test_user2"><br/>       
            <label>message: </label><input type="text" id="messageText" autocomplete="off" value="test_message2"><br/>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = new WebSocket("ws://localhost/chat/chat/ws");
            ws.onmessage = function(event) {
                console.log('Message received %s', event.data)
                var messages = document.getElementById('messages');
                var message = document.createElement('li');
                var data = JSON.parse(event.data);
                message.innerHTML = `<strong>${data.username} :</strong> ${data.message}`;
                messages.appendChild(message);
            };
            function sendMessage(event) {
                var username = document.getElementById("username");
                var group_id = document.getElementById("groupId");
                var input = document.getElementById("messageText");
                var data = {
                    "group_id": group_id.value, 
                    "username": username.value,
                    "message": input.value,
                };
                console.log('Message send %s', data)
                ws.send(JSON.stringify(data));
                event.preventDefault();
            }
        </script>
    </body>
</html>
"""

class ChatView(HTTPEndpoint):
    async def get(self, request):
        return HTMLResponse(html)

class ChatChannel(ChannelEndpoint): # наследуемся от ChannelEndpoint
    async def on_receive(self, websocket, data):
        group_id = data["group_id"]
        message = data["message"]
        username = data["username"]
        if message.strip():
            await self.get_or_create(group_id)  # получаем группу (и все ее каналы) из словаря CHANNEL_GROUPS 
            payload = {
                "username": username,
                "message": message,
            } 
            await self.group_send(payload) # отправляем сообщение всем участникам группы

Методы ChannelGroups


В процессе эксплуатации нам понадобится отправка сообщения в группы из любого места кода, «мониторинг» групп, а также «очистка».

Отправка сообщения в группу из любого места кода:

from channel_box import channel_groups
await channel_groups.group_send('my_chat_1', {"username": "New User", "message": "Hello world"})


«Мониторинг» групп:
from channel_box import channel_groups
channel_groups.groups_show()


Очистка:
from channel_box import channel_groups
channel_groups.groups_flush()


Установка


Менеджер пакетов pip:
pip install channel-box


Исходный код решения

**UPD 2022:**

channel-box дорос до версии 0.4 и теперь дружит с последними версиями Starlette.

Чат на channel-box здесь:

channel-box.andrey-sobolev.ru
Теги:
Хабы:
Всего голосов 1: ↑1 и ↓0+1
Комментарии5

Публикации

Работа

Data Scientist
39 вакансий

Ближайшие события