Анатомия backpressure в реактивных потоках

Читая многочисленные статьи по теме реактивных потоков, читатель может прийти к выводу, что:

  • backpressure это круто
  • backpressure доступно только в библиотеках, реализующих спецификацию reactive streams
  • эта спецификация настолько сложна, что не стоит и пытаться ее реализовать самому

В этой статье я попытаюсь показать, что:

  • backpressure — это очень просто
  • для реализации асинхронного backpressure достаточно сделать асинхронный вариант семафора
  • при наличии реализации асинхронного семафора, интерфейс org.reactivestreams.Publisher реализуется в несколько десятков строк кода

Backpressure (обратное давление) — это обратная связь, регулирующая скорость работы производителя данных для согласования со скоростью работы потребителя. В отсутствие такой связи более быстрый производитель может переполнить буфер потребителя либо, если буфер безразмерный, исчерпать всю оперативную память.

В многопоточном программировании эта проблема была решена Дейкстрой, предложившим новый механизм синхронизации — семафор. Семафор можно трактовать как счетчик разрешений. Предполагается, что перед совершением ресурсоемкого действия производитель запрашивает разрешение у семафора. Если семафор пуст, то поток производителя блокируется.

Асинхронные программы не могут блокировать потоки, поэтому не могут обращаться за разрешением к пустому семафору (но все остальные операции с семафором делать могут). Блокировать свое исполнение они должны другим способом. Этот другой способ заключается в том, что они просто покидают рабочий поток, на котором исполнялись, но перед этим организуют свое возвращение к работе, как только семафор наполнится.

Наиболее элегантный способ организовать приостановку и возобновление работы асинхронной программы — это структурировать ее как dataflow актор с портами:



A dataflow model – actors with ports, the directed connections between their ports, and initial tokens. Взято из: A Structured Description Of Dataflow Actors And Its Application

Порты бывают входными и выходными. Во входные порты поступают токены (сообщения и сигналы) от выходных портов других акторов. Если входной порт содержит токены, а выходной имеет место для размещения токенов, то он считается активным. Если все порты актора активны, он отправляется на исполнение. Таким образом, при возобновлении своей работы программа актора может безопасно считывать токены из входных портов и записывать в выходные. В этом нехитром механизме таится вся мудрость асинхронного программирования. Выделение портов как отдельных подобъектов акторв резко упрощает кодирование асинхронных программ и позволяет увеличить их разнообразие комбинированием портов разных типов.

Классический актор Хьюитта содержит 2 порта — один видимый, с буфером для входящих сообщений, другой скрытый бинарный, блокирующийся, когда актор отправляется на исполнение и, таким образом, препятствующий повторному запуску актора до окончания первоначального запуска. Искомый асинхронный семафор — нечто среднее между этими двумя портами. Как и буфер для сообщений, он может хранить много токенов, и как у скрытого порта, эти токены — черные, то есть неразличимые, как в сетях Петри, и для их хранения достаточно счетчика токенов.

На первом уровне иерархии у нас определен класс AbstractActor c тремя вложенными классами — базовый Port и производные AsyncSemaPort и InPort, a также с механизмом запуска актора на исполнение при отсутствии заблокированных портов. Вкратце это выглядит так:

public abstract class AbstractActor {
    /** счетчик заблокированных портов */
    private int blocked = 0;

    protected synchronized void restart() {
            controlPort.unBlock();
    }

    private synchronized void incBlockCount() {
        blocked++;
    }

    private synchronized void decBlockCount() {
        blocked--;
        if (blocked == 0) {
            controlPort.block();
            excecutor.execute(this::run);
        }
    }

    protected abstract void turn() throws Throwable;

    /** головной метод */
    private void run() {
        try {
            turn();
            restart();
        } catch (Throwable throwable) {
            whenError(throwable);
        }
    }
}

В него вложен минимальный набор классов-портов:

