Привет, Хабр!

На связи Паша Емельянов, тимлид в AGIMA. В этой статье расскажу, как на одном из проектов мы переписывали старый функционал, разработанный когда-то на PHP, на Golang, с какими проблемами столкнулись и как их решали. Статья будет интересна и начинающим, и миддлам, а еще системным архитекторам, т. к. здесь затронем как инфраструктурные вопросы, так и вопросы реализации конкретного сервиса под потребности бизнес-заказчика.

Что было

Проект, о котором сейчас пойдет речь, это большой онлайн-магазин техники. С самого начала наша основная цель была оптимизировать и переписать старый легаси-код. Старый функционал работал на стеке PHP с базой данных MySQL. В ней хранились все товары, категории, характеристики — короче, вся информация, которая требовалась для решения бизнес-задач.

Сама бизнес-задача была такой: у нас есть маркетплейсы — Яндекс.Маркет, Goods и другие платформы, и всем этим площадкам нужны данные о товарах разных категорий, которые магазин может предоставить покупателям в разных регионах. Прежний вариант реализации не подходил по ряду причин:

  1. большое потребление ресурсов;

  2. длительная генерация;

  3. много легаси-кода, высокая сложность внесения доработок;

  4. невозможность горизонтального масштабирования.

Дело в том, что при вертикальном масштабировании мы можем буквально вставлять железки в кластер и реплицировать это всё, либо разворачивать новые копии монолитного приложения. Это очень долго и неэффективно. При горизонтальном мы можем легко, добавив больше ресурсов в кластер, получить большую пропускную способность и вычислительную мощность. 

Новый сервис

После нескольких встреч с маркетологами и бизнесом наши аналитики зафиксировали продуктовые задачи на разработку:

  1. Нужно было получить быструю генерацию множества файлов по множеству регионов и категорий для всех потребителей без ограничений. В старом сервисе была проблема с производительностью. Поэтому это условие было невыполнимо.

  2. Им хотелось, чтобы можно было управлять тем, в какое время и какие файлы будут обновляться.

  3. Они хотели в первую очередь генерировать файлы по требованию или с отложенным запуском.

  4. Еще нужна была возможность высокой кастомизации этих шаблонов и файлов с выгрузками, которые они будут отдавать в маркетплейсы.

Получилось, что фактически наша задача была сделать всё с нуля и продумать архитектуру.

Из продуктовых требований вытекали и технические. Кроме горизонтальной масштабируемости, было важно, чтобы новая система без проблем интегрировалась с основной на тот момент сервисной инфраструктурой. Связано это было с тем, что у заказчика уже существовала сервисная платформа. Это некий каскад сервисов, которые обеспечивают те или иные критические бизнес-функции. 

В рамках этого фасада и этой сервисной инфраструктуры было создано несколько микросервисов, которые уже успешно функционировали на протяжении нескольких месяцев и лет. Поэтому самым разумным решением было подбирать технологии исходя из текущей сервисной платформы, так как накладные расходы на создание новой сервисной платформы для одного микросервиса могли быть достаточно большими.

Еще система должна была обеспечивать высокую производительность. То есть чтобы сервис позволял запускать в час хоть сто файлов на 100 регионов, хоть 200 файлов на 200 без каких-либо ограничений. Иными словами, было важно, чтобы не возникало никаких накладок при поставке информации партнерам. При этом система должна была потреблять минимальное количество ресурсов: чем больше ресурсов нужно, тем выше нагрузка на железо; чем выше нагрузка на железо, тем выше накладные расходы. Это бы как раз увеличивало время на генерацию файлов.

Исходя из всех вводных данных мы и принимали решение о том, как реализовать новый сервис. На текущей сервисной платформе использовали Kubernetes. Еще в команде была хорошая экспертиза по Golang — мы любим его за высокую скорость работы и низкое потребление ресурсов, большое количество наработок для ускорения разработки, хорошую совместимость с оркестраторами и количеством собираемых метрик производительности из коробки. И в качестве базы данных для хранения настроек самое популярное для этого стека была PostgreSQL. 

