Как стать автором
Обновить

Забираем большие маленькие данные по REST API

Время на прочтение6 мин
Количество просмотров8.4K


Кадр из мультфильма «Смешарики: 132 серия (Пылесос)»


При проведении различной ad-hoc аналитики или же создания интеграций между DS решением и внешними системами очень часто приходится использовать REST API для получения данных. Ситуация, когда все помещается в один запрос — идеальна, но редка как единорог. Как правило, приходится тянуть большие объемы, тянуть по частям и в режиме многоходовок, возможно, с использованием курсоров. Внешняя система может лечь при большой нагрузке или же там включатся механизмы пропуска запросов (троттлинг). Вопросы «почему у меня не работает» и «как мне сделать, чтобы работало» возникают с завидной регулярностью.


Ниже приведен блочный разбор типового скрипта для получению данных из внешней системы через REST API. Его можно рассматривать как первое приближение решения задачи подобного класса.


Является продолжением серии предыдущих публикаций.


Определяемся с требованиями


Чтобы обеспечить более или менее комфортную и предсказуемую работу по сбору данных с внешней системы определимся с минимальным набором функциональных требований. Изначально считаем, что исполнение может быть достаточно долгим, поскольку для коротких запросов все делается гораздо проще — перезапрос неудачной сессии дешевле обвязки.


  1. Необходимо обеспечить детальное логирование процесса сбора.
  2. Необходимо обеспечить многопоточный сбор данных.
  3. Необходимо обеспечить примерную количественную валидацию получаемых данных.
  4. Необходимо обеспечить локальное кэширование для иммитации инкрементального апдейта.

Разберем шаги на примере анализа данных для нужд маркетинга с сервиса (пусть это будет dashamail) по рассылке кампаний. Как минимум, есть иерархическая зависимость кампания — список рассылки. Т.е. сначала необходимо определиться со списком интересующих рекламных кампаний, а потом уже по каждой кампании определяться с ее списком рассылки. Скрипт минимально простой, исключительно для выполнения своих функций и не более. Желающие улучшить всегда могут это сделать самостоятельно.


Инициализация


Несмотря на то, что мы сами будем сохранять файловый кэш по запрошенным кампаниям, допустим, что у нас еще есть куммулятивный кэш файл по ранее собранным рассылкам. Исторически достался и мы его тоже попробуем приложить к делу.


Код инициализации
library(tidyverse)
library(magrittr)
library(httr)
library(rvest)
library(glue)
library(jsonlite)
library(stringi)
library(checkmate)
library(tictoc)
library(iterators)
library(future)
library(doFuture)
library(doRNG)
library(futile.logger)
future::plan(sequential)

user_id <- "тут мог бы быть ваш логин"
user_pwd <- "тут мог бы быть ваш пароль"
base_url <- glue("https://api.dashamail.com/?username={user_id}&password={user_pwd}")

common_logname <- "dashamail.log"
dasha_cache <- here::here("precalc", "dasha_cache")

flog.appender(appender.file(common_logname))
flog.threshold(INFO)
flog.info("Start batch processing")

tic("Full DashaMail processing workflow")

Запрашиваем список кампаний


Обычно необходимо получить список всех проведенных кампаний, отсеять из него интересующие и потом уже по каждому идентификатору кампании начать запрашивать детальные параметры.


Код сбора данных по списку проведенных кампаний
# вспомогательная функция по нормализации ответа
cleanCampaign <- function(x){
  # тэг relay_id может быть NULL и он не дает сделать bind. Он для нас неважен, поэтому удалим
  x$relay_id <- NULL
  x$sqleseg <- NULL
  x
}

# для ускорения процесса берем архив уже собранных данных
arch_tbl <- here::here("precalc", "dashamail_arch.fst") %>%
  fst::read_fst()

# получаем список кампаний средствами API ----------
tic("Requesting campaigns' info through API")
resp <- GET(glue("{base_url}&method=campaigns.get"))
flog.info(capture.output(toc()))

# формируем табличку интересующих нас кампаний
camps_tbl <- content(resp)[["response"]][["data"]] %>%
  map_dfr(cleanCampaign) %>%
  mutate_at("emails_sent", as.numeric) %>%
  filter(status == "SENT") %>%
  filter(emails_sent > 0) %>%
  mutate_at(vars(delivery_time), lubridate::ymd_hms, truncated = 3, tz = "Europe/Moscow") %>%
  arrange(desc(id))

Сверка с локальным кэшем


Чтобы сократить время сбора, нагрузку на внешний сервис и обеспечить, возможную историчность, мы ранее сохраняли все удачные ответы в файловый кэш. Сверим его с полученной таблицой рассылок и сформируем инкрементальное задание на опрос.


Код сверки с локальным кэшем
# сделаем реестр закэшированных данных
cached_tbl <- fs::dir_ls(path = dasha_cache, glob = "*.qs") %>%
  tibble(path = ., camp_id = str_match(fs::path_file(path), "camp_(\\d+).*\\.qs")[, 2])

Подготовка заданий для многопоточного опроса кампаний


Cформируем табличное описание заданий для генерации итератора. Учтем что кампания кампании рознь. Перемешаем случайным образом кампании, чтобы устранить потенциальную разбалансировку по нагрузке потоков.


Это мы все делаем в основном потоке.


Код подготовки заданий
iter_tbl <- camps_tbl %>%
  # перемешаем случайным образом кампании
  sample_n(n()) %>%
  select(camp_id = id, emails_sent, delivery_time) %>%
  # из запроса исключаем успешно закэшированные
  anti_join(cached_tbl, by = "camp_id") %>% 
  # исключаем те, что есть уже в архиве
  anti_join(mutate(arch_tbl, camp_id = as.character(campaign_id)), by = "camp_id") %>%
  # исключаем старые рассылки за пределами квартала
  filter(as.Date(delivery_time) > lubridate::dmy("29-12-2020")) %>%
  mutate(camp_url = glue("{base_url}&method=reports.sent&",
                         "campaign_id={camp_id}&limit=1000000000&order=email%20asc")) %>%
  mutate(idx = paste0(row_number(), "/", n())) %>%
  mutate(pad_idx = numform::f_pad_zero(row_number()))
