Как стать автором
Обновить

Коммуникация по RESTful API: синхронизируем данные с маркетплейсами

Уровень сложностиСредний
Время на прочтение33 мин
Количество просмотров1.6K

В статье разбирается код на Ruby и в Ruby on Rails, в частности, на примере задачи по разработке web-сервиса синхронизации данных с внешними источниками. Погружение в программное решение начинается с разбора бизнес-задачи. Через освещение реальных API с маркетплейсов OZON и Яндекс.Маркет обосновываются способы принятия архитектурных решений и способы оптимизации кода. Эта статья также является авторской попыткой раскрыть принцы SOLID при реализации логики реального бэкенд приложения по переработке структурированных данных в условиях эксплуатации стороннего REST API.

I. Введение

1. О контексте бизнес-задачи

Мы обращаемся к теме продаж через маркетплейсы по причине её огромной популярности. Поскольку площадка продаж электронная, то и взаимодействие с ней подразумевает использование автоматизированных программных инструментов. В качестве таковых выступает либо личный кабинет клиента маркетплейса, либо стороннее программное решение, общающееся с маркетплейсом через REST API. Нас, как разработчиков, будет интересовать второй вариант в контексте данной статьи.

Попробуем подробнее описать контекст стоящей перед нами задачи по разработке сервиса, помогающего предпринимателям взаимодействовать с маркетплейсами. Вообще маркетплейсы выбираются в качестве площадки продаж по причине ожидаемого гарантированного трафика потенциальных покупателей. Однако доступ к услугам маркетплейсов предполагает некоторые расходы в процентах от продаж. Поэтому у предпринимателей к определенному моменту времени собирается пул вопросов, требующий независимой экспертной оценки. Например: насколько популярна товарная категория продавца на конкретном маркетплейсе; как расширить аналитику по продажам, предоставляемую через маркетплейс; как лучше распределять товарные запасы между складом от маркетплейса и своим собственным. Предполагается, что в данном случае было бы идеально собрать свою собственную независимую статистику с различных маркетплейсов перед принятием дальнейших стратегических решений по выбору идеальной торговой площадки для своей группы товаров. Вот здесь и возникает потребность в некоторой автоматизации, когда привлекают разработчиков к её воплощению.

На данном этапе разработчикам важно понимать, что перед ними не стоит задачи разрабатывать новый программный продукт с нуля, и в тоже время решение должно быть настолько автономным, чтобы заинтересовать предпринимателя в эксплуатации их сервиса на долгосрочной основе. Тут мы приходим к первой идее будущего программного сервиса, которая заключается в копировании структуры данных с маркетплейса в собственное решение. Эта структура будет базироваться на трех фундаментальных сущностях - Товар, Заказ, Транзакция. Под Товаром будем понимать совокупность данных, касающихся номенклатуры продаж. Под Заказом будем рассматривать совокупность данных, характеризующих перемещение товара от продавца к клиенту. И под Транзакцией будем рассматривать совокупность финансовых обязательств, возникающих на момент совершения сделки покупателя с продавцом.

Внимательный читатель может сделать замечание о том, что здесь не хватает ещё одной важной сущности – Склад, покрывающей ряд организационных моментов, связанных с хранением товаров и логистикой. Это справедливо в отношении производителей и глобальных импортёров. Однако на текущий момент мы рассматриваем розничных продавцов, для которых идеальная схема продаж – «с колёс». Поэтому для них эта сущность, очевидно, может быть избыточной, а следовательно, и мы последуем их логике.

2. О предмете автоматизации

Чтобы приблизиться к более детальному исследованию программного решения, которое будет разрабатываться для потенциальных заказчиков-предпринимателей, разберём ряд вопросов, над которыми надо подумать разработчикам перед тем, как приступать к кодингу.

Потенциальными потребителями автоматизированного решения со стороны заказчика будут лица, принимающие решения, менеджеры по продажам и маркетологи. Ещё предполагается, что время от времени к разработчикам будут обращаться представители от заказчика за технической поддержкой. На данном этапе стоит осознать, что у одного заказчика не будет средств на поддержание и развитие сложного автоматизированного решения. С самого начала разработки Web-сервиса стоит иметь ввиду, что им будет пользоваться условно неограниченное количество пользователей. При этом большинство из них уже должны признавать недостаточность для их бизнеса публичных средств от маркетплейсов и нуждаться в дополнительных программных инструментах. То есть разработчикам предстоит создать Web-приложение, обладающее своей собственной системой авторизации и некоторым количеством сервисов по обработке данных с маркетплейсов, а также рядом инструментов, обеспечивающих внутреннюю коммуникацию между разработчиками и пользователями (продавцами).

На данный момент мы определили предмет автоматизации и тип приложения. Далее перейдём к его архитектурным особенностям и инфраструктурным потребностям. Поскольку приложение должно обеспечить коммуникацию с маркетплейсами – поставщиками данных, и коммуникацию с пользователями – потребителями данных, то мы можем воспроизвести такую же модель Web-приложения, состоящего из нескольких относительно автономных частей. То есть требуется создать фоновое приложение, умеющее коммуницировать по REST API в глобальной сети с поставщиками данных, и визуальное приложение, умеющее отображать систематизированную информацию из собранных данных на дэшбордах. Это две активные части Web-приложения. В качестве пассивной части приложения, и фактически связующим звеном между двумя перечисленными, выступает хранилище данных.

Итак, у нас уже начинает складываться структура Web-приложения, которое должно состоять из фронтенда для коммуникации с пользователями и бэкенда, обслуживающего фронтенд данными из БД. И ещё из одного бэкенда, коммуницирующего с маркетплейсами, и из реляционной базы данных, хранящей историю данных клиента. Мы уже понимаем, что по мере увеличения нагрузки на арендуемые сервера приложение будет масштабироваться горизонтально посредством запуска большего количества параллельно работающих модулей над однотипными задачами. По мере увеличения нагрузки на СУБД её можно будет превратить в кластер, чтобы отделить аналитический блок от сервисного блока по сбору и актуализации данных о клиентах.

На этом вводную часть статьи мы завершаем. Дальнейшей темой нашего дискурса будет задача по загрузке данных с разных маркетплейсов в собственную базу данных. В рамках этой задачи обсудим механизм приёма данных, способ обработки данных с отделением бизнесовых вопросов от деталей технической реализации самого процесса загрузки, а также разберём вопрос трансформации данных перед их сохранением в базу данных.

II. REST API. Открываем двери на маркетплейсы

Для коммуникации с маркетплейсами организуем сервис-приложение, которое будет работать в фоновом режиме по расписанию. Далее сконцентрируемся на подзадаче по загрузке Товаров чтобы развернуть детали организуемой нами внутренней структуры приложения. Некоторая часть этой программной структуру также будет использована для загрузки данных о Заказах и Транзакциях. Сразу отметим, что хранение на собственных серверах «сырых» данных с маркетплейсов о каждом заказчике – дорогое мероприятие, поэтому имеет смысл предусмотреть первичную упаковку данных на этапе её загрузки с маркетплейсов в объёме, необходимом строго для будущей аналитической системы.

1. Собираемые данные

Маркетплейсы предоставляют подробное описание API, которое мы будем использовать при проектировании нашего сервиса:

Мы бы хотели сохранять информацию из указанных методов в одну таблицу с привязкой к пользователю. В качестве описания товара было выбрано:

name

Название товара

description

Подробное описание товара

images

Массив ссылок на изображения

category_title

Наименование категории, к которой продавец относит свой товар.

offer_id

Идентификатор товара в системе продавца — артикул

product_id

Уникальный идентификатор товара на маркетплейсе

skus

Массив идентификаторов товара SKU (stock keeping unit — единица складского учёта)

schemes

Перечень наименований схем складского учёта, по которым продаётся товар

barcodes

Все штрихкоды товара

price

Цена товара с учётом скидок — это значение показывается на карточке товара

status

Идентификатор Состояния товара. На разных маркетплейсах это различные величины. Поэтому мы будем сводить их к следующему перечню значений — preliminary, on_moderation, failed_moderation, published, unpublished, archived

/app/services/yandex/products_downloader/importing_scheme.rb

module Yandex
  class ProductsDownloader
    module ImportingScheme
      def prepare_product(product, item)
        offer = item[:offer]
        mapping = item[:mapping]
        product.assign_attributes(
          {
            name:           offer.fetch(:name, ''),
            barcodes:       offer.fetch(:barcodes, []),
            price:          find_price(offer),
            status:         find_status(offer),
            schemes:        find_schemes(offer),
            images:         offer.fetch(:pictures, []),
            description:    offer.fetch(:description, nil),
            product_id:     mapping.fetch(:marketModelId, nil),
            skus:           find_skus(mapping),
            category_title: find_category_title(offer, mapping),
            scrub_status:   'success'
          }
        )
        product
      end
    end
  end
end

/app/services/ozon/products_downloader/importing_scheme.rb

module Ozon
  class ProductsDownloader
    module ImportingScheme
      def prepare_product(product, item)
        product.assign_attributes(
          {
            name:           item.fetch(:name, ''),
            offer_id:       item[:offer_id],
            barcodes:       find_barcodes(item),
            price:          find_price(item),
            status:         find_status(item),
            schemes:        find_schemes(item),
            images:         item[:images],
            skus:           find_skus(item),
            category_title: find_category_title(item),
            stock:          item.dig(:stocks, :present),
            scrub_status:   'success'
          }
        )
        product
      end

При разработке учётной системы товаров с маркетплейсов формирование такой сводной таблицы с характеристиками товара обязывает разработчика учесть несколько нюансов. Далее по тексту перечислим эти нюансы и подробно обсудим их детали.

2. Периодичность обновления информации о товарах

Предполагается, что продавец сможет управлять процессом актуализации базы товаров в приложении вручную по собственному усмотрению. Эта возможность предоставляется из соответствующего контроллера.

/app/controllers/api/v1/credentials_controller.rb

module Api
  module V1
    class CredentialsController < ApplicationController
      before_action :fetch_credential, only: %i[update archive descriptions]
 
      def update
        resp, status = Tasks::ReimportProducts.new(true, @mp_credential, I18n.t('messages.users_control')).call
        render json: resp, status:
      end

      private

      def fetch_credential
        @mp_credential = fetch_credentials.find_by(id: params[:id])
        if @mp_credential.nil?
          render json: {
            errors: [{
              code: 'error',
              title: I18n.t('errors.not_found', class_name: 'MarketplaceCredential')
            }]
          }, status: 404
        else
          @mp_credential
        end
      end

      def fetch_credentials
        current_user.marketplace_credentials
      end
    end
  end
end

Но со своей стороны мы также представляем жизненный цикл описания товара статичным в течении суток, после чего он может измениться. Этот правило положим в основу работы приложения по умолчанию через менеджера задач CRON.

/clock.rb

module Clockwork
  every(1.day, 'Import products', at: '23:00') do
    MarketplaceInteraction::ImportProductsJob.perform_later
  end
end

Из личного опыта работы с маркетплейсами отметим здесь, что иногда информация о товарах продавца перестаёт поступать от маркетплейса и нам нужно контролировать такой процесс деградации данных. Поэтому был введён ещё один технический статус товара:

scrub_status

Статус обработки товара:
unspecified — устанавливается при первичном заведении товара из учётной системы;
success — устанавливается при успешном обновлении информации о товаре;
gone — устанавливается, когда маркет сообщил о завершении доставки данных о товарах, а в базе осталась необработанная позиция.

После введения на маркетплейсах режима загрузки архивных товаров такая опция перестала быть актуальной, но её можно использовать при отключении режима загрузки архивных данных.

/app/services/yandex/products_downloader.rb

module Yandex
  class ProductsDownloader
    attr_reader :mp_credential, :http_client, :parsed_ids

    private

    def tag_lost_products!
      products = Product.where(marketplace_credential_id: mp_credential.id)
      lost_offer_ids = products.pluck(:offer_id) - parsed_ids

      products.where(
        offer_id: lost_offer_ids
      ).update_all(scrub_status: 'gone')
    end
  end
end

/app/services/ozon/products_downloader.rb

module Ozon
  class ProductsDownloader
    attr_reader :mp_credential, :parsed_ids, :http_client_list, :http_client_info, :http_client_description

    private

    def tag_lost_products!
      products = Product.where(marketplace_credential_id: mp_credential.id)
      lost_product_ids = products.pluck(:product_id) - parsed_ids.keys

      products.where(
        product_id: lost_product_ids
      ).update_all(scrub_status: 'gone')
    end
  end
end

3. У товаров на разных маркетплейсах существуют различные способы их уникальной идентификации

На Яндекс.Маркет — это offerId. У OZON — это product_id или даже просто id, как «Номер задания на формирование документов».

Это является причиной разнесения процедуры обновления товаров от разных маркетплейсов в разные классы. Только в таком случае возможно сопоставлять поступившие данные с данными из базы по уникальному id от маркетплейса.

/app/services/ozon/products_downloader/importing_scheme.rb

module Ozon
  class ProductsDownloader
    module ImportingScheme
      def iterate(list_by_id, exists = [], updated_products = [], updated_fields = [])
        Product.where(
          marketplace_credential_id: mp_credential.id,
          product_id: list_by_id.keys
        ).find_each do |product|
          exists << product.product_id
          product = prepare_product(product, list_by_id[product.product_id])
          @parsed_ids[product.product_id] = 1
          # We can record the changes somewhere.
          # pp("product.changes=",product.changes) if product.changed?
          if product.changed?
            updated_products << product
            updated_fields += product.changes.keys
          end
        end
        [updated_products, updated_fields, exists]
      end
    end
  end
end

/app/services/yandex/products_downloader/importing_scheme.rb

module Yandex
  class ProductsDownloader
    module ImportingScheme
      def iterate(list_by_id, exists = [], updated_products = [], updated_fields = [])
        Product.where(
          marketplace_credential_id: mp_credential.id,
          offer_id: list_by_id.keys
        ).find_each do |product|
          exists << product.offer_id
          product = prepare_product(product, list_by_id[product.offer_id])
          # We can record the changes somewhere.
          # pp("product.changes=",product.changes) if product.changed?
          if product.changed? && imported?(product)
            updated_products << product
            updated_fields += product.changes.keys
          end
        end
        [updated_products, updated_fields, exists]
      end
    end
  end
end

Кстати, у Яндекс.Маркет нет однозначной характеристики, соответствующей нашему параметру product_id. Наиболее близким по семантике становится marketModelId — что в описании API определяется как «Идентификатор модели на Маркете, который может отсутствовать в ответе, если товар ещё не привязан к карточке».