Для оркестрации будущего сервиса в итоге мы использовали уже готовый и отлаженный Kubernetes, на котором уже работает несколько десятков микросервисов. А учитывая релевантный опыт и наработки, решили писать на GoLang. СУБД — PostgreSQL; вместе с этим — Aerospike для кэша, S3 — для хранения файлов.

Скелет микросервиса через Go-kit

При разработке любого микросервиса первым делом мы размечаем API. Это значит, что мы должны указать, какие ссылки пользователь, SPA или какой-нибудь другой сервис снаружи сможет использовать. Всё это мы описали в соответствии со стандартами Google в формате Protobuf3.

В качестве инструмента, который позволяет нам оптимизировать и ускорять разработку, мы использовали наработки по кодогенерации на Go-kit и protoc-gen. Он позволяет практически полностью сгенерировать микросервис, который уже может быть проинтегрирован в сервисную платформу. При этом использует размеченный proto-файл или их комплект, в каждом из которых размечены эндпоинты. Сервис, созданный таким образом, на выходе имеет все работающие эндпоинты. Несмотря на то, что на этом этапе они возвращают какие-то обезличенные и рандомные данные, это уже рабочая версия микросервиса, которую мы затем дорабатываем. Фактически это скелет будущего микросервиса, и сейчас расскажу, из чего он состоит.

Пример Protobuf-файла:

syntax = "proto3";
package prmexportpb;
option go_package = "internal/prmexportpb";
import "google/api/annotations.proto";
import "protoc-gen-swagger/options/annotations.proto";
import "health.proto";
option (grpc.gateway.protoc_gen_swagger.options.openapiv2_swagger) = {
 info: {
   title: "Export service";
   version: "1.0";
 };
 schemes: HTTP;
 consumes: "application/json";
 produces: "application/json";
 responses: {
   key: "404";
   value: {
     description: "Returned when the resource does not exist.";
     schema: {
       json_schema: {
         type: STRING;
       }
     }
   }
 }
};
service HealthService {
 // returns a error if service doesn`t live. The kubelet uses liveness probes to know when to restart a Container.
 rpc Liveness (LivenessRequest) returns (LivenessResponse) {
   option (google.api.http) = {
     get: "/liveness"
   };
   option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
     tags: "HealthCheck"
   };
 }
 // returns a error if service doesn`t ready. Service doesn`t ready by default.
 rpc Readiness (ReadinessRequest) returns (ReadinessResponse) {
   option (google.api.http) = {
     get: "/readiness"
   };
   option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
     tags: "HealthCheck"
   };
 }
 // returns buid time, last commit and version app
 rpc Version (VersionRequest) returns (VersionResponse) {
   option (google.api.http) = {
     get: "/version"
   };
   option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
     tags: "HealthCheck"
   };
 }
}

Health.proto

syntax = "proto3";
package prmexportpb;
option go_package = "internal/prmexportpb";

message LivenessRequest {}

message LivenessResponse {
  string status = 1;
}

message ReadinessRequest {}

message ReadinessResponse {
  string status = 1;
}

message VersionRequest {}

message VersionResponse {
  string BuildTime = 1;
  string Commit = 2;
  string Version = 3;
}

Почитать про proto3 можно тут.

Во-первых, на этом этапе сервис уже обернут в трейсинг. Это вещь, которая нужна, чтобы трассировать запросы внутри и снаружи сервиса. Можно видеть наглядно цепочку вызова компонентов и затраченное на их выполнение время. Это позволяет быстро находить узкие места.

Рис. 1. Список трейсов
Рис. 2. Трейсы ПО в разрезе траспорта и эндпоинта

Пример переопределяющей функции для трейса GET /liveness:

// NewTracingService returns an instance of an instrumenting Service.
func NewTracingService(ctx context.Context, s Service) Service {
  tracer := tracing.FromContext(ctx)
  return &tracingService{tracer, s}
}
type tracingService struct {
  tracer opentracing.Tracer
  Service
}
func (s *tracingService) Liveness(ctx context.Context, req *LivenessRequest) (resp *LivenessResponse, err error) {
  span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, s.tracer, "Liveness")
  defer span.Finish()
  return s.Service.Liveness(ctx, req)
}