# sample_n(8)
flog.info(glue("Requesting {nrow(iter_tbl)} campaigns, {nrow(cached_tbl)} are cached"))

Сбор данных по кампаниям в многопоточном режиме


Поскольку 99% времени потоки будут находиться в режиме ожидания ответа от внешней системы, количество потоков может быть больше числа доступных ядер и определяться лишь возможностями внешней системы. Тут же ведем проверку соответствия реально полученного объема данных с задекларированным по списку кампаний. Проверить полезно, но не всегда эти показатели могут совпадать. Не из-за проблем на канале связи, а из-за неконсистентности данных во внешней системе. Это тоже стоит учесть.


Детально про %dorng% можно почитать здесь.


Код сбора данных по кампаниям
tic("Requesting campaigns' full report")
# инициализируем параллельную обработку
registerDoFuture()
future::plan(future.callr::callr, workers = 4) # REST 99% времени молчит
rm(resp, ll)
gc(full = TRUE)

start_time <- Sys.time()
camp_list <- 
  foreach(it = iter(iter_tbl, by = "row"), .verbose = FALSE,
          .inorder = FALSE) %dorng% 
  {
    # собираем список отчетов
    flog.appender(appender.file(common_logname))
    flog.info(glue("INIT: campaign #{it$camp_id}, {it$idx}"))

    tic(glue("Report request for campaign #{it$camp_id}, {it$idx} completed"))
    cont <- GET(it$camp_url) %>%
      httr::content()
    body_tbl <- cont$response$data %>%
      data.table::rbindlist() %>%
      as_tibble()
    flog.info(capture.output(toc()))

    elapsed_time <- round(difftime(Sys.time(), start_time, units = "mins"), 1)
    flog.info(glue("Total elapsed time = {elapsed_time} min(s),", 
                        "mem_used = {capture.output(pryr::mem_used())}"))

    # TODO !!! Надо сверить число строк ответа с полем emails_sent из кампании !!!
    if(checkmate::testDataFrame(body_tbl, min.rows = 1, null.ok = FALSE)) {
      # сохраняем только в случае получения полного объема данных
      flog.info(glue("#{it$camp_id} data size = {capture.output(pryr::object_size(body_tbl))}"))
      delta_n <- it$emails_sent - nrow(body_tbl)
      f_suffix <- ifelse(delta_n == 0, "", glue("_(lost_{delta_n})"))
      if(delta_n != 0) {
        flog.info(glue("#{it$camp_id} response object is INCONSISTENT!",
                       "{nrow(body_tbl)}/{it$emails_sent} rows (fact/plan)", .sep = " "))
      }
      # сразу подожмем данные
      body_tbl %>%
        mutate_at(vars(sent_time), lubridate::ymd_hms, truncated = 3, tz = "Europe/Moscow") %>%
        mutate(sent_date = as.Date(sent_time)) %>%
        mutate_at(vars(id, member_id, campaign_id, list_id), as.numeric) %>%
        select(-open_time) %>%
        qs::qsave(here::here(dasha_cache, glue("camp_{it$camp_id}{f_suffix}.qs")))

    } else flog.info(glue("#{it$camp_id} response object is EMPTY"))
    rm(cont, body_tbl)
    gc()

    NULL
  }
# закрываем все дочерние сессии, они едят память
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
gcinfo(FALSE)

flog.info("Full request finished")
flog.info(capture.output(toc()))

Финализируем результат


Собираем все данные вместе, сохраняем, считаем время


Код финализации
# теперь собираем всю закэшированную информацию по кампаниям
report_tbl <- cached_tbl$path %>%
  purrr::map_dfr(qs::qread) %>%
  # подтянем старые данные
  bind_rows(arch_tbl)

tic("Saving email activity @ FST")
fst::write_fst(report_tbl, here::here("precalc", "dashamail.fst"), compress = 100)
flog.info(capture.output(toc()))

# общее время работы скрипта
flog.info(capture.output(toc())) 

Заключение


Приведенный каркас является достаточно универсальным и требует весьма незначительной адаптации под специфику того или иного сервиса. Может авторизацию чуть придется подправить или пароли вытащить наружу — но это частные технические моменты. При парсинге сильно вариативных json можно использовать доп. инструменты, например jqr, детальнее писал здесь.


Предыдущая публикация — «Data Science 'по ту сторону изгороди'».

Теги:
Хабы:
Всего голосов 7: ↑7 и ↓0+7
Комментарии0

Публикации

Истории

Работа

Data Scientist
78 вакансий
Python разработчик
123 вакансии

Ближайшие события

27 августа – 7 октября
Премия digital-кейсов «Проксима»
МоскваОнлайн
14 сентября
Конференция Practical ML Conf
МоскваОнлайн
19 сентября
CDI Conf 2024
Москва
20 – 22 сентября
BCI Hack Moscow
Москва
24 сентября
Конференция Fin.Bot 2024
МоскваОнлайн
25 сентября
Конференция Yandex Scale 2024
МоскваОнлайн
28 – 29 сентября
Конференция E-CODE
МоскваОнлайн
28 сентября – 5 октября
О! Хакатон
Онлайн
30 сентября – 1 октября
Конференция фронтенд-разработчиков FrontendConf 2024
МоскваОнлайн
3 – 18 октября
Kokoc Hackathon 2024
Онлайн