Search
Write a publication
Pull to refresh
0
0
Send message
С примерами по многопоточности не то что-то.
Попробуем сначала решить задачу на Java
    private static CompletableFuture<Optional<String>> calcResultOfTwoServices (
            Supplier<Optional<Integer>> getResultFromFirstService,
            Function<Integer, Optional<Integer>> getResultFromSecondService
    ) {
        return CompletableFuture
                .supplyAsync(getResultFromFirstService)
                .thenApplyAsync(firstResultOptional ->
                        firstResultOptional.flatMap(first ->
                                getResultFromSecondService.apply(first).map(second ->
                                    first + " " + second
                                )
                        )
                );
    }


Зачем использовать thenApplyAsync, если мы не меняем Executor? В вашем коде вся логика будет выполняться в ForkJoinPool.commonPool(). Можно тогда и простым thenApply обойтись. К тому же, не будет теряться время на переключение контекста между окончанием работы первого сервиса и началом работы второго. Хотя, возможно, в реализации CompletableFuture есть оптимизации на этот счёт.
А как же Kotlin? Давайте попробуем сделать что-то подобное на корутинах.
val result = async {
        withContext(Dispatchers.Default) { getResultFromFirstService() }?.let { first ->
            withContext(Dispatchers.Default) { getResultFromSecondService(first) }?.let { second ->
                "$first $second"
            }
        }
    }


Тут вы, как и в примере на Java, хотите с помощью функции (не ключевое слово, кстати, как и async) withContext() насильно заставить второй сервис выполняться в отличном от исходного диспетчере, а значит и, потенциально, в другом потоке. Зачем? Ну отправит она эту задачу на выполнение в другой поток (возможно), всё равно же он прерывается до окончания выполнения.
В итоге можно прийти к более простому примеру:
    val result = GlobalScope.async {
        getResultFromFirstService()?.let { first ->
            getResultFromSecondService(first)?.let { second ->
                "$first $second"
            }
        }
    }

Конечно, оговорюсь, что из вашего примера непонятно, на каком СoroutineScope вызывается async, но тогда этот пример тем более неудачный, так как вы про это нигде не говорили.

Не то, чтобы я придирался к мелочам, но эти мелочи мало того, что не дают выигрыша в производительности (а даже, скорее всего, наоборот), но и ещё ухудшают чтение кода.
Про Scala не скажу, так как не имел опыта работы с ней.

Поздравляю, вы изобрели READ COMMITED SNAPSHOT уровень изоляции.

Я лишь использовал оптимистичные блокировки, которые используются повсеместно, включая внутренности различных баз данных.

Ну и не одними РСУБД едиными, можно в качестве event store использовать любую другую подходящую БД, которая сможет реализовать вышеизложенный подход.

У нас, мне кажется, возникло недопонимание. Давайте я более подробно разъясню пример с ES:

-- БД:
SELECT * FROM Events WHERE AggregateType = 'Stock' AND AggregateId = stock_id;

-- приложение:
-- Берем последний EventId. Если он равен, например, 1, то следующий EventId будет 2
-- Собираем из выборки агрегат, проигрывая все события по порядку
-- Проверяем, что quantity в агрегате больше 0
-- Если больше 0, то переходим к вставке события QuantityReduced c EventId = 2
-- Если 0, то отдаем неуспешный ответ

-- БД:
INSERT INTO Events VALUES (next_sequence_id, stock_id, 'Stock', 2, 'QuantityReduced', data);
-- Если вставка прошла успешно, то отдаем успешный ответ клиенту
-- Если вставка фэйлится на уникальности ключа AggregateType + AggregateId + EventId, то сработала оптимистичная блокировка.
-- То есть кто-то уже вставил для этого агрегата событие с EventId = 2, а значит состояние агрегата изменилось.
-- В таком случае повторяем сначала

Там, где я в своем предыдущем сообщении говорил, что лучше повесить уникальный индекс на AggregateType и EventId, то был не прав, так как нужно к индексу добавить еще AggregateId.

Когда "потом"?

Тут я имел ввиду, что собираем после SELECT * FROM Events

Еще кое-что поясню: между select и insert для случая с ES и между select и update для случая без ES происходит какая-то бизнес-логика на стороне приложения, то есть не на уровне БД.

Поэтому я и говорю, что натягиваю сову на глобус. Смотря какой аудит, смотря где и какие требования, тут все верно.

Так эти детали самые важные.

Можно просто взять

SELECT MAX(SequenceId) FROM Events;

А еще лучше, чтобы уменьшить количество конфликтов, можно сделать уникальный индекс по колонкам AggregateType и EventId и использовать EventId в качестве "версии".

То есть в вашем между select и insert нужно обновить агрегат

Не нужно. Вставка события - это и есть "обновление" агрегата. Агрегат уже потом собирается путем проигрывания всех событий из таблицы Events.

Кстати говоря, в моих примерах с оптимистичными блокировками каждый SQL запрос может выполняться в отдельной транзакции. Главное, чтобы для случая "без ES" UPDATE был атомарным.

А что это даст?

Давайте попробуем пофантазировать. Допустим, мы написали 2 прототипа системы (с ES и с традиционным подходом). Есть Ваня и Петя, которые хотят купить один товар, но на складе он последний. Предположим, что у нас есть система хранения, которая понимает SQL, поддерживает уникальные индексы, но не умеет в транзакции (но операцию UPDATE все же считаем атомарной, чтобы предотвратить потерянные обновления). Эту систему хранения мы будем использовать для обоих прототипов. Оба пользователя добавляют товар в корзину, переходят на чекаут, вбивают свои данные и нажимают на кнопку "оплатить". В этом случае нам нужно временно зарезервировать товар на складе, пока производится оплата. Далее имеем примерно следующие SQL запросы к базе, которые генерируются после нажатия на кнопку оплаты:

Без ES с использованием оптимистической блокировки:

SELECT * FROM inventory WHERE id = stock_id;
UPDATE inventory SET quantity = quantity — 1 WHERE id = stock_id AND version = expected_version;

Без ES с использованием семантического лока:

SELECT * FROM inventory WHERE id = stock_id;
UPDATE inventory SET quantity = quantity — 1 WHERE id = stock_id AND quantity > 0;

В случае, если UPDATE возвращает 0 строк, то повторяем запросы. Если при повторении обнаруживается, что товара на складе больше нет, то показываем пользователю, что купить товар он не успел.

Если все-таки транзакции поддерживаем, то можно и через пессемистичную блокировку с использованием SELECT FOR UPDATE

С ES пока вижу только такой вариант, при условии, что есть уникальный индекс на SequenceId (в качестве примера используем таблицу с событиями из статьи):

SELECT * FROM Events WHERE AggregateType = 'Stock' AND AggregateId = stock_id;
INSERT INTO Events VALUES (expected_sequence_id, stock_id, 'Stock', event_id, 'QuantityReduced', data);

Детали получения актуального SequenceId я опустил. В случае, если валимся на уникальном индексе, то делаем то же самое, что и в примере с оптимистичными локами без использование ES.

Во всех примерах это происходит синхронно и консистентно при нажатии на кнопку оплаты.

Аудит возможен и без ES.

Вы правы. И по сути это будет тот же ES, но только вы на каждое событие создаете снимок состояния агрегата и используете его при чтении. Возможно, я натянул сову на глобус сейчас, но тем не менее :)

Вот это я не понял, можно подробнее?

Это если у вас есть, например, брокер сообщений, и вам нужно бросать в него сообщение только в том случае, если транзакция в базе данных успешно закомитилась или не бросать его вовсе. То есть нужны ACID гарантии между базой данных и брокером сообщений. Это достигается либо с использованием распределенных транзакций, что является злом, либо с использованием шаблона Transactional Outbox, где вы не бросаете событие в брокер напрямую, а вставляете его в таблицу в рамках одной транзакции. События из этой таблицы асинхронно будут выгребаться оттуда через какой-нибудь CDC или тупым поллингом с помощью селекта. Само собой, здесь также очень важна упорядоченность событий, если они не являются коммутативными. Например, будет странно, если сначала прилетит событие OrderCancelled, а потом OrderCreated для одного заказа. В случае с ES этот паттерн уже реализован из коробки, так как у вас таблица Events сама по себе является Outbox таблицей. При этом здесь уже нет необходимости делать в рамках одной транзакции обновление строк в разных таблицах, нужна только вставка в одну таблицу, что снимает ряд требований на хранилище событий.

но на практике просто база данных работает быстрее, чем оптимизированный CQRS, в режиме строгой согласованности

К сожалению, у меня нет большого жизненного опыта, чтобы судить, как оно обычно "на практике". Но практика все же подсказывает, что на любой кейс брать один и тот же подход всегда и везде чревато неуспехом.

Я все еще жду пример, где с ES будет работать лучше чем без него.

Все же там, где необходим подробный аудит и где применяется распределенная архитектура, где приложения активно используют асинхронное взаимодействие путем обмена сообщениями, думаю. Возможно, как раз в каких-нибудь очень серьезных финансовых приложениях, где нужно записывать каждое действие над изменением состояния чего угодно. Все-таки пока что мне не приходилось работать с системами, где применяется ES, поэтому могу лишь фантазировать.

Блокировка запси во время чтения создает строгую консистентность.

Можно делать оптимистичную блокировку. Каждое событие - это инкремент версии агрегата. Если есть возможность сохранять событие вместе с версией и повесить уникальный индекс на версию, то получим что-то вроде оптимистичной блокировки. Попытались добавить событие с существующей версией - операция завершится неудачей, и нужно будет повторить.

Во-первых это убивает все преимущества ES, которые обозначены в статье.

Ну почему убивает? Во-первых, наличие аудит лога и возможность "путешествия во времени" точно не убивает. Во-вторых, из коробки механизм атомарной и упорядоченной публикации событий (в системе без ES пришлось бы использовать Transactional Outbox pattern или, не дай боже, распределенные транзакции). В-третьих, CQRS - хороший паттерн, который отделяет мух от котлет, и позволяет более гранулярно оптимизировать различные части приложения. Конечно, ES и CQRS - вещи ортогональные, но в связке интересно выглядит.

Лок требует строгой консистентности, которой нет в ES.

Семантический лок - это немного другое, он больше на оптимистичную блокировку похож, но только на уровне бизнес логики.

Это прекрасный вариант с точки зрения user experience.

Согласен, что это худший из способов, но чисто технически это все же способ :) Если конкретно в вашей системе это большая редкость, то может и прокатить с извинениями и скидочкой за доставленные неудобства.

P.S. Все же согласен, что в 99% можно обойтись без ES. И сразу начинать проектировать приложение с помощью этого подхода не стоит. Серебряной пули не существует, и за всё приходится платить. Однако я уверен, что ES может и находит свое применение.

Конкретно это можно с помощью optimistic locking разруливать кстати и без транзакций. Но для этого нужно, чтобы система хранения поддерживала что-то вроде уникальных индексов. Каждое событие инкрементит версию, которая сохраняется вместе с этим событием. Вот эти версии должны быть уникальными.

Я, конечно, не сварщик, но для решения проблемы двойной оплаты вижу навскидку такие варианты:

в случае с event sourcing Authorize будет командой, а в обработчике этой команды в агрегате мы будем работать с состоянием, собранным из write представления, которое консистентно. В случае, если заказ уже оплачен, то возвращаем робокассе ошибку, заодно можно сохранить событие о том, что была попытка двойной оплаты. В случае успеха, соответственно, меняем состояние заказа на "оплачен". Если представить, что PSP не вызывает наш сервис, чтобы проверить, что деньги можно списать, и списывает их безусловно, то можно пойти немного другим путем. Сделать кнопку оплаты таким образом, чтобы она сперва создала семантической лок в системе (то есть это будет команда) путем перевода заказа в состояние а-ля PAYMENT_IN_PROGRESS, и потом бы происходил редирект на страницу PSP. Но кажется, что конкретно в этой ситуации без event sourcing пришлось сделать то же самое. Ещё один вариант: если задетектили двойную оплату каким-либо способом, то всегда одну из транзакций можно откатить в PSP в автоматическом режиме. Короче, как мне кажется, варианты побороть проблему есть. И способы решения проблем, как всегда, сильно зависят от бизнеса и конкретных кейсов.

Information

Rating
Does not participate
Registered
Activity