В статье разбирается код на Ruby и в Ruby on Rails, в частности, на примере задачи по разработке web-сервиса синхронизации данных с внешними источниками. Погружение в программное решение начинается с разбора бизнес-задачи. Через освещение реальных API с маркетплейсов OZON и Яндекс.Маркет обосновываются способы принятия архитектурных решений и способы оптимизации кода. Эта статья также является авторской попыткой раскрыть принцы SOLID при реализации логики реального бэкенд приложения по переработке структурированных данных в условиях эксплуатации стороннего REST API.
I. Введение
1. О контексте бизнес-задачи
Мы обращаемся к теме продаж через маркетплейсы по причине её огромной популярности. Поскольку площадка продаж электронная, то и взаимодействие с ней подразумевает использование автоматизированных программных инструментов. В качестве таковых выступает либо личный кабинет клиента маркетплейса, либо стороннее программное решение, общающееся с маркетплейсом через REST API. Нас, как разработчиков, будет интересовать второй вариант в контексте данной статьи.
Попробуем подробнее описать контекст стоящей перед нами задачи по разработке сервиса, помогающего предпринимателям взаимодействовать с маркетплейсами. Вообще маркетплейсы выбираются в качестве площадки продаж по причине ожидаемого гарантированного трафика потенциальных покупателей. Однако доступ к услугам маркетплейсов предполагает некоторые расходы в процентах от продаж. Поэтому у предпринимателей к определенному моменту времени собирается пул вопросов, требующий независимой экспертной оценки. Например: насколько популярна товарная категория продавца на конкретном маркетплейсе; как расширить аналитику по продажам, предоставляемую через маркетплейс; как лучше распределять товарные запасы между складом от маркетплейса и своим собственным. Предполагается, что в данном случае было бы идеально собрать свою собственную независимую статистику с различных маркетплейсов перед принятием дальнейших стратегических решений по выбору идеальной торговой площадки для своей группы товаров. Вот здесь и возникает потребность в некоторой автоматизации, когда привлекают разработчиков к её воплощению.
На данном этапе разработчикам важно понимать, что перед ними не стоит задачи разрабатывать новый программный продукт с нуля, и в тоже время решение должно быть настолько автономным, чтобы заинтересовать предпринимателя в эксплуатации их сервиса на долгосрочной основе. Тут мы приходим к первой идее будущего программного сервиса, которая заключается в копировании структуры данных с маркетплейса в собственное решение. Эта структура будет базироваться на трех фундаментальных сущностях - Товар, Заказ, Транзакция. Под Товаром будем понимать совокупность данных, касающихся номенклатуры продаж. Под Заказом будем рассматривать совокупность данных, характеризующих перемещение товара от продавца к клиенту. И под Транзакцией будем рассматривать совокупность финансовых обязательств, возникающих на момент совершения сделки покупателя с продавцом.
Внимательный читатель может сделать замечание о том, что здесь не хватает ещё одной важной сущности – Склад, покрывающей ряд организационных моментов, связанных с хранением товаров и логистикой. Это справедливо в отношении производителей и глобальных импортёров. Однако на текущий момент мы рассматриваем розничных продавцов, для которых идеальная схема продаж – «с колёс». Поэтому для них эта сущность, очевидно, может быть избыточной, а следовательно, и мы последуем их логике.
2. О предмете автоматизации
Чтобы приблизиться к более детальному исследованию программного решения, которое будет разрабатываться для потенциальных заказчиков-предпринимателей, разберём ряд вопросов, над которыми надо подумать разработчикам перед тем, как приступать к кодингу.
Потенциальными потребителями автоматизированного решения со стороны заказчика будут лица, принимающие решения, менеджеры по продажам и маркетологи. Ещё предполагается, что время от времени к разработчикам будут обращаться представители от заказчика за технической поддержкой. На данном этапе стоит осознать, что у одного заказчика не будет средств на поддержание и развитие сложного автоматизированного решения. С самого начала разработки Web-сервиса стоит иметь ввиду, что им будет пользоваться условно неограниченное количество пользователей. При этом большинство из них уже должны признавать недостаточность для их бизнеса публичных средств от маркетплейсов и нуждаться в дополнительных программных инструментах. То есть разработчикам предстоит создать Web-приложение, обладающее своей собственной системой авторизации и некоторым количеством сервисов по обработке данных с маркетплейсов, а также рядом инструментов, обеспечивающих внутреннюю коммуникацию между разработчиками и пользователями (продавцами).
На данный момент мы определили предмет автоматизации и тип приложения. Далее перейдём к его архитектурным особенностям и инфраструктурным потребностям. Поскольку приложение должно обеспечить коммуникацию с маркетплейсами – поставщиками данных, и коммуникацию с пользователями – потребителями данных, то мы можем воспроизвести такую же модель Web-приложения, состоящего из нескольких относительно автономных частей. То есть требуется создать фоновое приложение, умеющее коммуницировать по REST API в глобальной сети с поставщиками данных, и визуальное приложение, умеющее отображать систематизированную информацию из собранных данных на дэшбордах. Это две активные части Web-приложения. В качестве пассивной части приложения, и фактически связующим звеном между двумя перечисленными, выступает хранилище данных.
Итак, у нас уже начинает складываться структура Web-приложения, которое должно состоять из фронтенда для коммуникации с пользователями и бэкенда, обслуживающего фронтенд данными из БД. И ещё из одного бэкенда, коммуницирующего с маркетплейсами, и из реляционной базы данных, хранящей историю данных клиента. Мы уже понимаем, что по мере увеличения нагрузки на арендуемые сервера приложение будет масштабироваться горизонтально посредством запуска большего количества параллельно работающих модулей над однотипными задачами. По мере увеличения нагрузки на СУБД её можно будет превратить в кластер, чтобы отделить аналитический блок от сервисного блока по сбору и актуализации данных о клиентах.
На этом вводную часть статьи мы завершаем. Дальнейшей темой нашего дискурса будет задача по загрузке данных с разных маркетплейсов в собственную базу данных. В рамках этой задачи обсудим механизм приёма данных, способ обработки данных с отделением бизнесовых вопросов от деталей технической реализации самого процесса загрузки, а также разберём вопрос трансформации данных перед их сохранением в базу данных.
II. REST API. Открываем двери на маркетплейсы
Для коммуникации с маркетплейсами организуем сервис-приложение, которое будет работать в фоновом режиме по расписанию. Далее сконцентрируемся на подзадаче по загрузке Товаров чтобы развернуть детали организуемой нами внутренней структуры приложения. Некоторая часть этой программной структуру также будет использована для загрузки данных о Заказах и Транзакциях. Сразу отметим, что хранение на собственных серверах «сырых» данных с маркетплейсов о каждом заказчике – дорогое мероприятие, поэтому имеет смысл предусмотреть первичную упаковку данных на этапе её загрузки с маркетплейсов в объёме, необходимом строго для будущей аналитической системы.
1. Собираемые данные
Маркетплейсы предоставляют подробное описание API, которое мы будем использовать при проектировании нашего сервиса:
Мы бы хотели сохранять информацию из указанных методов в одну таблицу с привязкой к пользователю. В качестве описания товара было выбрано:
name | Название товара |
description | Подробное описание товара |
images | Массив ссылок на изображения |
category_title | Наименование категории, к которой продавец относит свой товар. |
offer_id | Идентификатор товара в системе продавца — артикул |
product_id | Уникальный идентификатор товара на маркетплейсе |
skus | Массив идентификаторов товара SKU (stock keeping unit — единица складского учёта) |
schemes | Перечень наименований схем складского учёта, по которым продаётся товар |
barcodes | Все штрихкоды товара |
price | Цена товара с учётом скидок — это значение показывается на карточке товара |
status | Идентификатор Состояния товара. На разных маркетплейсах это различные величины. Поэтому мы будем сводить их к следующему перечню значений — preliminary, on_moderation, failed_moderation, published, unpublished, archived |
/app/services/yandex/products_downloader/importing_scheme.rb
module Yandex
class ProductsDownloader
module ImportingScheme
def prepare_product(product, item)
offer = item[:offer]
mapping = item[:mapping]
product.assign_attributes(
{
name: offer.fetch(:name, ''),
barcodes: offer.fetch(:barcodes, []),
price: find_price(offer),
status: find_status(offer),
schemes: find_schemes(offer),
images: offer.fetch(:pictures, []),
description: offer.fetch(:description, nil),
product_id: mapping.fetch(:marketModelId, nil),
skus: find_skus(mapping),
category_title: find_category_title(offer, mapping),
scrub_status: 'success'
}
)
product
end
end
end
end
/app/services/ozon/products_downloader/importing_scheme.rb
module Ozon
class ProductsDownloader
module ImportingScheme
def prepare_product(product, item)
product.assign_attributes(
{
name: item.fetch(:name, ''),
offer_id: item[:offer_id],
barcodes: find_barcodes(item),
price: find_price(item),
status: find_status(item),
schemes: find_schemes(item),
images: item[:images],
skus: find_skus(item),
category_title: find_category_title(item),
stock: item.dig(:stocks, :present),
scrub_status: 'success'
}
)
product
end
При разработке учётной системы товаров с маркетплейсов формирование такой сводной таблицы с характеристиками товара обязывает разработчика учесть несколько нюансов. Далее по тексту перечислим эти нюансы и подробно обсудим их детали.
2. Периодичность обновления информации о товарах
Предполагается, что продавец сможет управлять процессом актуализации базы товаров в приложении вручную по собственному усмотрению. Эта возможность предоставляется из соответствующего контроллера.
/app/controllers/api/v1/credentials_controller.rb
module Api
module V1
class CredentialsController < ApplicationController
before_action :fetch_credential, only: %i[update archive descriptions]
def update
resp, status = Tasks::ReimportProducts.new(true, @mp_credential, I18n.t('messages.users_control')).call
render json: resp, status:
end
private
def fetch_credential
@mp_credential = fetch_credentials.find_by(id: params[:id])
if @mp_credential.nil?
render json: {
errors: [{
code: 'error',
title: I18n.t('errors.not_found', class_name: 'MarketplaceCredential')
}]
}, status: 404
else
@mp_credential
end
end
def fetch_credentials
current_user.marketplace_credentials
end
end
end
end
Но со своей стороны мы также представляем жизненный цикл описания товара статичным в течении суток, после чего он может измениться. Этот правило положим в основу работы приложения по умолчанию через менеджера задач CRON.
module Clockwork
every(1.day, 'Import products', at: '23:00') do
MarketplaceInteraction::ImportProductsJob.perform_later
end
end
Из личного опыта работы с маркетплейсами отметим здесь, что иногда информация о товарах продавца перестаёт поступать от маркетплейса и нам нужно контролировать такой процесс деградации данных. Поэтому был введён ещё один технический статус товара:
scrub_status | Статус обработки товара: |
После введения на маркетплейсах режима загрузки архивных товаров такая опция перестала быть актуальной, но её можно использовать при отключении режима загрузки архивных данных.
/app/services/yandex/products_downloader.rb
module Yandex
class ProductsDownloader
attr_reader :mp_credential, :http_client, :parsed_ids
private
def tag_lost_products!
products = Product.where(marketplace_credential_id: mp_credential.id)
lost_offer_ids = products.pluck(:offer_id) - parsed_ids
products.where(
offer_id: lost_offer_ids
).update_all(scrub_status: 'gone')
end
end
end
/app/services/ozon/products_downloader.rb
module Ozon
class ProductsDownloader
attr_reader :mp_credential, :parsed_ids, :http_client_list, :http_client_info, :http_client_description
private
def tag_lost_products!
products = Product.where(marketplace_credential_id: mp_credential.id)
lost_product_ids = products.pluck(:product_id) - parsed_ids.keys
products.where(
product_id: lost_product_ids
).update_all(scrub_status: 'gone')
end
end
end
3. У товаров на разных маркетплейсах существуют различные способы их уникальной идентификации
На Яндекс.Маркет — это offerId
. У OZON — это product_id
или даже просто id
, как «Номер задания на формирование документов».
Это является причиной разнесения процедуры обновления товаров от разных маркетплейсов в разные классы. Только в таком случае возможно сопоставлять поступившие данные с данными из базы по уникальному id от маркетплейса.
/app/services/ozon/products_downloader/importing_scheme.rb
module Ozon
class ProductsDownloader
module ImportingScheme
def iterate(list_by_id, exists = [], updated_products = [], updated_fields = [])
Product.where(
marketplace_credential_id: mp_credential.id,
product_id: list_by_id.keys
).find_each do |product|
exists << product.product_id
product = prepare_product(product, list_by_id[product.product_id])
@parsed_ids[product.product_id] = 1
# We can record the changes somewhere.
# pp("product.changes=",product.changes) if product.changed?
if product.changed?
updated_products << product
updated_fields += product.changes.keys
end
end
[updated_products, updated_fields, exists]
end
end
end
end
/app/services/yandex/products_downloader/importing_scheme.rb
module Yandex
class ProductsDownloader
module ImportingScheme
def iterate(list_by_id, exists = [], updated_products = [], updated_fields = [])
Product.where(
marketplace_credential_id: mp_credential.id,
offer_id: list_by_id.keys
).find_each do |product|
exists << product.offer_id
product = prepare_product(product, list_by_id[product.offer_id])
# We can record the changes somewhere.
# pp("product.changes=",product.changes) if product.changed?
if product.changed? && imported?(product)
updated_products << product
updated_fields += product.changes.keys
end
end
[updated_products, updated_fields, exists]
end
end
end
end
Кстати, у Яндекс.Маркет нет однозначной характеристики, соответствующей нашему параметру product_id
. Наиболее близким по семантике становится marketModelId
— что в описании API определяется как «Идентификатор модели на Маркете, который может отсутствовать в ответе, если товар ещё не привязан к карточке».
4. Тип данных для идентификаторов
У OZON для offer_id, product_id, sku реализован самый очевидный формат — числовой. Однако offerId на Яндекс.Маркет уже строковый. На других маркетплейсах тоже часто используется строковый тип данных для перечисленных параметров. Поэтому у нас идентификаторы будем хранить только в строках. Эта информация является востребованной СУДБ.
/db/migrate/20240106161532_create_products.rb
class CreateProducts < ActiveRecord::Migration[7.1]
def up
create_enum :product_scrub_status, ["unspecified", "success", "gone"]
create_enum :product_status, ["preliminary", "on_moderation", "failed_moderation", "published", "unpublished", "archived"]
execute <<-SQL
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'monetary_amount') THEN
CREATE TYPE monetary_amount AS
(
value float,
currency VARCHAR(3)
);
END IF;
END$$;
SQL
create_table :products do |t|
t.references :marketplace_credential, type: :uuid, null: false, foreign_key: true
t.string :offer_id, comment: 'client SKU'
t.string :product_id, comment: 'marketplace object, articule or model'
t.string :name, null: false
t.text :description
t.string :skus, array: true, comment: 'marketplace SKUs'
t.string :images, array: true
t.string :barcodes, array: true
t.enum :status, enum_type: :product_status, default: "preliminary", null: false
t.enum :scrub_status, enum_type: :product_scrub_status, default: "unspecified", null: false
t.column :price, :monetary_amount
t.integer :stock
t.string :category_title
t.string :schemes, array: true, comment: 'sales schemes'
t.timestamps
end
end
end
5. О пакетной загрузке данных
Маркетплейсы имеют разные стратегии обслуживания клиентских приложений по API. Яндекс.Маркет быстро отдаёт необходимые данные, накладывая лишь ограничение на количество товаров в одном запросе в размере 200 позиций и разрешая до 600 запросов в минуту. Это значит, что в минуту можно загружать до 120_000 товарных позиций, чего с лихвой должно хватать для большинства продавцов на маркетплейсе без привлечения специальных алгоритмических техник по соблюдению rate limits при коммуникации с маркетплейсом. Для клиентов с большим количеством данных при установленных лимитах от маркетплейса можно позволить себе оставаться с алгоритмом синхронной обработки данных, и воспользоваться слиппером для пережидания ограничения на запрос к маркетплейсу.
/app/services/yandex/sleeper.rb
module Yandex
module Sleeper
# INPUT:
# headers = {
# Date: Tue, 09 Jan 2024 09:14:10 GMT
# X-RateLimit-Resource-Until: Tue, 09 Jan 2024 09:15:00 GMT
# }
def do_sleep(headers, duration)
dt =
Time.parse(headers['X-RateLimit-Resource-Until']) -
Time.parse(headers['Date']) +
1
if dt > duration
Rails.logger.error I18n.t('errors.duration_of_rate_limit_has_been_changed',
marketplace_name: 'Yandex')
else
sleep dt
end
rescue StandardError => e
# We are checking the code. It's fixable
ErrorLogger.push_trace e
end
end
end
При этом можно воспользоваться заголовочными параметрами X-RateLimit-Resource-Remaining
и X-RateLimit-Resource-Until
из последнего запроса к маркетплейсу.
/app/services/yandex/api/offer_mappings.rb
module Yandex
class Api
class OfferMappings < Api
MAX_REQUESTS_PER_MINUTE = 600
RATE_LIMIT_DURATION = 60
include Yandex::Sleeper
private
# INPUT:
# headers = {
# X-RateLimit-Resource-Remaining: 600
# }
def delay_if_limits(headers)
if headers.fetch(
'X-RateLimit-Resource-Remaining',
MAX_REQUESTS_PER_MINUTE
).to_i < limiting_remaining_requests
do_sleep(headers, RATE_LIMIT_DURATION)
end
end
end
end
end
У OZON нет лимитов на количество загрузок данных в единицу времени, но при этом в целом ситуация немного хуже по нескольким причинам. Первая, OZON задерживает время отклика на запрос. Вторая, OZON исключает возможность получать данные из одного метода с маркетплейса по ожидаемому перечню параметров. Только из отдельных запросов можно получить данные об описании продукта description
и о наименовании категории товара category_title
.
Что касается каталог с категориями товаров, то его можно скачать у OZON отдельно.
Дерево категорий и типов товаров (версия 2)
/app/services/ozon/load_categories.rb
Struct.new('CategoryOzon', :name, :id, :disabled)
module Ozon
class LoadCategories
def call
@http_client = http_client
list = http_client_call
@imported_list = []
scalp(
list: list[:result],
previous_category: Struct::CategoryOzon.new(name: '')
)
OzonCategory.import @imported_list,
on_duplicate_key_ignore: true,
on_duplicate_key_update: {
conflict_target: %i[
description_category_id
type_id
],
columns: %i[
category_name
category_disabled
type_name
type_disabled
]
}
end
end
end
В таком случае можно использовать дополнительные запросы к собственной базе во время загрузки описаний товаров.
/app/services/ozon/products_downloader/importing_scheme.rb
module Ozon
class ProductsDownloader
module ImportingScheme
def find_category_title(item)
return if item[:description_category_id].blank? && item[:type_id].blank?
CashOzonCategory.get(
item[:description_category_id] || 0,
item[:type_id] || 0
)
end
end
end
end
Использование дополнительных запросов к базе по каждой товарной позиции является плохой практикой, поэтому стоит прибегнуть к механизму кеширования.
/app/services/cash_ozon_category.rb
class CashOzonCategory
class << self
def o_cat
@o_cat ||= Kredis.hash 'ozon_categories'
end
def get(category, type)
o_cat["#{category}_#{type}"] || take_cat(category, type)
end
def take_cat(category, type)
obj = OzonCategory.find_by(
description_category_id: category,
type_id: type
)
return if obj.nil?
o_cat["#{category}_#{type}"] = "#{obj.category_name}/#{obj.type_name}"
end
end
end
Загружать на ежедневной основе каталог категорий нет необходимости, и какого-то явного графика в обязательном осуществлении обновления каталога категорий тоже не прослеживается. Поэтому вопрос обновления каталога товаров для OZON оставим до момента появления проблемы с прекращением заполнения позиции с категорией товара в каталоге продавца внутри нашей системы.
Теперь вернёмся к текстовому описанию продукта description
, который можно загружать строго под одну товарную единицу, как разрешает OZON.
15 тыс. товарных позиций грузится около 5 минут. Это очень долго. Поэтому мы выносим этот процесс в фоновую задачу, отделённую от процедуры загрузки описаний товаров.
/app/services/ozon/products_downloader/import_desriptions.rb
module Ozon
class ProductsDownloader
module ImportDesriptions
def downloading_product_desriptions
Product.where(
marketplace_credential_id: @mp_credential.id,
product_id: @parsed_ids.keys
).find_in_batches(batch_size: 100) do |products|
updated_products = []
products.each do |product|
product.description = load_description(product.product_id)
updated_products << product if product.changed?
end
if updated_products.any?
Product.import(updated_products,
on_duplicate_key_ignore: true,
on_duplicate_key_update: {
conflict_target: %i[
marketplace_credential_id
product_id
offer_id
],
columns: %i[
description
]
})
end
end
end
end
end
end
Импорт описаний товаров тоже пусть запускается из CRON задачи:
module Clockwork
every(1.day, 'Import product descriptions (for Ozon)', at: '1:00') do
MarketplaceInteraction::ImportProductDescriptionsJob.perform_later
end
end
Поскольку OZON считает данные с описаниями товаров несущественными для API приложения, то имеет смысл в предоставлении продавцам самостоятельного решения - осуществлять или нет загрузку описаний товаров с OZON в нашу платформу. Поэтому выносим выключатель загрузчика описаний товаров в эндпоинт приложения.
/app/controllers/api/v1/credentials_controller.rb
module Api
module V1
class CredentialsController < ApplicationController
before_action :fetch_credential, only: %i[update archive descriptions]
def descriptions
value = Handles::ProductsDownloader.to_bool(params[:value])
@mp_credential.autoload_descriptions.value = value if value.in? [true, false]
render json: {
marketplace_credential: {
id: @mp_credential.id,
autoload_descriptions: @mp_credential.autoload_descriptions.value
}
}
end
private
def fetch_credential
@mp_credential = fetch_credentials.find_by(id: params[:id])
if @mp_credential.nil?
render json: {
errors: [{
code: 'error',
title: I18n.t('errors.not_found', class_name: 'MarketplaceCredential')
}]
}, status: 404
else
@mp_credential
end
end
end
end
end
6. Привязка списка продуктов к учётным данным пользователя на маркетплейсе
В начале этой главы было упомянуто, что списки товаров должны быть связаны с пользователями сервиса. На самом деле Яндекс.Маркет подсказывает, что у пользователя на маркетплейсе может быть до нескольких бизнесов. Собственно эти бизнесы и являются промежуточным звеном между пользователями и товарами. То есть схема базы данных будет выглядеть следующим образом:
В этом случае коммуникация приложения с маркетплейсами будет устанавливаться через регистрационные данные пользователя marketplace_credentials
. Поэтому, чтобы работать с маркетплейсом по REST API, Яндекс.Маркет в личном кабинете пользователя выдаёт token
, а OZON - api_key
.
Дальнейшая коммуникация с «рабочей средой» маркетплейса осуществляется посредством добавления в заголовочную часть запроса авторизационных данных:
module Ozon
class Api < BaseApi
def headers
super.merge(
{
'Api-Key' => @api_key,
'Client-Id' => @client_id,
'x-o3-app-name' => ENV.fetch('OZON_APP_ID')
}
)
end
end
end
Авторизация для запросов магазина к Маркету
module Yandex
class Api < BaseApi
def headers
super.merge(
{
'Authorization' => "OAuth oauth_token=\"#{@token}\", oauth_client_id=\"#{ENV.fetch('YANDEX_APP_ID')}\""
}
)
end
end
end
При длительной эксплуатации ключи доступа могут устаревать по разным причинам, поэтому мы должны предусмотреть автоматическую проверку регистрационных данных на их актуальность. В случае OZON такую проверку можно осуществить через любой запрос к существующему методу API.
В случае Яндекс.Маркет проверка регистрационных данных может быть осуществлена через конкретный URL, который в своём адресе может содержать какой-нибудь дополнительный Id. Например, запрос за описаниями товаров содержит в себе business_id
:
https://api.partner.market.yandex.ru/businesses/{businessId}/offer-mappings
Актуальность этого Id можно проверить только через реальный запрос
Информация о товарах в каталоге
чтобы по реальному HTTP-ответу получить статус ошибки или подтверждение об успешности выполнения запроса
Типы ошибок и что с ними делать
В нашем случае можно использовать универсальный механизм для обоих маркетплейсов по обработке статусов HTTP-запросов
class BaseApi
EXACT_ERROR_RAISER = {
420 => ->(status, msg, mp_id) { raise MarketplaceAggregator::ApiLimitError.new(status, msg, mp_id) },
401 => ->(status, msg, mp_id) { raise MarketplaceAggregator::ApiAccessDeniedError.new(status, msg, mp_id) },
403 => ->(status, msg, mp_id) { raise MarketplaceAggregator::ApiAccessDeniedError.new(status, msg, mp_id) }
}.freeze
RANGE_ERROR_RAISER = {
(400...500) => ->(status, msg, mp_id) { raise MarketplaceAggregator::ApiBadRequestError.new(status, msg, mp_id) },
(500..) => ->(status, msg, mp_id) { raise MarketplaceAggregator::ApiError.new(status, msg, mp_id) }
}.freeze
def call(method:, raise_an_error: false, params: {}, body: {})
# Let's define an HTTP method for a specific endpoint
@raise_an_error = raise_an_error
send(method, params, body)
end
private
def get(params = {}, _ = {})
api_call do
connection.get do |req|
req.url url
req.params = params
req.headers = headers
end
end
end
def post(params = {}, body = {})
api_call do
connection.post do |req|
req.url url
req.params = params
req.headers = headers
req.body = body.to_json
end
end
end
def api_call
response = yield
@status = response.status
@response_content_type = content_type(response.headers)
body = response_parse(response.body)
raise_error(
error_message(body)
)
[@status, response.headers, body]
end
def content_type(headers)
case headers.[]('content-type')
when %r{application/json}
:json
when %r{text/html}
:html
else
:any
end
end
def response_parse(body)
if response_content_type == :json
begin
JSON.parse body, symbolize_names: true
rescue JSON::ParserError => e
ErrorLogger.push e
{}
end
else
body
end
end
def raise_error(message)
return unless @raise_an_error
EXACT_ERROR_RAISER[status]&.call(status, message, @marketplace_credential.id)
RANGE_ERROR_RAISER.each do |k, v|
v.call(status, message, @marketplace_credential.id) if k.include?(status)
end
end
end
Недостаток проверки регистрационных данных через реальный запрос заключается в сокращении количества допустимых запросов в единицу времени (то есть в расходовании rate limits).
По идее, в разрешении данной проблемы должен помочь HTTP-метод HEAD, вида:
irb(main):01> response = Faraday.head(
'https://api.partner.market.yandex.ru/businesses/:business_id/offer-mappings.json',
{ limit: 1 },
{
'Authorization' => "OAuth oauth_token=\"y0_token\", oauth_client_id=\"#{ENV.fetch('YANDEX_APP_ID')}\"",
'Content-Type' => 'application/json'
}
)
На который мы бы получили статус ответа, вида:
irb(main):02> response.status
=> 200
Но мы, к сожалению, получаем только статус 405:
irb(main):02> response.status
=> 405
что означает, https://developer.mozilla.org/ru/docs/Web/HTTP/Status - Method Not Allowed
Поэтому был найден альтернативный способ проверки регистрационных данных через curl-запрос с параметром -I
/app/services/yandex/check_credentials.rb
module Yandex
class CheckCredentials < BaseCheckCredentials
def bash_command
<<~`BASH`
curl -I \
-H 'Authorization: OAuth oauth_token=#{@mp_credential.credentials['token']}, \
oauth_client_id=#{ENV.fetch('YANDEX_APP_ID')}' \
-X POST https://api.partner.market.yandex.ru/businesses/\
#{@mp_credential.credentials['business_id']}/offer-mappings.json?limit=1
BASH
end
end
end
В HTTP-ответе находится только текст со статусом и заголовками. Проверка HTTP-ответа регулярным выражением помогает выяснить реальный статус на запрос с введёнными регистрационными данными:
/app/services/base_check_credentials.rb
class BaseCheckCredentials
def call
curl = bash_command
case message = curl.split("\r\n").first
when %r{(HTTP/1.1 200)|(HTTP/2 200)}
{ ok: true }
when %r{(HTTP/1.1 403)|(HTTP/2 403)|(HTTP/1.1 401)|(HTTP/2 401)|(<html>)}
# find out details from e.message of POST request
redo_post_request
else
{ errors: message }
end
end
end
Таким образом мы повысили управляемость техническим состоянием сервиса по доставке данных с маркетплейсов.
7. Как достать данные клиента о товарах на маркетплейсе
Разобранный выше (раздел 5) способ загрузки данных с маркетплейса Ozon не заканчивается на освещённых проблемах отдельной загрузки описаний и категорий товаров. Метод выдачи списка товаров «Получить список товаров по идентификаторам» работает только при явном указании перечня идентификаторов для запрашиваемых товаров. На выбор предлагается три категории идентификаторов – offer_id
, product_id
, sku
. Вопрос: откуда их взять? Как узнать от маркетплейса, какие товары вообще размещены пользователем на маркетплейсе?
Ответом на поставленный вопрос становится метод «Список товаров», который в пакетном режиме выдаёт списки offer_id
и product_id
товаров, размещённых продавцом на маркетплейсе.
В этом методе API также предоставляется возможность указания фильтров по visibility
товара. Для нас актуальны два значения — ALL
и ARCHIVED
.
В итоге полный цикл загрузки характеристик товаров продавца с маркетплейса сводится к следующим трём этапам.
/app/services/ozon/products_downloader/downloading_scheme.rb
Первый этап. Загружаем все товары, кроме архивных.
def downloading_unarchived_products
@archive = false
benchmarking(
-> { "import: :mp_credential[#{@mp_credential.id}] — actual[#{@parsed_ids.size}] — Done" }
) { circle_downloader }
@total = @parsed_ids.size
end
Второй этап. Загружаем архивные товары.
def downloading_archived_products
if @mp_credential.autoload_archives.value
@archive = true
benchmarking(
-> { "import: :mp_credential[#{@mp_credential.id}] — archived[#{@parsed_ids.size - @total}] — Done" }
) { circle_downloader }
end
end
Третий этап. Загружаем описания товаров по списку товаров из собственной БД.
def downloading_desriptions
if @mp_credential.autoload_descriptions.value
benchmarking(
-> { "import: :mp_credential[#{@mp_credential.id}] — desriptions[#{@parsed_ids.size}] — Done" }
) { downloading_product_desriptions }
end
en
Первые два этапа базируются на одном и том же алгоритме, в котором выполняется цикл:
формируется запрос к странице со списком идентификаторов товаров
загружается пакет со списками
product_id
товаровверифицируется успешность загрузки пакета данных с
product_id
вызывается процедура пакетной загрузки характеристик товаров кроме описаний товаров с последующим импортом данных в локальную базу
достаётся токен следующей страницы для загрузки очередного пакета с
product_id
прерывается цикл, если токен пустой
/app/services/ozon/products_downloader/downloading_scheme.rb
def circle_downloader
page_tokens = {}
loop do
status, _, body = @http_client_list.call(
body: {
filter: {
visibility: (@archive ? 'ARCHIVED' : 'ALL')
},
limit: PAGE_LIMIT
}.merge(page_tokens)
)
break_if_http_error(status) if status != 200
items = (body&.dig(:result, :items) || []).map { |elem| elem[:product_id] }
break if items.blank?
download_product_info_list(items)
page_token = body&.dig(:result, :last_id)
if page_token.blank?
break
else
page_tokens = { last_id: page_token }
end
end
end
Для сравнения Яндекс.Маркет работает в два этапа. Первый этап — загрузка всех товаров, кроме архивных. Второй этап — загрузка архивных данных. А цикл загрузки товаров выглядит следующим образом:
формируется запрос к странице с характеристиками товаров
загружается пакет с характеристиками товаров
верифицируется успешность загрузки пакета данных с характеристиками товаров
вызывается процедура импорта пакета данных с характеристиками товаров в локальную базу данных
достаётся токен следующей страницы для загрузки очередного пакета с характеристиками товаров
прерывается цикл, если токен пустой
/app/services/yandex/products_downloader/downloading_scheme.rb
def circle_downloader
page_tokens = {}
loop do
status, _, body = @http_client.call(
params: LIMITS.merge(page_tokens),
body: { archived: @archive }
)
if status == 200
items = body&.dig(:result, :offerMappings) || []
break if items.blank?
import_payload(items)
else # any other status anyway
# To be safe, but we shouldn't get here.
# This is possible if the status is < 400 and the status is != 200.
raise MarketplaceAggregator::ApiError.new(
status,
I18n.t('errors.something_went_wrong'),
mp_credential.id
)
end
page_token = body&.dig(:result, :paging, :nextPageToken)
if page_token.blank?
break
else
page_tokens = { page_token: }
end
end
end
III. Оптимизация кода. Повышение надёжности
1. Откуда берётся бизнес логика
Загрузка данных с маркетплейсов осуществляется с помощью HTTP-клиента. В нашем случае используется Faraday.
class Connection
def self.start
@conn ||= Faraday.new do |config|
config.request :retry, RETRY_OPTIONS
config.adapter Faraday.default_adapter
end
end
end
Предполагается, что в целях обеспечения контакта с маркетплейсом во время обслуживания одной сессии или работы одной фоновой задачи может быть выполнено до нескольких обращений к маркетплейсу. При этом достаточно одного HTTP-клиента, поэтому при объявлении нового HTTP-клиента используется оператор ||=
присвоения по условию отсутствия предварительного подключения к нему. Поскольку диапазон задач, для которых будет использован данный клиент, может быть различным, то мы минимизируем на данном этапе его конфигурацию. Конфигурация будет отличаться, например, при GET и POST запросах.
class BaseApi
attr_accessor :connection
def initialize(mp_credential, options = {})
@connection = Connection.start
@marketplace_credential = mp_credential
@raise_an_error = false
end
private
def get(params = {}, _ = {})
api_call do
connection.get do |req|
req.url url
req.params = params
req.headers = headers
end
end
end
def post(params = {}, body = {})
api_call do
connection.post do |req|
req.url url
req.params = params
req.headers = headers
req.body = body.to_json
end
end
end
end
Общий каркас настройки HTTP-клиента на этом заканчивается. Далее перейдём к реализации дополнительных параметров подключения к разным маркетплейсам из отдельных классов.
Для установления контакта с маркетплейсом используются параметры url
, params
, headers
, body
. Два из них — params
и body
— являются входными хэшами при вызове соответствующих методов .get
и .post
при обращении к Web-сервису. Два других параметра — url
и headers
можно настроить автоматически из отдельных классов для каждого маркетплейса. Headers
содержат регистрационные данные на маркетплейсах, и они уже упоминались выше в разделе I.6 — /app/services/ozon/api.rb, /app/services/yandex/api.rb.
Url
от API-методов также предполагается объявлять в отдельных классах в целях соблюдения «Принципа единственной ответственности». Так повышается Cohesion внутри кода, обслуживающего HTTP-клиента, и понижается Coupling между HTTP-клиентом и другими частями кода по переработке различных данных, загруженных с маркетплейсов. /Cohesion и Coupling: отличия/
В нашем случае имеется один API-метод на загрузку продуктов с Яндекс.Маркет:
/app/services/yandex/api/offer_mappings.rb
module Yandex
class Api
class OfferMappings < Api
def url
"#{URL}/businesses/#{@business_id}/offer-mappings.json"
end
end
end
end
и три API-метода на загрузку продуктов с OZON:
/app/services/ozon/api/product_list.rb
module Ozon
class Api
class ProductList < Api
def url
"#{URL}/v2/product/list"
end
end
end
end
/app/services/ozon/api/product_info_list.rb
module Ozon
class Api
class ProductInfoList < Api
def url
"#{URL}/v2/product/info/list"
end
end
end
end
/app/services/ozon/api/product_info_description.rb
module Ozon
class Api
class ProductInfoDescription < Api
def url
"#{URL}/v1/product/info/description"
end
end
end
end
Загрузка данных о товарах осуществляется с каждого маркетплейса по своему алгоритму при уникальном наборе начальных данных.
/app/services/ozon/products_downloader.rb
module Ozon
class ProductsDownloader
def initialize(mp_credential)
@mp_credential = mp_credential
@http_client_list = Ozon::Api::ProductList.new(mp_credential)
@http_client_info = Ozon::Api::ProductInfoList.new(mp_credential)
@http_client_description = Ozon::Api::ProductInfoDescription.new(mp_credential)
@parsed_ids = {}
end
def call
return false if mp_credential.credentials.nil?
download_products
tag_lost_products!
true
end
end
end
/app/services/yandex/products_downloader.rb
module Yandex
class ProductsDownloader
def initialize(mp_credential)
@mp_credential = mp_credential
@http_client = Yandex::Api::OfferMappings.new(mp_credential)
@parsed_ids = []
end
def call
return false if mp_credential.credentials.nil?
download_products
tag_lost_products!
true
end
end
end
Процедуры импорта товаров на маркетплейсах запускаются из единого источника процессов - фоновая задача по импорту данных с маркетплейсов
/app/jobs/products/import_job.rb
module Products
class ImportJob < ApplicationJob
include MaBenchmarking
queue_as do
is_client_queue = arguments.first
if is_client_queue
:client_grabber_products
else
:marketplace_grabber_products
end
end
DOWNLOADER_CLASS = 'ProductsDownloader'
def perform(is_client_queue, mp_credential_id)
@is_client_queue = is_client_queue
@mp_credential = MarketplaceCredential.find_by(id: mp_credential_id)
return if irrelevant?
@mp_credential.update(last_sync_at_products: Time.current) if downloadable?
end
private
def downloadable?
import(@mp_credential.marketplace.to_constant_with(DOWNLOADER_CLASS).new(@mp_credential))
true
#...
end
def import(downloader)
benchmarking(
-> { "import: :mp_credential[#{@mp_credential.id}] — [#{downloader.parsed_ids.size}] - OK" }
) { downloader.call }
end
end
end
Здесь важно обратить внимание на использование принципа инверсии зависимостей, когда по имени маркетплейса выбирается класс, в котором реализованы детали процедуры импорта - downloader.call
/Принципы SOLID на примере ruby♦️/
Другим не менее значимым алгоритмическим подходом, реализованным в приведённом коде, является разнесение задачи импорта по разным очередям в зависимости от источника инициации процесса импорта. Это может быть клиентский запрос на немедленную загрузку данных, например, при регистрации нового пользователя в нашем Web-приложении. Такие запросы отправляются в очередь :client_grabber_products
.
Вызов клиентского запроса на импорт данных осуществляется из соответствующего эндпоинта POST http://localhost:3000/api/v1/credentials
/app/controllers/api/v1/credentials_controller.rb
module Api
module V1
class CredentialsController < ApplicationController
def create
@mp_credential = fetch_credentials.new(credential_params)
@mp_credential.fix_credentials!
Operations::CheckCredentials.new(@mp_credential).call
if @mp_credential.is_valid
@mp_credential.save!
Products::ImportJob.perform_later(true, @mp_credential.id)
render json: {
marketplace_credential: @mp_credential
}
else
render json: {
errors: [{
code: 'error',
title: I18n.t('errors.credentials_are_invalid'),
detail: @mp_credential.credentials['errors']
}]
}, status: 400
end
end
end
end
end
В другом случае инициатором запроса на импорт данных может быть CRON-задача, о чём говорится в разделе I.2. Тогда такие запросы отправляются в очередь :marketplace_grabber_products
.
Здесь стоит подчеркнуть, что в автоматической режиме на повторный импорт данных на ежедневной основе отправляется вся клиентская база:
/app/business_logic/tasks/import_products.rb
module Tasks
class ImportProducts
def call
MarketplaceCredential.find_each(batch_size: 100) do |mp_credential|
# 1. correcting client mistakes
mp_credential.fix_credentials!
# 2. verifying the validity of credentials
Operations::CheckCredentials.new(mp_credential).call
# 3. refresh the record
mp_credential.save! if mp_credential.changed?
# 4. cause direct data import
Products::ImportJob.perform_later(false, mp_credential.id) if mp_credential.is_valid
end
end
end
end
При этом алгоритм должен пройтись по всем клиентским данным, используемым для авторизации. Очевидно, что нам нужно перепроверить актуальность авторизационных данных перед попыткой запустить процедуру импорта товаров для последующего выявления изменений в базе. По идее, результатом проверки должны быть сделаны рассылки уведомлений тем пользователям, чьи регистрационные данные устарели и когда требуется их ручное обновление из кабинета маркетплейса. Отвечать за логику поведения данного программного блока должен администратор, который по мере накопления опыта взаимодействия с пользователями-продавцами должен иметь возможность оперативно менять данный функционал. В силу высокой зависимости данного участка кода Web-приложения от операционных намерений его менеджеров мы и вынесли этот код в отдельный раздел под названием app/business_logic
.
Поскольку задача масс-импорта данных предполагает продолжительную загрузку, то здесь также организуется отдельная фоновая задача в отдельной ветке :marketplace_grabber_products
.
/app/jobs/marketplace_interaction/import_products_job.rb
module MarketplaceInteraction
class ImportProductsJob < ApplicationJob
queue_as :marketplace_grabber_products
def perform
Tasks::ImportProducts.new.call
end
end
end
Аналогично организован код масс-импорта описаний по товарам с OZON:
/app/business_logic/tasks/import_product_descriptions.rb, /app/jobs/marketplace_interaction/import_product_descriptions_job.rb
2. Обработка ошибок
Существует несколько способов работы с программными ошибками. В данном приложении активно эксплуатируются четыре способа - перехват ошибок; генерация и перехват ошибок (Railway Oriented Programming); генерация и перехват ошибок, чтобы перезапустить задачу; генерация ошибок.
Под способом «перехвата ошибок» понимается допущение в будущем появления ошибок на этапе разработки. В задачу перехватчиков входит нормальное прерывание выполнения кода с логированием места и причины ошибки.
/app/jobs/products/import_job.rb
module Products
class ImportJob < ApplicationJob
def downloadable?
import(@mp_credential.marketplace.to_constant_with(DOWNLOADER_CLASS).new(@mp_credential))
true
rescue NameError => e
# We are checking the code. It's fixable
ErrorLogger.push_trace e
false
#...
end
end
end
Здесь перехватывается ошибка неизвестного наименования класса для нового маркетплейса. Причина может быть в непосредственном имени или в неправильном размещении файла.
Второй способ, связанный с «генерированием и перехватом ошибок», эксплуатируется в двух случаях.
Первый случай — это генерация исключения с целью идентификации основания отказа в исполнении функции по эндпоинту и перехват исключения для возврата на запрос по эндпоинту форматированного ответа с указанием причины отказа
/app/controllers/application_controller.rb
class ApplicationController < ActionController::API
rescue_from(MarketplaceAggregator::UnauthorizedError) do
render json: {
errors: [{
code: 'error',
title: I18n.t('errors.unauthorized')
}]
}, status: 401
end
rescue_from(MarketplaceAggregator::ForbiddenError) do
render json: {
errors: [{
code: 'error',
title: I18n.t('errors.forbidden')
}]
}, status: 403
end
def current_user
raise MarketplaceAggregator::UnauthorizedError unless fetch_user_uuid
@current_user ||= Client.find_by(id: fetch_user_uuid)
raise MarketplaceAggregator::ForbiddenError unless @current_user
@current_user
end
private
def fetch_user_uuid
request.headers['HTTP_USER']
end
end
Такой способ генерации исключений до и после проверки заголовка запроса потребовался для соблюдения строгой привязки HTTP-запроса к зарегистрированному пользователю в базе данных.
В этом случае также следует учесть, что для запроса используется нестандартный заголовок запроса, поэтому Ruby on Rails добавляет к нему префикс HTTP_
. /https://github.com/rails/rails/blob/main/actionpack/lib/action_dispatch/http/headers.rb/
Таким образом, заголовок у запроса к эндпоинту выглядит как USER:registered_user_id
, а внутри приложения следует использовать обращение за заголовком, как request.headers['HTTP_USER']
.
Второй случай «генерирования и перехвата ошибок» предназначен для реализации Railway-ориентированного программирования при возвращении маркетплейсами статусов, отличных от значения 200
.
Генерация исключения происходит в классе HTTP-клиента. Пример реализации был рассмотрен в разделе I.6 /app/services/base_api.rb .
Перехват исключения для этого случая осуществляется в методе downloadable? у класса с реализацией фоновой задачи импорта данных.
/app/jobs/products/import_job.rb
module Products
class ImportJob < ApplicationJob
def downloadable?
import(@mp_credential.marketplace.to_constant_with(DOWNLOADER_CLASS).new(@mp_credential))
true
rescue NameError => e
# We are checking the code. It's fixable
ErrorLogger.push_trace e
false
rescue MarketplaceAggregator::ApiAccessDeniedError => e
ErrorLogger.push e
# that's all
false
rescue MarketplaceAggregator::ApiBadRequestError => e
ErrorLogger.push e
# that's all
false
rescue MarketplaceAggregator::ApiLimitError => e
# restart the task taking into account restrictions on limits
Tasks::ReimportProducts.new(@is_client_queue, @mp_credential, e).call
false
rescue MarketplaceAggregator::ApiError => e
ErrorLogger.push e
# TODO: restart task after an hour (for example)
false
end
end
end
В большинстве случаев, как видно из кода, прерывание процедуры импорта не предполагает автоматизированного способа решения проблемы.
Кроме Второго способа обработки исключений здесь мы также видим Третий способ «генерирования и перехвата ошибок с целью перезапуска задачи по импорту данных» на строке с rescue MarketplaceAggregator::ApiLimitError
. Это тот редкий случай, когда правила маркетплейса позволяют очевидным способом автоматически разрешить проблему последнего отказа в отклике по достижению rate limits.
/app/business_logic/tasks/reimport_products.rb
module Tasks
class ReimportProducts
def call
# 1. check the history of ReimportProducts calls
# It should be less than three times
analyse do
# 2. cause direct data import
Products::ImportJob.set(wait_until: 1.minute.from_now)
.perform_later(@is_client_queue, @mp_credential.id)
end
end
end
end
Здесь стоит рассказать об одном нюансе, учтённом в настройках HTTP-клиента при обработке статусов HTTP-ответов от маркетплейсов. Известно, что ответ по REST API может приходить не только от Web-приложения, но и от Web-сервера. Это ошибки 500-й серии. Некоторые из них являются некритическими для удержания контакта на незначительном промежутке времени посредством повторения запроса без отключения соединения с реципиентом. Пример такой настройки представлен в коде HTTP-клиента:
class Connection
RETRY_OPTIONS = {
max: 5, # Retry a failed request up to 5 times
interval: 2, # First retry after 2s
backoff_factor: 2, # Double the delay for each subsequent retry
retry_statuses: [500, 502, 503, 504]
# Retry only when we get a 500, 502, 503 or 504 response
}.freeze
def self.start
@conn ||= Faraday.new do |config|
config.request :retry, RETRY_OPTIONS
config.adapter Faraday.default_adapter
end
end
end
В заключении этого раздела разберём Четвёртый способ обработки ошибок, основанный на генерации ошибок без последующей их программной обработки. Это ситуация называется «проектированием по контракту», когда в родительском классе заявляется пул методов, обязательных к реализации в дочерних классах. Чтобы не забывать прописывать реализации обязательных методов и создаются искусственные условия, генерирующие критические прерывания программы. Ещё такой подход в программировании называют Принципом Барбары-Лисков. Пример его применения можно видеть в /app/services/base_check_credentials.rb.
class BaseCheckCredentials
private
# curl command can invoke POST method with -I/--head option
# https://curl.se/docs/manpage.html
def bash_command
raise NotImplementedError, "#{self.class}.#{__method__}: #{I18n.t('errors.marketplace_has_not_been_selected')}"
end
def http_client
raise NotImplementedError, "#{self.class}.#{__method__}: #{I18n.t('errors.marketplace_has_not_been_selected')}"
end
end
В наследуемых классах заявленные методы превращаются в следующее содержание
/app/services/yandex/check_credentials.rb
module Yandex
class CheckCredentials < BaseCheckCredentials
private
def bash_command
<<~`BASH`
curl -I \
-H 'Authorization: OAuth oauth_token=#{@mp_credential.credentials['token']}, \
oauth_client_id=#{ENV.fetch('YANDEX_APP_ID')}' \
-X POST https://api.partner.market.yandex.ru/businesses/\
#{@mp_credential.credentials['business_id']}/offer-mappings.json?limit=1
BASH
end
def http_client
Yandex::Api::OfferMappings.new(@mp_credential)
end
end
end
/app/services/ozon/check_credentials.rb
module Ozon
class CheckCredentials < BaseCheckCredentials
private
def bash_command
<<~`BASH`
curl -I \
-H 'Api-Key: #{@mp_credential.credentials['api_key']}' \
-H 'Client-Id: #{@mp_credential.credentials['client_id']}' \
-H 'x-o3-app-name: #{ENV.fetch('OZON_APP_ID')}' \
-X POST https://api-seller.ozon.ru/v2/product/list
BASH
end
def http_client
Ozon::Api::ProductList.new(@mp_credential)
end
end
end
3. Выстраивание задач по загрузке данных в очередь
По тексту статьи неоднократно упоминались очереди. Рассмотрим несколько моментов, связанных с обслуживание очередей при многопоточной обработке данных.
У нас в качестве адаптера очередей используется Sidekiq, о чём и указано в конфигурационном файле /config/application.rb.
Поскольку Sidekiq является Ruby процессом, то он остаётся обладателем только одного ядра процессора на инстанс. Чтобы увеличить количество запущенных инстансов, нужно увеличить количество конфигурационных файлов в папке /config.
Для текущего приложения принято решение выделить два инстанса. Один на клиентские запросы по подключению новых аккаунтов и обслуживанию запросов на повторную, внеплановую перезагрузку данных с маркетплейсов /config/sidekiq_live.yml .
:timeout: 10
:concurrency: <%= ENV.fetch("SIDEKIQ_CONCURRENCY") { 15 } %>
:max_retries: 0
:queues:
- [client_grabber_products, 5]
- [client_grabber_orders, 3]
- [client_grabber_transactions, 2]
- [default, 1]
- [client_grabber_product_descriptions, 4]
Второй инстанс выделяется на выполнение регулярной плановой загрузки данных по расписанию и их автоматическую догрузку в случае предусмотренных вариантов такой отложенной обработки /config/sidekiq_scheduled.yml .
:timeout: 10
:concurrency: <%= ENV.fetch("SIDEKIQ_CONCURRENCY") { 15 } %>
:max_retries: 0
:queues:
- [marketplace_grabber_products, 5]
- [marketplace_grabber_orders, 3]
- [marketplace_grabber_transactions, 3]
- [marketplace_grabber_product_descriptions, 4]
Для каждого инстанса выделяем некоторое число параллельных потоков concurrency, которое позволим одномоментно открыть для увеличения пропускной способности Sidekiq. Мы выбрали значение 15.
Далее распределим выделенное количество потоков между очередями. Здесь нужно учитывать, что чем больше потоков находится в параллельной работе, тем больше они друг другу мешают. Идеального распределения потоков между очередями не существует. В любом случае конфигурацию придётся настраивать под реальные объёмы обрабатываемых данных. Страница административной панели Sidekiq будет нам в помощь.
Теперь оценим процессы, происходящие внутри одного потока Sidekiq.
После ознакомления в данной статье с подробным описанием всех процессов импорта данных с маркетплейсов можно составить представления о расходуемых ресурсах. Одним существенным ресурсом является время загрузки данных по запросу о товарах. Для Яндекс.Маркет тормозящим фактором в имеющейся реализации может стать процесс «засыпания» на очень больших списках товаров в целях соблюдения наложенных маркетплейсом rate limits. На OZON замедляющим выполнение задачи фактором при загрузке данных может становиться загрузка описаний товаров. Для обоих маркетплейсов с течением продолжительной эксплуатации активно торгующим продавцом может оказаться избыточной операция загрузки архивных данных. Просто их может оказаться слишком много. На все эти случаи у нас уже интегрирован в код рукописный бенчмаркинг /lib/ma_benchmarking.rb .
module MaBenchmarking
def benchmarking(log_string)
back_time = Time.now
yield
Rails.logger.info format("(in %.3f sec) #{log_string.call}", Time.now - back_time)
end
end
Он логирует события выгрузки данных. На основе логов мы можем проводить периодический анализ выгружаемых данных для последующего принятия организационных мер.
Следующим расходным ресурсом является количество обращений из параллельных потоков к базе данных на запись и обновление данных. На текущий момент в программе реализована наилучшая практика на чтение и запись данных из/в базу пакетами. Так для импорта используется гем activerecord-import
. Предполагается соблюдать заданную практику при дальнейшем развитии программного решения.
Последний по упоминанию расходный ресурс, на который стоит обращать внимание, является объём выделяемой памяти на инстанс Sidekiq. Им мы можем управлять через изменение размеров батчей в пакетах данных на загрузку с маркетплейса и на обработку базы данных.
IV. Заключение
В статье детально разобран код Web-сервиса, эксплуатирующего REST API по импорту данных о товарах с маркетплейсов Яндекс.Маркет и OZON. На реальных участках кода показаны примеры реализации принципов SOLID. Проектируемое приложение не предполагает превращение Web-сервиса в большой монолит. Здесь намеренно отсутствует модуль авторизации пользователей. Этот функционал предполагает реализацию в отдельном сервисе. Архитектуру проектируемого приложения можно назвать сервисной. Описанные в документации к Web-сервису эндпоинты предназначены для внутреннего использования внутри приложения.
Представленный Web-сервис построен на фреймворке Ruby on Rails. В качестве СУБД используется PostgreSQL. Для хранения данных активно используются нативные типы данных Postgres. Выполнение фоновых задач вынесено в Sidekiq. В проекте используется системный менеджер задач CRON из гема clockwork. Для кэширования в Redis используется гем Kredis. С полностью рабочим проектом можно ознакомиться в репозитории https://github.com/rubygitflow/marketplace_aggregator.
Проект открыт для вкладов от контрибьютеров.
Приветствуется сохранение выбранного стиля в настройках Rubocop. Одобряется глубокое тестирование в районе 100% с сохранением покрытия кода тестами в Rspec.