Как стать автором
Обновить

Простейший пример kafka + golang

Уровень сложностиПростой
Время на прочтение5 мин
Количество просмотров24K

В данной статье представлен простой способ реализации микросервисной архитектуры с использованием Kafka, Golang и Docker.

Если вы сразу хотите перейти к рабочему коду, вот ссылка на репозиторий

⚠️ Важно: Не для Продакшена!

Этот материал представляет собой исключительно образовательный пример и демонстрирует техническую возможность реализации подхода запрос-ответ через брокера. Следует подчеркнуть, что в реальных средах подход запрос-ответ через брокер сообщений часто считается дурной практикой. В общепринятых паттернах архитектуры брокеры сообщений обычно используются для того, чтобы "когда-нибудь" обработать сообщение, а не для моментального ответа на запрос. Если в вашей системе необходимо получать моментальные ответы на запросы, рассмотрите возможность размещения сервисов за балансировщиком нагрузки.

Общий процесс работы

  1. Клиент отправляет HTTP-запрос на первый микросервис (API Gateway), используя, например, Postman.

  2. API Gateway передает запрос в Kafka, откуда его принимает второй микросервис.

  3. Второй микросервис обрабатывает запрос и отправляет ответ обратно в Kafka.

  4. API Gateway извлекает ответ из Kafka и возвращает его клиенту.

Примечание: Важным аспектом является возможность сопоставления запросов и ответов, чтобы при параллельной обработке множества запросов каждый ответ был возвращен соответствующему клиенту.

Краткий обзор кода

Конфигурация API Gateway:

Файл api-gateway/main.go:

package main

import (
	"encoding/json"
	"log"
	"sync"
	"time"

	"github.com/IBM/sarama"
	"github.com/gin-gonic/gin"
	"github.com/google/uuid"
)

// MyMessage - структура для нашего сообщения
type MyMessage struct {
	ID    string `json:"id"`
	Name  string `json:"name"`
	Value string `json:"value"`
}

// responseChannels - словарь для хранения каналов ответов, индексированных по ID запроса
// mu - мьютекс для обеспечения синхронизации доступа к словарю responseChannels
var responseChannels map[string]chan *sarama.ConsumerMessage
var mu sync.Mutex

func main() {
	responseChannels = make(map[string]chan *sarama.ConsumerMessage)

	// Создание продюсера Kafka
	producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil)
	if err != nil {
		log.Fatalf("Failed to create producer: %v", err)
	}
	defer producer.Close()

	// Создание консьюмера Kafka
	consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil)
	if err != nil {
		log.Fatalf("Failed to create consumer: %v", err)
	}
	defer consumer.Close()

	// Подписка на партицию "pong" в Kafka
	partConsumer, err := consumer.ConsumePartition("pong", 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatalf("Failed to consume partition: %v", err)
	}
	defer partConsumer.Close()

	// Горутина для обработки входящих сообщений от Kafka
	go func() {
		for {
			select {
			// Чтение сообщения из Kafka
			case msg, ok := <-partConsumer.Messages():
				if !ok {
					log.Println("Channel closed, exiting goroutine")
					return
				}
				responseID := string(msg.Key)
				mu.Lock()
				ch, exists := responseChannels[responseID]
				if exists {
					ch <- msg
					delete(responseChannels, responseID)
				}
				mu.Unlock()
			}
		}
	}()

	// Инициализация роутера Gin
	router := gin.Default()
	router.GET("/ping", func(c *gin.Context) {
		requestID := uuid.New().String()

		message := MyMessage{
			ID:    requestID,
			Name:  "Ping",
			Value: "Pong",
		}

		// Преобразование сообщения в JSON что бы потом отправить через kafka
		bytes, err := json.Marshal(message)
		if err != nil {
			c.JSON(500, gin.H{"error": "failed to marshal JSON"})
			return
		}

		msg := &sarama.ProducerMessage{
			Topic: "ping",
			Key:   sarama.StringEncoder(requestID),
			Value: sarama.ByteEncoder(bytes),
		}

		// отправка сообщения в Kafka
		_, _, err = producer.SendMessage(msg)
		if err != nil {
			log.Printf("Failed to send message to Kafka: %v", err)
			c.JSON(500, gin.H{"error": "failed to send message to Kafka"})
			return
		}

		responseCh := make(chan *sarama.ConsumerMessage)
		mu.Lock()
		responseChannels[requestID] = responseCh
		mu.Unlock()

		select {
		case responseMsg := <-responseCh:
			c.JSON(200, gin.H{"message": string(responseMsg.Value)})
		case <-time.After(10 * time.Second):
			mu.Lock()
			delete(responseChannels, requestID)
			mu.Unlock()
			c.JSON(500, gin.H{"error": "timeout waiting for response"})
		}
	})

	if err := router.Run(":8080"); err != nil {
		log.Fatalf("Failed to run server: %v", err)
	}
}

