Pull to refresh

Изучай observable, создавая observable

Reading time7 min
Views34K
Original author: Ben Lesh

Эта статья — перевод оригинальной статьи Ben Lesh “Learning Observable By Building Observable”. Также я веду телеграм канал “Frontend по-флотски”, где рассказываю про интересные вещи из мира разработки интерфейсов ​

Вступление

Это повторение старой статьи, написанной мною в 2016 году, и доклада, с которым я выступал несколько раз. Я хочу немного модернизировать контент и, надеюсь, упростить его. Цель — помочь людям понять, что такое observable. Не только observable из RxJS, но и любой observable (да, их больше одного) как тип.

Observables — просто функции

Чтобы понять это, я хочу разместить рядом два примера. Один является observable из RxJS, а другой — просто функцией. Оба этих примера имеют одинаковый результат:

RxJS Observable

Это пример простого observable, созданного с помощью RxJS, который выдает три числа и завершается. Если вы уже знакомы с RxJS, это эквивалент (1, 2, 3).

import { Observable } from 'rxjs';
 
const source = new Observable<number>((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});
 
// Usage
console.log('start');
source.subscribe({
  next: console.log,
  complete: () => console.log('done'),
});
console.log('stop');

Функция “observable”

/**
 * A simple object with a `next` and `complete` callback on it.
 */
interface Observer<T> {
  next: (value: T) => void;
  complete: () => void;
}
 
/**
 * A function that takes a simple object with callbacks
 * and does something them.
 */
const source = (subscriber: Observer<number>) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
};
 
// Usage
console.log('start');
source({
  next: console.log,
  complete: () => console.log('done'),
});
console.log('stop');

Вывод (обоих!)

"start"
1
2
3
"done"
"stop"

Я хочу, чтобы вы обратили внимание на сходство. В обоих случаях вы передаете объект с помощью метода next и complete. В обоих случаях вы вызываете next и complete в теле функции. В обоих случаях тело функции не выполняется до тех пор, пока вы не вызовете source.subscribe() или просто вызовете функцию напрямую как source() в другом примере. Это потому, что observables — это просто специализированные функции.

Почему бы просто не использовать функции? Что «особенного» в observable?

Отлично подмечено. Если бы вы были очень осторожны, вы, вероятно, могли бы использовать обычные функции для некоторых из этих случаев. Но проблема в том, что версия с функциями не совсем «безопасна». На основе всего лишь одного примера, если семантика observable такова, что next никогда не должен вызываться после завершения, нам понадобятся некоторые гарантии для этого.

const source = function (subscriber: Observer<number>) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // Упс, этого не должно быть
};

В примере выше будет написано «done», а затем сразу 4. Такого быть вообще не должно! Поэтому мы хотим предоставить способ гарантировать, что следующий метод нашего подписчика не будет вызван после завершения. Это можно сделать, заключив нашу функцию в класс.

/**
 * A class used to wrap a user-provided Observer. Since the
 * observer is just a plain objects with a couple of callbacks on it,
 * this type will wrap that to ensure `next` does nothing if called after
 * `complete` has been called, and that nothing happens if `complete`
 * is called more than once.
 */
class SafeSubscriber<T> {
  closed = false;
 
  constructor(private destination: Observer<T>) {}
 
  next(value: T) {
    // Check to see if this is "closed" before nexting.
    if (!this.closed) {
      this.destination.next(value);
    }
  }
 
  complete() {
    // Make sure we're not completing an already "closed" subscriber.
    if (!this.closed) {
      // We're closed now.
      this.closed = true;
      this.destination.complete();
    }
  }
}
 
/**
 * A class to wrap our function, to ensure that when the function is
 * called with an observer, that observer is wrapped with a SafeSubscriber
 */
class Observable<T> {
  constructor(private _wrappedFunc: (subscriber: Observer<T>) => void) {}
 
