Как стать автором
Обновить

Синхронизация кеша в распределенных Go (и не только) приложениях с помощью Kafka

Уровень сложностиСредний
Время на прочтение3 мин
Количество просмотров844

Заранее оговорюсь, всё что описано в данной статье, касается runtime (децентрализованного) кеша.

Зачем нам такое может понадобиться? По нескольким причинам:

  • У нас высокие требования к скорости работы приложения, дополнительные запросы к централизованному кешу нежелательны, с целью избежать сетевых взаимодействий.

  • Компания не знает как готовить отказоустойчивый кеш (например redis), или просто не хочет/не может затягивать новую технологию, усложнять инфраструктуру.

  • У нас нет четкого представления о том, как инвалидировать кеш на основе TTL (time to live), поскольку бизнес правила не позволяют жить невалидному кешу хоть какое-то время. (будьте осторожны, если у вас высокие требования к синхронизации данных, возникает множество дополнительных проблем в виде проблем с сетью/производительность реплик/etc...)

Представим типичную ситуацию в распределенной среде: имеется N реплик, на каждой из них свой независимый кеш. Например в Repository (я намеренно опустил работу с примитивами синхронизации при обновлении значений кеша, так как цель продемонстрировать суть происходящего):

type db interface {
  Find(key string) string
  Update(key string, value string)
}

type Repository struct {
  cache map[string]string
  db    db
}

func NewRepository(db db) Repository {
  return Repository{db: db, cache: make(map[string]string)}
}

func (r *Repository) Update(key string, value string) {
  r.db.Update(key, value)
  r.updateCacheValue(key, value)
}

func (r *Repository) updateCacheValue(key string, value string)  {
  r.cache[key] = value
}

func (r *Repository) UpdateCache(key string) {
  r.updateCacheValue(key, r.db.Find(key))
}

func (r *Repository) Find(key string) string {
  if val, ok := r.cache[key]; ok {
   return val
  }
  value := r.db.Find(key)
  r.updateCacheValue(key, value)
  return value
}

Это прекрасно работает, но не в распределенном приложении. Мы сталкиваемся с ситуацией, когда запрос пользователя на обновление, попал на одну случайную реплику и нам нужно уведомить всех об изменениях в данных, так как кеш локальный:

Пользователь операции SELECT может попасть на реплику, где устаревшее значение кеша
Пользователь операции SELECT может попасть на реплику, где устаревшее значение кеша

Можем использовать событийно-ориентированный подход и с помощью kafka мы легко решим эту проблему.
Для синхронизации мы можем использовать sync-topic, как это может выглядеть? А примерно так:

Напишем небольшой компонент для работы с топиком:

type KafkaSync struct {
  reader *kafka.Reader
  writer *kafka.Writer
}

func NewKafkaSync(reader *kafka.Reader, writer *kafka.Writer) KafkaSync {
  return KafkaSync{reader: reader, writer: writer}
}

func (sync KafkaSync) Sync(ctx context.Context, key string) error {
  return sync.writer.WriteMessages(ctx, kafka.Message{Value: []byte(key)})
}

func (sync KafkaSync) OnSync(ctx context.Context, cb func(key string)) error {
  err := sync.reader.SetOffsetAt(ctx, time.Now())
  if err != nil {
   return fmt.Errorf("setting offset: %w", err)
  }
  for ctx.Err() == nil {
    message, err := sync.reader.ReadMessage(ctx)
    if err != nil {
     return fmt.Errorf("read message: %w", err)
    }
    cb(string(message.value))
  }
  return nil
}

Суть компонента в том, чтобы генерировать события в момент обновления данных - метод Sync, а также уметь их отлавливать - метод OnSync.

Псевдокод, который отразит принцип синхронизации:

kafkaSync := NewKafkaSync()
repo := NewRepository()
eg, ctx := errgroup.WithContext(context.Background())

eg.Go(func() error {
  return kafkaSync.OnSync(ctx, func(key string) error {
     // Тут мы отловим новое событие синхронизации на всех репликах
     // и можем обновить кеш на каждой из них
     repo.UpdateCache(key)
  })
})

eg.Go(func() error {
  // Представим что тут сработал HTTP Update эндпойнт, на какой то реплике
  repo.Update("key1", "val1")
  return kafka.sync()
})

eg.Wait()

При любом изменении хранилища, мы генерируем событие обновления в sync-topic, которое в свою очередь получат остальные, и обновят локальный кеш.

В топике мы можем пересылать:

  • Ключ записи которую нужно обновить, но необходимо идти в хранилище за новым значением.

  • Пустое сообщение. Иногда это нужно, чтобы просто уведомить о факте обновления хранилища, например если нужно обновить множество ключей.

  • Обновленное значение и ключ. Например, когда мы хотим избежать дополнительных походов в хранилище, чтобы не создавать нагрузку. Мы можем просто переслать это значение в топике.

Нужно учитывать важный момент при работе с kafka

Если sync-topic имеет несколько партиций, то распределение в рамках consumer group может сделать данную схему неработоспособной. Решение проблемы заключается в чтении топика без consumer group, тогда каждая реплика будет уведомлена о любом изменении, в любой партиции.

Теги:
Хабы:
+5
Комментарии17

Публикации

Работа

Go разработчик
86 вакансий

Ближайшие события