Как стать автором
Обновить
0

SAGA на golang

Время на прочтение9 мин
Количество просмотров12K

После того, как я написал статью про паттерн CQRS, мне захотелось описать ещё один интересный шаблон для микросервисной архитектуры, а именно saga (он же повествование).

Проблематика

Проекты Каруны, как и многие современные приложения, следуют парадигме микросервисной архитектуры. Поэтому паттерн в настоящее время актуален для нас как никогда. Для начала рассмотрим какой-нибудь простой кейс на примере интернет-магазина. Предположим, что некоторая часть нашей архитектуры разделена на микросервисы: order, который отвечает за создание самого заказа, и сервис goods, который отвечает за создание товаров в этом заказе. Создание заказа можно представить следующей схемой:

Клиент обращается непосредственно к сервису order, в котором создаётся сущность заказа. Сервис order создает в своей БД сущность заказа и посылает запрос на создание товаров в этом заказе в сервис goods. Если в сервисе goods произойдет ошибка, то нарушается согласованность данных в нашей системе — заказ уже записан в базу данных сервиса order, но товары не будут записаны в БД сервиса goods.

Методы решения

Многие слышали о так называемых распределённых транзакциях, которые, как предполагается, должны решать описанную выше проблему. Для управления такими транзакциями используется стандарт X/Open XA, следование которому гарантирует как сохранение всей транзакции, так и её откат. Одной из главных проблем распределённых транзакций является то, что их не поддерживают многие современные инструменты — такие как MongoDB, Kafka и т. д. 

Другая проблема связана с жёстким требованием: 100-процентная доступность всех сервисов во время исполнения такой транзакции. А так как каждый новый сервис понижает доступность системы в целом (доступность системы = произведению доступности каждого сервиса), использование распределённых транзакций будет хуже и хуже сказываться на приложении в целом при масштабировании. По этим причинам использование данного метода в современных приложениях не желательно.

Для решения нашей задачи отлично подходит паттерн saga. Он обеспечивает согласованность данных между сервисами, используя локальные транзакции и асинхронные сообщения. Логика исполнения паттерна размазана на несколько сервисов: после фиксации транзакции в одном сервисе он публикует асинхронное сообщение о завершении транзакции. Это сообщение инициализирует следующий этап выполнения паттерна, и так до конца "повествования".

Для отката изменений на N-ом этапе повествование использует компенсирующие транзакции, которые откатывают изменения и устраняют последствия выполнения предыдущих N-1 транзакций. Выполнение таких транзакций тоже происходит последовательно, но в обратном порядке. Паттерн также оперирует такими понятиями как поворотная транзакция (дальнейшие транзакции никогда не отказывают) и доступная для повторения транзакция (всегда заканчивается успешно).

Управление (координация) повествований может осуществляться 2 способами:

  • Хореография — решения о следующих шагах и откатах принимаются распределённо каждым сервисом по отдельности.

  • Оркестрация — логика координации централизуется в одном месте, из которого шлются сообщения в другие сервисы.

Проектирование

В нашем случае реализация паттерна будет выглядеть так:

Сервис

Транзакция

Компенсирующая транзакция

order

CreateOrder()

Доступная для компенсации

RemoveOrder()

goods

CreateGoods()

Поворотная

-

order

AcceptOrder()

Доступная для повторения

-

Реализуем saga паттерн, используя хореографию. Она проще в реализации, чем оркестрация, и хорошо ложится на наш случай небольшого количества сервисов. Применительно к описанной задаче, повествование можно представить следующей схемой:

Рассмотрим две возможных ситуации для нашего функционала . В первом случае клиент посылает запрос в сервис order POST /orders, и всё проходит отлично:

  1. Успешное создание сущности заказа в сервисе order инициализирует событие order_created_v1, на которое подписан сервис goods.

  2. После чтения этого события и успешного создания сущностей товаров в сервисе goods посылается событие goods_created_v1 в сервис order.

  3. При получении этого события сервис order меняет статус заказа в своей БД на approved.

Другой случай, это когда во время заказа что-то пошло не так:

  1. Снова создаётся заказ в сервисе order, и генерируется событие order_created_v1.

  2. Сервис goods, получив событие, пытается создать товары для этого заказа. Но что-то идёт не так, и товары не заносятся в БД. Посылается событие goods_rejected_v1 в сервис order.

  3. При получении этого события сервис order откатывает свои изменения из шага 1 — удаляет заказ.

Реализация

Реализовывать проект будем с помощью golang как языка сервисов, postgresql для баз данных и kafka для обмена сообщениями между сервисами. Создадим локальное рабочее окружение, а именно — сервисы order, goods, БД для каждого сервиса и брокер kafka:

docker-compose.yml
version: '3.9'

services:
  order:
    build:
      dockerfile: .docker/app.Dockerfile
      context: ./
      args:
        SERVICE_NAME: order
    environment:
      - HTTP_BIND=8080
      - POSTGRES_DB=orders
      - POSTGRES_USER=orders_user
      - POSTGRES_PASSWORD=orders_password
      - HOST_DB=db-order
      - PORT_DB=5432
      - KAFKA_ADDR=kafka:9092
      - ORDER_CREATED_TOPIC=order_created_v1
      - ORDER_CREATED_TOPIC=goods_created_v1
      - GOODS_REJECTED_TOPIC=goods_rejected_v1
    depends_on:
      - db-order
      - kafka
    volumes:
      - ./order:/app/order:delegated
      - ./.docker/entrypoint.sh:/entrypoint.sh:ro
    entrypoint: /entrypoint.sh
    ports:
      - "8080:8080"
    networks:
      - saga

  db-order:
    image: postgres:14
    environment:
      - POSTGRES_DB=orders
      - POSTGRES_USER=orders_user
      - POSTGRES_PASSWORD=orders_password
    ports:
      - "5441:5432"
    volumes:
      - data:/var/lib/postgresql
    networks:
      - saga

  goods:
    build:
      dockerfile: .docker/app.Dockerfile
      context: ./
      args:
        SERVICE_NAME: goods
    environment:
      - HTTP_BIND=8081
      - POSTGRES_DB=goods
      - POSTGRES_USER=goods_user
      - POSTGRES_PASSWORD=goods_password
      - HOST_DB=db-goods
      - PORT_DB=5432
      - KAFKA_ADDR=kafka:9092
      - ORDER_CREATED_TOPIC=order_created_v1
      - GOODS_CREATED_TOPIC=goods_created_v1
      - GOODS_REJECTED_TOPIC=goods_rejected_v1
    volumes:
      - ./goods:/app/goods:delegated
      - ./.docker/entrypoint.sh:/entrypoint.sh:ro
    entrypoint: /entrypoint.sh
    depends_on:
      - db-goods
      - kafka
    ports:
      - "8081:8081"
    networks:
      - saga

  db-goods:
    image: postgres:14
    environment:
      - POSTGRES_DB=goods
      - POSTGRES_USER=goods_user
      - POSTGRES_PASSWORD=goods_password
    ports:
      - "5442:5432"
    volumes:
      - data:/var/lib/postgresql
    networks:
      - saga

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - '2181:2181'
    networks:
      - saga

  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: order_created_v1:1:1,goods_created_v1:1:1,goods_rejected_v1:1:1
    networks:
      - saga

volumes:
  data:

networks:
  saga:

Сервис order включает в себя таблицы со статусами заказов и самими заказами:

CREATE TABLE statuses (
    id   BIGSERIAL PRIMARY KEY,
    name TEXT NOT NULL
);

INSERT INTO statuses (id, name) VALUES (1, 'PENDING'), (2, 'CREATED');

CREATE TABLE orders (
    id         BIGSERIAL PRIMARY KEY,
    user_id    BIGINT NOT NULL,
    status_id  BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,

    FOREIGN KEY (status_id) REFERENCES statuses (id)
);

И реализует эндпоинт создания заказа:

...
server.router.HandleFunc("/v1/orders", s.CreateOrderV1).Methods(http.MethodPost)
...

В котором сохраняется сущность заказа в БД и продьюсится сообщение в kafka:

обработчик CreateOrderV1
func (s Server) CreateOrderV1(w http.ResponseWriter, r *http.Request) {
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Error().Err(err).Msg("Data hasn't been read.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	orderData := model.OrderData{}
	err = json.Unmarshal(body, &orderData)
	if err != nil {
		log.Error().Err(err).Msg("Data hasn't been parsed.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	var orderID int64
	err = s.db.QueryRow(context.Background(), `INSERT INTO orders (user_id, status_id, created_at) VALUES ($1, 1, NOW()) RETURNING id`, orderData.UserID).Scan(&orderID)
	if err != nil {
		log.Error().Err(err).Msg("Order hasn't been created.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	msg := model.CreatedOrderMsg{Data: model.Order{
		ID:       orderID,
		GoodsIds: orderData.GoodsIds,
	}}
	msgStr, err := json.Marshal(msg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been marshaled.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("ORDER_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
	_, _, err = s.kafkaProducer.SendMessage(producerMsg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been sent.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}
}

Cервис слушает топики goods_created_v1 и goods_rejected_v1 и в зависимости от полученного изменяет созданную сущность заказа:

обработка события goods_created_v1
func (gch GoodsCreatedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		gce := GoodsCreatedEvent{}
		err := json.Unmarshal(msg.Value, &gce)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been handled.")
			session.MarkMessage(msg, "")
			continue
		}

		_, err = gch.db.Exec(context.Background(), `UPDATE orders SET status_id = 2 WHERE id = $1`, gce.Data.OrderID)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been inserted.")
		}

		session.MarkMessage(msg, "")
	}

	return nil
}
обрабокта события goods_rejected_v1
func (grh GoodsRejectedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		gre := GoodsRejectedEvent{}
		err := json.Unmarshal(msg.Value, &gre)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been handled.")
			session.MarkMessage(msg, "")
			continue
		}

		_, err = grh.db.Exec(context.Background(), `DELETE FROM orders WHERE id = $1`, gre.Data.OrderID)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been inserted.")
		}

		session.MarkMessage(msg, "")
	}

	return nil
}

Сервис goods содержит таблицу с заказами:

CREATE TABLE goods (
    id         BIGSERIAL PRIMARY KEY,
    goods_id   BIGINT NOT NULL,
    order_id   BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

И слушает событие order_created_v1, которое обрабатывает следующим образом:

обработrа события order_created_v1
func (och OrderCreatedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		oce := OrderCreatedEvent{}
		err := json.Unmarshal(msg.Value, &oce)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been handled.")
			session.MarkMessage(msg, "")
			continue
		}

		ctx := context.Background()
		tx, err := och.db.Begin(ctx)
		for _, goodsID := range oce.Data.GoodsIds {
			_, err = och.db.Exec(context.Background(), `INSERT INTO goods (goods_id, order_id, created_at) VALUES ($1, $2, NOW())`, goodsID, oce.Data.ID)
			if err != nil {
				tx.Rollback(ctx)
				log.Error().Err(err).Msg("Inserting error.")

				err := och.sendRejected(oce.Data.ID)
				if err != nil {
					log.Error().Err(err).Msg("Event hasn't been sent.")
				}
				session.MarkMessage(msg, "")
				continue
			}
		}

		err = tx.Commit(ctx)
		if err != nil {
			err := tx.Rollback(ctx)
			log.Error().Err(err).Msg("Transaction commit error.")
			rErr := och.sendRejected(oce.Data.ID)
			if rErr != nil {
				log.Error().Err(rErr).Msg("Event hasn't been sent.")
			}
			session.MarkMessage(msg, "")
			continue
		}

		err = och.sendCreated(oce.Data.ID)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been sent.")
		}
		session.MarkMessage(msg, "")
	}

	return nil
}

func (och OrderCreatedHandler) sendRejected(orderID int64) error {
	msg := model.RejectedGoodsMsg{Data: model.Goods{
		OrderID: orderID,
	}}
	msgStr, err := json.Marshal(msg)
	if err != nil {
		return err
	}
	producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("GOODS_REJECTED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
	_, _, err = och.producer.SendMessage(producerMsg)
	return err
}

func (och OrderCreatedHandler) sendCreated(orderID int64) error {
	msg := model.CreatedGoodsMsg{Data: model.Goods{
		OrderID: orderID,
	}}
	msgStr, err := json.Marshal(msg)
	if err != nil {
		return err
	}
	producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("GOODS_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
	_, _, err = och.producer.SendMessage(producerMsg)
	return err
}

Теперь мы может сделать запрос к сервису и создать заказ:

curl --request POST \
   --header "Content-Type: application/json" \
   --data '{"user_id":1,"goods_ids":[1,2]}' \
   'http://localhost:8080/v1/orders'

В успешном варианте развития событий мы получаем полностью сформированную согласованную сущность заказа с соответствующими товарами. В противном случае неудача в сервисе goods влечёт за собой откат транзакции в order. Таким образом мы добились, чего хотели — согласованности данных в распределённой системе. На моем github представлен полный листинг описанной архитектуры. Для реализации повествования на основе оркестрации есть готовая библиотека здесь, заодно можно подробнее ознакомиться с принципами её работы.

Заключение

  • Паттерн saga хорошо решает проблему согласованности в распределённых системах сервисов.

  • Одна из главных проблем повествований — что они являются ACD. У них нет изолированности, это вызывает аномалии (по аналогии с аномалиями в СУБД). Одни повествования могут влиять на данные, с которыми работают другие повествования. Для компенсации этого недостатка паттерн должен реализовывать контрмеры.

  • Если сравнивать повествование на основе оркестрации и хореографии, то второй метод проще в реализации, сложнее для понимания и подходит лучше для простых кейсов. Среди явных недостатков можно выделить возможность возникновения жёсткого связывания, т.к каждый сервис подписывается на все события, которые на него влияют.

  • Повествование на основе оркестрации не создает циклических зависимостей, и бизнес-логика значительно проще, чем в хореографии. Меньше связывания — у каждого сервиса своё API, которое вызывается оркестратором. Лучше подходит для сложных повествований.

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.
Используете ли вы паттерн saga в своей архитектуре?
30.51% Да18
1.69% Нет, мы используем распределенные транзакции1
35.59% Нет, нам это не нужно21
23.73% Нет, мы реализуем согласованность данных другим способом14
13.56% Другое8
Проголосовали 59 пользователей. Воздержались 15 пользователей.
Теги:
Хабы:
Всего голосов 6: ↑4 и ↓2+5
Комментарии15

Публикации

Информация

Сайт
karuna.group
Дата регистрации
Дата основания
Численность
201–500 человек
Местоположение
Россия

Истории