4. Тип данных для идентификаторов

У OZON для offer_id, product_id, sku реализован самый очевидный формат — числовой. Однако offerId на Яндекс.Маркет уже строковый. На других маркетплейсах тоже часто используется строковый тип данных для перечисленных параметров. Поэтому у нас идентификаторы будем хранить только в строках. Эта информация является востребованной СУДБ.

/db/migrate/20240106161532_create_products.rb

class CreateProducts < ActiveRecord::Migration[7.1]
  def up
    create_enum :product_scrub_status, ["unspecified", "success", "gone"]
    create_enum :product_status, ["preliminary", "on_moderation", "failed_moderation", "published", "unpublished", "archived"]

    execute <<-SQL
      DO $$
      BEGIN
        IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'monetary_amount') THEN
          CREATE TYPE monetary_amount AS
          (
            value     float,
            currency  VARCHAR(3)
          );
        END IF;
      END$$;
    SQL

    create_table :products do |t|
      t.references :marketplace_credential, type: :uuid, null: false, foreign_key: true
      t.string :offer_id, comment: 'client SKU'
      t.string :product_id, comment: 'marketplace object, articule or model'
      t.string :name, null: false
      t.text :description
      t.string :skus, array: true, comment: 'marketplace SKUs'
      t.string :images, array: true
      t.string :barcodes, array: true
      t.enum :status, enum_type: :product_status, default: "preliminary", null: false
      t.enum :scrub_status, enum_type: :product_scrub_status, default: "unspecified", null: false
      t.column :price, :monetary_amount
      t.integer :stock
      t.string :category_title
      t.string :schemes, array: true, comment: 'sales schemes'

      t.timestamps
    end
  end
end

5. О пакетной загрузке данных

Маркетплейсы имеют разные стратегии обслуживания клиентских приложений по API. Яндекс.Маркет быстро отдаёт необходимые данные, накладывая лишь ограничение на количество товаров в одном запросе в размере 200 позиций и разрешая до 600 запросов в минуту. Это значит, что в минуту можно загружать до 120_000 товарных позиций, чего с лихвой должно хватать для большинства продавцов на маркетплейсе без привлечения специальных алгоритмических техник по соблюдению rate limits при коммуникации с маркетплейсом. Для клиентов с большим количеством данных при установленных лимитах от маркетплейса можно позволить себе оставаться с алгоритмом синхронной обработки данных, и воспользоваться слиппером для пережидания ограничения на запрос к маркетплейсу.

/app/services/yandex/sleeper.rb

module Yandex
  module Sleeper
    # INPUT:
    # headers = {
    #   Date: Tue, 09 Jan 2024 09:14:10 GMT
    #   X-RateLimit-Resource-Until: Tue, 09 Jan 2024 09:15:00 GMT
    # }
    def do_sleep(headers, duration)
      dt =
        Time.parse(headers['X-RateLimit-Resource-Until']) -
        Time.parse(headers['Date']) +
        1
      if dt > duration
        Rails.logger.error I18n.t('errors.duration_of_rate_limit_has_been_changed',
                                  marketplace_name: 'Yandex')
      else
        sleep dt
      end
    rescue StandardError => e
      # We are checking the code. It's fixable
      ErrorLogger.push_trace e
    end
  end
end

При этом можно воспользоваться заголовочными параметрами X-RateLimit-Resource-Remaining и X-RateLimit-Resource-Until из последнего запроса к маркетплейсу.

/app/services/yandex/api/offer_mappings.rb

module Yandex
  class Api
    class OfferMappings < Api
      MAX_REQUESTS_PER_MINUTE = 600
      RATE_LIMIT_DURATION = 60

      include Yandex::Sleeper

      private

      # INPUT:
      # headers = {
      #   X-RateLimit-Resource-Remaining: 600
      # }
      def delay_if_limits(headers)
        if headers.fetch(
          'X-RateLimit-Resource-Remaining',
          MAX_REQUESTS_PER_MINUTE
        ).to_i < limiting_remaining_requests
          do_sleep(headers, RATE_LIMIT_DURATION)
        end
      end
    end
  end
end

У OZON нет лимитов на количество загрузок данных в единицу времени, но при этом в целом ситуация немного хуже по нескольким причинам. Первая, OZON задерживает время отклика на запрос. Вторая, OZON исключает возможность получать данные из одного метода с маркетплейса по ожидаемому перечню параметров. Только из отдельных запросов можно получить данные об описании продукта description и о наименовании категории товара category_title.

Что касается каталог с категориями товаров, то его можно скачать у OZON отдельно.

Дерево категорий и типов товаров (версия 2)

/app/services/ozon/load_categories.rb

Struct.new('CategoryOzon', :name, :id, :disabled)

module Ozon
  class LoadCategories
    def call
      @http_client = http_client
      list = http_client_call
      @imported_list = []
      scalp(
        list: list[:result],
        previous_category: Struct::CategoryOzon.new(name: '')
      )
      OzonCategory.import @imported_list,
                          on_duplicate_key_ignore: true,
                          on_duplicate_key_update: {
                            conflict_target: %i[
                              description_category_id
                              type_id
                            ],
                            columns: %i[
                              category_name
                              category_disabled
                              type_name
                              type_disabled
                            ]
                          }
    end
  end
end

В таком случае можно использовать дополнительные запросы к собственной базе во время загрузки описаний товаров.

/app/services/ozon/products_downloader/importing_scheme.rb

module Ozon
  class ProductsDownloader
    module ImportingScheme
      def find_category_title(item)
        return if item[:description_category_id].blank? && item[:type_id].blank?

        CashOzonCategory.get(
          item[:description_category_id] || 0,
          item[:type_id] || 0
        )
      end
    end
  end
end

Использование дополнительных запросов к базе по каждой товарной позиции является плохой практикой, поэтому стоит прибегнуть к механизму кеширования.

/app/services/cash_ozon_category.rb

class CashOzonCategory
  class << self
    def o_cat
      @o_cat ||= Kredis.hash 'ozon_categories'
    end

    def get(category, type)
      o_cat["#{category}_#{type}"] || take_cat(category, type)
    end

    def take_cat(category, type)
      obj = OzonCategory.find_by(
        description_category_id: category,
        type_id: type
      )
      return if obj.nil?

      o_cat["#{category}_#{type}"] = "#{obj.category_name}/#{obj.type_name}"
    end
  end
end

Загружать на ежедневной основе каталог категорий нет необходимости, и какого-то явного графика в обязательном осуществлении обновления каталога категорий тоже не прослеживается. Поэтому вопрос обновления каталога товаров для OZON оставим до момента появления проблемы с прекращением заполнения позиции с категорией товара в каталоге продавца внутри нашей системы.

Теперь вернёмся к текстовому описанию продукта description, который можно загружать строго под одну товарную единицу, как разрешает OZON.

Получить описание товара

15 тыс. товарных позиций грузится около 5 минут. Это очень долго. Поэтому мы выносим этот процесс в фоновую задачу, отделённую от процедуры загрузки описаний товаров.

/app/services/ozon/products_downloader/import_desriptions.rb

module Ozon
  class ProductsDownloader
    module ImportDesriptions
      def downloading_product_desriptions
        Product.where(
          marketplace_credential_id: @mp_credential.id,
          product_id: @parsed_ids.keys
        ).find_in_batches(batch_size: 100) do |products|
          updated_products = []
          products.each do |product|
            product.description = load_description(product.product_id)
            updated_products << product if product.changed?
          end
          if updated_products.any?
            Product.import(updated_products,
                           on_duplicate_key_ignore: true,
                           on_duplicate_key_update: {
                             conflict_target: %i[
                               marketplace_credential_id
                               product_id
                               offer_id
                             ],
                             columns: %i[
                               description
                             ]
                           })
          end
        end
      end
    end
  end
end

Импорт описаний товаров тоже пусть запускается из CRON задачи:

/clock.rb

module Clockwork
  every(1.day, 'Import product descriptions (for Ozon)', at: '1:00') do
    MarketplaceInteraction::ImportProductDescriptionsJob.perform_later
  end
end

Поскольку OZON считает данные с описаниями товаров несущественными для API приложения, то имеет смысл в предоставлении продавцам самостоятельного решения - осуществлять или нет загрузку описаний товаров с OZON в нашу платформу. Поэтому выносим выключатель загрузчика описаний товаров в эндпоинт приложения.

/app/controllers/api/v1/credentials_controller.rb

module Api
  module V1
    class CredentialsController < ApplicationController
      before_action :fetch_credential, only: %i[update archive descriptions]

      def descriptions
        value = Handles::ProductsDownloader.to_bool(params[:value])
        @mp_credential.autoload_descriptions.value = value if value.in? [true, false]
        render json: {
          marketplace_credential: {
            id: @mp_credential.id,
            autoload_descriptions: @mp_credential.autoload_descriptions.value
          }
        }
      end

      private

      def fetch_credential
        @mp_credential = fetch_credentials.find_by(id: params[:id])
        if @mp_credential.nil?
          render json: {
            errors: [{
              code: 'error',
              title: I18n.t('errors.not_found', class_name: 'MarketplaceCredential')
            }]
          }, status: 404
        else
          @mp_credential
        end
      end
    end
  end
end

6. Привязка списка продуктов к учётным данным пользователя на маркетплейсе

В начале этой главы было упомянуто, что списки товаров должны быть связаны с пользователями сервиса. На самом деле Яндекс.Маркет подсказывает, что у пользователя на маркетплейсе может быть до нескольких бизнесов. Собственно эти бизнесы и являются промежуточным звеном между пользователями и товарами. То есть схема базы данных будет выглядеть следующим образом:

ER-диаграмма для marketplace_credentials
ER-диаграмма для marketplace_credentials

В этом случае коммуникация приложения с маркетплейсами будет устанавливаться через регистрационные данные пользователя marketplace_credentials. Поэтому, чтобы работать с маркетплейсом по REST API, Яндекс.Маркет в личном кабинете пользователя выдаёт token, а OZON - api_key.

Дальнейшая коммуникация с «рабочей средой» маркетплейса осуществляется посредством добавления в заголовочную часть запроса авторизационных данных:

Что такое рабочая среда

/app/services/ozon/api.rb

module Ozon
  class Api < BaseApi
    def headers
      super.merge(
        {
          'Api-Key' => @api_key,
          'Client-Id' => @client_id,
          'x-o3-app-name' => ENV.fetch('OZON_APP_ID')
        }
      )
    end
  end
end

Авторизация для запросов магазина к Маркету

/app/services/yandex/api.rb

module Yandex
  class Api < BaseApi
    def headers
      super.merge(
        {
          'Authorization' => "OAuth oauth_token=\"#{@token}\", oauth_client_id=\"#{ENV.fetch('YANDEX_APP_ID')}\""
        }
      )
    end
  end
end

При длительной эксплуатации ключи доступа могут устаревать по разным причинам, поэтому мы должны предусмотреть автоматическую проверку регистрационных данных на их актуальность. В случае OZON такую проверку можно осуществить через любой запрос к существующему методу API.

В случае Яндекс.Маркет проверка регистрационных данных может быть осуществлена через конкретный URL, который в своём адресе может содержать какой-нибудь дополнительный Id. Например, запрос за описаниями товаров содержит в себе business_id:

https://api.partner.market.yandex.ru/businesses/{businessId}/offer-mappings

Актуальность этого Id можно проверить только через реальный запрос

Информация о товарах в каталоге

чтобы по реальному HTTP-ответу получить статус ошибки или подтверждение об успешности выполнения запроса

Типы ошибок и что с ними делать

В нашем случае можно использовать универсальный механизм для обоих маркетплейсов по обработке статусов HTTP-запросов

/app/services/base_api.rb

class BaseApi
  EXACT_ERROR_RAISER = {
    420 => ->(status, msg, mp_id) { raise MarketplaceAggregator::ApiLimitError.new(status, msg, mp_id) },
    401 => ->(status, msg, mp_id) { raise MarketplaceAggregator::ApiAccessDeniedError.new(status, msg, mp_id) },
    403 => ->(status, msg, mp_id) { raise MarketplaceAggregator::ApiAccessDeniedError.new(status, msg, mp_id) }
  }.freeze

  RANGE_ERROR_RAISER = {
    (400...500) => ->(status, msg, mp_id) { raise MarketplaceAggregator::ApiBadRequestError.new(status, msg, mp_id) },
    (500..) => ->(status, msg, mp_id) { raise MarketplaceAggregator::ApiError.new(status, msg, mp_id) }
  }.freeze

  def call(method:, raise_an_error: false, params: {}, body: {})
    # Let's define an HTTP method for a specific endpoint
    @raise_an_error = raise_an_error
    send(method, params, body)
  end

  private

  def get(params = {}, _ = {})
    api_call do
      connection.get do |req|
        req.url url
        req.params = params
        req.headers = headers
      end
    end
  end

  def post(params = {}, body = {})
    api_call do
      connection.post do |req|
        req.url url
        req.params = params
        req.headers = headers
        req.body = body.to_json
      end
    end
  end

  def api_call
    response = yield
    @status = response.status
    @response_content_type = content_type(response.headers)
    body = response_parse(response.body)
    raise_error(
      error_message(body)
    )

    [@status, response.headers, body]
  end

  def content_type(headers)
    case headers.[]('content-type')
    when %r{application/json}
      :json
    when %r{text/html}
      :html
    else
      :any
    end
  end

  def response_parse(body)
    if response_content_type == :json
      begin
        JSON.parse body, symbolize_names: true
      rescue JSON::ParserError => e
        ErrorLogger.push e
        {}
      end
    else
      body
    end
  end

  def raise_error(message)
    return unless @raise_an_error

    EXACT_ERROR_RAISER[status]&.call(status, message, @marketplace_credential.id)

    RANGE_ERROR_RAISER.each do |k, v|
      v.call(status, message, @marketplace_credential.id) if k.include?(status)
    end
  end
end

Недостаток проверки регистрационных данных через реальный запрос заключается в сокращении количества допустимых запросов в единицу времени (то есть в расходовании rate limits).

По идее, в разрешении данной проблемы должен помочь HTTP-метод HEAD, вида:

irb(main):01> response = Faraday.head(
    'https://api.partner.market.yandex.ru/businesses/:business_id/offer-mappings.json',
    { limit: 1 },
    {
      'Authorization' => "OAuth oauth_token=\"y0_token\", oauth_client_id=\"#{ENV.fetch('YANDEX_APP_ID')}\"",
      'Content-Type' => 'application/json'
    }
  )

