Comments 7
Спасибо за статью, но код довольно запутанный и следить за ним не просто (Port вызывает методы AbstractActor и наоборот, каждый из них по отдельности нельзя понять), было бы неплохо добавить разбор того что происходит в простой паре producer-consumer и что означает block, поскольку он не блокирующий.
Пока не ясно чем этот подход лучше чего-то вроде:
class ProducerSubscription<T> extends Subscription<T> with Runnable {
private Subscriber<T> subscriber; // from constructor
private AtomicLong demand = new AtomicLong();
private AtomicLong spawnedId = new AtomicLong();
public void request(long size) {
if (demand.getAndAdd(size) == 0) {
executor.execute(this::send);
}
}
private void send() {
long id = spawnedId.incrementAndGet();
// ensure only one send spawned most of the time
while (spawnedId.get() == id) {
long batch = demand.getAndSet(0);
if (batch == 0) break;
while (batch-- > 0) {
subscriber.onNext(....);
}
}
}
}
block() блокирует алгоритм актора. О блокировке используемого потока исполнения мы вообще не говорим, поскольку нам это делать запрещено.
«чем этот подход лучше чего-то вроде»
в вашем примере нет разделения на системную и пользовательскую часть. Если я захочу сделать Publisher с другой функциональностью, мне нечего будет позаимствовать из вашего примера. А хотелось бы при написании функциональности не думать о системных вещах типа предотвращения нежелательного параллельного запуска одной и той же задачи.
Далее, вы не расписали subscriber.onNext(....). Где вы собрались вычислять очередное значение для onNext? там же, где и вызов onNext? Это снижает параллелизм в случае недобросовестного клиента, который нагружает свой onNext тяжелыми вычислениями. По хорошему, для вычисления генерируемых значений нужен отдельный актор.
block() блокирует алгоритм актора
вот об этом я и говорю — эта фраза не объясняет ничего, описанию этого алгоритма стоило бы уделить больше внимания
в вашем примере нет разделения на системную и пользовательскую часть
вы не расписали subscriber.onNext(....)
Как раз эта часть может быть например абстрактным методом, реализуемым клиентской частью или лямбдой, передаваемой снаружи.
Это снижает параллелизм в случае недобросовестного клиента, который нагружает свой onNext тяжелыми вычислениями
Тут согласен, циркулярный буфер и отдельная таска отправки решит эту проблему. Этот код призван только показать, что решить задачу можно просто, вопрос в том — что дает усложнение?
и в какой момент он должен исполнятся? У вас для него не предусмотрено места. Он должен работать вне ProducerSubscription.
"описанию этого алгоритма стоило бы уделить больше внимания"
Да, мне как-то говорил один профессор, что ключевые моменты в статье надо повторять по три раза. Я же блокировку актора описал один раз:
Асинхронные программы не могут блокировать потоки… Блокировать свое исполнение они должны другим способом. Этот другой способ заключается в том, что они просто покидают рабочий поток, на котором исполнялись, но перед этим организуют свое возвращение к работе, как только семафор наполнится.
Итак, какой-то актор имеет ссылку на входной порт следующего актора, как нарисовано на картинках. Он вызывает операцию на акторе, что может привести к изменению его состояния. Так, если порт типа InPort был пуст, и туда поместили токен, то порт переходит из заблокированного состояния в разблокированное, и извещает об этом AbstractActor с помощью вызова decBlocked(). Если в результате число заблокированных портов стало равно нулю, то актор отправляется на исполнение, а контрольный порт блокируется, чтобы число заблокированных портов стало не равно нулю, и не произошло повторной отправки на исполнение.
Начав работать, актор вызывает пользовательскую процедуру whenNext, которая, скорее всего, извлечет токен из входного порта и тем самым его заблокирует. Когда пользовательская процедура закончит свой вызов, контрольный порт автоматически разблокируется. Если к этому моменту входной порт будет снова заполнен, это приведет к повторному запуску актора. Если нет, то актор будет ждать поступления токена на входной порт.
Разобрался немного, но как например контролируется запись в InPort
? Ведь если старое значение еще не прочитано, новое перезапишет старое? Не требует ли такой подход слишком тщательно следить за взаимодействием, писать в каждый порт только раз за круг, не забыть записать в какой-то порт?
Анатомия backpressure в реактивных потоках