WebSockets в Angular. Часть 2. Продуктовые решения

  • Tutorial
image

В предыдущей статье речь шла об общем решении для вебсокетов в Angular, где мы на основе WebSocketSubject построили шину с реконнектом и сервисом для использования в компонентах. Подобная реализация подходит для большинства простых случаев, например, приема и отправки сообщений в чате и т.д., но её возможностей может быть недостаточно там, где нужно построить нечто более гибкое и контролируемое. В этой статье я раскрою некоторые особенности при работе с вебсокетами и расскажу о тех требованиях, с которыми сталкивался сам и, возможно, столкнетесь вы.

Часто в больших проектах с высокой посещаемостью перед фронтендом встают задачи, которые в других обстоятельствах привычнее видеть на бэкенде. В условиях жесткой экономии серверных ресурсов часть проблем мигрирует на территорию фронтенда, по этой причине в проект закладывается максимум расширяемости и контроля.

Вот список основных требований для вебсокет-клиента, которые будут рассматриваться в этой статье:

  • Автоматический «умный» реконнект;
  • Режим дебага;
  • Система подписок на события на основе RxJs;
  • Прием и парсинг бинарных данных;
  • Проецирование (маппинг) получаемой информации на модели;
  • Контроль над изменениями моделей по мере прихода новых событий;
  • Игнорирование произвольных событий и отмена игнорирования.

Рассмотрим каждый пункт подробнее.

Реконнект/Дебаг


Про реконнект я писал в предыдущей статье, поэтому просто процитирую часть текста:

Реконнект, или организация переподключения к серверу, это первостепенный фактор при работе с вебсокетами, т.к. обрывы сети, падения сервера или другие ошибки, вызывающие обрыв коннекта способны обрушить работу приложения.
Важно учесть, что попытки переподключения не должны быть слишком частыми и не должны продолжаться до бесконечности, т.к. такое поведение способно подвесить клиент.

Сам по себе вебсокет не умеет восстанавливать соединение при обрыве. Следовательно, если перезагрузился или упал сервер, или же у пользователя переподключился интернет, то для продолжения работы нужно переподключиться и вебсокету.

В этой статье для реконнекта и дебага будет использоваться Reconnecting WebSocket, который содержит нужный функционал и прочие опции, такие как смена url вебсокета между переподключениями, выбор произвольного конструктора WebSocket и т.д. Также подойдут и другие альтернативные решения. Реконнект же из предыдущей статьи не подходит, т.к. он написан под WebSocketSubject, который в этот раз не применяется.

Система подписок на события на основе RxJs


Для использования вебсокетов в компонентах нужно подписываться на события и отписываться от них, когда потребуется. Для этого воспользуемся распространенным дизайн-паттерном Pub/Sub.
«Издатель-подписчик (англ. publisher-subscriber или англ. pub/sub) — поведенческий шаблон проектирования передачи сообщений, в котором отправители сообщений, именуемые издателями (англ. publishers), напрямую не привязаны программным кодом отправки сообщений к подписчикам (англ. subscribers). Вместо этого сообщения делятся на классы и не содержат сведений о своих подписчиках, если таковые есть. Аналогичным образом подписчики имеют дело с одним или несколькими классами сообщений, абстрагируясь от конкретных издателей.»

Подписчик обращается к издателю не напрямую, а через промежуточную шину — сервис вебсокетов. Также должна быть возможность подписаться на несколько событий с одинаковым типом возвращаемых данных. Для каждой подписки создается собственный Subject, который добавляется к объекту listeners, что позволяет адресовать события вебсокета нужным подпискам. При работе с RxJs Subject, возникают некоторые сложности с отписками, поэтому создадим несложный сборщик мусора, который будет удалять сабжекты из объекта listeners в случае, когда у них отсутствуют observers.

Прием и парсинг бинарных данных


WebSocket поддерживает передачу бинарных данных, файлов или стримов, что часто используется в больших проектах. Это выглядит примерно следующим образом:
0x80, <длина — один или несколько байт>, <тело сообщения>