  subscribe(observer: Observer<T>): void {
    // We can wrap our observer in a "safe subscriber" that
    // does the work of making sure it's not closed.
    const subscriber = new SafeSubscriber(observer);
    this._wrappedFunc(subscriber);
  }
}
 
// Usage
// Now 4 won't be nexted after we complete.
const source = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // this does nothing.
});

Обработка Partial Observers

Другой возможный сценарий — «partial» observer. Другими словами, observable, у которого есть только метод next или complete (или, возможно, метод ошибки, но мы вернемся к этому позже). Теперь мы можем легко разобраться с этим сценарием имея наш тип observable, указанный выше, потому что мы можем реализовать это в нашем SafeSubscriber:

class SafeSubscriber<T> {
  closed = false;
 
  constructor(private destination: Partial<Observer<T>>) {}
 
  next(value: T) {
    if (!this.closed) {
      this.destination.next?.(value); // Note the ?. check here.
    }
  }
 
  complete() {
    if (!this.closed) {
      this.closed = true;
      this.destination.complete?.(); // And here.
    }
  }
}

Уведомления об ошибке

Уведомить нашего подписчика об ошибке так же просто, как добавить дополнительный обработчик к нашим Observer и SafeSubscriber. Семантика очень похожа на complete выше. И ошибка, и complete считаются прекращением наблюдения.

Observer просто немного изменится, чтобы появился обработчик ошибок:

interface Observer<T> {
  next: (value: T) => void;
  complete: () => void;
  error: (err: any) => void;
}

После этого мы можем добавить в наш SafeSubscriber error метод:

class SafeSubscriber<T> {
  closed = false;
 
  constructor(private destination: Partial<Observer<T>>) {}
 
  next(value: T) {
    if (!this.closed) {
      this.destination.next?.(value);
    }
  }
 
  complete() {
    if (!this.closed) {
      this.closed = true;
      this.destination.complete?.();
    }
  }
 
  error(err: any) {
    if (!this.closed) {
      this.closed = true;
      this.destination.error?.(err);
    }
  }
}

Источники данных и кейс для отмены подписки

Основной вариант использования observable — обертывание асинхронного источника данных, например WebSocket.

Чтобы сделать что-то подобное, вы можете использовать наш самодельный observable созданный выше, например:

const helloSocket = new Observable<string>((subscriber) => {
  // Open a socket.
  const socket = new WebSocket('wss://echo.websocket.org');
 
  socket.onopen = () => {
    // Once it's open, send some text.
    socket.send('Hello, World!');
  };
 
  socket.onmessage = (e) => {
    // When it echoes the text back (in the case of this particular server)
    // notify the consumer.
    subscriber.next(e.data);
  };
 
  socket.onclose = (e) => {
    // Oh! we closed!
    if (e.wasClean) {
      // ...because the server said it was done.
      subscriber.complete();
    } else {
      // ...because of something bad. Maybe we lost network or something.
      subscriber.error(new Error('Socket closed dirty!'));
    }
  };
});
 
// Start the websocket and log the echoes
helloSocket.subscribe({
  next: console.log,
  complete: () => console.log('server closed'),
  error: console.error,
});

Но теперь у нас появилась проблема. Пользователь, который подписывается на наш observable, не имеет возможности отменить его и закрыть сокет. Нам нужен способ прекратить работу. Если бы мы просто использовали функцию, мы могли бы вернуть функцию, содержащую нашу логику прекращения подписки.

const source = (subscriber: Observer<string>) => {
  const socket = new WebSocket('wss://echo.websocket.org');
 
  socket.onopen = () => {
    socket.send('Hello, World!');
  };
 
  socket.onmessage = (e) => {
    subscriber.next(e.data);
  };
 
  socket.onclose = (e) => {
    if (e.wasClean) {
      subscriber.complete();
    } else {
      subscriber.error(new Error('Socket closed dirty!'));
    }
  };
 
  return () => {
    if (socket.readyState <= WebSocket.OPEN) {
      socket.close();
    }
  };
};
 
