Эта статья — перевод оригинальной статьи Ben Lesh “Learning Observable By Building Observable”. Также я веду телеграм канал “Frontend по-флотски”, где рассказываю про интересные вещи из мира разработки интерфейсов
Вступление
Это повторение старой статьи, написанной мною в 2016 году, и доклада, с которым я выступал несколько раз. Я хочу немного модернизировать контент и, надеюсь, упростить его. Цель — помочь людям понять, что такое observable. Не только observable из RxJS, но и любой observable (да, их больше одного) как тип.
Observables — просто функции
Чтобы понять это, я хочу разместить рядом два примера. Один является observable из RxJS, а другой — просто функцией. Оба этих примера имеют одинаковый результат:
RxJS Observable
Это пример простого observable, созданного с помощью RxJS, который выдает три числа и завершается. Если вы уже знакомы с RxJS, это эквивалент (1, 2, 3).
import { Observable } from 'rxjs'; const source = new Observable<number>((subscriber) => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); // Usage console.log('start'); source.subscribe({ next: console.log, complete: () => console.log('done'), }); console.log('stop');
Функция “observable”
/** * A simple object with a `next` and `complete` callback on it. */ interface Observer<T> { next: (value: T) => void; complete: () => void; } /** * A function that takes a simple object with callbacks * and does something them. */ const source = (subscriber: Observer<number>) => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }; // Usage console.log('start'); source({ next: console.log, complete: () => console.log('done'), }); console.log('stop');
Вывод (обоих!)
"start" 1 2 3 "done" "stop"
Я хочу, чтобы вы обратили внимание на сходство. В обоих случаях вы передаете объект с помощью метода next и complete. В обоих случаях вы вызываете next и complete в теле функции. В обоих случаях тело функции не выполняется до тех пор, пока вы не вызовете source.subscribe() или просто вызовете функцию напрямую как source() в другом примере. Это потому, что observables — это просто специализированные функции.
Почему бы просто не использовать функции? Что «особенного» в observable?
Отлично подмечено. Если бы вы были очень осторожны, вы, вероятно, могли бы использовать обычные функции для некоторых из этих случаев. Но проблема в том, что версия с функциями не совсем «безопасна». На основе всего лишь одного примера, если семантика observable такова, что next никогда не должен вызываться после завершения, нам понадобятся некоторые гарантии для этого.
const source = function (subscriber: Observer<number>) { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); subscriber.next(4); // Упс, этого не должно быть };
В примере выше будет написано «done», а затем сразу 4. Такого быть вообще не должно! Поэтому мы хотим предоставить способ гарантировать, что следующий метод нашего подписчика не будет вызван после завершения. Это можно сделать, заключив нашу функцию в класс.
/** * A class used to wrap a user-provided Observer. Since the * observer is just a plain objects with a couple of callbacks on it, * this type will wrap that to ensure `next` does nothing if called after * `complete` has been called, and that nothing happens if `complete` * is called more than once. */ class SafeSubscriber<T> { closed = false; constructor(private destination: Observer<T>) {} next(value: T) { // Check to see if this is "closed" before nexting. if (!this.closed) { this.destination.next(value); } } complete() { // Make sure we're not completing an already "closed" subscriber. if (!this.closed) { // We're closed now. this.closed = true; this.destination.complete(); } } } /** * A class to wrap our function, to ensure that when the function is * called with an observer, that observer is wrapped with a SafeSubscriber */ class Observable<T> { constructor(private _wrappedFunc: (subscriber: Observer<T>) => void) {} subscribe(observer: Observer<T>): void { // We can wrap our observer in a "safe subscriber" that // does the work of making sure it's not closed. const subscriber = new SafeSubscriber(observer); this._wrappedFunc(subscriber); } } // Usage // Now 4 won't be nexted after we complete. const source = new Observable((subscriber) => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); subscriber.next(4); // this does nothing. });
Обработка Partial Observers
Другой возможный сценарий — «partial» observer. Другими словами, observable, у которого есть только метод next или complete (или, возможно, метод ошибки, но мы вернемся к этому позже). Теперь мы можем легко разобраться с этим сценарием имея наш тип observable, указанный выше, потому что мы можем реализовать это в нашем SafeSubscriber:
class SafeSubscriber<T> { closed = false; constructor(private destination: Partial<Observer<T>>) {} next(value: T) { if (!this.closed) { this.destination.next?.(value); // Note the ?. check here. } } complete() { if (!this.closed) { this.closed = true; this.destination.complete?.(); // And here. } } }
Уведомления об ошибке
Уведомить нашего подписчика об ошибке так же просто, как добавить дополнительный обработчик к нашим Observer и SafeSubscriber. Семантика очень похожа на complete выше. И ошибка, и complete считаются прекращением наблюдения.
Observer просто немного изменится, чтобы появился обработчик ошибок:
interface Observer<T> { next: (value: T) => void; complete: () => void; error: (err: any) => void; }
После этого мы можем добавить в наш SafeSubscriber error метод:
class SafeSubscriber<T> { closed = false; constructor(private destination: Partial<Observer<T>>) {} next(value: T) { if (!this.closed) { this.destination.next?.(value); } } complete() { if (!this.closed) { this.closed = true; this.destination.complete?.(); } } error(err: any) { if (!this.closed) { this.closed = true; this.destination.error?.(err); } } }
Источники данных и кейс для отмены подписки
Основной вариант использования observable — обертывание асинхронного источника данных, например WebSocket.
Чтобы сделать что-то подобное, вы можете использовать наш самодельный observable созданный выше, например:
const helloSocket = new Observable<string>((subscriber) => { // Open a socket. const socket = new WebSocket('wss://echo.websocket.org'); socket.onopen = () => { // Once it's open, send some text. socket.send('Hello, World!'); }; socket.onmessage = (e) => { // When it echoes the text back (in the case of this particular server) // notify the consumer. subscriber.next(e.data); }; socket.onclose = (e) => { // Oh! we closed! if (e.wasClean) { // ...because the server said it was done. subscriber.complete(); } else { // ...because of something bad. Maybe we lost network or something. subscriber.error(new Error('Socket closed dirty!')); } }; }); // Start the websocket and log the echoes helloSocket.subscribe({ next: console.log, complete: () => console.log('server closed'), error: console.error, });
Но теперь у нас появилась проблема. Пользователь, который подписывается на наш observable, не имеет возможности отменить его и закрыть сокет. Нам нужен способ прекратить работу. Если бы мы просто использовали функцию, мы могли бы вернуть функцию, содержащую нашу логику прекращения подписки.
const source = (subscriber: Observer<string>) => { const socket = new WebSocket('wss://echo.websocket.org'); socket.onopen = () => { socket.send('Hello, World!'); }; socket.onmessage = (e) => { subscriber.next(e.data); }; socket.onclose = (e) => { if (e.wasClean) { subscriber.complete(); } else { subscriber.error(new Error('Socket closed dirty!')); } }; return () => { if (socket.readyState <= WebSocket.OPEN) { socket.close(); } }; }; const teardown = source({ next: console.log, complete: () => console.log('done'), error: console.error, }); // Decide you really don't want to keep the socket open. teardown();
Еще одна проблема, которая возникает не обязательно связанная с WebSocket, но и с другими ситуациями, — это ситуации, когда автор observable решает, что он столкнулся с ошибкой или состоянием завершения, и он хочет уведомить пользователя, а затем удалить его. Было бы неплохо иметь единое место для этого, чтобы при вызове subscriber.error или subscriber.complete удаление выполнялось как можно быстрее.
Мы можем добиться всего этого с помощью некоторых изменений SafeSubscriber и добавления типа Subscription.
/** * Our subscription type. This is to manage teardowns. */ class Subscription { private teardowns = new Set<() => void>(); add(teardown: () => void) { this.teardowns.add(teardown); } unsubscribe() { for (const teardown of this.teardowns) { teardown(); } this.teardowns.clear(); } } class SafeSubscriber<T> { closed = false; constructor( private destination: Partial<Observer<T>>, private subscription: Subscription, ) { // Make sure that if the subscription is unsubscribed, // we don't let any more notifications through this subscriber. subscription.add(() => (this.closed = true)); } next(value: T) { if (!this.closed) { this.destination.next?.(value); } } complete() { if (!this.closed) { this.closed = true; this.destination.complete?.(); this.subscription.unsubscribe(); } } error(err: any) { if (!this.closed) { this.closed = true; this.destination.error?.(err); this.subscription.unsubscribe(); } } } class Observable<T> { constructor(private _wrappedFunc: (subscriber: Observer<T>) => () => void) {} subscribe(observer: Observer<T>) { const subscription = new Subscription(); const subscriber = new SafeSubscriber(observer, subscription); subscription.add(this._wrappedFunc(subscriber)); return subscription; } } const helloSocket = new Observable<string>((subscriber) => { const socket = new WebSocket('wss://echo.websocket.org'); socket.onopen = () => { socket.send('Hello, World!'); }; socket.onmessage = (e) => { subscriber.next(e.data); }; socket.onclose = (e) => { if (e.wasClean) { subscriber.complete(); } else { subscriber.error(new Error('Socket closed dirty!')); } }; return () => { if (socket.readyState <= WebSocket.OPEN) { socket.close(); } }; }); const subscription = helloSocket.subscribe({ next: console.log, complete: () => console.log('server closed'), error: console.error, }); // Later, we can unsubscribe! subscription.unsubscribe();
Это всё! Observables — это просто специализированные функции!
Я знаю, что это было очень МНОГО кода, надеюсь, пошаговое руководство помогло вам понять, что входит в тип observable (например, тип из rxjs), и помогло немного прояснить его. Это уж точно не волшебство.
Тогда я бы посоветовал вам спросить себя: «Для чего нужны observables?» и, честно говоря, самый короткий ответ: вы можете использовать observables для всего, для чего вы можете использовать функцию. Самая большая разница — это небольшой набор гарантий в отношении обратных вызовов и то, что, возможно, наиболее важно, в отношении прекращения работы.
Важно отметить, что описанная выше реализация НЕ является чем-то, что вы должны воссоздавать или использовать в продакшене. Я бы рекомендовал использовать observable из RxJS, поскольку он охватывает гораздо больше ситуаций, чтобы помочь сохранить ваш код надежным и безопасным при составлении сложных потоков данных.
