Как стать автором
Обновить

Обновление состояния системы через Server-Sent Events (SSE) без затей

Уровень сложностиСредний
Время на прочтение11 мин
Количество просмотров9.3K

В любом многопользовательском проекте рано или поздно возникает потребность в оперативном уведомлении клиентов об изменении объектов в системе. В данной статье я не буду рассматривать что-то сложное, типа совместного редактирования документов, а напишу о более простой ситуации, которая встречается повсеместно.

Допустим, у нас есть список объектов, к примеру – список эпизодов для шоу. Один клиент на него смотрит, а другой в это время добавляет в список еще один эпизод. Хорошо бы факт добавления нового эпизода сразу отобразить у первого клиента. То же самое относится и к просмотру информации об отдельном эпизоде: если один клиент её просматривает, а другой – редактирует, было бы здорово результат редактирования сразу отображать у первого.

Для решения данной задачи очень удобно использовать механизм Server-Sent Events (SSE). О том как это сделать в проектах, у которых на беке Node, а на фронте React, я и хочу поговорить.

Прежде чем писать код...

...давайте вспомним о том, что обычно в работе с объектами в системе мы используем CRUD-операции. Для уведомления об изменении состояния объекта нам, скорее всего, понадобится только три из них: создание, обновление, удаление. Эти три операции можно разделить на две группы:

  • обновление и удаление,

  • создание.

Различия в том, что обновляется и удаляется уже существующий объект и клиент знает его идентификатор. Этот идентификатор клиент может указать серверу когда будет отправлять запрос на получение информации об изменении его состояния. Получив такой запрос сервер сможет определить его правомочность в рамках действующей модели доступа т.к. и клиент и объект явно определены.

В случае создания объекта максимум из того, что может сообщить клиент серверу – это его тип. Но только типа объекта, скорее всего, будет недостаточно для того, чтобы определить правомочность доступа, т.к. вряд ли в вашей системе все пользователи имеют доступ ко всем объектам. Кроме того, вряд ли вам нужно давать возможность какому-либо пользователю получать информацию о создании любых объектов в системе (если только это не суперпользователь).

К счастью объекты в системе обычно не болтаются сами по себе, а могут быть сгруппированы по какому-либо признаку. В приведенном во вступлении примере эпизоды группируются по принадлежности к шоу, а шоу – по принадлежности к создавшему их пользователю. Одни и те же объекты часто могут быть сгруппированы по разным признакам, которые зависят от семантики системы. Т.е. в общем случае можно говорить о том, что объекты помещаются в контейнеры, которые мы можем каким-либо образом идентифицировать. В примере выше для эпизодов контейнером является шоу (у которого есть свой идентификатор), а для шоу контейнером является пользователь, который их создал.

Тогда, когда речь заходит об отслеживании создания объекта, клиент может сообщить системе его тип и идентификатор контейнера, в котором он ожидает появления. В этом случае сервер уже может проверять права доступа. Да и в целом, обычно, отслеживать появление объекта требуется в контексте какого-либо контейнера, поэтому такая схема естественна.

Почему Server-Sent Events (SSE)?

Когда описываемая задача только возникла в моем проекте, я тут же реализовал самый тупой вариант – периодический опрос информации об объектах. Сердце обливалось кровью при просмотре логов от осознания того, сколько вхолостую потрачено трафика и ресурсов сервера. Утешало лишь то, что все равно общая нагрузка была не настолько высока, что бы начинать беспокоиться. Однако, было понятно, что так жить нельзя и с этим надо что-то делать.

В современном мире существует три способа для решения данного вопроса:

  1. Polling: Short или Long

  2. Server-Sent Events (SSE)

  3. WebSockets

Как настоящий хипста-погромист я, конечно же, перво-наперво решил что делать надо по-большому и сразу схватился за WebSockets. Советую вам сделать так же (нет!).