На который мы бы получили статус ответа, вида:

irb(main):02> response.status
=> 200

Но мы, к сожалению, получаем только статус 405:

irb(main):02> response.status
=> 405

что означает, https://developer.mozilla.org/ru/docs/Web/HTTP/Status - Method Not Allowed

Поэтому был найден альтернативный способ проверки регистрационных данных через curl-запрос с параметром -I

/app/services/yandex/check_credentials.rb

module Yandex
  class CheckCredentials < BaseCheckCredentials
    def bash_command
      <<~`BASH`
        curl -I \
        -H 'Authorization: OAuth oauth_token=#{@mp_credential.credentials['token']}, \
        oauth_client_id=#{ENV.fetch('YANDEX_APP_ID')}' \
        -X POST https://api.partner.market.yandex.ru/businesses/\
        #{@mp_credential.credentials['business_id']}/offer-mappings.json?limit=1
      BASH
    end
  end
end

В HTTP-ответе находится только текст со статусом и заголовками. Проверка HTTP-ответа регулярным выражением помогает выяснить реальный статус на запрос с введёнными регистрационными данными:

/app/services/base_check_credentials.rb

class BaseCheckCredentials
  def call
    curl = bash_command

    case message = curl.split("\r\n").first
    when %r{(HTTP/1.1 200)|(HTTP/2 200)}
      { ok: true }
    when %r{(HTTP/1.1 403)|(HTTP/2 403)|(HTTP/1.1 401)|(HTTP/2 401)|(<html>)}
      # find out details from e.message of POST request
      redo_post_request
    else
      { errors: message }
    end
  end
end

Таким образом мы повысили управляемость техническим состоянием сервиса по доставке данных с маркетплейсов.

7. Как достать данные клиента о товарах на маркетплейсе

Разобранный выше (раздел 5) способ загрузки данных с маркетплейса Ozon не заканчивается на освещённых проблемах отдельной загрузки описаний и категорий товаров. Метод выдачи списка товаров «Получить список товаров по идентификаторам» работает только при явном указании перечня идентификаторов для запрашиваемых товаров. На выбор предлагается три категории идентификаторов – offer_id, product_id, sku. Вопрос: откуда их взять? Как узнать от маркетплейса, какие товары вообще размещены пользователем на маркетплейсе?

Ответом на поставленный вопрос становится метод «Список товаров», который в пакетном режиме выдаёт списки offer_id и product_id товаров, размещённых продавцом на маркетплейсе.

В этом методе API также предоставляется возможность указания фильтров по visibility товара. Для нас актуальны два значения — ALL и ARCHIVED.

В итоге полный цикл загрузки характеристик товаров продавца с маркетплейса сводится к следующим трём этапам.

/app/services/ozon/products_downloader/downloading_scheme.rb

Первый этап. Загружаем все товары, кроме архивных.

      def downloading_unarchived_products
        @archive = false
        benchmarking(
          -> { "import: :mp_credential[#{@mp_credential.id}] — actual[#{@parsed_ids.size}] — Done" }
        ) { circle_downloader }
        @total = @parsed_ids.size
      end

Второй этап. Загружаем архивные товары.

      def downloading_archived_products
        if @mp_credential.autoload_archives.value
          @archive = true
          benchmarking(
            -> { "import: :mp_credential[#{@mp_credential.id}] — archived[#{@parsed_ids.size - @total}] — Done" }
          ) { circle_downloader }
        end
      end

Третий этап. Загружаем описания товаров по списку товаров из собственной БД.

      def downloading_desriptions
        if @mp_credential.autoload_descriptions.value
          benchmarking(
            -> { "import: :mp_credential[#{@mp_credential.id}] — desriptions[#{@parsed_ids.size}] — Done" }
          ) { downloading_product_desriptions }
        end
      en

Первые два этапа базируются на одном и том же алгоритме, в котором выполняется цикл:

  • формируется запрос к странице со списком идентификаторов товаров

  • загружается пакет со списками product_id товаров

  • верифицируется успешность загрузки пакета данных с product_id

  • вызывается процедура пакетной загрузки характеристик товаров кроме описаний товаров с последующим импортом данных в локальную базу

  • достаётся токен следующей страницы для загрузки очередного пакета с product_id

  • прерывается цикл, если токен пустой

/app/services/ozon/products_downloader/downloading_scheme.rb

      def circle_downloader
        page_tokens = {}
        loop do
          status, _, body = @http_client_list.call(
            body: {
              filter: {
                visibility: (@archive ? 'ARCHIVED' : 'ALL')
              },
              limit: PAGE_LIMIT
            }.merge(page_tokens)
          )
          break_if_http_error(status) if status != 200

          items = (body&.dig(:result, :items) || []).map { |elem| elem[:product_id] }
          break if items.blank?

          download_product_info_list(items)

          page_token = body&.dig(:result, :last_id)
          if page_token.blank?
            break
          else
            page_tokens = { last_id: page_token }
          end
        end
      end

Для сравнения Яндекс.Маркет работает в два этапа. Первый этап — загрузка всех товаров, кроме архивных. Второй этап — загрузка архивных данных. А цикл загрузки товаров выглядит следующим образом:

  • формируется запрос к странице с характеристиками товаров

  • загружается пакет с характеристиками товаров

  • верифицируется успешность загрузки пакета данных с характеристиками товаров

  • вызывается процедура импорта пакета данных с характеристиками товаров в локальную базу данных

  • достаётся токен следующей страницы для загрузки очередного пакета с характеристиками товаров

  • прерывается цикл, если токен пустой

/app/services/yandex/products_downloader/downloading_scheme.rb

      def circle_downloader
        page_tokens = {}
        loop do
          status, _, body = @http_client.call(
            params: LIMITS.merge(page_tokens),
            body: { archived: @archive }
          )
          if status == 200
            items = body&.dig(:result, :offerMappings) || []
            break if items.blank?

            import_payload(items)
          else # any other status anyway
            # To be safe, but we shouldn't get here.
            # This is possible if the status is < 400 and the status is != 200.
            raise MarketplaceAggregator::ApiError.new(
              status,
              I18n.t('errors.something_went_wrong'),
              mp_credential.id
            )
          end

          page_token = body&.dig(:result, :paging, :nextPageToken)
          if page_token.blank?
            break
          else
            page_tokens = { page_token: }
          end
        end
      end

III. Оптимизация кода. Повышение надёжности

1. Откуда берётся бизнес логика

Загрузка данных с маркетплейсов осуществляется с помощью HTTP-клиента. В нашем случае используется Faraday.

/app/services/connection.rb

class Connection
  def self.start
    @conn ||= Faraday.new do |config|
      config.request :retry, RETRY_OPTIONS
      config.adapter Faraday.default_adapter
    end
  end
end

Предполагается, что в целях обеспечения контакта с маркетплейсом во время обслуживания одной сессии или работы одной фоновой задачи может быть выполнено до нескольких обращений к маркетплейсу. При этом достаточно одного HTTP-клиента, поэтому при объявлении нового HTTP-клиента используется оператор ||= присвоения по условию отсутствия предварительного подключения к нему. Поскольку диапазон задач, для которых будет использован данный клиент, может быть различным, то мы минимизируем на данном этапе его конфигурацию. Конфигурация будет отличаться, например, при GET и POST запросах.

/app/services/base_api.rb

class BaseApi
  attr_accessor :connection

  def initialize(mp_credential, options = {})
    @connection = Connection.start
    @marketplace_credential = mp_credential
    @raise_an_error = false
  end

  private

  def get(params = {}, _ = {})
    api_call do
      connection.get do |req|
        req.url url
        req.params = params
        req.headers = headers
      end
    end
  end

  def post(params = {}, body = {})
    api_call do
      connection.post do |req|
        req.url url
        req.params = params
        req.headers = headers
        req.body = body.to_json
      end
    end
  end
end

Общий каркас настройки HTTP-клиента на этом заканчивается. Далее перейдём к реализации дополнительных параметров подключения к разным маркетплейсам из отдельных классов.

Для установления контакта с маркетплейсом используются параметры url, params, headers, body. Два из них — params и body — являются входными хэшами при вызове соответствующих методов .get и .post при обращении к Web-сервису. Два других параметра — url и headers можно настроить автоматически из отдельных классов для каждого маркетплейса. Headers содержат регистрационные данные на маркетплейсах, и они уже упоминались выше в разделе I.6 — /app/services/ozon/api.rb, /app/services/yandex/api.rb.

Url от API-методов также предполагается объявлять в отдельных классах в целях соблюдения «Принципа единственной ответственности». Так повышается Cohesion внутри кода, обслуживающего HTTP-клиента, и понижается Coupling между HTTP-клиентом и другими частями кода по переработке различных данных, загруженных с маркетплейсов. /Cohesion и Coupling: отличия/

В нашем случае имеется один API-метод на загрузку продуктов с Яндекс.Маркет:

/app/services/yandex/api/offer_mappings.rb

module Yandex
  class Api
    class OfferMappings < Api
      def url
        "#{URL}/businesses/#{@business_id}/offer-mappings.json"
      end
    end
  end
end

и три API-метода на загрузку продуктов с OZON:

/app/services/ozon/api/product_list.rb

module Ozon
  class Api
    class ProductList < Api
      def url
        "#{URL}/v2/product/list"
      end
    end
  end
end

/app/services/ozon/api/product_info_list.rb

module Ozon
  class Api
    class ProductInfoList < Api
      def url
        "#{URL}/v2/product/info/list"
      end
    end
  end
end

/app/services/ozon/api/product_info_description.rb

module Ozon
  class Api
    class ProductInfoDescription < Api
      def url
        "#{URL}/v1/product/info/description"
      end
    end
  end
end

Загрузка данных о товарах осуществляется с каждого маркетплейса по своему алгоритму при уникальном наборе начальных данных.

/app/services/ozon/products_downloader.rb

module Ozon
  class ProductsDownloader
    def initialize(mp_credential)
      @mp_credential = mp_credential
      @http_client_list = Ozon::Api::ProductList.new(mp_credential)
      @http_client_info = Ozon::Api::ProductInfoList.new(mp_credential)
      @http_client_description = Ozon::Api::ProductInfoDescription.new(mp_credential)
      @parsed_ids = {}
    end

    def call
      return false if mp_credential.credentials.nil?

      download_products
      tag_lost_products!
      true
    end
  end
end

/app/services/yandex/products_downloader.rb

module Yandex
  class ProductsDownloader
    def initialize(mp_credential)
      @mp_credential = mp_credential
      @http_client = Yandex::Api::OfferMappings.new(mp_credential)
      @parsed_ids = []
    end

    def call
      return false if mp_credential.credentials.nil?

      download_products
      tag_lost_products!
      true
    end
  end
end

Процедуры импорта товаров на маркетплейсах запускаются из единого источника процессов - фоновая задача по импорту данных с маркетплейсов

/app/jobs/products/import_job.rb

module Products
  class ImportJob < ApplicationJob
    include MaBenchmarking

    queue_as do
      is_client_queue = arguments.first
      if is_client_queue
        :client_grabber_products
      else
        :marketplace_grabber_products
      end
    end

    DOWNLOADER_CLASS = 'ProductsDownloader'

    def perform(is_client_queue, mp_credential_id)
      @is_client_queue = is_client_queue
      @mp_credential = MarketplaceCredential.find_by(id: mp_credential_id)
      return if irrelevant?

      @mp_credential.update(last_sync_at_products: Time.current) if downloadable?
    end

    private

    def downloadable?
      import(@mp_credential.marketplace.to_constant_with(DOWNLOADER_CLASS).new(@mp_credential))
      true
      #... 
    end

    def import(downloader)
      benchmarking(
        -> { "import: :mp_credential[#{@mp_credential.id}] — [#{downloader.parsed_ids.size}] - OK" }
      ) { downloader.call }
    end
  end
end

Здесь важно обратить внимание на использование принципа инверсии зависимостей, когда по имени маркетплейса выбирается класс, в котором реализованы детали процедуры импорта - downloader.call /Принципы SOLID на примере ruby♦️/

Другим не менее значимым алгоритмическим подходом, реализованным в приведённом коде, является разнесение задачи импорта по разным очередям в зависимости от источника инициации процесса импорта. Это может быть клиентский запрос на немедленную загрузку данных, например, при регистрации нового пользователя в нашем Web-приложении. Такие запросы отправляются в очередь :client_grabber_products.

Вызов клиентского запроса на импорт данных осуществляется из соответствующего эндпоинта POST http://localhost:3000/api/v1/credentials

/app/controllers/api/v1/credentials_controller.rb

module Api
  module V1
    class CredentialsController < ApplicationController
      def create
        @mp_credential = fetch_credentials.new(credential_params)
        @mp_credential.fix_credentials!
        Operations::CheckCredentials.new(@mp_credential).call
        if @mp_credential.is_valid
          @mp_credential.save!
          Products::ImportJob.perform_later(true, @mp_credential.id)
          render json: {
            marketplace_credential: @mp_credential
          }
        else
          render json: {
            errors: [{
              code: 'error',
              title: I18n.t('errors.credentials_are_invalid'),
              detail: @mp_credential.credentials['errors']
            }]
          }, status: 400
        end
      end
    end
  end
end

В другом случае инициатором запроса на импорт данных может быть CRON-задача, о чём говорится в разделе I.2. Тогда такие запросы отправляются в очередь :marketplace_grabber_products.

Здесь стоит подчеркнуть, что в автоматической режиме на повторный импорт данных на ежедневной основе отправляется вся клиентская база:

/app/business_logic/tasks/import_products.rb

module Tasks
  class ImportProducts
    def call
      MarketplaceCredential.find_each(batch_size: 100) do |mp_credential|
        # 1. correcting client mistakes
        mp_credential.fix_credentials!
        # 2. verifying the validity of credentials
        Operations::CheckCredentials.new(mp_credential).call
        # 3. refresh the record
        mp_credential.save! if mp_credential.changed?
        # 4. cause direct data import
        Products::ImportJob.perform_later(false, mp_credential.id) if mp_credential.is_valid
      end
    end
  end
end

При этом алгоритм должен пройтись по всем клиентским данным, используемым для авторизации. Очевидно, что нам нужно перепроверить актуальность авторизационных данных перед попыткой запустить процедуру импорта товаров для последующего выявления изменений в базе. По идее, результатом проверки должны быть сделаны рассылки уведомлений тем пользователям, чьи регистрационные данные устарели и когда требуется их ручное обновление из кабинета маркетплейса. Отвечать за логику поведения данного программного блока должен администратор, который по мере накопления опыта взаимодействия с пользователями-продавцами должен иметь возможность оперативно менять данный функционал. В силу высокой зависимости данного участка кода Web-приложения от операционных намерений его менеджеров мы и вынесли этот код в отдельный раздел под названием app/business_logic.

Поскольку задача масс-импорта данных предполагает продолжительную загрузку, то здесь также организуется отдельная фоновая задача в отдельной ветке :marketplace_grabber_products.

/app/jobs/marketplace_interaction/import_products_job.rb

module MarketplaceInteraction
  class ImportProductsJob < ApplicationJob
    queue_as :marketplace_grabber_products

    def perform
      Tasks::ImportProducts.new.call
    end
  end
end

Аналогично организован код масс-импорта описаний по товарам с OZON:

/app/business_logic/tasks/import_product_descriptions.rb, /app/jobs/marketplace_interaction/import_product_descriptions_job.rb

2. Обработка ошибок

Существует несколько способов работы с программными ошибками. В данном приложении активно эксплуатируются четыре способа - перехват ошибок; генерация и перехват ошибок (Railway Oriented Programming); генерация и перехват ошибок, чтобы перезапустить задачу; генерация ошибок.

Под способом «перехвата ошибок» понимается допущение в будущем появления ошибок на этапе разработки. В задачу перехватчиков входит нормальное прерывание выполнения кода с логированием места и причины ошибки.

/app/jobs/products/import_job.rb

module Products
  class ImportJob < ApplicationJob
    def downloadable?
      import(@mp_credential.marketplace.to_constant_with(DOWNLOADER_CLASS).new(@mp_credential))
      true
    rescue NameError => e
      # We are checking the code. It's fixable
      ErrorLogger.push_trace e
      false
      #...
    end
  end
end

Здесь перехватывается ошибка неизвестного наименования класса для нового маркетплейса. Причина может быть в непосредственном имени или в неправильном размещении файла.

Второй способ, связанный с «генерированием и перехватом ошибок», эксплуатируется в двух случаях.

Первый случай — это генерация исключения с целью идентификации основания отказа в исполнении функции по эндпоинту и перехват исключения для возврата на запрос по эндпоинту форматированного ответа с указанием причины отказа

/app/controllers/application_controller.rb

class ApplicationController < ActionController::API
  rescue_from(MarketplaceAggregator::UnauthorizedError) do
    render json: {
      errors: [{
        code: 'error',
        title: I18n.t('errors.unauthorized')
      }]
    }, status: 401
  end

  rescue_from(MarketplaceAggregator::ForbiddenError) do
    render json: {
      errors: [{
        code: 'error',
        title: I18n.t('errors.forbidden')
      }]
    }, status: 403
  end

  def current_user
    raise MarketplaceAggregator::UnauthorizedError unless fetch_user_uuid

    @current_user ||= Client.find_by(id: fetch_user_uuid)
    raise MarketplaceAggregator::ForbiddenError unless @current_user

    @current_user
  end

  private

  def fetch_user_uuid
    request.headers['HTTP_USER']
  end
end

Такой способ генерации исключений до и после проверки заголовка запроса потребовался для соблюдения строгой привязки HTTP-запроса к зарегистрированному пользователю в базе данных.

В этом случае также следует учесть, что для запроса используется нестандартный заголовок запроса, поэтому Ruby on Rails добавляет к нему префикс HTTP_ . /https://github.com/rails/rails/blob/main/actionpack/lib/action_dispatch/http/headers.rb/

Таким образом, заголовок у запроса к эндпоинту выглядит как USER:registered_user_id, а внутри приложения следует использовать обращение за заголовком, как request.headers['HTTP_USER'].

Второй случай «генерирования и перехвата ошибок» предназначен для реализации Railway-ориентированного программирования при возвращении маркетплейсами статусов, отличных от значения 200.

Генерация исключения происходит в классе HTTP-клиента. Пример реализации был рассмотрен в разделе I.6 /app/services/base_api.rb .

Перехват исключения для этого случая осуществляется в методе downloadable? у класса с реализацией фоновой задачи импорта данных.

/app/jobs/products/import_job.rb

module Products
  class ImportJob < ApplicationJob
     def downloadable?
      import(@mp_credential.marketplace.to_constant_with(DOWNLOADER_CLASS).new(@mp_credential))
      true
    rescue NameError => e
      # We are checking the code. It's fixable
      ErrorLogger.push_trace e
      false
    rescue MarketplaceAggregator::ApiAccessDeniedError => e
      ErrorLogger.push e
      # that's all
      false
    rescue MarketplaceAggregator::ApiBadRequestError => e
      ErrorLogger.push e
      # that's all
      false
    rescue MarketplaceAggregator::ApiLimitError => e
      # restart the task taking into account restrictions on limits
      Tasks::ReimportProducts.new(@is_client_queue, @mp_credential, e).call
      false
    rescue MarketplaceAggregator::ApiError => e
      ErrorLogger.push e
      # TODO: restart task after an hour (for example)
      false
    end
  end
end

В большинстве случаев, как видно из кода, прерывание процедуры импорта не предполагает автоматизированного способа решения проблемы.

Кроме Второго способа обработки исключений здесь мы также видим Третий способ «генерирования и перехвата ошибок с целью перезапуска задачи по импорту данных» на строке с rescue MarketplaceAggregator::ApiLimitError. Это тот редкий случай, когда правила маркетплейса позволяют очевидным способом автоматически разрешить проблему последнего отказа в отклике по достижению rate limits.

/app/business_logic/tasks/reimport_products.rb

module Tasks
  class ReimportProducts
    def call
      # 1. check the history of ReimportProducts calls
      # It should be less than three times
      analyse do
        # 2. cause direct data import
        Products::ImportJob.set(wait_until: 1.minute.from_now)
                           .perform_later(@is_client_queue, @mp_credential.id)
      end
    end

  end
end

Здесь стоит рассказать об одном нюансе, учтённом в настройках HTTP-клиента при обработке статусов HTTP-ответов от маркетплейсов. Известно, что ответ по REST API может приходить не только от Web-приложения, но и от Web-сервера. Это ошибки 500-й серии. Некоторые из них являются некритическими для удержания контакта на незначительном промежутке времени посредством повторения запроса без отключения соединения с реципиентом. Пример такой настройки представлен в коде HTTP-клиента:

/app/services/connection.rb

class Connection
  RETRY_OPTIONS = {
    max: 5,                     # Retry a failed request up to 5 times
    interval: 2,                # First retry after 2s
    backoff_factor: 2,          # Double the delay for each subsequent retry
    retry_statuses: [500, 502, 503, 504]
    # Retry only when we get a 500, 502, 503 or 504 response
  }.freeze

  def self.start
    @conn ||= Faraday.new do |config|
      config.request :retry, RETRY_OPTIONS
      config.adapter Faraday.default_adapter
    end
  end
end

В заключении этого раздела разберём Четвёртый способ обработки ошибок, основанный на генерации ошибок без последующей их программной обработки. Это ситуация называется «проектированием по контракту», когда в родительском классе заявляется пул методов, обязательных к реализации в дочерних классах. Чтобы не забывать прописывать реализации обязательных методов и создаются искусственные условия, генерирующие критические прерывания программы. Ещё такой подход в программировании называют Принципом Барбары-Лисков. Пример его применения можно видеть в /app/services/base_check_credentials.rb.