Port — базовый класс всех портов

    protected  class Port {
        private boolean isBlocked = true;

        public Port() {
            incBlockCount();
        }

        protected synchronized void block() {
            if (isBlocked) {
                return;
            }
            isBlocked = true;
            incBlockCount();
        }

        protected synchronized void unBlock() {
            if (!isBlocked) {
                return;
            }
            isBlocked = false;
            decBlockCount();
        }
    }

Асинхронный семафор:

    public class AsyncSemaPort extends Port {
        private long permissions = 0;

        public synchronized void release(long n) {
            permissions += n;
            if (permissions > 0) {
                unBlock();
            }
        }

        public synchronized void aquire(long delta) {
            permissions -= delta;
            if (permissions <= 0) { 
                // поток исполнения не блокируется
                // но актор не зайдет на следующий раунд исполнения,
                // пока счетчик разрешений не станет опять положительным
                block();
            }
        }
    }

InPort — минимальный буфер для одного входящего сообщения:

    public class InPort<T> extends Port implements OutMessagePort<T> {
        private T item;

        @Override
        public void onNext(T item) {
            this.item = item;
            unBlock();
        }

        public synchronized T poll() {
            T res = item;
            item = null;
            return res;
        }
    }

Полную версию класса AbstractActor можно посмотреть здесь.

На следующем уровне иерархии мы имеем три абстрактных актора с определенными портами, но с неопределенной процедурой обработки:

  • класс AbstractProducer — это актор с одним портом типа асинхронный семафор (и внутренним контрольным портом, присутствует у всех акторов по умолчанию).
  • класс AbstractTransformer — обычный актор Хьюита, со ссылкой на входной порт следующего актора в цепочке, куда он отправляет преобразованные токены.
  • класс AbstractConsumer — также обычный актор, но преобразованные токены он никуда не отправляет, при этом он имеет ссылку на семафор производителя, и открывает этот семафор после поглощения входного токена. Таким образом, количество находящихся в обработке токенов поддерживается постоянным, и никакого переполнения буферов не происходит.

На последнем уровне, уже в директории test, определены конкретные акторы, используемые в тестах:

  • класс ProducerActor генерирует конечный поток целых чисел.
  • класс TransformerActor принимает очередное число из потока и отправляет его дальше по цепочке.
  • класс ConsumerActor — принимает и печатает полученные числа

Теперь мы можем построить цепочку асинхронных, параллельно работающих обработчиков следующим образом: производитель — любое количество трансформеров — потребитель



Тем самым мы реализовали backpressure, и даже в более общем виде, чем в спецификации reactive streams — обратная связь может охватывать произвольное число каскадов обработки, а не только соседние, как в спецификации.

Чтобы реализовать спецификацию, надо определить выходной порт, чувствительный к количеству переданных ему с помощью метода request() разрешений — это будет Publisher, и дополнить уже существующий InPort вызовом этого метода — это будет Subscriber. То есть, мы принимаем, что интерфейсы Publisher и Subscriber описывают поведение портов, а не акторов. Но судя по тому, что в списке интерфейсов присутствует также Processor, который никак не может быть интерфейсом порта, авторы спецификации считают свои интерфейсы интерфейсами акторов. Ну что же, мы можем сделать акторы, реализующие все эти интерфейсы, с помощью делегирования исполнения интерфейсных функций соответствующим портам.

Для простоты пусть наш Publisher не имеет собственного буфера и будет писать сразу в буфер Subscriber'а. Для этого нужно, чтобы какой-либо Subscriber подписался и выполнил request(), то есть, у нас есть 2 условия и, соответственно, нам нужно 2 порта — InPort<Subscriber> и AsyncSemaPort. Ни один из них не подходит в качестве базового для реализации Publisher'а, так как содержит ненужные методы, поэтому мы сделаем эти порты внутренними переменными:

public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
    protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
    protected AbstractActor.AsyncSemaPort sema;

    public ReactiveOutPort(AbstractActor actor) {
        subscriber = actor.new InPort<>();
        sema = actor.new AsyncSemaPort();
    }
}

На этот раз мы определили класс ReactiveOutPort не как вложенный, поэтому ему понадобился параметр конструктора — ссылка на объемлющий актор, чтобы создать экземпляры портов, определенных как вложенные классы.

Метод subscribe(Subscriber subscriber) сводится к сохранению подписчика и вызову subscriber.onSubscribe():

    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException();
        }
        if (this.subscriber.isFull()) {
            subscriber.onError(new IllegalStateException());
            return;
        }
        this.subscriber.onNext(subscriber);
        subscriber.onSubscribe(this);
    }

что обычно приводит к вызову Publisher.request(), который сводится к поднятию семафора с помощью вызова AsyncSemaPort.release():

    public synchronized void request(long n) {
        if (subscriber.isEmpty()) {
            return; // this spec requirement
        }
        if (n <= 0) {
            subscriber.current().onError(new IllegalArgumentException());
            return;
        }
        sema.release(n);
    }

И теперь нам осталось не забыть опускать семафор с помощью вызова AsyncSemaPort.aquire() в момент использования ресурса:

    public synchronized void onNext(T item) {
        Subscriber<? super T> subscriber = this.subscriber.current();
        if (subscriber == null) {
            throw  new IllegalStateException();
        }
        sema.aquire();
        subscriber.onNext(item);
    }

Проект AsyncSemaphore был специально разработан для этой статьи. Он намеренно сделан максимально компактным, чтобы не утомлять читателя. Как результат, он содержит существенные ограничения:

  • Одновременно к Publisher'у может быть подписано не более одного Subscriber
  • размер входного буфера Subscriber'а равен 1

Кроме того, AsyncSemaPort не является полным аналогом синхронного семафора — только один клиент может выполнять операцию aquire() у AsyncSemaPort (имеется в виду объемлющий актор). Но это не является недостатком — AsyncSemaPort хорошо выполняет свою роль. В принципе, можно сделать и по другому — взять java.util.concurrent.Semaphore и дополнить его асинхронным интерфейсом подписки (см AsyncSemaphore.java из проекта DF4J). Такой семафор может связывать акторы и потоки исполнения в любом порядке.

Вообще, каждый вид синхронного (блокирующего) взаимодействия имеет свой асинхронный (неблокирующий) аналог. Так, в том же проекте DF4J имеется реализация BlockingQueue, дополненная асинхронным интерфейсом. Это открывает возможность поэтапного преобразования многопоточной программы в асинхронную, по частям заменяя потоки на акторы.
AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее
Реклама

