Сегодня рассмотрю архитектурный паттерн CQRS и его возможное место в вашей архитектуре. Также осуществим его реализацию на языке golang.
Проблематика
В некоторых проектах Каруны мы стремимся к микросервисной архитектуре. У этой концепции много плюсов, но она создаёт некоторые трудности. Одну из таких трудностей и метод её преодоления я хочу рассмотреть в данной статье.
Для простоты возьмём универсальный пример в виде приложения интернет-магазина. Предположим, он имеет микросервисную архитектуру и следует доменной модели. Одной из главных частей нашего приложения является функционал, связанный с заказами пользователей и с товарами в этих заказах. В нашей архитектуре есть два соответствующих сервиса: order и goods. Сервис order отвечает за создание, обновление, удаление и чтение сущностей заказа (order), а сервис goods реализует тот же CRUD с товарами заказа. Наши клиенты (мобильные приложения, браузерное приложение, и т.д.) взаимодействуют с этими сервисами, и для удобства у нас реализован паттерн объединение API. Т.е разработан сервис, выполняющий роль API-композитора, работающий с данными наших сервис-провайдеров order и goods. Общую архитектуру можно представить следующим образом:
Что касается API-композитора, то его роль может выполнять веб-приложение, API-шлюз или отдельный сервис. Но выбор варианта в нашем случае выходит за рамки темы данной статьи.
Вроде бы всё неплохо: есть одна точка входа в приложение, логика разнесена по доменным областям. Но что делать в случае, когда API-композитору нужно выполнить нетривиальные выборки и объединять большие наборы данных?
Бизнес просит, чтобы были реализованы сложные фильтры и пагинация. Например, нужно выбрать заказы, в которых количество товаров больше заданного N. В этом случае нам нужно делать полную выборку из сервиса goods, и на композитор будет ложиться задача объединения и фильтровки большого количества данных. Это неэффективно и задействует большое количество памяти. Как раз в этом случае на помощь приходит шаблон CQRS, который и решает проблему.
Проектирование
Суть шаблона CQRS (command query responsibility segregation — разделение ответственности командных запросов) заключается в разделении модулей и данных на две отдельные части: команды и запросы. Командные модули реализуют операции: create, update, delete. Модуль запросов реализует получение данных (get). Помимо улучшенного разделения ответственностей, преимущество данного шаблона заключается в том, что сервисы могут хранить данные в таком виде, в каком это удобно для более эффективных запросов. В нашем случае сервисы order, goods будут выполнять только команды, а новый сервис order-history возьмёт на себя ответственность в реализации запросов. Посмотрим, как меняется архитектура с внедрением шаблона CQRS:
Теперь сервисы order и goods отвечают только за изменение соответствующих им сущностей: заказов и товаров. Все эти изменения, помимо записи в базы данных сервисов, публикуются в виде событий. Новый сервис order-history отвечает только за запросы на чтение данных. Он подписывается на события из order, goods и заполняет/изменяет свою базу данных.
Реализация
Попробуем реализовать с помощью следующего стэка: golang как язык сервисов, postgresql для баз данных и kafka для обмена сообщениями между сервисами.
Создадим локальное рабочее окружение, а именно — сервисы order, goods, order-history, БД для каждого сервиса и брокер kafka:
docker-compose.yml
version: '3.9'
services:
order:
build:
dockerfile: .docker/app.Dockerfile
context: ./
args:
SERVICE_NAME: order
environment:
- HTTP_BIND=8080
- POSTGRES_DB=orders
- POSTGRES_USER=orders_user
- POSTGRES_PASSWORD=orders_password
- HOST_DB=db-order
- PORT_DB=5432
- KAFKA_ADDR=kafka:9092
- ORDER_CREATED_TOPIC=order_created_v1
depends_on:
- db-order
- kafka
volumes:
- ./order:/app/order:delegated
- ./.docker/entrypoint.sh:/entrypoint.sh:ro
entrypoint: /entrypoint.sh
ports:
- "8080:8080"
networks:
- cqrs
db-order:
image: postgres:14
environment:
- POSTGRES_DB=orders
- POSTGRES_USER=orders_user
- POSTGRES_PASSWORD=orders_password
ports:
- "5441:5432"
volumes:
- data:/var/lib/postgresql
networks:
- cqrs
goods:
build:
dockerfile: .docker/app.Dockerfile
context: ./
args:
SERVICE_NAME: goods
environment:
- HTTP_BIND=8081
- POSTGRES_DB=goods
- POSTGRES_USER=goods_user
- POSTGRES_PASSWORD=goods_password
- HOST_DB=db-goods
- PORT_DB=5432
- KAFKA_ADDR=kafka:9092
- GOODS_CREATED_TOPIC=goods_created_v1
volumes:
- ./goods:/app/goods:delegated
- ./.docker/entrypoint.sh:/entrypoint.sh:ro
entrypoint: /entrypoint.sh
depends_on:
- db-goods
- kafka
ports:
- "8081:8081"
networks:
- cqrs
db-goods:
image: postgres:14
environment:
- POSTGRES_DB=goods
- POSTGRES_USER=goods_user
- POSTGRES_PASSWORD=goods_password
ports:
- "5442:5432"
volumes:
- data:/var/lib/postgresql
networks:
- cqrs
order-history:
build:
dockerfile: .docker/app.Dockerfile
context: ./
args:
SERVICE_NAME: order-history
environment:
- HTTP_BIND=8082
- POSTGRES_DB=orders_history
- POSTGRES_USER=orders_history_user
- POSTGRES_PASSWORD=orders_history_password
- HOST_DB=db-order-history
- PORT_DB=5432
- KAFKA_ADDR=kafka:9092
- GOODS_CREATED_TOPIC=goods_created_v1
- ORDER_CREATED_TOPIC=order_created_v1
depends_on:
- db-order-history
- kafka
volumes:
- ./order-history:/app/order-history:delegated
- ./.docker/entrypoint.sh:/entrypoint.sh:ro
entrypoint: /entrypoint.sh
ports:
- "8082:8082"
networks:
- cqrs
db-order-history:
image: postgres:14
environment:
- POSTGRES_DB=orders_history
- POSTGRES_USER=orders_history_user
- POSTGRES_PASSWORD=orders_history_password
ports:
- "5443:5432"
volumes:
- data:/var/lib/postgresql
networks:
- cqrs
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- '2181:2181'
networks:
- cqrs
kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- '9092:9092'
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: order_created_v1:1:1,goods_created_v1:1:1
networks:
- cqrs
volumes:
data:
networks:
cqrs:
Сервис order будет сохранять заказы в таблицу:
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
И реализовывать эндпоинт для создания заказа:
func NewServer(db *pgxpool.Pool, kafkaProducer sarama.SyncProducer) Server {
s := Server{}
s.kafkaProducer = kafkaProducer
s.db = db
s.router = mux.NewRouter()
s.router.HandleFunc("/v1/order", s.CreateOrderV1).Methods(http.MethodPost)
return s
}
Который обрабатывает создание заказа, сохраняет его в БД и шлет событие в топик order_created_v1:
обработчик CreateOrderV1
func (s Server) CreateOrderV1(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
log.Error().Err(err).Msg("Data hasn't been parsed.")
w.WriteHeader(http.StatusBadRequest)
return
}
userID := r.Form.Get("userId")
order := model.Order{}
err = s.db.QueryRow(context.Background(), `INSERT INTO orders (user_id, created_at) VALUES ($1, NOW()) RETURNING id, user_id, created_at`, userID).Scan(&order.ID, &order.UserID, &order.CreatedAt)
if err != nil {
log.Error().Err(err).Msg("Order hasn't been created.")
w.WriteHeader(http.StatusInternalServerError)
return
}
msg := model.CreatedOrderMsg{Data: order}
msgStr, err := json.Marshal(msg)
if err != nil {
log.Error().Err(err).Msg("Message hasn't been marshaled.")
w.WriteHeader(http.StatusInternalServerError)
return
}
producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("ORDER_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
_, _, err = s.kafkaProducer.SendMessage(producerMsg)
if err != nil {
log.Error().Err(err).Msg("Message hasn't been sent.")
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
}
Сервис goods реализуется аналогично. Таблица для хранения товаров заказов:
CREATE TABLE goods (
id BIGSERIAL PRIMARY KEY,
order_id BIGINT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
Эндпоинт для создания товара в заказе:
s.router.HandleFunc("/v1/goods", s.CreateGoodsV1).Methods(http.MethodPost)
И обработчик для создания товара в заказе и отсылки события в топик goods_created_v1:
обработчик CreateGoodsV1
func (s Server) CreateGoodsV1(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
log.Error().Err(err).Msg("Data hasn't been parsed.")
w.WriteHeader(http.StatusBadRequest)
return
}
orderID := r.Form.Get("orderId")
goods := model.Goods{}
err = s.db.QueryRow(context.Background(), `INSERT INTO goods (order_id, created_at) VALUES ($1, NOW()) RETURNING id, order_id, created_at`, orderID).Scan(&goods.ID, &goods.OrderID, &goods.CreatedAt)
if err != nil {
log.Error().Err(err).Msg("Goods hasn't been created.")
w.WriteHeader(http.StatusInternalServerError)
return
}
msg := model.CreatedGoodsMsg{Data: goods}
msgStr, err := json.Marshal(msg)
if err != nil {
log.Error().Err(err).Msg("Message hasn't been marshaled.")
w.WriteHeader(http.StatusInternalServerError)
return
}
producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("GOODS_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
_, _, err = s.kafkaProducer.SendMessage(producerMsg)
if err != nil {
log.Error().Err(err).Msg("Message hasn't been sent.")
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
}
БД сервиса order-history имеет следующую структуру:
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
CREATE TABLE goods (
id BIGSERIAL PRIMARY KEY,
order_id BIGINT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
FOREIGN KEY (order_id) REFERENCES orders (id)
);
Сервис слушает события order_created_v1 и goods_created_v1 и записывает данные в свою БД:
обработка события order_created_v1
...
type OrderCreatedEvent struct {
Data struct {
ID int64 `json:"id"`
UserID int64 `json:"user_id"`
CreatedAt time.Time `json:"created_at"`
} `json:"data"`
}
func (oh OrderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
oce := OrderCreatedEvent{}
err := json.Unmarshal(msg.Value, &oce)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been handled.")
session.MarkMessage(msg, "")
continue
}
_, err = oh.db.Exec(context.Background(), `INSERT INTO orders (id, user_id, created_at) VALUES ($1, $2, $3)`, oce.Data.ID, oce.Data.UserID, oce.Data.CreatedAt)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been inserted.")
}
session.MarkMessage(msg, "")
}
return nil
}
...
обработка события goods_created_v1
...
type GoodsCreatedEvent struct {
Data struct {
ID int64 `json:"id"`
OrderID int64 `json:"order_id"`
CreatedAt time.Time `json:"created_at"`
} `json:"data"`
}
func (gh GoodsHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
gce := GoodsCreatedEvent{}
err := json.Unmarshal(msg.Value, &gce)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been handled.")
session.MarkMessage(msg, "")
continue
}
_, err = gh.db.Exec(context.Background(), `INSERT INTO goods (id, order_id, created_at) VALUES ($1, $2, $3)`, gce.Data.ID, gce.Data.OrderID, gce.Data.CreatedAt)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been inserted.")
}
session.MarkMessage(msg, "")
}
return nil
}
...
Сервис будет реализовывать эндпоинт получения данных:
s.router.HandleFunc("/v1/order-history", s.GetOrderHistoryV1).Methods(http.MethodGet)
И сама реализация метода:
обработчик GetOrderHistoryV1
func (s Server) GetOrderHistoryV1(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
log.Error().Err(err).Msg("Data hasn't been parsed.")
w.WriteHeader(http.StatusBadRequest)
return
}
threshold := r.Form.Get("threshold")
offset := r.Form.Get("offset")
limit := r.Form.Get("limit")
rows, err := s.db.Query(context.Background(), `SELECT orders.id, orders.user_id, orders.created_at FROM orders
INNER JOIN goods ON goods.order_id = orders.id
GROUP BY orders.id
HAVING COUNT(goods.id) > $1
LIMIT $2 OFFSET $3`, threshold, limit, offset)
if err != nil {
log.Error().Err(err).Msg("Goods haven't been got.")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer rows.Close()
data := make([]model.Order, 0)
for rows.Next() {
o := model.Order{}
sErr := rows.Scan(&o.ID, &o.UserID, &o.CreatedAt)
if sErr != nil {
log.Error().Err(err).Msg("Reading error.")
w.WriteHeader(http.StatusInternalServerError)
return
}
data = append(data, o)
}
ordersRsp := model.OrdersResponse{Data: data}
response, err := json.Marshal(ordersRsp)
if err != nil {
log.Error().Err(err).Msg("Response hasn't been marshaled.")
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write(response)
}
Теперь мы без лишних накладных расходов можем выполнить запрос на получение заказов, в которых количество товаров больше 1.
curl 'http://localhost:8082/v1/order-history?limit=2&threshold=1&offset=0'
Таким образом, мы получаем данные без сложной обработки, вся логика получения данных и обработки реализована в сервисе order-history.
Для простоты мы реализовали только создание сущностей и чтение их в сервисе order-history. Реализация обновления данных может усложнить логику сервисов, т.к нужно поддерживать конкурентное обновление данных и как-то обрабатывать повторяющиеся события.
Полный листинг реализации данного функционала вы можете найти на моем github.
Заключение
Реализация шаблона CQRS позволяет эффективно разделить логику приложения: эффективно реализуются запросы и улучшить общее разделение ответственности
В модуле запросов можно использовать другие СУБД помимо PostgreSQL — в том числе и аналитические. Например, clickhouse или vertica. Также можно использовать NoSQL хранилища типа MongoDB или DynamoDB.
Несмотря на преимущества, CQRS влечёт за собой усложнение архитектуры (администрирование и обслуживание БД). Может появиться рассинхронизация между представлениями для БД команд и запросов. За этим тоже необходимо следить.
Модуль представлений сложен в обслуживании: проблемы конкурентного обновления данных и повторяющихся событий.
CQRS хорошо совместим с event sourcing.