Синхронизация баз данных между монолитом и микросервисами с помощью Kafka. Наше решение

    В этой статье я расскажу про готовое решение для поддержки консистентности данных между растущей микросервисной и унаследованной архитектурой. Под катом код для репликации двух баз данных с проверкой синхронизации, который может пригодиться для решения аналогичных задач.



    С проблемой консистентности данных мы столкнулись при разработке микросервиса под названием Profile. Он отвечает за регистрацию новых пользователей, хранение данных о них и синхронизируется с монолитной базой. Именно в синхронизации двух баз оказалось несколько проблем.

    Синхронизация данных


    Чтобы система работала стабильно, когда данные меняются в одной базе данных, изменения должны автоматически отразиться на другой. Для этого мы создали очередь из таких изменений в Kafka.

    Мы начали работу с таблицы студентов. К базовым колонкам с именами, фамилиями и классами добавили дополнительные — revision и foreign_revision — для работы с очередью. Теперь при добавлении или изменении данных вызывается триггер в постгресе, который записывает в поле revision текущее время с точностью до миллисекунды. Привожу код добавления колонок и триггера:
    ALTER TABLE "students" ADD "revision" timestamp(6)
    ALTER TABLE "students" ALTER COLUMN "revision" SET DEFAULT timezone('utc', now());
    
    ALTER TABLE "students" ADD "foreign_revision" timestamp(6)
    
    CREATE OR REPLACE FUNCTION increase_revision() RETURNS trigger AS $$
      BEGIN
        NEW.revision := timezone('utc', now());
        RETURN NEW;
      END
    $$ LANGUAGE PLPGSQL;
    
    CREATE TRIGGER update_revision
      BEFORE UPDATE ON students
      FOR EACH ROW
      WHEN (old.foreign_revision is not distinct FROM new.foreign_revision and
           row_to_json(old)::jsonb - 'revision' is distinct FROM row_to_json(new)::jsonb - 'revision')
      EXECUTE PROCEDURE increase_revision();

    После добавления студента или изменения его персональных данных формируем и отправляем сообщение в Kafka. Однако если отправить такие сообщения до закрытия транзакции, база пострадает: закончатся коннекты, из-за ошибки сети транзакция откатится. Чтобы этого не происходило, в модели мы использовали after_commit:
    after_commit :push_to_exchange, on: [:create, :update]

    Сервис Profile подписан на общую очередь в Kafka и либо обновляет существующую запись в таблице, либо добавляет новую.
    class StudentConsumer
      def consume(payload, metadata)
        if record = Student.where(id: payload.id).first
          record.update!(params(payload))
        else
          Student.create!(params(payload))
        end
      end
    
      def params(payload)
        hash = payload.to_h
        hash[:foreign_revision] = hash[:revision]
        hash.slice(*Student.column_names.map(&:to_sym))
      end
    end
    

    Таким образом мы добились того, что данные консистентны в двух разных базах. Процесс состоит из четырех шагов:
    • Добавляем или обновляем студента в монолите.
    • Триггер проставляет текущее время в поле revision.
    • Отправляем сообщение в Kafka.
    • Получаем сообщение и сохраняем данные о студенте в базе сервиса Profile.

    Этот алгоритм работает хорошо, пока не возникнут проблемы: упадет сеть, появятся массовые изменения через update_all или единичные — через update_column, Kafka не будет работать или будет работать медленно.

    Чтобы все это не влияло на процесс синхронизации, монолит также подписан на эту очередь и записывает в поле foreign_revision ревизию, которую прочитал из Kafka:
    class StudentConsumer
      def consume(payload, metadata)
          Student.where(id: payload.id).update_all(foreign_revision: payload.revision)
      end
    end
    

    Каждые пять минут в монолите запускается воркер, который находит все строки, у которых поля ревизий не совпадают, и заново досылает их в Kafka:
    module Profile::SyncShareable
      def run
        Student.where("foreign_revision is null or revision != foreign_revision").
          where("revision < ?", Time.now - 1.minute).
          order(revision: :desc).
          limit(5000).
          each(&:push_to_exchange)
      end
    end
    

    Для ускорения этого процесса нужен условный индекс. Он будет маленького размера, потому что у большинства записей ревизии будут совпадать:
    CREATE  INDEX  "index_studends_on_revision" ON "students"  ("revision") WHERE revision <> foreign_revision

    Таким образом актуальная информация о всех студентах стала доступна для чтения в сервисе Profile. Однако для изменения данных мы были вынуждены ходить в API монолита.

    Чтобы вносить изменения прямо в Profile, мы задумались о двусторонней синхронизации.

    Двусторонняя синхронизация


    Двустороннюю синхронизацию можно обеспечить, если зеркально повторить весь предыдущий код в сервисе Profile. Но придется решить несколько проблем.

    1. Генерация уникального идентификатора


    Мы не можем создать нового студента в Profile, если в монолите использован числовой ID. Решит проблему переход на строчный UUID вместо числового инкремента.

    2. Синхронизация занимает существенное время


    Проблема заключается в том, что данные могут обновляться в двух местах сразу. Например, если в 48 секунд произошло изменение имени в монолите, а в 49 секунд — фамилии в Profile. Теоретически это возможно при исправлениях, дополнениях, автоматической коррекции. Обмен сообщениями через Kafka может занимать дольше трех секунд, и в таком случае изменение имени потеряется из-за более новой версии данных с обновленной фамилией.

    Чтобы при двусторонней синхронизации такого не происходило, можно заменить Kafka на что-то более быстрое, например, на RabbitMQ. Но в Kafka хранится журнал транзакций, и мы можем вернуться, проанализировать нашу синхронизацию, в случае аварии проработать транзакции заново. К тому же он умеет работать с двумя разными ЦОД. Для нас это было важно, и мы остались с Kafka. Хотя для кого-то, возможно, актуальнее будет быстрый Rabbit, в котором синхронизация происходит за миллисекунды, а количество воркеров можно регулировать динамически.

    3. Асинхронная синхронизация


    Когда мы пишем изменения в Profile, нет гарантии, что прочитаем их в монолите, — данные синхронизируются с задержкой. Это надо учитывать, когда разные части приложения написаны поверх разных сервисов. В таких местах приходится отказываться от двусторонней синхронизации и переходить на синхронное взаимодействие через REST API или gRPC.

    Таким образом мы можем распределить нагрузку между монолитом и отдельным сервисом, улучшить кодовую базу, делать деплой этого сервиса независимо от монолита.

    UPD: архитектура решения примерно такая.


    ***
    Как вы решали проблему консистентности данных в микросервисной архитектуре? Какой опыт бесшовного распиливания монолита у вас был?
    Учи.ру
    Крупнейший EdTech в школьном образовании

    Комментарии 9

      0
      А можно картинку с архитектурой?
        0
        Присоединяюсь к коллеге хотелось бы увидеть картинку с архитектурой.
          0
          Добавил
          0
          Примерно тоже самое под Rails github.com/umbrellio/table_sync
            +1

            На after_commit недостаточно работать, например update all не будет колбеков вызывать

              0
              справедливости ради могу отметить что update_all является антипаттерном
                0
                Да, это так, но запретить его не получится, поэтому перехват событий лучше поручить триеру в postgresql.
            0
            А вы смотрели в сторону CDC решений, например, debezium?
              0
              Нет не смотрели, из похожего в голову приходит логическая репликация данных. Преимущество нашего решения, что сообщение в очередь ты отправляешь сам, поэтому можно отправить все взаимосвязанные сущности, например родителя у ученика или его аватар.

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

            Самое читаемое