Второй - это создавать транзакцию вызовом Begin у интерфейса репозитория. В слое приложения придется обрабатывать commit и rollback.
Третий - type AtomicOperation func(context.Context, Repository) error из статьи. Я экспериментировал с таким подходом. Если много репозиториев нужно, то будет расти количество аргументов type AtomicOperation func(context.Context, Repository1, Repository2, ..., RepositoryN)
Все остальное, что я видел - это уже, на мой взгляд, вариации этих трех вариантов.
В слое инфраструктуры есть функция, которая в качестве аргумента принимает анонимную функцию. Эта ананонимная функция создется в слое приложения и описывает алгоритм взаимодействия с БД. Перед вызовом анонимной функции открывается транзакция. После выполнения анонимой функции транзакция коммитится. Если анонимная функция вернула ошибку, то транзакция откатывается. Каждая функция, которая делает sql-запрос должна проверять context.Context на наличие транзакции. Если транзакция есть, то в качетве объекта то sql-запрос посылается в рамках транзакции. Если транзакции в контексте нет, то sql-запрос посылается через одно из соединений в пуле соединений с БД.
Если мне нужно информация типа состояния из editing контекста, то я передаю эту информацию на вход команды using контекста, при этом работая по контрактам using контекста
Подскажите, я правильно понял, что в соответствии с этим подходом клиент приложения сначала получает editing todo list с помощью query, а затем передает его в command контексту using? Или, например, контекст editing публикует доменное событие `TodoListPublished`, а контекст using создает у себя модель PublishedTodoList, к которому потом можно направить команду TakeTodoList в контекст using?
Для меня репозиторий - это тоже доменная модель. Модель коллекции агрегатов. Репозитории используются многими command handles и domain services, поэтому я расцениваю репозиторий в своем роде как общий код. Пока удается сохранять небольшой список функций репозитория (add, update, delete, get) храню его рядом с агрегатами.
Однако часто в репозиториях появляются функции вроде `getByName`, `getBySomething`, `updateName` и т.д. Тогда это уже не модель предметной области, а набор функциональных интерфейсов, например `todoListGetterByName`.
В работе я активно использую интерфейсы, объявленные в месте использования (в слое приложения в основном), и активно комбинирую этот подход с репозиториями. Просто в статье не нашел место это показать.
CQS использую как повод отделить функции, изменяющие состояние БД от функций, считывющих состояние БД. Command здесь - это просто название dto. Здесь нет паттернов, только организация структур и функций в слое приложения.
Доброго времени суток! Стрелки для меня - это `import`. В своих проектах делаю импорты направленнми снаружи во внутрь. В статье я делюсь тем, что делаю, поэтому и нарисовл так. Не совсем понял что вы имеете в виду под оберткой. Приведите, пожалуйста, пример.
Я тоже наблюдал подобные сбои и на других конфигурациях и на этом docker-образе kafka в целом. На постоянной основе не воспроизводится, поэтому разобраться тоже пока не удалось.
На вопрос, когда такой подход целесообразно применить, гораздо лучше меня ответит список сценариев использования библиотеки Parallel Consumer: https://github.com/confluentinc/parallel-consumer/blob/master/README.adoc (пункт 3.4. Scenarios). В основном, насколько я понимаю, авторы Parallel Consumer предлагают использовать этот подход, когда увеличивать количество партиций не представляется возможными, либо когда увеличение партиций значительно не ускоренит обработку сообщений.
Потеря сообщений после падения приложения
Потери сообщений можно избежать, если фиксировать пакет прочитанных сообщений явно вызовом метода Commit в коде обработчика сообщений только после того, как сообщения обработаны. Если приложение упадет в процессе обработки пачки сообщений, то после перезапуска приложения все незафиксированные сообщения считаются consumer'ом повторно.
Тут еще надо сказать, что segmentio/kafka неявно для клиента библиотеки считывает из kafka сообщения в буферный канал msgs, и именно из msgs извлекается по одному сообщения вызовом метода Fetch. Думаю, что большинство реализаций consumer поступают аналогично - используют буфер считанных сообщений в оперативной памяти приложения.
Менее активное использование одного канала по сравнению с остальными (если я правильно вас понял).
Думаю, что это проблема не подхода с каналами, а больше проблема балансировки нагрузки и распределения сообщения по очередям. Ведь может оказаться, что и одна из партиций kafka будет пуста, потому что алгоритм распределения сообщений не гарантирует равномерного распределения.
Я согласен, что в любом подходе есть плюсы и минусы. Думаю, что нужно находить компромиссные решения с учетом исходных данных и ограничений решаемой задачи.
Хотелось показать, как можно действовать в условиях, когда есть возможность использовать только ограниченное количество партиций. Пример, конечно, сильно упрощенный. Параллельный consumer имеет смысл использовать, когда обработка сообщения заключается в обращении к нескольким внешним системам и применении проверок предметной области, а не только в записи в базу.
Такие эксперименты не продовил. Попробую порассуждать.
Насколько мне известно, producer всегда записывает сообщения в Leader-партицию.
Пусть producer посылает сообщение в партицию 0. Запись в партицию 0 в данный момент недоступна. Пусть запись недоступна по следующей причине. У consumer установлен параметр acks=all, сообщение записалось в Leader-партицию, и не записалось в необходимое количество Follower-партиций (запись подтвердили меньше Follower-партиций, чем установлено в параметре брокера min.insync.replicas). Тогда сообщение, которое producer пытался записать в партицию 0 не будет записана ни в какую другую партицию - producer получит ошибку.
Получается, что при записи сообщение не попадет в неправильную партицию
Теперь посмотрим на чтениие.
По умолчанию consumer читает сообщения из Leader-партиции. Пусть чтение из Leader-партиции невозможно. Такое могло произойти из-за сетевых задержек или перегрузки брокера. Тогда из Follower-партиций назначается новая Leader-партиция, и из нее продолжит читать сообщения consumer. Если новая Leader-партиция будучи Folower-партицией не успела получить все сообщения от прежней Leader-партиции, то consumer не получит часть сообщений. Насколько я знаю, эти сообщения kafka не восстановит.
Первый вариант - сделать то же самое, что и в статье, только транзакцию передавать явно.
Чтобы не было типа pgx в слое приложения, нужно:
- переименовать type Tx interface в TxExecutor
- рядом с интерфейсом объявить type Tx any
- использовать Tx any во всех пакетах, кроме postgres. В пакете postgres понадобится делать явное приведение типа any к pgx.Tx
Я в работе использую этот первый вариант. С ним нормально. Есть еще у меня проект, где транзакция в контексте. Тоже нормально.
Второй и третий варианты - https://www.angus-morrison.com/blog/atomic-go-repositories
Второй - это создавать транзакцию вызовом Begin у интерфейса репозитория. В слое приложения придется обрабатывать commit и rollback.
Третий - type AtomicOperation func(context.Context, Repository) error из статьи. Я экспериментировал с таким подходом. Если много репозиториев нужно, то будет расти количество аргументов type AtomicOperation func(context.Context, Repository1, Repository2, ..., RepositoryN)
Все остальное, что я видел - это уже, на мой взгляд, вариации этих трех вариантов.
Вот еще пример https://github.com/ThreeDotsLabs/wild-workouts-go-ddd-example/blob/56ef6d5daa2e1f2de82f85278eef1fe5825eb401/internal/trainer/app/command/make_hours_available.go#L39
Попробуйте поискать статьи по фразе 'golang clean transaction', будет много вариантов с примерами.
У всех подходов есть плюсы и минусы. В go я не нашел такого подхода, который можно было бы считать эталоном.
Я бы сказал, что у такого подхода есть свои минусы и свои плюсы.
На мой взгляд основной минус - это неочевидность того, что транзакция находится в контексте.
К счастью реализаций абстрактных транзкакций много. Есть из чего выбрать.
Да, транзакция передается в контексте. Вот production ready решение https://github.com/avito-tech/go-transaction-manager.
В слое инфраструктуры есть функция, которая в качестве аргумента принимает анонимную функцию. Эта ананонимная функция создется в слое приложения и описывает алгоритм взаимодействия с БД. Перед вызовом анонимной функции открывается транзакция. После выполнения анонимой функции транзакция коммитится. Если анонимная функция вернула ошибку, то транзакция откатывается. Каждая функция, которая делает sql-запрос должна проверять context.Context на наличие транзакции. Если транзакция есть, то в качетве объекта то sql-запрос посылается в рамках транзакции. Если транзакции в контексте нет, то sql-запрос посылается через одно из соединений в пуле соединений с БД.
Большое спасибо за пояснение!
Интересное решение. Про оркестратор в таких с ценариях даже не задумывался.
CORS - просто пример. Его нет в demo приложении.
Да, согласен, написано неудачно. Я хотел сказать, что в пакете domain и его вложенных пакетах нельзя писать import из infra.
Доброго времени суток!
Спасибо за комментарий!
Подскажите, я правильно понял, что в соответствии с этим подходом клиент приложения сначала получает editing todo list с помощью query, а затем передает его в command контексту using?
Или, например, контекст editing публикует доменное событие `TodoListPublished`, а контекст using создает у себя модель PublishedTodoList, к которому потом можно направить команду TakeTodoList в контекст using?
Доброго времени суток!
Для меня репозиторий - это тоже доменная модель. Модель коллекции агрегатов. Репозитории используются многими command handles и domain services, поэтому я расцениваю репозиторий в своем роде как общий код. Пока удается сохранять небольшой список функций репозитория (add, update, delete, get) храню его рядом с агрегатами.
Однако часто в репозиториях появляются функции вроде `getByName`, `getBySomething`, `updateName` и т.д. Тогда это уже не модель предметной области, а набор функциональных интерфейсов, например `todoListGetterByName`.
В работе я активно использую интерфейсы, объявленные в месте использования (в слое приложения в основном), и активно комбинирую этот подход с репозиториями. Просто в статье не нашел место это показать.
CQS использую как повод отделить функции, изменяющие состояние БД от функций, считывющих состояние БД. Command здесь - это просто название dto. Здесь нет паттернов, только организация структур и функций в слое приложения.
Доброго времени суток!
Стрелки для меня - это `import`. В своих проектах делаю импорты направленнми снаружи во внутрь. В статье я делюсь тем, что делаю, поэтому и нарисовл так.
Не совсем понял что вы имеете в виду под оберткой. Приведите, пожалуйста, пример.
Я тоже наблюдал подобные сбои и на других конфигурациях и на этом docker-образе kafka в целом. На постоянной основе не воспроизводится, поэтому разобраться тоже пока не удалось.
Балансер находиться в продюсере.
Целесообразность подхода
На вопрос, когда такой подход целесообразно применить, гораздо лучше меня ответит список сценариев использования библиотеки Parallel Consumer: https://github.com/confluentinc/parallel-consumer/blob/master/README.adoc (пункт 3.4. Scenarios).
В основном, насколько я понимаю, авторы Parallel Consumer предлагают использовать этот подход, когда увеличивать количество партиций не представляется возможными, либо когда увеличение партиций значительно не ускоренит обработку сообщений.
Потеря сообщений после падения приложения
Потери сообщений можно избежать, если фиксировать пакет прочитанных сообщений явно вызовом метода Commit в коде обработчика сообщений только после того, как сообщения обработаны. Если приложение упадет в процессе обработки пачки сообщений, то после перезапуска приложения все незафиксированные сообщения считаются consumer'ом повторно.
Тут еще надо сказать, что segmentio/kafka неявно для клиента библиотеки считывает из kafka сообщения в буферный канал msgs, и именно из msgs извлекается по одному сообщения вызовом метода Fetch. Думаю, что большинство реализаций consumer поступают аналогично - используют буфер считанных сообщений в оперативной памяти приложения.
Менее активное использование одного канала по сравнению с остальными (если я правильно вас понял).
Думаю, что это проблема не подхода с каналами, а больше проблема балансировки нагрузки и распределения сообщения по очередям. Ведь может оказаться, что и одна из партиций kafka будет пуста, потому что алгоритм распределения сообщений не гарантирует равномерного распределения.
Я согласен, что в любом подходе есть плюсы и минусы. Думаю, что нужно находить компромиссные решения с учетом исходных данных и ограничений решаемой задачи.
Надо было больше внимания в статье уделить ограничениям, тогда, возможно, решения выглядели бы более оправданными. Учту это в будущем.
Спасибо вам за комментарий.
Мне хотелось рассмотреть решения связанные именно с kafka. Ведь kafka довольно часто применяют для решения подобных задач.
Согласен с вами, есть более оптимальные решения.
Хотелось показать, как можно действовать в условиях, когда есть возможность использовать только ограниченное количество партиций. Пример, конечно, сильно упрощенный. Параллельный consumer имеет смысл использовать, когда обработка сообщения заключается в обращении к нескольким внешним системам и применении проверок предметной области, а не только в записи в базу.
Такие эксперименты не продовил. Попробую порассуждать.
Насколько мне известно, producer всегда записывает сообщения в Leader-партицию.
Пусть producer посылает сообщение в партицию 0. Запись в партицию 0 в данный момент недоступна. Пусть запись недоступна по следующей причине. У consumer установлен параметр acks=all, сообщение записалось в Leader-партицию, и не записалось в необходимое количество Follower-партиций (запись подтвердили меньше Follower-партиций, чем установлено в параметре брокера min.insync.replicas). Тогда сообщение, которое producer пытался записать в партицию 0 не будет записана ни в какую другую партицию - producer получит ошибку.
Получается, что при записи сообщение не попадет в неправильную партицию
Теперь посмотрим на чтениие.
По умолчанию consumer читает сообщения из Leader-партиции. Пусть чтение из Leader-партиции невозможно. Такое могло произойти из-за сетевых задержек или перегрузки брокера. Тогда из Follower-партиций назначается новая Leader-партиция, и из нее продолжит читать сообщения consumer. Если новая Leader-партиция будучи Folower-партицией не успела получить все сообщения от прежней Leader-партиции, то consumer не получит часть сообщений. Насколько я знаю, эти сообщения kafka не восстановит.
Тогда отвечу на ваш вопрос так: не повлияют.
Но, повторюсь, собственноручно я это не проверял.