Кадр из мультфильма «Смешарики: 132 серия (Пылесос)»
При проведении различной ad-hoc аналитики или же создания интеграций между DS решением и внешними системами очень часто приходится использовать REST API для получения данных. Ситуация, когда все помещается в один запрос — идеальна, но редка как единорог. Как правило, приходится тянуть большие объемы, тянуть по частям и в режиме многоходовок, возможно, с использованием курсоров. Внешняя система может лечь при большой нагрузке или же там включатся механизмы пропуска запросов (троттлинг). Вопросы «почему у меня не работает» и «как мне сделать, чтобы работало» возникают с завидной регулярностью.
Ниже приведен блочный разбор типового скрипта для получению данных из внешней системы через REST API. Его можно рассматривать как первое приближение решения задачи подобного класса.
Является продолжением серии предыдущих публикаций.
Определяемся с требованиями
Чтобы обеспечить более или менее комфортную и предсказуемую работу по сбору данных с внешней системы определимся с минимальным набором функциональных требований. Изначально считаем, что исполнение может быть достаточно долгим, поскольку для коротких запросов все делается гораздо проще — перезапрос неудачной сессии дешевле обвязки.
- Необходимо обеспечить детальное логирование процесса сбора.
- Необходимо обеспечить многопоточный сбор данных.
- Необходимо обеспечить примерную количественную валидацию получаемых данных.
- Необходимо обеспечить локальное кэширование для иммитации инкрементального апдейта.
Разберем шаги на примере анализа данных для нужд маркетинга с сервиса (пусть это будет dashamail) по рассылке кампаний. Как минимум, есть иерархическая зависимость кампания — список рассылки. Т.е. сначала необходимо определиться со списком интересующих рекламных кампаний, а потом уже по каждой кампании определяться с ее списком рассылки. Скрипт минимально простой, исключительно для выполнения своих функций и не более. Желающие улучшить всегда могут это сделать самостоятельно.
Инициализация
Несмотря на то, что мы сами будем сохранять файловый кэш по запрошенным кампаниям, допустим, что у нас еще есть куммулятивный кэш файл по ранее собранным рассылкам. Исторически достался и мы его тоже попробуем приложить к делу.
library(tidyverse)
library(magrittr)
library(httr)
library(rvest)
library(glue)
library(jsonlite)
library(stringi)
library(checkmate)
library(tictoc)
library(iterators)
library(future)
library(doFuture)
library(doRNG)
library(futile.logger)
future::plan(sequential)
user_id <- "тут мог бы быть ваш логин"
user_pwd <- "тут мог бы быть ваш пароль"
base_url <- glue("https://api.dashamail.com/?username={user_id}&password={user_pwd}")
common_logname <- "dashamail.log"
dasha_cache <- here::here("precalc", "dasha_cache")
flog.appender(appender.file(common_logname))
flog.threshold(INFO)
flog.info("Start batch processing")
tic("Full DashaMail processing workflow")
Запрашиваем список кампаний
Обычно необходимо получить список всех проведенных кампаний, отсеять из него интересующие и потом уже по каждому идентификатору кампании начать запрашивать детальные параметры.
# вспомогательная функция по нормализации ответа
cleanCampaign <- function(x){
# тэг relay_id может быть NULL и он не дает сделать bind. Он для нас неважен, поэтому удалим
x$relay_id <- NULL
x$sqleseg <- NULL
x
}
# для ускорения процесса берем архив уже собранных данных
arch_tbl <- here::here("precalc", "dashamail_arch.fst") %>%
fst::read_fst()
# получаем список кампаний средствами API ----------
tic("Requesting campaigns' info through API")
resp <- GET(glue("{base_url}&method=campaigns.get"))
flog.info(capture.output(toc()))
# формируем табличку интересующих нас кампаний
camps_tbl <- content(resp)[["response"]][["data"]] %>%
map_dfr(cleanCampaign) %>%
mutate_at("emails_sent", as.numeric) %>%
filter(status == "SENT") %>%
filter(emails_sent > 0) %>%
mutate_at(vars(delivery_time), lubridate::ymd_hms, truncated = 3, tz = "Europe/Moscow") %>%
arrange(desc(id))
Сверка с локальным кэшем
Чтобы сократить время сбора, нагрузку на внешний сервис и обеспечить, возможную историчность, мы ранее сохраняли все удачные ответы в файловый кэш. Сверим его с полученной таблицой рассылок и сформируем инкрементальное задание на опрос.
# сделаем реестр закэшированных данных
cached_tbl <- fs::dir_ls(path = dasha_cache, glob = "*.qs") %>%
tibble(path = ., camp_id = str_match(fs::path_file(path), "camp_(\\d+).*\\.qs")[, 2])
Подготовка заданий для многопоточного опроса кампаний
Cформируем табличное описание заданий для генерации итератора. Учтем что кампания кампании рознь. Перемешаем случайным образом кампании, чтобы устранить потенциальную разбалансировку по нагрузке потоков.
Это мы все делаем в основном потоке.
iter_tbl <- camps_tbl %>%
# перемешаем случайным образом кампании
sample_n(n()) %>%
select(camp_id = id, emails_sent, delivery_time) %>%
# из запроса исключаем успешно закэшированные
anti_join(cached_tbl, by = "camp_id") %>%
# исключаем те, что есть уже в архиве
anti_join(mutate(arch_tbl, camp_id = as.character(campaign_id)), by = "camp_id") %>%
# исключаем старые рассылки за пределами квартала
filter(as.Date(delivery_time) > lubridate::dmy("29-12-2020")) %>%
mutate(camp_url = glue("{base_url}&method=reports.sent&",
"campaign_id={camp_id}&limit=1000000000&order=email%20asc")) %>%
mutate(idx = paste0(row_number(), "/", n())) %>%
mutate(pad_idx = numform::f_pad_zero(row_number()))
# sample_n(8)
flog.info(glue("Requesting {nrow(iter_tbl)} campaigns, {nrow(cached_tbl)} are cached"))
Сбор данных по кампаниям в многопоточном режиме
Поскольку 99% времени потоки будут находиться в режиме ожидания ответа от внешней системы, количество потоков может быть больше числа доступных ядер и определяться лишь возможностями внешней системы. Тут же ведем проверку соответствия реально полученного объема данных с задекларированным по списку кампаний. Проверить полезно, но не всегда эти показатели могут совпадать. Не из-за проблем на канале связи, а из-за неконсистентности данных во внешней системе. Это тоже стоит учесть.
Детально про %dorng%
можно почитать здесь.
tic("Requesting campaigns' full report")
# инициализируем параллельную обработку
registerDoFuture()
future::plan(future.callr::callr, workers = 4) # REST 99% времени молчит
rm(resp, ll)
gc(full = TRUE)
start_time <- Sys.time()
camp_list <-
foreach(it = iter(iter_tbl, by = "row"), .verbose = FALSE,
.inorder = FALSE) %dorng%
{
# собираем список отчетов
flog.appender(appender.file(common_logname))
flog.info(glue("INIT: campaign #{it$camp_id}, {it$idx}"))
tic(glue("Report request for campaign #{it$camp_id}, {it$idx} completed"))
cont <- GET(it$camp_url) %>%
httr::content()
body_tbl <- cont$response$data %>%
data.table::rbindlist() %>%
as_tibble()
flog.info(capture.output(toc()))
elapsed_time <- round(difftime(Sys.time(), start_time, units = "mins"), 1)
flog.info(glue("Total elapsed time = {elapsed_time} min(s),",
"mem_used = {capture.output(pryr::mem_used())}"))
# TODO !!! Надо сверить число строк ответа с полем emails_sent из кампании !!!
if(checkmate::testDataFrame(body_tbl, min.rows = 1, null.ok = FALSE)) {
# сохраняем только в случае получения полного объема данных
flog.info(glue("#{it$camp_id} data size = {capture.output(pryr::object_size(body_tbl))}"))
delta_n <- it$emails_sent - nrow(body_tbl)
f_suffix <- ifelse(delta_n == 0, "", glue("_(lost_{delta_n})"))
if(delta_n != 0) {
flog.info(glue("#{it$camp_id} response object is INCONSISTENT!",
"{nrow(body_tbl)}/{it$emails_sent} rows (fact/plan)", .sep = " "))
}
# сразу подожмем данные
body_tbl %>%
mutate_at(vars(sent_time), lubridate::ymd_hms, truncated = 3, tz = "Europe/Moscow") %>%
mutate(sent_date = as.Date(sent_time)) %>%
mutate_at(vars(id, member_id, campaign_id, list_id), as.numeric) %>%
select(-open_time) %>%
qs::qsave(here::here(dasha_cache, glue("camp_{it$camp_id}{f_suffix}.qs")))
} else flog.info(glue("#{it$camp_id} response object is EMPTY"))
rm(cont, body_tbl)
gc()
NULL
}
# закрываем все дочерние сессии, они едят память
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
gcinfo(FALSE)
flog.info("Full request finished")
flog.info(capture.output(toc()))
Финализируем результат
Собираем все данные вместе, сохраняем, считаем время
# теперь собираем всю закэшированную информацию по кампаниям
report_tbl <- cached_tbl$path %>%
purrr::map_dfr(qs::qread) %>%
# подтянем старые данные
bind_rows(arch_tbl)
tic("Saving email activity @ FST")
fst::write_fst(report_tbl, here::here("precalc", "dashamail.fst"), compress = 100)
flog.info(capture.output(toc()))
# общее время работы скрипта
flog.info(capture.output(toc()))
Заключение
Приведенный каркас является достаточно универсальным и требует весьма незначительной адаптации под специфику того или иного сервиса. Может авторизацию чуть придется подправить или пароли вытащить наружу — но это частные технические моменты. При парсинге сильно вариативных json можно использовать доп. инструменты, например jqr
, детальнее писал здесь.
Предыдущая публикация — «Data Science 'по ту сторону изгороди'».