class BaseCheckCredentials
  
  private

  # curl command can invoke POST method with -I/--head option
  # https://curl.se/docs/manpage.html
  def bash_command
    raise NotImplementedError, "#{self.class}.#{__method__}: #{I18n.t('errors.marketplace_has_not_been_selected')}"
  end

  def http_client
    raise NotImplementedError, "#{self.class}.#{__method__}: #{I18n.t('errors.marketplace_has_not_been_selected')}"
  end
end

В наследуемых классах заявленные методы превращаются в следующее содержание

/app/services/yandex/check_credentials.rb

module Yandex
  class CheckCredentials < BaseCheckCredentials

    private

    def bash_command
      <<~`BASH`
        curl -I \
        -H 'Authorization: OAuth oauth_token=#{@mp_credential.credentials['token']}, \
        oauth_client_id=#{ENV.fetch('YANDEX_APP_ID')}' \
        -X POST https://api.partner.market.yandex.ru/businesses/\
        #{@mp_credential.credentials['business_id']}/offer-mappings.json?limit=1
      BASH
    end

    def http_client
      Yandex::Api::OfferMappings.new(@mp_credential)
    end
  end
end

/app/services/ozon/check_credentials.rb

module Ozon
  class CheckCredentials < BaseCheckCredentials

    private

    def bash_command
      <<~`BASH`
        curl -I \
        -H 'Api-Key: #{@mp_credential.credentials['api_key']}' \
        -H 'Client-Id: #{@mp_credential.credentials['client_id']}' \
        -H 'x-o3-app-name: #{ENV.fetch('OZON_APP_ID')}' \
        -X POST https://api-seller.ozon.ru/v2/product/list
      BASH
    end

    def http_client
      Ozon::Api::ProductList.new(@mp_credential)
    end
  end
end

3. Выстраивание задач по загрузке данных в очередь

По тексту статьи неоднократно упоминались очереди. Рассмотрим несколько моментов, связанных с обслуживание очередей при многопоточной обработке данных.

У нас в качестве адаптера очередей используется Sidekiq, о чём и указано в конфигурационном файле /config/application.rb.

Поскольку Sidekiq является Ruby процессом, то он остаётся обладателем только одного ядра процессора на инстанс. Чтобы увеличить количество запущенных инстансов, нужно увеличить количество конфигурационных файлов в папке /config.

Для текущего приложения принято решение выделить два инстанса. Один на клиентские запросы по подключению новых аккаунтов и обслуживанию запросов на повторную, внеплановую перезагрузку данных с маркетплейсов /config/sidekiq_live.yml .

:timeout: 10
:concurrency: <%= ENV.fetch("SIDEKIQ_CONCURRENCY") { 15 } %>
:max_retries: 0
:queues:
  - [client_grabber_products, 5]
  - [client_grabber_orders, 3]
  - [client_grabber_transactions, 2]
  - [default, 1]
  - [client_grabber_product_descriptions, 4]

Второй инстанс выделяется на выполнение регулярной плановой загрузки данных по расписанию и их автоматическую догрузку в случае предусмотренных вариантов такой отложенной обработки /config/sidekiq_scheduled.yml .

:timeout: 10
:concurrency: <%= ENV.fetch("SIDEKIQ_CONCURRENCY") { 15 } %>
:max_retries: 0
:queues:
  - [marketplace_grabber_products, 5]
  - [marketplace_grabber_orders, 3]
  - [marketplace_grabber_transactions, 3]
  - [marketplace_grabber_product_descriptions, 4]

Для каждого инстанса выделяем некоторое число параллельных потоков concurrency, которое позволим одномоментно открыть для увеличения пропускной способности Sidekiq. Мы выбрали значение 15.

Далее распределим выделенное количество потоков между очередями. Здесь нужно учитывать, что чем больше потоков находится в параллельной работе, тем больше они друг другу мешают. Идеального распределения потоков между очередями не существует. В любом случае конфигурацию придётся настраивать под реальные объёмы обрабатываемых данных. Страница административной панели Sidekiq будет нам в помощь.

Теперь оценим процессы, происходящие внутри одного потока Sidekiq.

После ознакомления в данной статье с подробным описанием всех процессов импорта данных с маркетплейсов можно составить представления о расходуемых ресурсах. Одним существенным ресурсом является время загрузки данных по запросу о товарах. Для Яндекс.Маркет тормозящим фактором в имеющейся реализации может стать процесс «засыпания» на очень больших списках товаров в целях соблюдения наложенных маркетплейсом rate limits. На OZON замедляющим выполнение задачи фактором при загрузке данных может становиться загрузка описаний товаров. Для обоих маркетплейсов с течением продолжительной эксплуатации активно торгующим продавцом может оказаться избыточной операция загрузки архивных данных. Просто их может оказаться слишком много. На все эти случаи у нас уже интегрирован в код рукописный бенчмаркинг /lib/ma_benchmarking.rb .

module MaBenchmarking
  def benchmarking(log_string)
    back_time = Time.now
    yield
    Rails.logger.info format("(in %.3f sec) #{log_string.call}", Time.now - back_time)
  end
end

Он логирует события выгрузки данных. На основе логов мы можем проводить периодический анализ выгружаемых данных для последующего принятия организационных мер.

Следующим расходным ресурсом является количество обращений из параллельных потоков к базе данных на запись и обновление данных. На текущий момент в программе реализована наилучшая практика на чтение и запись данных из/в базу пакетами. Так для импорта используется гем activerecord-import. Предполагается соблюдать заданную практику при дальнейшем развитии программного решения.

Последний по упоминанию расходный ресурс, на который стоит обращать внимание, является объём выделяемой памяти на инстанс Sidekiq. Им мы можем управлять через изменение размеров батчей в пакетах данных на загрузку с маркетплейса и на обработку базы данных.

IV. Заключение

В статье детально разобран код Web-сервиса, эксплуатирующего REST API по импорту данных о товарах с маркетплейсов Яндекс.Маркет и OZON. На реальных участках кода показаны примеры реализации принципов SOLID. Проектируемое приложение не предполагает превращение Web-сервиса в большой монолит. Здесь намеренно отсутствует модуль авторизации пользователей. Этот функционал предполагает реализацию в отдельном сервисе. Архитектуру проектируемого приложения можно назвать сервисной. Описанные в документации к Web-сервису эндпоинты предназначены для внутреннего использования внутри приложения.

Представленный Web-сервис построен на фреймворке Ruby on Rails. В качестве СУБД используется PostgreSQL. Для хранения данных активно используются нативные типы данных Postgres. Выполнение фоновых задач вынесено в Sidekiq. В проекте используется системный менеджер задач CRON из гема clockwork. Для кэширования в Redis используется гем Kredis. С полностью рабочим проектом можно ознакомиться в репозитории https://github.com/rubygitflow/marketplace_aggregator.

Проект открыт для вкладов от контрибьютеров.

Приветствуется сохранение выбранного стиля в настройках Rubocop. Одобряется глубокое тестирование в районе 100% с сохранением покрытия кода тестами в Rspec.

Теги:
Хабы:
Рейтинг0
Комментарии1

Публикации

Истории

Работа

Ruby on Rails
5 вакансий

Ближайшие события