Cегодня мы с вами напишем решение для фреймворка Starlette, которое позволит группировать открытые вебсокет соединения.
Starlette довольно молодой фреймворк, и какие-то «плюшки» для него приходится писать самостоятельно. В предыдущей статье я показал как можно реализовать JWT сессии и «Djangoподобную» структуру, сегодня мы рассмотрим как группировать вебсокет соединения.
Допустим я и мой друг (назовем его UnnamedUser) решили пообщаться в чате.
Когда я захожу в комнату с ID=1, мой браузер устанавливает первое вебсокет соединение с сервером (для упрощения назовем его «канал 1»*).
Когда UnnamedUser заходит в комнату с ID=1, его браузер устанавливает второе вебсокет соединение с сервером (для упрощения назовем его «канал 2»*).
* далее по тексту вебсокет соединение будет называться каналом
Чтобы решить эту проблему нам необходимо объединить наши каналы в группу (к примеру room_1) и делать массовые рассылки при наступлении какого либо события (к примеру кто-то написал в чат).
Для хранения групп создадим класс ChannelGroups и объявим глобально его экземпляр:
Чтобы добавить для открытых соединений связь с channel_groups нам необходимо унаследоваться от базового класса WebSocketEndpoint.
Начнем с создания вспомогательного класса Channel:
В качестве uuid (уникального идентификатора) канала мы будем использовать встроенный в Python механизм идентификации UUID objects.
Создадим основной класс ChannelEndpoint от которого мы будем наследоваться в наших endpoints:
При инициализации мы переопределяем базовый метод WebSocketEndpoint on_connect, добавляя к нему объект Channel.
В endpoints унаследованных от ChannelEndpoint появляются новые публичные методы:
В процессе эксплуатации нам понадобится отправка сообщения в группы из любого места кода, «мониторинг» групп, а также «очистка».
Отправка сообщения в группу из любого места кода:
«Мониторинг» групп:
Очистка:
Менеджер пакетов pip:
Исходный код решения
**UPD 2022:**
channel-box дорос до версии 0.4 и теперь дружит с последними версиями Starlette.
Чат на channel-box здесь:
channel-box.andrey-sobolev.ru
Вступление
Starlette довольно молодой фреймворк, и какие-то «плюшки» для него приходится писать самостоятельно. В предыдущей статье я показал как можно реализовать JWT сессии и «Djangoподобную» структуру, сегодня мы рассмотрим как группировать вебсокет соединения.
Для чего нужна группировка вебсокет cоединений?
Допустим я и мой друг (назовем его UnnamedUser) решили пообщаться в чате.
Когда я захожу в комнату с ID=1, мой браузер устанавливает первое вебсокет соединение с сервером (для упрощения назовем его «канал 1»*).
Когда UnnamedUser заходит в комнату с ID=1, его браузер устанавливает второе вебсокет соединение с сервером (для упрощения назовем его «канал 2»*).
* далее по тексту вебсокет соединение будет называться каналом
Технически «канал 1» и «канал 2» это два разных объекта класса WebSocketEndpoint, поэтому в чате мы видим только свои сообщения, а не сообщения других участников (как ожидалось).
Чтобы решить эту проблему нам необходимо объединить наши каналы в группу (к примеру room_1) и делать массовые рассылки при наступлении какого либо события (к примеру кто-то написал в чат).
Где хранить группы?
Для хранения групп создадим класс ChannelGroups и объявим глобально его экземпляр:
import time
import uuid
from simple_print.functions import sprint_f
from starlette.endpoints import WebSocketEndpoint
class ChannelGroups:
def __init__(self):
self.created = time.time()
_CHANNEL_GROUPS = {}
created = None
async def group_send(self, group, payload):
self.clean_expired()
for channel in self._CHANNEL_GROUPS.get(group, {}):
await channel.send(payload)
def groups_show(self):
if self._CHANNEL_GROUPS:
for group in self._CHANNEL_GROUPS:
sprint_f(f"\n{group}", "green")
for channel in self._CHANNEL_GROUPS.get(group, {}):
sprint_f(channel, "cyan")
if channel.is_expired():
sprint_f("expired", "red")
else:
sprint_f("Channel groups is empty", "yellow")
def groups_flush(self):
self._CHANNEL_GROUPS = {}
def group_add(self, group, channel):
self._CHANNEL_GROUPS.setdefault(group, {})
self._CHANNEL_GROUPS[group][channel] = ""
def remove_channel(self, channel):
for group in self._CHANNEL_GROUPS:
if channel in self._CHANNEL_GROUPS[group]:
del self._CHANNEL_GROUPS[group][channel]
if not any(self._CHANNEL_GROUPS[group]):
del self._CHANNEL_GROUPS[group]
def clean_expired(self):
for group in self._CHANNEL_GROUPS:
for channel in self._CHANNEL_GROUPS.get(group, {}):
if channel.is_expired():
del self._CHANNEL_GROUPS[group][channel]
if not any(self._CHANNEL_GROUPS[group]):
del self._CHANNEL_GROUPS[group]
channel_groups = ChannelGroups()
Унаследуемся от WebSocketEndpoint
Чтобы добавить для открытых соединений связь с channel_groups нам необходимо унаследоваться от базового класса WebSocketEndpoint.
Начнем с создания вспомогательного класса Channel:
class Channel:
def __init__(self, websocket, expires, encoding):
self.channel_uuid = str(uuid.uuid1())
self.websocket = websocket
self.expires = expires
self.encoding = encoding
self.created = time.time()
async def send(self, payload):
websocket = self.websocket
if self.encoding == "json":
try:
await websocket.send_json(payload)
except RuntimeError:
pass
elif self.encoding == "text":
try:
await websocket.send_text(payload)
except RuntimeError:
pass
elif self.encoding == "bytes":
try:
await websocket.send_bytes(payload)
except RuntimeError:
pass
else:
try:
await websocket.send(payload)
except RuntimeError:
pass
self.created = time.time()
def is_expired(self):
return self.expires + int(self.created) < time.time()
def __repr__(self):
return f"{self.channel_uuid}"
В качестве uuid (уникального идентификатора) канала мы будем использовать встроенный в Python механизм идентификации UUID objects.
Создадим основной класс ChannelEndpoint от которого мы будем наследоваться в наших endpoints:
class ChannelEndpoint(WebSocketEndpoint):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.expires = 60 * 60 * 24
self.encoding = "json"
self.channel_groups = channel_groups
async def on_connect(self, websocket, **kwargs):
await super().on_connect(websocket, **kwargs)
self.channel = Channel(websocket=websocket, expires=self.expires, encoding=self.encoding)
async def on_disconnect(self, websocket, close_code):
await super().on_disconnect(websocket, close_code)
self.channel_groups.remove_channel(self.channel)
async def group_send(self, payload):
await self.channel_groups.group_send(self.group, payload)
def get_or_create(self, group):
assert self._validate_name(group), "Invalid group name"
self.channel_groups.group_add(group, self.channel)
self.group = group
def _validate_name(self, name):
if name.isidentifier():
return True
raise TypeError("Group names must be valid python identifier only alphanumerics and underscores are accepted")
При инициализации мы переопределяем базовый метод WebSocketEndpoint on_connect, добавляя к нему объект Channel.
В endpoints унаследованных от ChannelEndpoint появляются новые публичные методы:
- get_or_create(self, group) — для получения или создания группы
- group_send(self, payload) — для отправки сообщений в каналы, которые входят в данную группу.
Пример интеграции
routes = [
Route("/chat/", endpoint=ChatView)
Route("/chat/ws", endpoint=ChatChannel)
]
html = """
<!DOCTYPE html>
<html>
<head>
<title>ws</title>
</head>
<body>
<h1>ChannelEndpoint</h1>
<form action="" onsubmit="sendMessage(event)">
<label>group_id: </label><input type="text" id="groupId" autocomplete="off" value="2"><br/>
<label>username: </label><input type="text" id="username" autocomplete="off" value="test_user2"><br/>
<label>message: </label><input type="text" id="messageText" autocomplete="off" value="test_message2"><br/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost/chat/chat/ws");
ws.onmessage = function(event) {
console.log('Message received %s', event.data)
var messages = document.getElementById('messages');
var message = document.createElement('li');
var data = JSON.parse(event.data);
message.innerHTML = `<strong>${data.username} :</strong> ${data.message}`;
messages.appendChild(message);
};
function sendMessage(event) {
var username = document.getElementById("username");
var group_id = document.getElementById("groupId");
var input = document.getElementById("messageText");
var data = {
"group_id": group_id.value,
"username": username.value,
"message": input.value,
};
console.log('Message send %s', data)
ws.send(JSON.stringify(data));
event.preventDefault();
}
</script>
</body>
</html>
"""
class ChatView(HTTPEndpoint):
async def get(self, request):
return HTMLResponse(html)
class ChatChannel(ChannelEndpoint): # наследуемся от ChannelEndpoint
async def on_receive(self, websocket, data):
group_id = data["group_id"]
message = data["message"]
username = data["username"]
if message.strip():
await self.get_or_create(group_id) # получаем группу (и все ее каналы) из словаря CHANNEL_GROUPS
payload = {
"username": username,
"message": message,
}
await self.group_send(payload) # отправляем сообщение всем участникам группы
Методы ChannelGroups
В процессе эксплуатации нам понадобится отправка сообщения в группы из любого места кода, «мониторинг» групп, а также «очистка».
Отправка сообщения в группу из любого места кода:
from channel_box import channel_groups
await channel_groups.group_send('my_chat_1', {"username": "New User", "message": "Hello world"})
«Мониторинг» групп:
from channel_box import channel_groups
channel_groups.groups_show()
Очистка:
from channel_box import channel_groups
channel_groups.groups_flush()
Установка
Менеджер пакетов pip:
pip install channel-box
Исходный код решения
**UPD 2022:**
channel-box дорос до версии 0.4 и теперь дружит с последними версиями Starlette.
Чат на channel-box здесь:
channel-box.andrey-sobolev.ru