Чтобы не создавать ограничений на длину передаваемого сообщения и в то же время не расходовать байты нерационально, разработчики протокола использовали следующий алгоритм. Каждый байт в указании длины рассматривается по отдельности: старший указывает на то, последний ли это байт (0) или же за ним идут другие (1), а младшие 7 битов содержат передаваемые данные. Следовательно, когда появляется признак бинарного дата-фрейма 0x80, то берется следующий байт и откладывается в отдельную «копилку». Потом следующий байт, если у него установлен старший бит, тоже переносится в «копилку» и так далее до тех пор, пока не встретится байт с нулевым старшим битом. Этот байт — последний в указателе длины и также складывается в «копилку». Теперь из байтов в «копилке» убираются старшие биты, и остаток объединяется. Вот это и будет длина тела сообщения — 7-битные числа без старшего бита.

Механизм парсинга и бинарного стрима на фронтенде сложен и связан с маппингом данных на модели. Этому можно посвятить отдельную статью. В этот раз разберем простой вариант, а сложные случаи оставим для следующих публикаций, если будет интерес к теме.

Проецирование (маппинг) получаемой информации на модели


Независимо от типа передачи принимаемое требуется безопасно читать и изменять. Нет единого мнения как это лучше делать, я придерживаюсь теории модели данных, так как считаю её логичной и надежной для программирования в ООП-стиле.
«Модель данных — это абстрактное, самодостаточное, логическое определение объектов, операторов и прочих элементов, в совокупности составляющих абстрактную машину доступа к данным, с которой взаимодействует пользователь. Эти объекты позволяют моделировать структуру данных, а операторы — поведение данных.»

Всевозможные популярные шины, которые не дают представление объекта, как класса, в котором определяется поведение, структура и т.д., создают путаницу, хуже контролируются и иногда обрастают тем, что им не свойственно. Например, класс собаки при любых условиях должен описывать собаку. Если собаку воспринимать в виде набора полей: хвост, цвет, морда и т.д., то у собаки может вырасти лишняя лапа, а вместо головы появиться другая собака.

image

Контроль над изменениями моделей по мере прихода новых событий


В этом пункте опишу задачу, с которой столкнулся при работе над веб-интерфейсом мобильного приложения спортивных ставок. API приложения работало через вебсокеты, через которые получали: обновление коэффициентов, добавление и удаление новых типов ставок, уведомления о начале или окончании матча и т.д. — итого около трёхсот событий вебсокета. Во время матча ставки и информация непрерывно обновляются, иногда 2–3 раза в секунду, таким образом проблема заключалась в том, что вслед за ними без промежуточного контроля обновлялся и интерфейс.

Когда пользователь следил за ставкой с мобильного устройства, и в это же время на его дисплее обновлялись списки, то ставка исчезала из поля видимости, поэтому пользователю приходилось искать отслеживаемую ставку заново. Такое поведение повторялось на каждое обновление.

image

Для решения потребовалась иммутабельность для объектов, которые отображались на экране, но при этом коэффициенты ставок должны были меняться, неактуальные ставки приобретать неактивный вид, а новые не добавляться до тех пор, пока пользователь не проскроллит экран. На бэкенде устаревшие варианты не хранились, поэтому такие линии требовалось запоминать и помечать флагом «deleted», для чего было создано промежуточное хранилище данных между вебсокетом и подпиской, что обеспечило контроль за изменениями.

В новом сервисе также создадим слой-заместитель и в этот раз воспользуемся Dexie.js — обертку над IndexedDB API, но подойдет любая другая виртуальная или браузерная БД. Допустимо использование Redux.

Игнорирование произвольных событий и отмена игнорирования


В одной компании часто существуют сразу несколько однотипных проектов: мобильная и веб версии, версии с различными настройками для разных групп пользователей, расширенные и урезанные варианты одного и того же приложения.

Часто все они используют единую кодовую базу, поэтому иногда требуется отключать ненужные события в рантайме или при DI, не удаляя подписки и снова включать, т.е. игнорировать часть из них, чтобы не обрабатывать ненужные события. Это простая, но полезная функция, которая добавляет гибкости шине Pub/Sub.

Начнем с описания интерфесов:

export interface IWebsocketService { // публичный интерфейс сервиса
    addEventListener<T>(topics: string[], id?: number): Observable<T>;
    runtimeIgnore(topics: string[]): void;
    runtimeRemoveIgnore(topics: string[]): void;
    sendMessage(event: string, data: any): void;
}

export interface WebSocketConfig { // конфиг при DI
    url: string;
    ignore?: string[];
    garbageCollectInterval?: number;
    options?: Options;
}

