Привет! На связи Артемий – Analytics Engineer.
Сегодня хотел бы поговорить о вопросах конвертирования финансовых показателей в разные валюты. Вопрос достаточно актуальный, так как большое количество компаний имеют мультинациональные зоны присутствия, строят аналитику глобального масштаба, готовят отчетность по международным стандартам.
Покажу как этот вопрос решается с помощью современных подходов на примере кейса:
Расширение списка базовых валют
Регулярное обновление и получения актуальных курсов
Обеспечение корректности исторических показателей
Максимальное удобство и простота использования в аналитических инструментах
Велком под кат для разбора решения проблемы учета мультивалютных метрик и показателей: Open Exchange Rates, Airflow, Redshift Spectrum, dbt.
Новые требования к сервису валютных курсов
В качестве legacy-источника использовался веб-сервис ЦБ РФ. Однако с изменяющимися требованиями и расширением зон присутствия компании его стало недостаточно. Например, по причине отсутствия котировки AED (дирхам ОАЭ). Для кого-то могут быть актуальны курсы криптовалют BTC, ETH, которые в веб-сервисе ЦБ РФ тоже отсутствуют.
Новые требования можно суммировать следующим образом:
Поддержка расширенного набора базовых валют, которые отсутствуют в API ЦБ РФ
Получение самых актуальных котировок, включая внутридневные курсы
Минимизация трансформаций данных вне Хранилища Данных (лучше если их вообще нет)
Задачи, которые предстоит решить легко визуализировать в виде матрицы. Красным помечены области, поддержку которых предстоит добавить:
Интеграция нового API для уже использующихся курсов
Добавление новых базовых валют в выгрузку
Получение ретроспективных (исторических) данных по новым валютам за прошлые периоды
Архивирование курсов из legacy-источника
Легаси приложение по выгрузке курсов валют формировало pivot-таблицу с коэффициентом для каждой пары в отдельном столбце. Это удобно, когда у нас есть строго фиксированный набор валют и наименования колонок, но превращается в головную боль если список валют необходимо расширить.
Появилось желание уйти от всех трансформаций и формирований таблиц в pandas до того как данные попадают в Хранилище. Здесь я придерживаюсь принципа применения всех трансформаций (T в ELT) в одном месте, и помогает мне в этом замечательный инструмент dbt.
Интеграция с новым поставщиком данных
Как уже стало понятно, без внешнего поставщика данных обойтись не получится, поэтому предлагаю рассмотреть один из ряда провайдеров курсов валют – https://openexchangerates.org/
Минимальный необходимый план Developer включает в себя:
10.000 запросов ежемесячно (более чем достаточно)
Ежечасные внутридневные обновления курсов
Широкий набор базовых валют, включая криптовалюты
Доступные методы API:
Для получения актуальных курсов валют воспользуемся API endpoint /latest.json
Простой запрос-ответ может выглядеть следующим образом:
Установка на расписание в Airflow
Для регулярного получения актуальных курсов валют я воспользуюсь инструментом Airflow. Apache Airflow – де-факто стандарт в области оркестрации данных, data engineering и управления пайплайнами.
Смысловая составляющая графа задачи (DAG):
Сделать запрос к API
Сохранить полученный ответ (например, в виде уникального ключа на S3)
Уведомить в Slack в случае ошибки
Конфигурация DAG:
Базовые валюты (base currency), от которых отсчитываем курсы
Синхронизация расписание запусков с расчетом витрин в Хранилище Данных
Токен доступа к сервису
Самый простой DAG состоит из одного таска с вызовом простого shell-скрипта:
TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"`
curl -H "Authorization: Token $OXR_TOKEN" \
"https://openexchangerates.org/api/historical/$BUSINESS_DT.json?base=$BASE_CURRENCY&symbols=$SYMBOLS" \
| aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$BUSINESS_DT-$BASE_CURRENCY-$TS.json
Вот как выглядит результат регулярной работы скрипта в S3:
Сегодня в штатном режиме выполняется около 25 обращений к сервису в сутки, статистика выглядит следующим образом:
Выгрузка истории по новым валютам
После обеспечения регулярной выгрузки всех необходимых валют, можно приступить к формированию истории по новым базовым валютам (которой, очевидно, нет). Это позволит переводить в новые валюты суммы транзакций прошлых периодов.
К сожалению, план Developer не включает обращения к API endpoint /time-series.json, и только ради этой разовой задачи не имеет смысла делать upgrade на более дорогостоящую версию.
Воспользуемся методом /historical/*.json и простым опросом API в цикле для формирования исторической выгрузки:
#!/bin/bash
d=2011-01-01
while [ "$d" != 2021-02-19 ]; do
echo $d
curl -H "Authorization: Token $TOKEN" "https://openexchangerates.org/api/historical/$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json
d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d)
done
Пиковая нагрузка вызвала вопросы у коллег, которые тоже пользуются сервисом, но это была разовая акция:
Архивирование исторических курсов валют
Вся история обменных курсов полученная из legacy-источника ЦБ РФ до даты X (перехода на новый сервис-провайдер) подлежит архивированию в неизменном виде.
Я хочу сохранить все те курсы, которые мы показывали в своих аналитических инструментах без изменений. То есть чтобы суммы в дашбордах и отчетах бизнес-пользователей не были изменены ни на копейку.
Для этого я выполню выгрузку накопленных значений обменных курсов за весь исторический период в Data Lake. Более детально, я произведу:
Трансформацию legacy pivot-таблицы в двумерную
Запись в колоночный формат PARQUET в AWS S3
Формирование архива в S3 в формате PARQUET
CREATE EXTERNAL TABLE spectrum.currencies_cbrf
STORED AS PARQUET
LOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' AS
WITH base AS (
SELECT 'EUR' AS base_currency
UNION ALL
SELECT 'GBP'
UNION ALL
SELECT 'RUB'
UNION ALL
SELECT 'USD'
)
SELECT
"day" AS business_dt
,b.base_currency
,CASE b.base_currency
WHEN 'EUR' THEN 1
WHEN 'GBP' THEN gbp_to_eur
WHEN 'RUB' THEN rub_to_eur
WHEN 'USD' THEN usd_to_eur
ELSE NULL
END AS eur
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_gbp
WHEN 'GBP' THEN 1
WHEN 'RUB' THEN rub_to_gbp
WHEN 'USD' THEN usd_to_gbp
ELSE NULL
END AS gbp
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_rub
WHEN 'GBP' THEN gbp_to_rub
WHEN 'RUB' THEN 1
WHEN 'USD' THEN usd_to_rub
ELSE NULL
END AS rub
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_usd
WHEN 'GBP' THEN gbp_to_usd
WHEN 'RUB' THEN rub_to_usd
WHEN 'USD' THEN 1
ELSE NULL
END AS usd
FROM ext.currencies c
CROSS JOIN base b
;
Таким образом, в хранилище S3 у меня теперь есть статический снимок всех обменных курсов, когда-либо использованных в аналитических приложениях, сериализованный в оптимизированный колоночный формат со сжатием. В случае необходимости пересчета витрин и исторических данных я запросто смогу воспользоваться этими курсами.
Доступ к данным из DWH через S3 External Table
А теперь самое интересное – из своего аналитического движка Amazon Redshift я хочу иметь возможность просто и быстро обращаться к самым актуальным курсам валют, использовать их в своих трансформациях.
Оптимальное решение – создание внешних таблиц EXTERNAL TABLE, которые обеспечивают SQL-доступ к данным, хранящимся в S3. При этом нам доступно чтение полуструктурированных данных в формате JSON, бинарных данных в форматах AVRO, ORC, PARQUET и другие опции. Продукт имеет название Redshift Spectrum и тесно связан с SQL-движком Amazon Athena, который имеет много общего с Presto.
CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (
"timestamp" bigint
, base varchar(3)
, rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
)
ROW format serde 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<BUCKET>/dwh/currencies/'
;
Обратите внимание на обращение ко вложенному документу rates с помощью создания типа данных struct.
Теперь добавим к этой задаче секретную силу dbt. Модуль dbt-external-tables позволяет автоматизировать создание EXTERNAL TABLES и зарегистрировать их в качестве источников данных:
- name: external
schema: spectrum
tags: ["spectrum"]
loader: S3
description: "External data stored in S3 accessed vith Redshift Spectrum"
tables:
- name: currencies_oxr
description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
freshness:
error_after: {count: 15, period: hour}
loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
external:
location: "s3://<BUCKET>/dwh/currencies/"
row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
columns:
- name: timestamp
data_type: bigint
- name: base
data_type: varchar(3)
- name: rates
data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
Немаловажным элементом является проверка своевременности данных – source freshness test на курсы валют. Тем самым мы будем постоянно держать руку на пульсе поступления актуальных данных в Хранилище. Очень важно рассчитывать все финансовые метрики корректно и в срок, а без актуальных значений курсов задачу решить невозможно.
В случае отставания данных – более 15 часов без свежих обменных курсов – мы тут же получаем уведомление в Slack.
Для прозрачности и простоты пользователей объединим исторические данные (архив) и постоянно поступающие актуальные курсы (новый API) в одну модель currencies:
Объединение исторических и новых данных в единый справочник
{{
config(
materialized='table',
dist='all',
sort=["business_dt", "base_currency"]
)
}}
with cbrf as (
select
business_dt
, null as business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from {{ source('external', 'currencies_cbrf') }}
where business_dt <= '2021-02-18'
),
oxr_all as (
select
(timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt
, (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts
, o.base as base_currency
, o.rates.aed::decimal(10,4) as aed
, o.rates.eur::decimal(10,4) as eur
, o.rates.gbp::decimal(10,4) as gbp
, o.rates.rub::decimal(10,4) as rub
, o.rates.usd::decimal(10,4) as usd
, row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn
from {{ source('external', 'currencies_oxr') }} as o
where business_dt > '2021-02-18'
),
oxr as (
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from {{ ref('stg_currencies_oxr_all') }}
where rn = 1
),
united as (
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from cbrf
union all
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from oxr
)
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from united
При этом физически справочник с курсами валют копируется на каждую ноду аналитического кластера Redshift и хранится в отсортированном по дате и базовой валюте виде для ускорения работы запросов.
Использование курсов в моделировании данных
В целом, работа с курсами валют для аналитиков и инженеров, которые развивают Хранилище Данных не изменилась и осталась весьма простой. Все детали использования нового API, обращения к внешним полу-структурированным документам JSON в S3, объединению с архивными данными скрыты . В своих трансформациях достаточно сделать простой джоин на таблицу с курсами валют:
select
-- price_details
, r.currency
, {{ convert_currency('price', 'currency') }}
, {{ convert_currency('discount', 'currency') }}
, {{ convert_currency('insurance', 'currency') }}
, {{ convert_currency('tips', 'currency') }}
, {{ convert_currency('parking', 'currency') }}
, {{ convert_currency('toll_road', 'currency') }}
from {{ ref('requests') }} r
left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt
and r.currency = currencies.base_currency
Сами метрики конвертируются при помощи простого макроса, который на вход принимает колонку с исходной суммой и колонку с исходным кодом валюты:
-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
, ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
, ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
, ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
, ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
{%- endmacro %}
Практико-ориентированное развитие
Работа с данными – одно из наиболее востребованных и бурно развивающихся направлений. Каждый день я нахожу новые интересные задачи и придумываю решения для них. Это захватывающий и интересный путь, расширяющий горизонты.
В конце мая состоится юбилейный запуск курса Data Engineer в OTUS, в котором я принимаю участие в роли преподавателя.
По прошествии двух лет программа постоянно менялась, адаптировалась. Ближайший запуск принесет ряд нововведений и будет построен вокруг кейсов – реальных прикладных проблем инженеров:
Data Architecture
Data Lake
Data Warehouse
NoSQL / NewSQL
MLOps
Детально с программой можно ознакомиться на лендинге курса.
Также я делюсь своими авторскими заметками и планами в телеграм-канале Technology Enthusiast.
Благодарю за внимание.