Зачем использовать thenApplyAsync, если мы не меняем Executor? В вашем коде вся логика будет выполняться в ForkJoinPool.commonPool(). Можно тогда и простым thenApply обойтись. К тому же, не будет теряться время на переключение контекста между окончанием работы первого сервиса и началом работы второго. Хотя, возможно, в реализации CompletableFuture есть оптимизации на этот счёт.
А как же Kotlin? Давайте попробуем сделать что-то подобное на корутинах.
val result = async {
withContext(Dispatchers.Default) { getResultFromFirstService() }?.let { first ->
withContext(Dispatchers.Default) { getResultFromSecondService(first) }?.let { second ->
"$first $second"
}
}
}
Тут вы, как и в примере на Java, хотите с помощью функции (не ключевое слово, кстати, как и async) withContext() насильно заставить второй сервис выполняться в отличном от исходного диспетчере, а значит и, потенциально, в другом потоке. Зачем? Ну отправит она эту задачу на выполнение в другой поток (возможно), всё равно же он прерывается до окончания выполнения.
В итоге можно прийти к более простому примеру:
val result = GlobalScope.async {
getResultFromFirstService()?.let { first ->
getResultFromSecondService(first)?.let { second ->
"$first $second"
}
}
}
Конечно, оговорюсь, что из вашего примера непонятно, на каком СoroutineScope вызывается async, но тогда этот пример тем более неудачный, так как вы про это нигде не говорили.
Не то, чтобы я придирался к мелочам, но эти мелочи мало того, что не дают выигрыша в производительности (а даже, скорее всего, наоборот), но и ещё ухудшают чтение кода.
Про Scala не скажу, так как не имел опыта работы с ней.
У нас, мне кажется, возникло недопонимание. Давайте я более подробно разъясню пример с ES:
-- БД:
SELECT * FROM Events WHERE AggregateType = 'Stock' AND AggregateId = stock_id;
-- приложение:
-- Берем последний EventId. Если он равен, например, 1, то следующий EventId будет 2
-- Собираем из выборки агрегат, проигрывая все события по порядку
-- Проверяем, что quantity в агрегате больше 0
-- Если больше 0, то переходим к вставке события QuantityReduced c EventId = 2
-- Если 0, то отдаем неуспешный ответ
-- БД:
INSERT INTO Events VALUES (next_sequence_id, stock_id, 'Stock', 2, 'QuantityReduced', data);
-- Если вставка прошла успешно, то отдаем успешный ответ клиенту
-- Если вставка фэйлится на уникальности ключа AggregateType + AggregateId + EventId, то сработала оптимистичная блокировка.
-- То есть кто-то уже вставил для этого агрегата событие с EventId = 2, а значит состояние агрегата изменилось.
-- В таком случае повторяем сначала
Там, где я в своем предыдущем сообщении говорил, что лучше повесить уникальный индекс на AggregateType и EventId, то был не прав, так как нужно к индексу добавить еще AggregateId.
Когда "потом"?
Тут я имел ввиду, что собираем после SELECT * FROM Events
Еще кое-что поясню: между select и insert для случая с ES и между select и update для случая без ES происходит какая-то бизнес-логика на стороне приложения, то есть не на уровне БД.
А еще лучше, чтобы уменьшить количество конфликтов, можно сделать уникальный индекс по колонкам AggregateType и EventId и использовать EventId в качестве "версии".
То есть в вашем между select и insert нужно обновить агрегат
Не нужно. Вставка события - это и есть "обновление" агрегата. Агрегат уже потом собирается путем проигрывания всех событий из таблицы Events.
Кстати говоря, в моих примерах с оптимистичными блокировками каждый SQL запрос может выполняться в отдельной транзакции. Главное, чтобы для случая "без ES" UPDATE был атомарным.
Давайте попробуем пофантазировать. Допустим, мы написали 2 прототипа системы (с ES и с традиционным подходом). Есть Ваня и Петя, которые хотят купить один товар, но на складе он последний. Предположим, что у нас есть система хранения, которая понимает SQL, поддерживает уникальные индексы, но не умеет в транзакции (но операцию UPDATE все же считаем атомарной, чтобы предотвратить потерянные обновления). Эту систему хранения мы будем использовать для обоих прототипов. Оба пользователя добавляют товар в корзину, переходят на чекаут, вбивают свои данные и нажимают на кнопку "оплатить". В этом случае нам нужно временно зарезервировать товар на складе, пока производится оплата. Далее имеем примерно следующие SQL запросы к базе, которые генерируются после нажатия на кнопку оплаты:
Без ES с использованием оптимистической блокировки:
SELECT * FROM inventory WHERE id = stock_id;
UPDATE inventory SET quantity = quantity — 1 WHERE id = stock_id AND version = expected_version;
Без ES с использованием семантического лока:
SELECT * FROM inventory WHERE id = stock_id;
UPDATE inventory SET quantity = quantity — 1 WHERE id = stock_id AND quantity > 0;
В случае, если UPDATE возвращает 0 строк, то повторяем запросы. Если при повторении обнаруживается, что товара на складе больше нет, то показываем пользователю, что купить товар он не успел.
Если все-таки транзакции поддерживаем, то можно и через пессемистичную блокировку с использованием SELECT FOR UPDATE
С ES пока вижу только такой вариант, при условии, что есть уникальный индекс на SequenceId (в качестве примера используем таблицу с событиями из статьи):
SELECT * FROM Events WHERE AggregateType = 'Stock' AND AggregateId = stock_id;
INSERT INTO Events VALUES (expected_sequence_id, stock_id, 'Stock', event_id, 'QuantityReduced', data);
Детали получения актуального SequenceId я опустил. В случае, если валимся на уникальном индексе, то делаем то же самое, что и в примере с оптимистичными локами без использование ES.
Во всех примерах это происходит синхронно и консистентно при нажатии на кнопку оплаты.
Аудит возможен и без ES.
Вы правы. И по сути это будет тот же ES, но только вы на каждое событие создаете снимок состояния агрегата и используете его при чтении. Возможно, я натянул сову на глобус сейчас, но тем не менее :)
Вот это я не понял, можно подробнее?
Это если у вас есть, например, брокер сообщений, и вам нужно бросать в него сообщение только в том случае, если транзакция в базе данных успешно закомитилась или не бросать его вовсе. То есть нужны ACID гарантии между базой данных и брокером сообщений. Это достигается либо с использованием распределенных транзакций, что является злом, либо с использованием шаблона Transactional Outbox, где вы не бросаете событие в брокер напрямую, а вставляете его в таблицу в рамках одной транзакции. События из этой таблицы асинхронно будут выгребаться оттуда через какой-нибудь CDC или тупым поллингом с помощью селекта. Само собой, здесь также очень важна упорядоченность событий, если они не являются коммутативными. Например, будет странно, если сначала прилетит событие OrderCancelled, а потом OrderCreated для одного заказа. В случае с ES этот паттерн уже реализован из коробки, так как у вас таблица Events сама по себе является Outbox таблицей. При этом здесь уже нет необходимости делать в рамках одной транзакции обновление строк в разных таблицах, нужна только вставка в одну таблицу, что снимает ряд требований на хранилище событий.
но на практике просто база данных работает быстрее, чем оптимизированный CQRS, в режиме строгой согласованности
К сожалению, у меня нет большого жизненного опыта, чтобы судить, как оно обычно "на практике". Но практика все же подсказывает, что на любой кейс брать один и тот же подход всегда и везде чревато неуспехом.
Я все еще жду пример, где с ES будет работать лучше чем без него.
Все же там, где необходим подробный аудит и где применяется распределенная архитектура, где приложения активно используют асинхронное взаимодействие путем обмена сообщениями, думаю. Возможно, как раз в каких-нибудь очень серьезных финансовых приложениях, где нужно записывать каждое действие над изменением состояния чего угодно. Все-таки пока что мне не приходилось работать с системами, где применяется ES, поэтому могу лишь фантазировать.
Блокировка запси во время чтения создает строгую консистентность.
Можно делать оптимистичную блокировку. Каждое событие - это инкремент версии агрегата. Если есть возможность сохранять событие вместе с версией и повесить уникальный индекс на версию, то получим что-то вроде оптимистичной блокировки. Попытались добавить событие с существующей версией - операция завершится неудачей, и нужно будет повторить.
Во-первых это убивает все преимущества ES, которые обозначены в статье.
Ну почему убивает? Во-первых, наличие аудит лога и возможность "путешествия во времени" точно не убивает. Во-вторых, из коробки механизм атомарной и упорядоченной публикации событий (в системе без ES пришлось бы использовать Transactional Outbox pattern или, не дай боже, распределенные транзакции). В-третьих, CQRS - хороший паттерн, который отделяет мух от котлет, и позволяет более гранулярно оптимизировать различные части приложения. Конечно, ES и CQRS - вещи ортогональные, но в связке интересно выглядит.
Лок требует строгой консистентности, которой нет в ES.
Семантический лок - это немного другое, он больше на оптимистичную блокировку похож, но только на уровне бизнес логики.
Это прекрасный вариант с точки зрения user experience.
Согласен, что это худший из способов, но чисто технически это все же способ :) Если конкретно в вашей системе это большая редкость, то может и прокатить с извинениями и скидочкой за доставленные неудобства.
P.S. Все же согласен, что в 99% можно обойтись без ES. И сразу начинать проектировать приложение с помощью этого подхода не стоит. Серебряной пули не существует, и за всё приходится платить. Однако я уверен, что ES может и находит свое применение.
Конкретно это можно с помощью optimistic locking разруливать кстати и без транзакций. Но для этого нужно, чтобы система хранения поддерживала что-то вроде уникальных индексов. Каждое событие инкрементит версию, которая сохраняется вместе с этим событием. Вот эти версии должны быть уникальными.
Я, конечно, не сварщик, но для решения проблемы двойной оплаты вижу навскидку такие варианты:
в случае с event sourcing Authorize будет командой, а в обработчике этой команды в агрегате мы будем работать с состоянием, собранным из write представления, которое консистентно. В случае, если заказ уже оплачен, то возвращаем робокассе ошибку, заодно можно сохранить событие о том, что была попытка двойной оплаты. В случае успеха, соответственно, меняем состояние заказа на "оплачен". Если представить, что PSP не вызывает наш сервис, чтобы проверить, что деньги можно списать, и списывает их безусловно, то можно пойти немного другим путем. Сделать кнопку оплаты таким образом, чтобы она сперва создала семантической лок в системе (то есть это будет команда) путем перевода заказа в состояние а-ля PAYMENT_IN_PROGRESS, и потом бы происходил редирект на страницу PSP. Но кажется, что конкретно в этой ситуации без event sourcing пришлось сделать то же самое. Ещё один вариант: если задетектили двойную оплату каким-либо способом, то всегда одну из транзакций можно откатить в PSP в автоматическом режиме. Короче, как мне кажется, варианты побороть проблему есть. И способы решения проблем, как всегда, сильно зависят от бизнеса и конкретных кейсов.
Зачем использовать thenApplyAsync, если мы не меняем Executor? В вашем коде вся логика будет выполняться в ForkJoinPool.commonPool(). Можно тогда и простым thenApply обойтись. К тому же, не будет теряться время на переключение контекста между окончанием работы первого сервиса и началом работы второго. Хотя, возможно, в реализации CompletableFuture есть оптимизации на этот счёт.
Тут вы, как и в примере на Java, хотите с помощью функции (не ключевое слово, кстати, как и async) withContext() насильно заставить второй сервис выполняться в отличном от исходного диспетчере, а значит и, потенциально, в другом потоке. Зачем? Ну отправит она эту задачу на выполнение в другой поток (возможно), всё равно же он прерывается до окончания выполнения.
В итоге можно прийти к более простому примеру:
Конечно, оговорюсь, что из вашего примера непонятно, на каком СoroutineScope вызывается async, но тогда этот пример тем более неудачный, так как вы про это нигде не говорили.
Не то, чтобы я придирался к мелочам, но эти мелочи мало того, что не дают выигрыша в производительности (а даже, скорее всего, наоборот), но и ещё ухудшают чтение кода.
Про Scala не скажу, так как не имел опыта работы с ней.
Я лишь использовал оптимистичные блокировки, которые используются повсеместно, включая внутренности различных баз данных.
Ну и не одними РСУБД едиными, можно в качестве event store использовать любую другую подходящую БД, которая сможет реализовать вышеизложенный подход.
У нас, мне кажется, возникло недопонимание. Давайте я более подробно разъясню пример с ES:
Там, где я в своем предыдущем сообщении говорил, что лучше повесить уникальный индекс на
AggregateType
иEventId
, то был не прав, так как нужно к индексу добавить ещеAggregateId
.Тут я имел ввиду, что собираем после
SELECT * FROM Events
Еще кое-что поясню: между select и insert для случая с ES и между select и update для случая без ES происходит какая-то бизнес-логика на стороне приложения, то есть не на уровне БД.
Поэтому я и говорю, что натягиваю сову на глобус. Смотря какой аудит, смотря где и какие требования, тут все верно.
Можно просто взять
SELECT MAX(SequenceId) FROM Events;
А еще лучше, чтобы уменьшить количество конфликтов, можно сделать уникальный индекс по колонкам
AggregateType
иEventId
и использоватьEventId
в качестве "версии".Не нужно. Вставка события - это и есть "обновление" агрегата. Агрегат уже потом собирается путем проигрывания всех событий из таблицы
Events
.Кстати говоря, в моих примерах с оптимистичными блокировками каждый SQL запрос может выполняться в отдельной транзакции. Главное, чтобы для случая "без ES"
UPDATE
был атомарным.Давайте попробуем пофантазировать. Допустим, мы написали 2 прототипа системы (с ES и с традиционным подходом). Есть Ваня и Петя, которые хотят купить один товар, но на складе он последний. Предположим, что у нас есть система хранения, которая понимает SQL, поддерживает уникальные индексы, но не умеет в транзакции (но операцию UPDATE все же считаем атомарной, чтобы предотвратить потерянные обновления). Эту систему хранения мы будем использовать для обоих прототипов. Оба пользователя добавляют товар в корзину, переходят на чекаут, вбивают свои данные и нажимают на кнопку "оплатить". В этом случае нам нужно временно зарезервировать товар на складе, пока производится оплата. Далее имеем примерно следующие SQL запросы к базе, которые генерируются после нажатия на кнопку оплаты:
Без ES с использованием оптимистической блокировки:
Без ES с использованием семантического лока:
В случае, если
UPDATE
возвращает 0 строк, то повторяем запросы. Если при повторении обнаруживается, что товара на складе больше нет, то показываем пользователю, что купить товар он не успел.Если все-таки транзакции поддерживаем, то можно и через пессемистичную блокировку с использованием
SELECT FOR UPDATE
С ES пока вижу только такой вариант, при условии, что есть уникальный индекс на
SequenceId
(в качестве примера используем таблицу с событиями из статьи):Детали получения актуального
SequenceId
я опустил. В случае, если валимся на уникальном индексе, то делаем то же самое, что и в примере с оптимистичными локами без использование ES.Во всех примерах это происходит синхронно и консистентно при нажатии на кнопку оплаты.
Вы правы. И по сути это будет тот же ES, но только вы на каждое событие создаете снимок состояния агрегата и используете его при чтении. Возможно, я натянул сову на глобус сейчас, но тем не менее :)
Это если у вас есть, например, брокер сообщений, и вам нужно бросать в него сообщение только в том случае, если транзакция в базе данных успешно закомитилась или не бросать его вовсе. То есть нужны ACID гарантии между базой данных и брокером сообщений. Это достигается либо с использованием распределенных транзакций, что является злом, либо с использованием шаблона Transactional Outbox, где вы не бросаете событие в брокер напрямую, а вставляете его в таблицу в рамках одной транзакции. События из этой таблицы асинхронно будут выгребаться оттуда через какой-нибудь CDC или тупым поллингом с помощью селекта. Само собой, здесь также очень важна упорядоченность событий, если они не являются коммутативными. Например, будет странно, если сначала прилетит событие
OrderCancelled
, а потомOrderCreated
для одного заказа. В случае с ES этот паттерн уже реализован из коробки, так как у вас таблицаEvents
сама по себе является Outbox таблицей. При этом здесь уже нет необходимости делать в рамках одной транзакции обновление строк в разных таблицах, нужна только вставка в одну таблицу, что снимает ряд требований на хранилище событий.К сожалению, у меня нет большого жизненного опыта, чтобы судить, как оно обычно "на практике". Но практика все же подсказывает, что на любой кейс брать один и тот же подход всегда и везде чревато неуспехом.
Все же там, где необходим подробный аудит и где применяется распределенная архитектура, где приложения активно используют асинхронное взаимодействие путем обмена сообщениями, думаю. Возможно, как раз в каких-нибудь очень серьезных финансовых приложениях, где нужно записывать каждое действие над изменением состояния чего угодно. Все-таки пока что мне не приходилось работать с системами, где применяется ES, поэтому могу лишь фантазировать.
Можно делать оптимистичную блокировку. Каждое событие - это инкремент версии агрегата. Если есть возможность сохранять событие вместе с версией и повесить уникальный индекс на версию, то получим что-то вроде оптимистичной блокировки. Попытались добавить событие с существующей версией - операция завершится неудачей, и нужно будет повторить.
Ну почему убивает? Во-первых, наличие аудит лога и возможность "путешествия во времени" точно не убивает. Во-вторых, из коробки механизм атомарной и упорядоченной публикации событий (в системе без ES пришлось бы использовать Transactional Outbox pattern или, не дай боже, распределенные транзакции). В-третьих, CQRS - хороший паттерн, который отделяет мух от котлет, и позволяет более гранулярно оптимизировать различные части приложения. Конечно, ES и CQRS - вещи ортогональные, но в связке интересно выглядит.
Семантический лок - это немного другое, он больше на оптимистичную блокировку похож, но только на уровне бизнес логики.
Согласен, что это худший из способов, но чисто технически это все же способ :) Если конкретно в вашей системе это большая редкость, то может и прокатить с извинениями и скидочкой за доставленные неудобства.
P.S. Все же согласен, что в 99% можно обойтись без ES. И сразу начинать проектировать приложение с помощью этого подхода не стоит. Серебряной пули не существует, и за всё приходится платить. Однако я уверен, что ES может и находит свое применение.
Конкретно это можно с помощью optimistic locking разруливать кстати и без транзакций. Но для этого нужно, чтобы система хранения поддерживала что-то вроде уникальных индексов. Каждое событие инкрементит версию, которая сохраняется вместе с этим событием. Вот эти версии должны быть уникальными.
Я, конечно, не сварщик, но для решения проблемы двойной оплаты вижу навскидку такие варианты:
в случае с event sourcing
Authorize
будет командой, а в обработчике этой команды в агрегате мы будем работать с состоянием, собранным из write представления, которое консистентно. В случае, если заказ уже оплачен, то возвращаем робокассе ошибку, заодно можно сохранить событие о том, что была попытка двойной оплаты. В случае успеха, соответственно, меняем состояние заказа на "оплачен". Если представить, что PSP не вызывает наш сервис, чтобы проверить, что деньги можно списать, и списывает их безусловно, то можно пойти немного другим путем. Сделать кнопку оплаты таким образом, чтобы она сперва создала семантической лок в системе (то есть это будет команда) путем перевода заказа в состояние а-ляPAYMENT_IN_PROGRESS
, и потом бы происходил редирект на страницу PSP. Но кажется, что конкретно в этой ситуации без event sourcing пришлось сделать то же самое. Ещё один вариант: если задетектили двойную оплату каким-либо способом, то всегда одну из транзакций можно откатить в PSP в автоматическом режиме. Короче, как мне кажется, варианты побороть проблему есть. И способы решения проблем, как всегда, сильно зависят от бизнеса и конкретных кейсов.