export interface ITopic<T> { // Топик для Pub/Sub
    [hash: string]: MessageSubject<T>;
}

export interface IListeners { // объект с топиками
    [topic: string]: ITopic<any>;
}

export interface IBuffer { // бинарный буфер из ws.message
    type: string;
    data: number[];
}

export interface IWsMessage { // ws.message
    event: string;
    buffer: IBuffer;
}

export interface IMessage { // Для демо
    id: number;
    text: string;
}

export type ITopicDataType = IMessage[] | number | string[]; // типизируем callMessage в сервисе

Отнаследуем Subject, чтобы создать сборщик мусора:

export class MessageSubject<T> extends Subject<T> {

    constructor(
        private listeners: IListeners, // объект с топиками
        private topic: string, // текущий топик
        private id: string // id сабжекта
    ) { 
        super();
    }

    /*
    * переопределяем стандартный next, 
    * теперь на очередное обращение при отсутствии подписок, 
    * будет вызываться garbageCollect
    */
    public next(value?: T): void {
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }

        if (!this.isStopped) {
            const {observers} = this;
            const len = observers.length;
            const copy = observers.slice();

            for (let i = 0; i < len; i++) {
                copy[i].next(value);
            }

            if (!len) {
                this.garbageCollect(); // выносим мусор
            }
        }
    }

    /*
    * garbage collector
    * */
    private garbageCollect(): void {
        delete this.listeners[this.topic][this.id]; // удаляем Subject
        if (!Object.keys(this.listeners[this.topic]).length) { // удаляем пустой топик
            delete this.listeners[this.topic];
        }
    }

}

В отличие от прошлой реализации websocket.events.ts сделаем частью модуля вебсокетов

export const WS_API = {
    EVENTS: {
        MESSAGES: 'messages',
        COUNTER: 'counter',
        UPDATE_TEXTS: 'update-texts'
    },
    COMMANDS: {
        SEND_TEXT: 'set-text',
        REMOVE_TEXT: 'remove-text'
    }
};

Для конфигурирования при подключении модуля создадим websocket.config:

import { InjectionToken } from '@angular/core';

export const config: InjectionToken<string> = new InjectionToken('websocket');

Создадим модель для Proxy:

import Dexie from 'dexie';

import { IMessage, IWsMessage } from './websocket.interfaces';
import { WS_API } from './websocket.events';

class MessagesDatabase extends Dexie { // это стандартное использование Dexie с typescript
    public messages!: Dexie.Table<IMessage, number>; // id is number in this case
    constructor() {
        super('MessagesDatabase'); // имя хранилища
        this.version(1).stores({  // модель стора
            messages: '++id,text'
        });
    }
}

Простой парсер моделей, в реальных условиях его лучше разделить на несколько файлов:

export const modelParser = (message: IWsMessage) => {
    if (message && message.buffer) {
        /* парсим */
        const encodeUint8Array = String.fromCharCode
            .apply(String, new Uint8Array(message.buffer.data));

        const parseData = JSON.parse(encodeUint8Array);

        let MessagesDB: MessagesDatabase; // IndexedDB
        if (message.event === WS_API.EVENTS.MESSAGES) { // IMessage[]
            if (!MessagesDB) {
                MessagesDB = new MessagesDatabase();
            }

                parseData.forEach((messageData: IMessage) => {
                    /* создаем транзакцию */
                    MessagesDB.transaction('rw', MessagesDB.messages, async () => {
                        
                        /* создаем, если запись отсутствует */
                        if ((await MessagesDB.messages
                            .where({id: messageData.id}).count()) === 0) {

                            const id = await MessagesDB.messages
                                .add({id: messageData.id, text: messageData.text});

                            console.log(`Addded message with id ${id}`);
                        }

                    }).catch(e => {
                        console.error(e.stack || e);
                    });
                });

            return MessagesDB.messages.toArray(); // возвращаем массив IMessage[]
        }

        if (message.event === WS_API.EVENTS.COUNTER) { // counter
            return new Promise(r => r(parseData)); // промис с счетчиком
        }

        if (message.event === WS_API.EVENTS.UPDATE_TEXTS) { // text
            const texts = [];

            parseData.forEach((textData: string) => {
                texts.push(textData);
            });

            return new Promise(r => r(texts)); // промис с массивом строк
        }

    } else {
        console.log(`[${Date()}] Buffer is "undefined"`);
    }
};

WebsocketModule:

@NgModule({
    imports: [
        CommonModule
    ]
})
export class WebsocketModule {
    public static config(wsConfig: WebSocketConfig): ModuleWithProviders {
        return {
            ngModule: WebsocketModule,
            providers: [{provide: config, useValue: wsConfig}]
        };
    }
}

Начнем создавать сервис:

private listeners: IListeners; // список топиков
private uniqueId: number; // соль для id подписки
private websocket: ReconnectingWebSocket; // объект вебсокета

constructor(@Inject(config) private wsConfig: WebSocketConfig) {
    this.uniqueId = -1;
    this.listeners = {};
    this.wsConfig.ignore = wsConfig.ignore ? wsConfig.ignore : [];

    // коннектимся
    this.connect();
}

ngOnDestroy() {
    this.websocket.close(); // убиваем вебсокет при дестрое
}

Метод connect:

private connect(): void {
    // ReconnectingWebSocket config
    const options = {
        connectionTimeout: 1000, // таймаут реконнекта, если не задано
        maxRetries: 10, // попытки реконнекта, если не задано
        ...this.wsConfig.options
    };

    // Коннектимся
    this.websocket = new ReconnectingWebSocket(this.wsConfig.url, [], options);

    this.websocket.addEventListener('open', (event: Event) => {
        // соединение открыто
        console.log(`[${Date()}] WebSocket connected!`);
    });

    this.websocket.addEventListener('close', (event: CloseEvent) => {
        // соединение закрыто
        console.log(`[${Date()}] WebSocket close!`);
    });

    this.websocket.addEventListener('error', (event: ErrorEvent) => {
        // ошибка соединения
        console.error(`[${Date()}] WebSocket error!`);
    });

    this.websocket.addEventListener('message', (event: MessageEvent) => {
        // диспатчим события в подписки
        this.onMessage(event);
    });

    setInterval(() => {
        // дублируем сборщик мусора
        this.garbageCollect();
    }, (this.wsConfig.garbageCollectInterval || 10000));
}

Дублируем сборщик мусора, будет проверять подписки по таймауту:

private garbageCollect(): void {
    for (const event in this.listeners) {
        if (this.listeners.hasOwnProperty(event)) {
            const topic = this.listeners[event];

            for (const key in topic) {
                if (topic.hasOwnProperty(key)) {
                    const subject = topic[key];

                    // удаляем Subject если нет подписок
                    if (!subject.observers.length) {
                        delete topic[key];
                    }
                }
            }

            Удаляем топик, если пуст
            if (!Object.keys(topic).length) {
                delete this.listeners[event];
            }
        }
    }
}

Смотрим в какую подписку слать событие:

private onMessage(event: MessageEvent): void {
    const message = JSON.parse(event.data); 

    for (const name in this.listeners) {
        if (this.listeners.hasOwnProperty(name) && !this.wsConfig.ignore.includes(name)) {

            const topic = this.listeners[name];
            const keys = name.split('/'); // если подписаны на несколько событий
            const isMessage = keys.includes(message.event);
            const model = modelParser(message); // получаем промис с моделями
            if (isMessage && typeof model !== 'undefined') {
                model.then((data: ITopicDataType) => {

                    // отправляем в Subject
                    this.callMessage<ITopicDataType>(topic, data);
                });
            }
        }
    }
}

Шлем событие в Subject:

private callMessage<T>(topic: ITopic<T>, data: T): void {
    for (const key in topic) {
        if (topic.hasOwnProperty(key)) {
            const subject = topic[key];

            if (subject) {
                // отправляем подписчику
                subject.next(data);
            } else {
                console.log(`[${Date()}] Topic Subject is "undefined"`);
            }
        }
    }
}

Создаем топик Pub/Sub:

private addTopic<T>(topic: string, id?: number): MessageSubject<T> {
    const token = (++this.uniqueId).toString();
    const key = id ? token + id : token; // уникальный id для токена
    const hash = sha256.hex(key); // SHA256-хэш в качестве id топика
    if (!this.listeners[topic]) {
        this.listeners[topic] = <any>{};
    }

    return this.listeners[topic][hash] = new MessageSubject<T>(this.listeners, topic, hash);
}

Подписка на одно или несколько событий:

public addEventListener<T>(topics: string | string[], id?: number): Observable<T> {
    if (topics) {
        // подписка на одно или несколько событий
        const topicsKey = typeof topics === 'string' ? topics : topics.join('/');

        return this.addTopic<T>(topicsKey, id).asObservable();
    } else {
        console.log(`[${Date()}] Can't add EventListener. Type of event is "undefined".`);
    }
}

Здесь все намеренно упрощено, но можно преобразовывать в бинарные сущности, как и в случае с сервером. Отсылка команд на сервер:

public sendMessage(event: string, data: any = {}): void {
    // если соединение активно, шлем имя события и информацию
    if (event && this.websocket.readyState === 1) {
        this.websocket.send(JSON.stringify({event, data}));
    } else {
        console.log('Send error!');
    }
}

Добавляем события в игнорлист в рантайме:

public runtimeIgnore(topics: string[]): void {
    if (topics && topics.length) { 
        // добавляем в игнорлист
        this.wsConfig.ignore.push(...topics);
    }
}

Удаляем события из игнорлиста:

public runtimeRemoveIgnore(topics: string[]): void {
    if (topics && topics.length) {
        topics.forEach((topic: string) => {
            // ищем событие в списке топиков
            const topicIndex = this.wsConfig.ignore.findIndex(t => t === topic);

            if (topicIndex > -1) {
                // снова слушаем собтия
                this.wsConfig.ignore.splice(topicIndex, 1);
            }
        });
    }
}

Подключаем модуль вебсокетов:

@NgModule({
    declarations: [
        AppComponent
    ],
    imports: [
        BrowserModule,
        ReactiveFormsModule,
        WebsocketModule.config({
            url: environment.ws, // или "ws://mywebsocketurl"
            // список игнорируемых событий
            ignore: [WS_API.EVENTS.ANY_1, WS_API.EVENTS.ANY_2],
            garbageCollectInterval: 60 * 1000, // интервал сборки мусора
            options: {
                connectionTimeout: 1000, // таймаут реконнекта
                maxRetries: 10 // попытки реконнекта
            }
        })
    ],
    providers: [],
    bootstrap: [AppComponent]
})
export class AppModule {
}

Используем в компонентах:

@Component({
    selector: 'app-root',
    templateUrl: './app.component.html',
    styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit, OnDestroy {

    private messages$: Observable<IMessage[]>;
    private messagesMulti$: Observable<IMessage[]>;
    private counter$: Observable<number>;
    private texts$: Observable<string[]>;

    public form: FormGroup;

    constructor(
      private fb: FormBuilder, 
      private wsService: WebsocketService) {
    }

    ngOnInit() {
        this.form = this.fb.group({
            text: [null, [
                Validators.required
            ]]
        });

        // get messages
        this.messages$ = this.wsService
            .addEventListener<IMessage[]>(WS_API.EVENTS.MESSAGES);
        
        // get messages multi
        this.messagesMulti$ = this.wsService
            .addEventListener<IMessage[]>([
                WS_API.EVENTS.MESSAGES, 
                WS_API.EVENTS.MESSAGES_1
             ]);

        // get counter
        this.counter$ = this.wsService
            .addEventListener<number>(WS_API.EVENTS.COUNTER);

        // get texts
        this.texts$ = this.wsService
            .addEventListener<string[]>(WS_API.EVENTS.UPDATE_TEXTS);
    }

    ngOnDestroy() {

    }

    public sendText(): void {
        if (this.form.valid) {
            this.wsService
                .sendMessage(WS_API.COMMANDS.SEND_TEXT, this.form.value.text);
                
            this.form.reset();
        }
    }

    public removeText(index: number): void {
        this.wsService.sendMessage(WS_API.COMMANDS.REMOVE_TEXT, index);
    }

}

Сервис готов к использованию.



Пример из статьи хоть не является универсальным решением для каждого проекта, но демонстрирует один из подходов работы с вебсокетами в больших и сложных приложениях. Вы можете взять его на вооружение и модифицировать в зависимости от текущих задач.

Полную версию сервиса можно найти на GitHub.

По всем вопросам можете обращаться в комментарии, ко мне в Телеграм или на канал Angular там же.
Поделиться публикацией

Комментарии 1

    0

    Всё это очень похоже на JSON-RPC с добавлением лишних сущностей.

    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

    Самое читаемое