Pull to refresh

Comments 7

Не надо так

reader := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092"},
    Topic:          "my-topic",
    GroupID:        "my-group",
    CommitInterval: 0, // Отключаем автоматический коммит
})

for {
    msg, err := reader.ReadMessage(context.Background())
    if err != nil {
        panic(err)
    }
    fmt.Printf("Received: %s\n", string(msg.Value))

    // Коммитим оффсет вручную после обработки
    err = reader.CommitMessages(context.Background(), msg)
    if err != nil {
        panic(err)
    }
}

Ридер не следит за тем, что коммитит, после реблансировки консьюмеров можно внезапно обнаружить, что произошел коммит в уже чужую партицию.

Лучше создать консьюмер группу и в её рамках обрабатывать старт нового поколения - https://pkg.go.dev/github.com/segmentio/kafka-go#Generation.Start

Плюс в том, что пока горутины, запущенные в Start(), не завершат работу, клиент не отдаст ОК на перебалансировку. Это даёт время дообработать и закоммитить уже вычитанное.

Спасибо! Надо было этот момент подробнее расписать.

Говоря о Kafka, мне кажется, стоит уделить внимание поддерживаемым схемам данных (JSON, AVRO, Protobuf) и особенностям их использования

Спасибо за статью, предлагаю рассмотреть как альтернативу https://github.com/zillow/zkafka . Крайне приятный интерфейс, очень удобно пользоваться и построено поверх конфлюент-го.

За просто так затащить CGO под капот приложению?!?
Спасибо не надо.

Хм, а можете просвятить почему такое отношение к CGO? Кроме времени билда я не успел заметить каких-то проблем.

Sign up to leave a comment.

Articles