Заранее оговорюсь, всё что описано в данной статье, касается runtime (децентрализованного) кеша.
Зачем нам такое может понадобиться? По нескольким причинам:
У нас высокие требования к скорости работы приложения, дополнительные запросы к централизованному кешу нежелательны, с целью избежать сетевых взаимодействий.
Компания не знает как готовить отказоустойчивый кеш (например redis), или просто не хочет/не может затягивать новую технологию, усложнять инфраструктуру.
У нас нет четкого представления о том, как инвалидировать кеш на основе TTL (time to live), поскольку бизнес правила не позволяют жить невалидному кешу хоть какое-то время. (будьте осторожны, если у вас высокие требования к синхронизации данных, возникает множество дополнительных проблем в виде проблем с сетью/производительность реплик/etc...)
Представим типичную ситуацию в распределенной среде: имеется N реплик, на каждой из них свой независимый кеш. Например в Repository (я намеренно опустил работу с примитивами синхронизации при обновлении значений кеша, так как цель продемонстрировать суть происходящего):
type db interface {
Find(key string) string
Update(key string, value string)
}
type Repository struct {
cache map[string]string
db db
}
func NewRepository(db db) Repository {
return Repository{db: db, cache: make(map[string]string)}
}
func (r *Repository) Update(key string, value string) {
r.db.Update(key, value)
r.updateCacheValue(key, value)
}
func (r *Repository) updateCacheValue(key string, value string) {
r.cache[key] = value
}
func (r *Repository) UpdateCache(key string) {
r.updateCacheValue(key, r.db.Find(key))
}
func (r *Repository) Find(key string) string {
if val, ok := r.cache[key]; ok {
return val
}
value := r.db.Find(key)
r.updateCacheValue(key, value)
return value
}
Это прекрасно работает, но не в распределенном приложении. Мы сталкиваемся с ситуацией, когда запрос пользователя на обновление, попал на одну случайную реплику и нам нужно уведомить всех об изменениях в данных, так как кеш локальный:

Можем использовать событийно-ориентированный подход и с помощью kafka мы легко решим эту проблему.
Для синхронизации мы можем использовать sync-topic, как это может выглядеть? А примерно так:

Напишем небольшой компонент для работы с топиком:
type KafkaSync struct {
reader *kafka.Reader
writer *kafka.Writer
}
func NewKafkaSync(reader *kafka.Reader, writer *kafka.Writer) KafkaSync {
return KafkaSync{reader: reader, writer: writer}
}
func (sync KafkaSync) Sync(ctx context.Context, key string) error {
return sync.writer.WriteMessages(ctx, kafka.Message{Value: []byte(key)})
}
func (sync KafkaSync) OnSync(ctx context.Context, cb func(key string)) error {
err := sync.reader.SetOffsetAt(ctx, time.Now())
if err != nil {
return fmt.Errorf("setting offset: %w", err)
}
for ctx.Err() == nil {
message, err := sync.reader.ReadMessage(ctx)
if err != nil {
return fmt.Errorf("read message: %w", err)
}
cb(string(message.value))
}
return nil
}
Суть компонента в том, чтобы генерировать события в момент обновления данных - метод Sync, а также уметь их отлавливать - метод OnSync.
Псевдокод, который отразит принцип синхронизации:
kafkaSync := NewKafkaSync()
repo := NewRepository()
eg, ctx := errgroup.WithContext(context.Background())
eg.Go(func() error {
return kafkaSync.OnSync(ctx, func(key string) error {
// Тут мы отловим новое событие синхронизации на всех репликах
// и можем обновить кеш на каждой из них
repo.UpdateCache(key)
})
})
eg.Go(func() error {
// Представим что тут сработал HTTP Update эндпойнт, на какой то реплике
repo.Update("key1", "val1")
return kafka.sync()
})
eg.Wait()
При любом изменении хранилища, мы генерируем событие обновления в sync-topic, которое в свою очередь получат остальные, и обновят локальный кеш.
В топике мы можем пересылать:
Ключ записи которую нужно обновить, но необходимо идти в хранилище за новым значением.
Пустое сообщение. Иногда это нужно, чтобы просто уведомить о факте обновления хранилища, например если нужно обновить множество ключей.
Обновленное значение и ключ. Например, когда мы хотим избежать дополнительных походов в хранилище, чтобы не создавать нагрузку. Мы можем просто переслать это значение в топике.
Нужно учитывать важный момент при работе с kafka
Если sync-topic имеет несколько партиций, то распределение в рамках consumer group может сделать данную схему неработоспособной. Решение проблемы заключается в чтении топика без consumer group, тогда каждая реплика будет уведомлена о любом изменении, в любой партиции.