Во-вторых, к скелету подключена Sentry, это платформа для отлова ошибок. То есть, если у нас происходит какая-то ошибка внутри кода или наш сервис возвращает 500-ю ошибку, мы ее ловим и отправляем в Sentry для дальнейшего разбирательства. И еще к сервису прикручены метрики производительности: в какой момент времени и какая метрика у нас подсчитывается. Например, количество запросов в секунду в тот или иной момент времени.

Пример переопределяющей функции для метода GET /liveness:

func (s *sentryService) Liveness(ctx context.Context, req *LivenessRequest) (resp *LivenessResponse, err error) {
  defer func() {
     if err != nil {
        log := s.getSentryLog(req, resp)
        sentry.ConfigureScope(func(scope *sentry.Scope) {
           scope.SetTag("code", strconv.Itoa(getHTTPStatusCode(err)))
           scope.SetTag("method", "Liveness")
           scope.SetExtra("request", log["request"])
           scope.SetExtra("response", log["response"])
        })
        sentry.CaptureException(err)
     }
  }()
  return s.Service.Liveness(ctx, req)
}

Сюда же следует добавить, что в сервисе есть логирование всех компонентов через Go-kit logging с уровнями. Получается, что на старте мы уже сразу достигаем принципа Observability, что сыграет на руку при поддержке и обслуживании сервиса в будущем. 

При генерации скелета сервиса мы сразу же получаем спецификацию в формате OpenAPI v3. Это общемировой стандарт, который используют для интеграции с огромным количеством API. Это yaml-спецификация, которую любой разработчик может добавить в утилиту для тестирования API и получить карту всех запросов и всех данных, которые API получает на вход и на выход. Это очень удобно для интеграции, так как впоследствии на наш микросервис мы прикрутили приложение SPA на ангуляре, админку, через которую происходит управление выгрузками и бизнес-задачами.

С примером можно ознакомиться тут.

А еще при генерации сервиса у нас появляется дополнительный транспорт — GRPC. Это протокол, который работает параллельно с HTTP и помогает другим микросервисам с ним интегрироваться, просто используя его как зависимость: в другом микросервисе подключается зависимость, и не нужно вручную интегрироваться с каждым эндпоинтом. 

Когда добавили зависимость от другого сервиса, где есть клиент, всё, что нам нужно, просто использовать метод. Всё остальное уже готово:

func (s *service) GetPeriodicalExports(ctx context.Context) (models []model.Setting, err error) {
  resp, err := s.setting.ReadPeriodicalSetting(ctx, &setting.ReadPeriodicalSettingRequest{})
  if err != nil {
     return
  }
  for _, settingRow := range resp.Data {
     models = append(models, settingRow.Attributes.ToRepo(settingRow.Id))
  }
  return
}

Интеграция с источниками данных

Важной задачей на проекте была интеграция с источниками данных. У заказчика был сервис, который работал на Java. Он предоставлял в развернутом виде всю информацию о категориях, товарах и их характеристиках — всё, что нам было нужно. 

Поскольку эта интеграция была реализована ранее, при создании нового сервиса мы воспользовались уже существующими функциями. Правда, впоследствии  для нас сделали доработку, чтобы мы могли получать весь каталог товаров, а это 700 тысяч наименований. Ранее приходилось делать тяжеловесные запросы в БД c характеристиками, и это занимало много времени. А сейчас весь каталог можно получить менее, чем за 30 минут. 

Тут-то нам и пришел на помощь Aerospike. Судя по бенчмаркам, работает он быстрее Redis, и TCO (Total cost of ownership) у него ниже. Но мы решили проверить эти данные, поэтому заблаговременно написали программу, добавили туда основных вендоров и сравнивали скорость работы через Go bench совместно с хранением в памяти. Aerospike понравился больше всех, и мы оставили его. 

Делаем запрос, добавляем товары, идем дальше. Обязательно добавляем трассировку и логирование для достижения Observability.

