Эта статья — перевод оригинальной статьи Ben Lesh “Learning Observable By Building Observable”. Также я веду телеграм канал “Frontend по-флотски”, где рассказываю про интересные вещи из мира разработки интерфейсов
Вступление
Это повторение старой статьи, написанной мною в 2016 году, и доклада, с которым я выступал несколько раз. Я хочу немного модернизировать контент и, надеюсь, упростить его. Цель — помочь людям понять, что такое observable. Не только observable из RxJS, но и любой observable (да, их больше одного) как тип.
Observables — просто функции
Чтобы понять это, я хочу разместить рядом два примера. Один является observable из RxJS, а другой — просто функцией. Оба этих примера имеют одинаковый результат:
RxJS Observable
Это пример простого observable, созданного с помощью RxJS, который выдает три числа и завершается. Если вы уже знакомы с RxJS, это эквивалент (1, 2, 3).
import { Observable } from 'rxjs';
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
// Usage
console.log('start');
source.subscribe({
next: console.log,
complete: () => console.log('done'),
});
console.log('stop');
Функция “observable”
/**
* A simple object with a `next` and `complete` callback on it.
*/
interface Observer<T> {
next: (value: T) => void;
complete: () => void;
}
/**
* A function that takes a simple object with callbacks
* and does something them.
*/
const source = (subscriber: Observer<number>) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
};
// Usage
console.log('start');
source({
next: console.log,
complete: () => console.log('done'),
});
console.log('stop');
Вывод (обоих!)
"start"
1
2
3
"done"
"stop"
Я хочу, чтобы вы обратили внимание на сходство. В обоих случаях вы передаете объект с помощью метода next и complete. В обоих случаях вы вызываете next и complete в теле функции. В обоих случаях тело функции не выполняется до тех пор, пока вы не вызовете source.subscribe() или просто вызовете функцию напрямую как source() в другом примере. Это потому, что observables — это просто специализированные функции.
Почему бы просто не использовать функции? Что «особенного» в observable?
Отлично подмечено. Если бы вы были очень осторожны, вы, вероятно, могли бы использовать обычные функции для некоторых из этих случаев. Но проблема в том, что версия с функциями не совсем «безопасна». На основе всего лишь одного примера, если семантика observable такова, что next никогда не должен вызываться после завершения, нам понадобятся некоторые гарантии для этого.
const source = function (subscriber: Observer<number>) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // Упс, этого не должно быть
};
В примере выше будет написано «done», а затем сразу 4. Такого быть вообще не должно! Поэтому мы хотим предоставить способ гарантировать, что следующий метод нашего подписчика не будет вызван после завершения. Это можно сделать, заключив нашу функцию в класс.
/**
* A class used to wrap a user-provided Observer. Since the
* observer is just a plain objects with a couple of callbacks on it,
* this type will wrap that to ensure `next` does nothing if called after
* `complete` has been called, and that nothing happens if `complete`
* is called more than once.
*/
class SafeSubscriber<T> {
closed = false;
constructor(private destination: Observer<T>) {}
next(value: T) {
// Check to see if this is "closed" before nexting.
if (!this.closed) {
this.destination.next(value);
}
}
complete() {
// Make sure we're not completing an already "closed" subscriber.
if (!this.closed) {
// We're closed now.
this.closed = true;
this.destination.complete();
}
}
}
/**
* A class to wrap our function, to ensure that when the function is
* called with an observer, that observer is wrapped with a SafeSubscriber
*/
class Observable<T> {
constructor(private _wrappedFunc: (subscriber: Observer<T>) => void) {}
subscribe(observer: Observer<T>): void {
// We can wrap our observer in a "safe subscriber" that
// does the work of making sure it's not closed.
const subscriber = new SafeSubscriber(observer);
this._wrappedFunc(subscriber);
}
}
// Usage
// Now 4 won't be nexted after we complete.
const source = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // this does nothing.
});
Обработка Partial Observers
Другой возможный сценарий — «partial» observer. Другими словами, observable, у которого есть только метод next или complete (или, возможно, метод ошибки, но мы вернемся к этому позже). Теперь мы можем легко разобраться с этим сценарием имея наш тип observable, указанный выше, потому что мы можем реализовать это в нашем SafeSubscriber:
class SafeSubscriber<T> {
closed = false;
constructor(private destination: Partial<Observer<T>>) {}
next(value: T) {
if (!this.closed) {
this.destination.next?.(value); // Note the ?. check here.
}
}
complete() {
if (!this.closed) {
this.closed = true;
this.destination.complete?.(); // And here.
}
}
}
Уведомления об ошибке
Уведомить нашего подписчика об ошибке так же просто, как добавить дополнительный обработчик к нашим Observer и SafeSubscriber. Семантика очень похожа на complete выше. И ошибка, и complete считаются прекращением наблюдения.
Observer просто немного изменится, чтобы появился обработчик ошибок:
interface Observer<T> {
next: (value: T) => void;
complete: () => void;
error: (err: any) => void;
}
После этого мы можем добавить в наш SafeSubscriber error метод:
class SafeSubscriber<T> {
closed = false;
constructor(private destination: Partial<Observer<T>>) {}
next(value: T) {
if (!this.closed) {
this.destination.next?.(value);
}
}
complete() {
if (!this.closed) {
this.closed = true;
this.destination.complete?.();
}
}
error(err: any) {
if (!this.closed) {
this.closed = true;
this.destination.error?.(err);
}
}
}
Источники данных и кейс для отмены подписки
Основной вариант использования observable — обертывание асинхронного источника данных, например WebSocket.
Чтобы сделать что-то подобное, вы можете использовать наш самодельный observable созданный выше, например:
const helloSocket = new Observable<string>((subscriber) => {
// Open a socket.
const socket = new WebSocket('wss://echo.websocket.org');
socket.onopen = () => {
// Once it's open, send some text.
socket.send('Hello, World!');
};
socket.onmessage = (e) => {
// When it echoes the text back (in the case of this particular server)
// notify the consumer.
subscriber.next(e.data);
};
socket.onclose = (e) => {
// Oh! we closed!
if (e.wasClean) {
// ...because the server said it was done.
subscriber.complete();
} else {
// ...because of something bad. Maybe we lost network or something.
subscriber.error(new Error('Socket closed dirty!'));
}
};
});
// Start the websocket and log the echoes
helloSocket.subscribe({
next: console.log,
complete: () => console.log('server closed'),
error: console.error,
});
Но теперь у нас появилась проблема. Пользователь, который подписывается на наш observable, не имеет возможности отменить его и закрыть сокет. Нам нужен способ прекратить работу. Если бы мы просто использовали функцию, мы могли бы вернуть функцию, содержащую нашу логику прекращения подписки.
const source = (subscriber: Observer<string>) => {
const socket = new WebSocket('wss://echo.websocket.org');
socket.onopen = () => {
socket.send('Hello, World!');
};
socket.onmessage = (e) => {
subscriber.next(e.data);
};
socket.onclose = (e) => {
if (e.wasClean) {
subscriber.complete();
} else {
subscriber.error(new Error('Socket closed dirty!'));
}
};
return () => {
if (socket.readyState <= WebSocket.OPEN) {
socket.close();
}
};
};
const teardown = source({
next: console.log,
complete: () => console.log('done'),
error: console.error,
});
// Decide you really don't want to keep the socket open.
teardown();
Еще одна проблема, которая возникает не обязательно связанная с WebSocket, но и с другими ситуациями, — это ситуации, когда автор observable решает, что он столкнулся с ошибкой или состоянием завершения, и он хочет уведомить пользователя, а затем удалить его. Было бы неплохо иметь единое место для этого, чтобы при вызове subscriber.error или subscriber.complete удаление выполнялось как можно быстрее.
Мы можем добиться всего этого с помощью некоторых изменений SafeSubscriber и добавления типа Subscription.
/**
* Our subscription type. This is to manage teardowns.
*/
class Subscription {
private teardowns = new Set<() => void>();
add(teardown: () => void) {
this.teardowns.add(teardown);
}
unsubscribe() {
for (const teardown of this.teardowns) {
teardown();
}
this.teardowns.clear();
}
}
class SafeSubscriber<T> {
closed = false;
constructor(
private destination: Partial<Observer<T>>,
private subscription: Subscription,
) {
// Make sure that if the subscription is unsubscribed,
// we don't let any more notifications through this subscriber.
subscription.add(() => (this.closed = true));
}
next(value: T) {
if (!this.closed) {
this.destination.next?.(value);
}
}
complete() {
if (!this.closed) {
this.closed = true;
this.destination.complete?.();
this.subscription.unsubscribe();
}
}
error(err: any) {
if (!this.closed) {
this.closed = true;
this.destination.error?.(err);
this.subscription.unsubscribe();
}
}
}
class Observable<T> {
constructor(private _wrappedFunc: (subscriber: Observer<T>) => () => void) {}
subscribe(observer: Observer<T>) {
const subscription = new Subscription();
const subscriber = new SafeSubscriber(observer, subscription);
subscription.add(this._wrappedFunc(subscriber));
return subscription;
}
}
const helloSocket = new Observable<string>((subscriber) => {
const socket = new WebSocket('wss://echo.websocket.org');
socket.onopen = () => {
socket.send('Hello, World!');
};
socket.onmessage = (e) => {
subscriber.next(e.data);
};
socket.onclose = (e) => {
if (e.wasClean) {
subscriber.complete();
} else {
subscriber.error(new Error('Socket closed dirty!'));
}
};
return () => {
if (socket.readyState <= WebSocket.OPEN) {
socket.close();
}
};
});
const subscription = helloSocket.subscribe({
next: console.log,
complete: () => console.log('server closed'),
error: console.error,
});
// Later, we can unsubscribe!
subscription.unsubscribe();
Это всё! Observables — это просто специализированные функции!
Я знаю, что это было очень МНОГО кода, надеюсь, пошаговое руководство помогло вам понять, что входит в тип observable (например, тип из rxjs), и помогло немного прояснить его. Это уж точно не волшебство.
Тогда я бы посоветовал вам спросить себя: «Для чего нужны observables?» и, честно говоря, самый короткий ответ: вы можете использовать observables для всего, для чего вы можете использовать функцию. Самая большая разница — это небольшой набор гарантий в отношении обратных вызовов и то, что, возможно, наиболее важно, в отношении прекращения работы.
Важно отметить, что описанная выше реализация НЕ является чем-то, что вы должны воссоздавать или использовать в продакшене. Я бы рекомендовал использовать observable из RxJS, поскольку он охватывает гораздо больше ситуаций, чтобы помочь сохранить ваш код надежным и безопасным при составлении сложных потоков данных.