Комментарии 7

    +1

    Спасибо за статью, но код довольно запутанный и следить за ним не просто (Port вызывает методы AbstractActor и наоборот, каждый из них по отдельности нельзя понять), было бы неплохо добавить разбор того что происходит в простой паре producer-consumer и что означает block, поскольку он не блокирующий.


    Пока не ясно чем этот подход лучше чего-то вроде:


    class ProducerSubscription<T> extends Subscription<T> with Runnable {
      private Subscriber<T> subscriber; // from constructor
      private AtomicLong demand = new AtomicLong();
      private AtomicLong spawnedId = new AtomicLong();
    
      public void request(long size) {
        if (demand.getAndAdd(size) == 0) {
          executor.execute(this::send);
        }
      }
    
      private void send() {
        long id = spawnedId.incrementAndGet();
        // ensure only one send spawned most of the time
        while (spawnedId.get() == id) {
          long batch = demand.getAndSet(0);
          if (batch == 0) break;
          while (batch-- > 0) {
            subscriber.onNext(....);
          }
        }
      }
    }
      0
      «что означает block, поскольку он не блокирующий»
      block() блокирует алгоритм актора. О блокировке используемого потока исполнения мы вообще не говорим, поскольку нам это делать запрещено.

      «чем этот подход лучше чего-то вроде»
      в вашем примере нет разделения на системную и пользовательскую часть. Если я захочу сделать Publisher с другой функциональностью, мне нечего будет позаимствовать из вашего примера. А хотелось бы при написании функциональности не думать о системных вещах типа предотвращения нежелательного параллельного запуска одной и той же задачи.

      Далее, вы не расписали subscriber.onNext(....). Где вы собрались вычислять очередное значение для onNext? там же, где и вызов onNext? Это снижает параллелизм в случае недобросовестного клиента, который нагружает свой onNext тяжелыми вычислениями. По хорошему, для вычисления генерируемых значений нужен отдельный актор.
        0
        block() блокирует алгоритм актора

        вот об этом я и говорю — эта фраза не объясняет ничего, описанию этого алгоритма стоило бы уделить больше внимания


        в вашем примере нет разделения на системную и пользовательскую часть
        вы не расписали subscriber.onNext(....)

        Как раз эта часть может быть например абстрактным методом, реализуемым клиентской частью или лямбдой, передаваемой снаружи.


        Это снижает параллелизм в случае недобросовестного клиента, который нагружает свой onNext тяжелыми вычислениями

        Тут согласен, циркулярный буфер и отдельная таска отправки решит эту проблему. Этот код призван только показать, что решить задачу можно просто, вопрос в том — что дает усложнение?

          0
          "эта часть может быть например абстрактным методом"
          и в какой момент он должен исполнятся? У вас для него не предусмотрено места. Он должен работать вне ProducerSubscription.

          "описанию этого алгоритма стоило бы уделить больше внимания"
          Да, мне как-то говорил один профессор, что ключевые моменты в статье надо повторять по три раза. Я же блокировку актора описал один раз:

          Асинхронные программы не могут блокировать потоки… Блокировать свое исполнение они должны другим способом. Этот другой способ заключается в том, что они просто покидают рабочий поток, на котором исполнялись, но перед этим организуют свое возвращение к работе, как только семафор наполнится.

      0
      «было бы неплохо добавить разбор того что происходит в простой паре producer-consumer»
      Итак, какой-то актор имеет ссылку на входной порт следующего актора, как нарисовано на картинках. Он вызывает операцию на акторе, что может привести к изменению его состояния. Так, если порт типа InPort был пуст, и туда поместили токен, то порт переходит из заблокированного состояния в разблокированное, и извещает об этом AbstractActor с помощью вызова decBlocked(). Если в результате число заблокированных портов стало равно нулю, то актор отправляется на исполнение, а контрольный порт блокируется, чтобы число заблокированных портов стало не равно нулю, и не произошло повторной отправки на исполнение.
      Начав работать, актор вызывает пользовательскую процедуру whenNext, которая, скорее всего, извлечет токен из входного порта и тем самым его заблокирует. Когда пользовательская процедура закончит свой вызов, контрольный порт автоматически разблокируется. Если к этому моменту входной порт будет снова заполнен, это приведет к повторному запуску актора. Если нет, то актор будет ждать поступления токена на входной порт.
        0

        Разобрался немного, но как например контролируется запись в InPort? Ведь если старое значение еще не прочитано, новое перезапишет старое? Не требует ли такой подход слишком тщательно следить за взаимодействием, писать в каждый порт только раз за круг, не забыть записать в какой-то порт?

          0
          любой нормальный буфер имеет ограниченный размер, и неважно, 1 или 1000, все равно надо следить за переполнением. И да, за один раунд можно безопасно прочитать не более одного значения из входного порта и записать не более одного значения в выходной порт. Чтобы прочитать или записать больше, надо опрашивать состояние портов и быть готовым к тому, что это не удастся. Так что лучше и не опрашивать вовсе, а сразу выходить на следующий раунд.

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое