В данной статье представлен простой способ реализации микросервисной архитектуры с использованием Kafka, Golang и Docker.
Если вы сразу хотите перейти к рабочему коду, вот ссылка на репозиторий
⚠️ Важно: Не для Продакшена!
Этот материал представляет собой исключительно образовательный пример и демонстрирует техническую возможность реализации подхода запрос-ответ через брокера. Следует подчеркнуть, что в реальных средах подход запрос-ответ через брокер сообщений часто считается дурной практикой. В общепринятых паттернах архитектуры брокеры сообщений обычно используются для того, чтобы "когда-нибудь" обработать сообщение, а не для моментального ответа на запрос. Если в вашей системе необходимо получать моментальные ответы на запросы, рассмотрите возможность размещения сервисов за балансировщиком нагрузки.
Общий процесс работы
Клиент отправляет HTTP-запрос на первый микросервис (API Gateway), используя, например, Postman.
API Gateway передает запрос в Kafka, откуда его принимает второй микросервис.
Второй микросервис обрабатывает запрос и отправляет ответ обратно в Kafka.
API Gateway извлекает ответ из Kafka и возвращает его клиенту.
Примечание: Важным аспектом является возможность сопоставления запросов и ответов, чтобы при параллельной обработке множества запросов каждый ответ был возвращен соответствующему клиенту.

Краткий обзор кода
Конфигурация API Gateway:
Файл api-gateway/main.go:
package main import ( "encoding/json" "log" "sync" "time" "github.com/IBM/sarama" "github.com/gin-gonic/gin" "github.com/google/uuid" ) // MyMessage - структура для нашего сообщения type MyMessage struct { ID string `json:"id"` Name string `json:"name"` Value string `json:"value"` } // responseChannels - словарь для хранения каналов ответов, индексированных по ID запроса // mu - мьютекс для обеспечения синхронизации доступа к словарю responseChannels var responseChannels map[string]chan *sarama.ConsumerMessage var mu sync.Mutex func main() { responseChannels = make(map[string]chan *sarama.ConsumerMessage) // Создание продюсера Kafka producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil) if err != nil { log.Fatalf("Failed to create producer: %v", err) } defer producer.Close() // Создание консьюмера Kafka consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil) if err != nil { log.Fatalf("Failed to create consumer: %v", err) } defer consumer.Close() // Подписка на партицию "pong" в Kafka partConsumer, err := consumer.ConsumePartition("pong", 0, sarama.OffsetNewest) if err != nil { log.Fatalf("Failed to consume partition: %v", err) } defer partConsumer.Close() // Горутина для обработки входящих сообщений от Kafka go func() { for { select { // Чтение сообщения из Kafka case msg, ok := <-partConsumer.Messages(): if !ok { log.Println("Channel closed, exiting goroutine") return } responseID := string(msg.Key) mu.Lock() ch, exists := responseChannels[responseID] if exists { ch <- msg delete(responseChannels, responseID) } mu.Unlock() } } }() // Инициализация роутера Gin router := gin.Default() router.GET("/ping", func(c *gin.Context) { requestID := uuid.New().String() message := MyMessage{ ID: requestID, Name: "Ping", Value: "Pong", } // Преобразование сообщения в JSON что бы потом отправить через kafka bytes, err := json.Marshal(message) if err != nil { c.JSON(500, gin.H{"error": "failed to marshal JSON"}) return } msg := &sarama.ProducerMessage{ Topic: "ping", Key: sarama.StringEncoder(requestID), Value: sarama.ByteEncoder(bytes), } // отправка сообщения в Kafka _, _, err = producer.SendMessage(msg) if err != nil { log.Printf("Failed to send message to Kafka: %v", err) c.JSON(500, gin.H{"error": "failed to send message to Kafka"}) return } responseCh := make(chan *sarama.ConsumerMessage) mu.Lock() responseChannels[requestID] = responseCh mu.Unlock() select { case responseMsg := <-responseCh: c.JSON(200, gin.H{"message": string(responseMsg.Value)}) case <-time.After(10 * time.Second): mu.Lock() delete(responseChannels, requestID) mu.Unlock() c.JSON(500, gin.H{"error": "timeout waiting for response"}) } }) if err := router.Run(":8080"); err != nil { log.Fatalf("Failed to run server: %v", err) } }
Второй микросервис:
Файл second-microservice/main.go:
package main import ( "encoding/json" "log" "github.com/IBM/sarama" ) // Наша структура для сообщения type MyMessage struct { ID string `json:"id"` Name string `json:"name"` Value string `json:"value"` } func main() { // Создание продюсера Kafka producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil) if err != nil { log.Fatalf("Failed to create producer: %v", err) } defer producer.Close() // Создание консьюмера Kafka consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil) if err != nil { log.Fatalf("Failed to create consumer: %v", err) } defer consumer.Close() // Подписка на партицию "ping" в Kafka partConsumer, err := consumer.ConsumePartition("ping", 0, sarama.OffsetNewest) if err != nil { log.Fatalf("Failed to consume partition: %v", err) } defer partConsumer.Close() for { select { // (обработка входящего сообщения и отправка ответа в Kafka) case msg, ok := <-partConsumer.Messages(): if !ok { log.Println("Channel closed, exiting") return } // Десериализация входящего сообщения из JSON var receivedMessage MyMessage err := json.Unmarshal(msg.Value, &receivedMessage) if err != nil { log.Printf("Error unmarshaling JSON: %v\n", err) continue } log.Printf("Received message: %+v\n", receivedMessage) responseText := receivedMessage.Name + " " + receivedMessage.Value + " ( " + receivedMessage.ID + " ) " // Формируем ответное сообщение resp := &sarama.ProducerMessage{ Topic: "pong", Key: sarama.StringEncoder(receivedMessage.ID), Value: sarama.StringEncoder(responseText), } // Отпровляем ответ в gateway _, _, err = producer.SendMessage(resp) if err != nil { log.Printf("Failed to send message to Kafka: %v", err) } } } }
Конфигурация docker-compose ( ��ля 3 версии )
Файл docker-compose.yml:
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: - ZOOKEEPER_CLIENT_PORT=2181 networks: - kafka-network kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper environment: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 networks: - kafka-network api-gateway: build: context: ./api-gateway dockerfile: Dockerfile depends_on: - kafka networks: - kafka-network ports: - "8080:8080" second-microservice: build: context: ./second-microservice dockerfile: Dockerfile depends_on: - kafka networks: - kafka-network networks: kafka-network: driver: bridge
Примечание: в 3-й версии Docker Compose функция "depends_on" работает несколько иначе, чем во 2-й версии. Она по-прежнему контролирует порядок запуска контейнеров, но не гарантирует, что зависимый сервис полностью готов к работе, прежде чем запускать зависящий от него сервис. В частности, эта функциональность не будет ожидать готовности зависимых сервисов в рамках кластера при использовании Docker Swarm.
Из-за этой особенности в Docker Compose версии 3 мы используем скрипт wait-for-it в каждом контейнере (Dockerfile, пример которого можно увидеть в репозитории), чтобы гарантировать, что все необходимые сервисы доступны и полностью функционируют, прежде чем начнется выполнение основной программы.Скрипт wait-for-it обеспечивает простой и эффективный способ ожидания доступности TCP-хоста и порта.
Это позволяет нам контролировать порядок запуска сервисов и гарантировать, что все необходимые зависимости удовлетворены, прежде чем сервис начнет свою работу.