func (s *service) ImportProducts(ctx context.Context) error {
  replMap := s.GetReplacementMap()
  resp, err := s.semService.GetProductsContent(ctx, &sem.GetProductsContentRequest{})
  if err != nil {
     return errors.Wrap(err, "sem initial getProducts err")
  }
  dataSize := len(resp.Products)
  for dataSize > 0 {
     for _, p := range resp.Products {
        s.ReplaceSymbols(&p, replMap)
        err = s.repository.Add(ctx, p)
        if err != nil {
           return errors.Wrap(err, "aerospike getProducts add err")
        }
     }
     resp, err = s.semService.GetProductsContent(ctx, &sem.GetProductsContentRequest{ScrollId: resp.ScrollId})
     if err != nil {
        return errors.Wrap(err, "sem cycle getProductsContent err")
     }
     dataSize = len(resp.Products)
  }
  return nil
}

А чтобы создавать региональные выгрузки, нам нужна была информация по регионам. Регионы все хранились в другом Golang-сервисе. Мы с ним проинтегрировались по протоколу GRPC. На этом этапе это были две необходимые интеграции, с которыми нам нужно было работать, чтобы обеспечить агрегацию данных.

Пример:

func NewGrpcConn(ctx context.Context, conf configs.Controller) (*grpc.ClientConn, error) {
  var opts []grpc.DialOption
  if conf.GRPC.TLS.Enabled {
     opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
        InsecureSkipVerify: conf.GRPC.TLS.InsecureSkipVerify,
     })))
  } else {
     opts = append(opts, grpc.WithInsecure())
  }
  conn, err := grpc.DialContext(ctx, fmt.Sprintf("%s:%d", conf.GRPC.Host, conf.GRPC.Port), opts...)
  if err != nil {
     return nil, errors.Wrap(err, "Unable to connect to controller")
  }
  return conn, nil
}

И получаем немного данных:

func (s *service) GetById(ctx context.Context, id uint64) (model model.Setting, err error) {
  resp, err := s.setting.ReadSettingById(ctx, &setting.ReadSettingByIdRequest{Id: id})
  if err != nil {
     return
  }
  model = resp.Data.Attributes.ToRepo(id)
  return
}

Кэш от Aerospike

Еще нам нужны были некоторые зависимости. Чтобы мы могли хранить где-то данные (например, настройки по регионам, настройки шаблонов, частоту генерации), требовалась база данных.

При разработке мы решили использовать ORM. Это зависимость, но уже для кода, которая позволяет облегчить доступ к базе данных и упростить работу с ней. Мы использовали ORM gorm. Она здорово показала себя: с ней было легко интегрироваться, и все результаты и накладные расходы на каждый запрос были довольно низкими. Мы будем ей и дальше пользоваться в менее нагруженных проектах. В высоконагруженных, очевидно, следует использовать pgx.

Из проблем. Как мы уже сказали, мы использовали Key-value от Aerospike. Этот кэш нам нужен был, чтобы хранить наши 700 тысяч товаров, потому что каждый раз при генерации каждой выгрузки было нелогично бегать в сервис с товарами и запрашивать всё повторно. 

Тем более среднее время получения информации по ним составляло плюс-минус 30 минут, так как товаров много и все пользуются сервисом с товарами. Поэтому нам нужен был кэш с данными, которые мы можем периодически обновлять, при этом не останавливая наши бизнес-процессы.

Если нам требовалось наполнить кэш новыми данными или перезаписать старый, мы могли столкнуться с проблемой: если в источнике данных изменился какой-то тип — например, строка стала числом или число стало стройкой, — то при закачке товаров у нас будут возникать ошибки. Причем неявные, которые отлавливаются не сразу.

Как мы с этим боролись: мы сделали функции для ручной очистки хранилища. Теперь,  когда у нас импорт товаров не работает, мы получаем ошибку в Sentry и понимаем, что у нас поменялась структура и нужно полностью очищать хранилище. При этом после очистки придется останавливать генерацию, так как мы не сможем генерить никакие файлы. Это решение наилучшее, которое нам удалось придумать. Но если вы сталкивались с такой же проблемой, то будет интересно узнать, как вы решили. Напишите, пожалуйста, в комментариях.

Работа всей системы

Теперь расскажу, как это всё работает. На проекте мы используем Cron и фоновый Runner. Кто не знает, Cron — это такая штука, которая выполняет задачи по расписанию: каждую минуту, или каждые 5 минут, или каждую 5-ю минуту каждого 12-го часа, например. 

В первой версии сервиса, которую мы реализовали, мы использовали функционал для выполнения фоновых задач. Это значит, что внутри запущенного сервиса, когда хотели, мы по крону (например, каждый час) запускали какую-то функцию. Она нам генерила файлы и загружала их на S3. 

Это решение было массово распространено на другие микросервисы. И для ускорения разработки мы его позаимствовали. Но так как дальше мы столкнулись с проблемами, нам пришлось его переделать. Там в принципе не было механизма очередей, поэтому мы в рамках микросервиса добавили небольшое FIFO (first in — first out). Фактически это очередь: первый пришел — первый ушел, выполнение в порядке попадания в очередь. Но в результате работ с этим функционалом и очередью мы были ограничены ресурсами на каждом pod’e.

Pod — это абстрактный объект Kubernetes, представляющий собой группу из одного или нескольких контейнеров приложения. У нас были большие проблемы при блокировке запуска задач на остальных pod’ах, так как они работают в режиме обеспечения максимальной доступности. В данном случае у нас работает 2 или 3 реплики микросервиса.

С какими проблемами столкнулись

Вместе с этим, когда мы инициируем сервис, у нас появляется проблема, когда у нас 3 pod’а параллельно генерят одни и те же файлы. Это очень нелогично и неудобно, с этим приходилось бороться, делать блокировки, применять механизм Service Discovery. Это всё были некорректные решения, которые вскоре мы переработали.

Соответственно, у нас терялась возможность параллельно выполнять задачи. То есть мы оставляем один pod, у нас генерируется, допустим, 15 задач. Притом они генерируются по порядку, так как в процессе генерации у нас все ресурсы, которые выделяются для pod’а, заимствуются при генерации задач. Это не очень хорошо, с этим тоже впоследствии нужно было бороться.

Еще одна проблема. Когда сервис падал (например, на кластере что-то сломалось и у нас pod упал), у нас также падали и все запущенные задачи. Плюс у нас не было механизма ограничения задач. Это значит, что на одном сервисе мы не могли ограничить количество одновременно выполняемых задач, причем достаточно простым способом. С этой проблемой тоже надо было бороться. Плюс зависшие задачи: одна зависла, остальные не двигаются, мы не можем ее просто грохнуть, нам нужно с ней что-то сделать.

Как решали

Что мы придумали. Сделали запуск задач через сам Kubernetes. То есть мы создаем отдельную программу (aka репозиторий), выносим всю бизнес-логику, связанную с генерацией, в это новое приложение. Иными словами, это у нас уже будет не сервис, а программа. Принципиальное отличие сервиса от программы в том, что сервис работает непрерывно и слушает входящие подключения или выполняет какую-то фоновую обработку, а программа единожды запускается, выполняет работу и завершает свою работу. Соответственно, эти микрозадачи стоит запускать внутри кластера Kubernetes.

Также мы планировали сделать механизм очередного запуска задач (first in — first out), чтобы задачи всегда шли в рамках своей позиции и при падении сервиса ничего не аффектилось, а при рестарте возобновлялась работа. Требовалось также сделать ограничитель параллельного запуска одинаковых задач. То есть эту логику тоже нужно было обязательно реализовать.

Исходя из этого, нам потребовалось сделать Garbage Collector, который будет осуществлять чистку старых зависших задач, потому что задача может зависнуть и в кластере Kubernetes — от этого никто не застрахован.

Что дальше

Теперь всё. В статье я рассказал, какой сервис мы сделали, с какими проблемами столкнулись и как эти проблемы будем решать. На данный момент можно сказать, что в планах стоит реализация механизма взаимодействия с KubeAPI и полировка бизнес-логики для соответствия актуальным задачам, а также их решение. Обязательно напишу вторую часть, в которой расскажу о проблемах при новой реализации, как мы их решали и обкатывали на проде. А еще поделюсь мыслями о том, как бы мы их решали, если бы делали это еще раз.