Второй микросервис:

Файл second-microservice/main.go:

package main

import (
	"encoding/json"
	"log"

	"github.com/IBM/sarama"
)

// Наша структура для сообщения
type MyMessage struct {
	ID    string `json:"id"`
	Name  string `json:"name"`
	Value string `json:"value"`
}

func main() {
	// Создание продюсера Kafka
	producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil)
	if err != nil {
		log.Fatalf("Failed to create producer: %v", err)
	}
	defer producer.Close()

	// Создание консьюмера Kafka
	consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil)
	if err != nil {
		log.Fatalf("Failed to create consumer: %v", err)
	}
	defer consumer.Close()

	// Подписка на партицию "ping" в Kafka
	partConsumer, err := consumer.ConsumePartition("ping", 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatalf("Failed to consume partition: %v", err)
	}
	defer partConsumer.Close()

	for {
		select {
		// (обработка входящего сообщения и отправка ответа в Kafka)
		case msg, ok := <-partConsumer.Messages():
			if !ok {
				log.Println("Channel closed, exiting")
				return
			}

			// Десериализация входящего сообщения из JSON
			var receivedMessage MyMessage
			err := json.Unmarshal(msg.Value, &receivedMessage)

			if err != nil {
				log.Printf("Error unmarshaling JSON: %v\n", err)
				continue
			}

			log.Printf("Received message: %+v\n", receivedMessage)

			responseText := receivedMessage.Name + " " + receivedMessage.Value + " ( " + receivedMessage.ID + " ) "

			// Формируем ответное сообщение
			resp := &sarama.ProducerMessage{
				Topic: "pong",
				Key:   sarama.StringEncoder(receivedMessage.ID),
				Value: sarama.StringEncoder(responseText),
			}
			// Отпровляем ответ в gateway
			_, _, err = producer.SendMessage(resp)
			if err != nil {
				log.Printf("Failed to send message to Kafka: %v", err)
			}
		}
	}
}

Конфигурация docker-compose ( для 3 версии )

Файл docker-compose.yml:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
    networks:
      - kafka-network

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
    networks:
      - kafka-network

  api-gateway:
    build:
      context: ./api-gateway
      dockerfile: Dockerfile
    depends_on:
      - kafka
    networks:
      - kafka-network
    ports:
      - "8080:8080"


  second-microservice:
    build:
      context: ./second-microservice
      dockerfile: Dockerfile
    depends_on:
      - kafka
    networks:
      - kafka-network

networks:
  kafka-network:
    driver: bridge

Примечание: в 3-й версии Docker Compose функция "depends_on" работает несколько иначе, чем во 2-й версии. Она по-прежнему контролирует порядок запуска контейнеров, но не гарантирует, что зависимый сервис полностью готов к работе, прежде чем запускать зависящий от него сервис. В частности, эта функциональность не будет ожидать готовности зависимых сервисов в рамках кластера при использовании Docker Swarm.

Из-за этой особенности в Docker Compose версии 3 мы используем скрипт wait-for-it в каждом контейнере (Dockerfile, пример которого можно увидеть в репозитории), чтобы гарантировать, что все необходимые сервисы доступны и полностью функционируют, прежде чем начнется выполнение основной программы.Скрипт wait-for-it обеспечивает простой и эффективный способ ожидания доступности TCP-хоста и порта.

Это позволяет нам контролировать порядок запуска сервисов и гарантировать, что все необходимые зависимости удовлетворены, прежде чем сервис начнет свою работу.

Рабочий код в репозитории

Теги:
Хабы:
Всего голосов 7: ↑4 и ↓3+4
Комментарии4

Публикации

Истории

Работа

Go разработчик
108 вакансий

Ближайшие события

22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань