Comments 7
Не надо так
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
GroupID: "my-group",
CommitInterval: 0, // Отключаем автоматический коммит
})
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Received: %s\n", string(msg.Value))
// Коммитим оффсет вручную после обработки
err = reader.CommitMessages(context.Background(), msg)
if err != nil {
panic(err)
}
}
Ридер не следит за тем, что коммитит, после реблансировки консьюмеров можно внезапно обнаружить, что произошел коммит в уже чужую партицию.
Лучше создать консьюмер группу и в её рамках обрабатывать старт нового поколения - https://pkg.go.dev/github.com/segmentio/kafka-go#Generation.Start
Плюс в том, что пока горутины, запущенные в Start()
, не завершат работу, клиент не отдаст ОК на перебалансировку. Это даёт время дообработать и закоммитить уже вычитанное.
Говоря о Kafka, мне кажется, стоит уделить внимание поддерживаемым схемам данных (JSON, AVRO, Protobuf) и особенностям их использования
Спасибо за статью, предлагаю рассмотреть как альтернативу https://github.com/zillow/zkafka . Крайне приятный интерфейс, очень удобно пользоваться и построено поверх конфлюент-го.
работа с Kafka в Go: практическое применение