По началу было довольно увлекательно писать свой протокол, авторизацию, систему подписок, диспетчеризацию, восстановление соединения и вот это вот всё. Однако, довольно скоро стало понятно, что получающаяся в итоге система начинает смахивать на какой-то доморощенный брокер сообщений, а для его женитьбы с фронтэндом надо очень постараться. И я приуныл... примерно на день, пока не вспомнил о наличии альтернатив, а точнее – альтернативы.

Право слово, если выбирать между Polling и SSE, то не очень понятно зачем останавливаться на первом. Это более древний метод, который сейчас рекомендуется разве что в качестве fallback-опции. Поэтому даже не будем акцентироваться на этом моменте.

Делаем сервер

Я собираюсь добавить два эндпойнта: для получения уведомлений об изменении состояния объекта и списка объектов. Прежде чем написать контроллеры, добавим следующий файл sse.ts, который реализует работу с соответствующими списками клиентов. Он довольно простой. Обращу лишь внимание на то, что я отправляю кастомное событие с каждым оповещением и дублирую его в данных. Это позволит в клиенте разбирать события разными способами в зависимости от потребностей. Но об этом позже.

import { Response } from 'express';
import { TEpisode } from '../../../types';
import logger from '../../../utils/logger';

export type TSseClient = {
    res: Response;
    id: number; // идентификатор объекта или контейнера
};

const itemClients: TSseClient[] = []; // клиенты, которые слушают конкретный объект
const listClients: TSseClient[] = []; // клиенты, которые слушают список объектов в контейнере

export const add = (type: 'item'|'list', client: TSseClient) => {
    const clients = type === 'item' ? itemClients : listClients;
    clients.push(client);
    logger.debug(`Добавлен новый клиент в список "${type}". Длина списка: ${clients.length}`);
};

export const remove = (type: 'item'|'list', res: Response) => {
    const clients = type === 'item' ? itemClients : listClients;
    const index = clients.findIndex(client => client.res === res);
    if(index >= 0) {
        clients.splice(index, 1);
        logger.debug(`Удален клиент из списка "${type}". Длина списка: ${clients.length}`);
    }
};

export const notify = (event: 'create'|'update'|'remove', episode: TEpisode) => {
    try {
        // Разошлем всем слушателям списка
        listClients.forEach(client => {
            if(client.id === episode.show_id) {
                client.res.write(`event: ${event}\n`);
                client.res.write(`data: ${JSON.stringify({ event, episode })}\n\n`);
            }
        });
        // Создание объекта рассылается только слушателям списка
        if(event === 'create') return;
        // Разошлем всем слушателям эпизода
        itemClients.forEach(client => {
            if(client.id === episode.id) {
                client.res.write(`event: ${event}\n`);
                client.res.write(`data: ${JSON.stringify({ event, episode })}\n\n`);
            }
        });
    } catch(error) {
        logger.error('SSE notify error\n%o', error);
    }
};

export default { add, remove, notify };

Как видно из кода он заточен на то, что рассылать мы будем информацию об эпизодах, которые могут быть сгруппированы единственным образом. Подобный же код потребуется чтобы работать и с другими объектами и способами группировки. При этом он будет совпадать с вышеприведенным на 99%. Поэтому имеет смысл код слегка модифицировать, для того, чтобы можно было его использовать с любыми типами объектов и контейнеров и не заниматься тупым копипастом. Тут, однако, я не буду приводить эти изменения дабы не усложнять изложение.

Вот как может выглядеть контроллер для оповещения об изменении состояния объекта (в нашем случае – эпизода).

import { NextFunction, Request, Response } from 'express';
import HttpException from '../../../exceptions/http.exception';
import pool from "../../../db/postgres";
import logger from '../../../utils/logger';
import { TEpisode } from '../../../types';
import sse, { TSseClient } from './sse';
/**
 * Запросы на прослушивание изменения эпизода
 */
export const sseItem = async (req: Request, res: Response, next: NextFunction) => {
    // Нужен залогиненый юзер
    if(!req.user) {
        return next(new HttpException(401));
    }
    // Берем идентификатор из запроса
    const episode_id = parseInt(req.params.id);
    // Проверим идентификатор
    if(isNaN(episode_id)) {
        return next(new HttpException(400));
    }
    // Лезем в базу и т.д.
    try {
        const result = await pool.query('SELECT * FROM episodes WHERE id=$1', [ episode_id ]);
        if(result.rows.length === 0) {
            return next(new HttpException(404));
        } 
        
        const episode = result.rows[0] as TEpisode;
        // Только суперпользователь и сам пользователь имют право читать информацию
        if(!req.user.is_root && req.user.id !== episode.owner_id) {
            return next(new HttpException(403));
        }
        
        // Отправим заголовок SSE
        const headers = {
            'Content-Type': 'text/event-stream',
            'Connection': 'keep-alive',
            'Cache-Control': 'no-cache',
        };
        res.writeHead(200, headers);
        
        // Установим задержку переподключения в 1 секунду
        res.write(`retry: 1000\n`);
        // Отошлем актуальное состояние объекта
        res.write(`event: update\n`);
        res.write(`data: ${JSON.stringify({ event: 'update', episode })}\n\n`);
        
        // Добавим клиента в список
        const newClient: TSseClient = {
            res,
            id: episode.id,
        };
        sse.add('item', newClient);
        
        // При обрыве соединения удалим клиента из списка
        req.on('close', () => {
            logger.debug(`SSE connection closed for ${req.user?.email}`);
            sse.remove('item', res);
        });

    } catch(error) {
        logger.error('%o', error);
        return next(new HttpException(500, 'Server error', 'Ошибка сервера', error));
    }
};

export default sseItem;

Одним из больших плюсов SSE является то, что можно использовать тот же механизм авторизации клиента, что и для остальных запросов. Я для этого написал небольшое middleware, которое вытаскивает из заголовка запроса токен авторизации, запрашивает по нему информацию о залогиненом юзере из Redis и помещает в запрос в поле user. Затем можно использовать эти данные для проверки прав доступа: в описываемом случае доступ к объекту может получить его владелец и суперпользователь.

Обратите внимание на то, что сразу после подключения клиенту отсылается информация об актуальном состоянии объекта. Сделано это для того, чтобы фронтэнду было проще обрабатывать ситуацию со сбросом соединения при переключении вкладки браузера. Если пользователь уйдет с вкладки с вашим приложением браузер закроет все SSE-соединения и, соответственно, обновления, которые произойдут до возврата пользователя к вкладке, будут потеряны. Как только вкладка с приложением будет вновь активирована, браузер автоматически восстановит SSE-соединение, по которому приложение сразу получит актуальное состояние объекта. При таком подходе нам нет необходимости заморачиваться с идентификаторами событий и сохранением истории изменения объекта.

Контроллер оповещения об изменениях объектов в контейнере очень похож на предыдущий. Отличие в том, что при подключении сразу передаётся актуальный список объектов. Затем будут отправляться уже события с изменениями объектов из списка.

import { NextFunction, Request, Response } from 'express';
import HttpException from '../../../exceptions/http.exception';
import pool from "../../../db/postgres";
import logger from '../../../utils/logger';
import { TEpisode, TShow } from '../../../types';
import sse, { TSseClient } from './sse';
/**
 * Запросы на прослушивание изменения эпизодов в конкретном шоу
 */