const teardown = source({
  next: console.log,
  complete: () => console.log('done'),
  error: console.error,
});
 
// Decide you really don't want to keep the socket open.
teardown();

Еще одна проблема, которая возникает не обязательно связанная с WebSocket, но и с другими ситуациями, — это ситуации, когда автор observable решает, что он столкнулся с ошибкой или состоянием завершения, и он хочет уведомить пользователя, а затем удалить его. Было бы неплохо иметь единое место для этого, чтобы при вызове subscriber.error или subscriber.complete удаление выполнялось как можно быстрее.

Мы можем добиться всего этого с помощью некоторых изменений SafeSubscriber и добавления типа Subscription.

/**
 * Our subscription type. This is to manage teardowns.
 */
class Subscription {
  private teardowns = new Set<() => void>();
 
  add(teardown: () => void) {
    this.teardowns.add(teardown);
  }
 
  unsubscribe() {
    for (const teardown of this.teardowns) {
      teardown();
    }
    this.teardowns.clear();
  }
}
 
class SafeSubscriber<T> {
  closed = false;
 
  constructor(
    private destination: Partial<Observer<T>>,
    private subscription: Subscription,
  ) {
    // Make sure that if the subscription is unsubscribed,
    // we don't let any more notifications through this subscriber.
    subscription.add(() => (this.closed = true));
  }
 
  next(value: T) {
    if (!this.closed) {
      this.destination.next?.(value);
    }
  }
 
  complete() {
    if (!this.closed) {
      this.closed = true;
      this.destination.complete?.();
      this.subscription.unsubscribe();
    }
  }
 
  error(err: any) {
    if (!this.closed) {
      this.closed = true;
      this.destination.error?.(err);
      this.subscription.unsubscribe();
    }
  }
}
 
class Observable<T> {
  constructor(private _wrappedFunc: (subscriber: Observer<T>) => () => void) {}
 
  subscribe(observer: Observer<T>) {
    const subscription = new Subscription();
    const subscriber = new SafeSubscriber(observer, subscription);
    subscription.add(this._wrappedFunc(subscriber));
    return subscription;
  }
}
 
const helloSocket = new Observable<string>((subscriber) => {
  const socket = new WebSocket('wss://echo.websocket.org');
 
  socket.onopen = () => {
    socket.send('Hello, World!');
  };
 
  socket.onmessage = (e) => {
    subscriber.next(e.data);
  };
 
  socket.onclose = (e) => {
    if (e.wasClean) {
      subscriber.complete();
    } else {
      subscriber.error(new Error('Socket closed dirty!'));
    }
  };
 
  return () => {
    if (socket.readyState <= WebSocket.OPEN) {
      socket.close();
    }
  };
});
 
const subscription = helloSocket.subscribe({
  next: console.log,
  complete: () => console.log('server closed'),
  error: console.error,
});
 
// Later, we can unsubscribe!
subscription.unsubscribe();

Это всё! Observables — это просто специализированные функции!

Я знаю, что это было очень МНОГО кода, надеюсь, пошаговое руководство помогло вам понять, что входит в тип observable (например, тип из rxjs), и помогло немного прояснить его. Это уж точно не волшебство.

Тогда я бы посоветовал вам спросить себя: «Для чего нужны observables?» и, честно говоря, самый короткий ответ: вы можете использовать observables для всего, для чего вы можете использовать функцию. Самая большая разница — это небольшой набор гарантий в отношении обратных вызовов и то, что, возможно, наиболее важно, в отношении прекращения работы.

Важно отметить, что описанная выше реализация НЕ является чем-то, что вы должны воссоздавать или использовать в продакшене. Я бы рекомендовал использовать observable из RxJS, поскольку он охватывает гораздо больше ситуаций, чтобы помочь сохранить ваш код надежным и безопасным при составлении сложных потоков данных.

Tags:
Hubs:
Total votes 8: ↑8 and ↓0+8
Comments2

Articles