export const sseList = async (req: Request, res: Response, next: NextFunction) => {
    // Нужен залогиненый юзер
    if(!req.user) {
        return next(new HttpException(401));
    }
    // Берем идентификатор из запроса
    const show_id = parseInt(req.params.id);
    // Проверим идентификатор
    if(isNaN(show_id)) {
        return next(new HttpException(400));
    }
    // Лезем в базу и т.д.
    try {
        let result = await pool.query('SELECT * FROM shows WHERE id=$1', [ show_id ]);
        if(result.rows.length === 0) {
            return next(new HttpException(404));
        } 
        
        const show = result.rows[0] as TShow;
        // Только суперпользователь и сам пользователь имют право читать информацию
        if(!req.user.is_root && req.user.id !== show.owner_id) {
            return next(new HttpException(403));
        }
        
        // Отправим заголовок SSE
        const headers = {
            'Content-Type': 'text/event-stream',
            'Connection': 'keep-alive',
            'Cache-Control': 'no-cache',
        };
        res.writeHead(200, headers);

        // Установим задержку переподключения в 1 секунду
        res.write(`retry: 1000\n`);

        // Прочитаем из базы актуальный список объектов
        result = await pool.query('SELECT * FROM episodes WHERE show_id=$1 ORDER BY name', [ show_id ]);
        const episodes = result.rows as TEpisode[];
        // и отошлем его клиенту
        res.write(`event: list\n`);
        res.write(`data: ${JSON.stringify({ event: 'list', episodes })}\n\n`);

        // Добавим клиента в список
        const newClient: TSseClient = {
            res,
            id: show.id,
        };
        sse.add('list', newClient);
        
        // При обрыве соединения удалим клиента из списка
        req.on('close', () => {
            logger.debug(`SSE connection closed for ${req.user?.email}`);
            sse.remove('list', res);
        });

    } catch(error) {
        logger.error('%o', error);
        return next(new HttpException(500, 'Server error', 'Ошибка сервера', error));
    }
};

export default sseList;

Это практически всё, что надо сделать на стороне сервера. Остаются сущие мелочи. Во-первых, в контроллерах создания, удаления и модификации объекта добавить вызовы оповещения:

import sse from './sse/sse';
import { TEpisode } from '../../types';
let episode: TEpisode;
...
// В контроллере создания
sse.notify('create', episode);
...
// В контроллере удаления
sse.notify('remove', episode);
...
// В контроллере модификации
sse.notify('update', episode);

Во-вторых, прописать роутер. И на этом всё, можно переходить к фронтэнду.

Делаем клиент

Для клиента на React просто просится хук, который может выглядеть примерно следующим образом:

import { useEffect, useState } from 'react';
import { useAppSelector } from './store';
import { API_URL } from '../config';
import { fetchEventSource } from '@microsoft/fetch-event-source';
import { TEpisode } from '../types';

type TSseMessage = {
	event: 'list'|'create'|'update'|'remove',
	episode?: TEpisode,
	episodes?: TEpisode[],
};

const useSse = (method: string) => {
	const token = useAppSelector(store => store.auth.token);
	const [ message, setMessage ] = useState<TSseMessage|null>(null);

	useEffect(() => {
		const controller = new AbortController();
		const { signal } = controller;

		const fetchData = async () => {
			await fetchEventSource(`${API_URL}/${method}`, {
				headers: {
					'Authorization': `Bearer ${token}`,
				},
				onmessage(event) {
					try {
						const parsedData = JSON.parse(event.data);
						setMessage(parsedData as TSseMessage);
					} catch(error) {
						console.error('useSse parsing error');
					}	
				},
				signal,
			});
		};

		fetchData();
		return () => controller.abort();

	}, [ method, token ]);

	return message;
};

export default useSse;

Важно тут то, что вместо стандартного EventSource используется Fetch Event Source и вот почему: дело в том, что EventSource не позволяет установить заголовки запроса, а без этого невозможно использовать типовую авторизацию с помощью токенов.

Как и в описании сервера, тут мы считаем, что ничего кроме эпизодов отслеживать не собираемся. Не представляет никакого труда модифицировать код для общего случая (добавив в него заодно обработку ошибок).

Компонент, отображающий эпизод мог бы выглядеть как-то так:

import { useEffect } from 'react';
import { useParams } from 'react-router-dom';
import { useAppDispatch, useAppSelector } from '../hooks/store';
import { episodeSet } from '../store/episodeSlice';
import useSse from '../hooks/sse';

const Episode = () => {
	const dispatch = useAppDispatch();
	const episode = useAppSelector(store => store.episode);
	const { episode_id } = useParams();
	const message = useSse(`sse/episodes/${episode_id}`);

    useEffect(() => {
		if(!message || message.event !== 'update' || !message.episode) return;
		dispatch(episodeSet(message.episode));
	}, [ message, dispatch ]);

    if(!episode) return null;
  
	return (
		<div>
			{episode.name}
		</div>
	);
};

export default Episode;

Очень просто! Практически не отличается от типового запроса к API, но при этом отображает всегда актуальное состояние объекта.

Для вывода списка объектов компонент не намного сложнее. Обратите внимание на то, что разбирать событие удобно в самом компоненте и именно для этого мы добавили его в данные, когда писали сервер.

import { FC, useEffect, useState } from 'react';
import { useAppDispatch, useAppSelector } from '../../hooks/store';
import { episodesAdd, episodesDel, episodesPut, episodesSet } from '../../store/episodesSlice';
import useSse from '../../hooks/sse';

type EpisodeListProps = {
    showId: number;
};

const EpisodeList: FC<EpisodeListProps> = ({ showId }) => {
	const dispatch = useAppDispatch();
	const episodes = useAppSelector(store => store.episodes);
	const message = useSse(`sse/episodes/shows/${showId}`);

	useEffect(() => {
		if(!message) return;
		switch(message.event) {
			case 'list':
				if(message.episodes) dispatch(episodesSet(message.episodes));
				break;
			case 'create':
				if(message.episode) dispatch(episodesAdd(message.episode));
				break;
			case 'remove':
				if(message.episode) dispatch(episodesDel(message.episode.id));
				break;
			case 'update':
				if(message.episode) dispatch(episodesPut(message.episode));
				break;
			default:
				return;
		}
	}, [ message, dispatch ]);

	return (
		<ul>
			{episodes.map(episode => <li key={episode.id}>{episode.name}<li/>)}
		</ul>
	);
};

export default EpisodeList;

Ура! Ура! Это всё! Нет :(

К сожалению, есть нюансы

Самый неприятный из них – жадность браузеров. Обычно они не дают установить более 6 SSE-соединений с одним сервером. Причем не в одной вкладке, а вообще. Шесть соединений – это крайне мало.

Для решения данной проблемы обычно предлагается три пути:

  1. Использование LocalStorage

  2. Использование Service Workers

  3. Переход на HTTP/2 на сервере

Третий способ лично мне кажется предпочтительным, хотя не все могут его себе позволить.

Еще один нюанс связан с тем, что при обрыве соединения браузер начнет усердно пытаться его восстанавливать, причем иногда он почему-то напрочь игнорирует задержку переподключения, которую указал сервер. Решение данной проблемы комплексное. Во-первых, необходимо понять по какой причине соединение было разорвано. Если это ошибка клиента, то соединение, скорее всего, не было установлено сразу (и кстати, установка на задержку переподключения могла быть даже не передана). В этом случае, клиент должен перестать долбиться в закрытые двери и принять меры. Возможно токен авторизации перестал быть валидным и необходимо заново авторизовать пользователя, возможно передан некорректный идентификатор и т.п.

Если причиной обрыва стало внезапное падение сервера, то стоит самостоятельно увеличить задержку переподключения, возможно, даже сделать её прогрессивной, если сервер долго не отвечает. А может просто выдать ошибку пользователю и отказаться дальше работать. Всё зависит от системы.

А вот теперь, пожалуй, всё.

Картинку к статье сгенерил бот Kandinsky 2.2

Теги:
Хабы:
Всего голосов 3: ↑3 и ↓0+3
Комментарии5

Публикации

Истории

Работа

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань