Как стать автором
Обновить

Ещё немного о параллельных вычислениях в R

Время на прочтение 6 мин
Количество просмотров 1.5K

Публикация очень краткая. Многие думают, что параллельные вычисления в R -- это очень сложно и неприменимо к их текущим задачам.

И да и нет. Если сознательно не вдаваться в теорию, железо и всякие подробности, то можно нарисовать «3 и 1/2» почти универсальных рецепта. Приведенные примеры сознательно похожи на продуктивные задачи, а не выхолощенные пара строчек синтетики.

Является продолжением серии предыдущих публикаций.

Используемые пакеты

Загрузка пакетов
library(tidyverse)
library(magrittr)
library(stringi)
library(glue)

library(dqrng)

library(iterators)
library(future)
library(foreach)
library(doFuture)

library(tictoc)
library(futile.logger)
library(lgr) # будем использовать его рутовый логгер `lgr`

library(hrbrthemes)

Паттерны параллелизации

Паттерн 1. Параллелизация tidyverse вычислений

Ситуация. Есть скрипт, содержащий множество пайплайнов на tidyverse.

Пример задачи. Подсчитаем среднее от суммы квадратов чисел. Для повышения эффективности параллельных вычислений важно уменьшить объемы перекачки данных между потоками. Используем пакет furrr.

`tidyverse` pipeline
registerDoFuture()
# future::plan(multiprocess)
workers <- parallel::detectCores() - 1
future::plan(multisession, workers = workers)

num_row <- 1:10^6

ff_seq <- function(x) x^2

ff_par <- function(x) mean(x^2)

tic("Считаем последовательно")
lst1 <- num_row %>%
  purrr::map_dbl(ff_seq) %>%
  mean()
toc()

tic("Считаем параллельно, вариант 1")
lst2 <- num_row %>%
  furrr::future_map_dbl(ff_seq) %>%
  mean()
toc() 

tic("Считаем параллельно, вариант 2")
lst2 <- num_row %>%
  split(cut(seq_along(.), workers, labels = FALSE)) %>%
  furrr::future_map_dbl(ff_par) %>%
  mean()
toc()

Естественно, результат зависит от аппаратной платформы и ОС, на которой все запускается. На тестовом прогоне у меня такая раскладка:

Считаем последовательно: 7.23 sec elapsed
Считаем параллельно, вариант 1: 3.43 sec elapsed
Считаем параллельно, вариант 2: 0.64 sec elapsed

Windows и Linux достаточно сильно отличаются по методам параллелизации. Linux в продуктиве сильно предпочтительнее Windows.

Паттерн 2. Локальная ручная параллелизация

Ситация. В ряде случаев при работе скрипта необходимо выполнить незначительное число разовых неунифицируемых операций. Например, загрузка справочников и различных первичных данных. Есть возможное решение, функция %<-%.

Генерация сэмплов
# создаем последовательность, матрица 20 атрибутов на 10^5 событий
nn <- 10^5
tic("Generating sample data.frame")
df <- 100 %>%
  # stri_rand_strings(length = 10, pattern = "[a-z]") %>%
  sample(10^4:10^5, .) %>%
  sample(20 * nn, replace = TRUE) %>%
  matrix(byrow = TRUE, ncol = 20) %>%
  as_tibble(.name_repair = "universal") %>%
  mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>%
  # сгенерируем версию объекта
  mutate(ver = sample(1:20, n(), replace = TRUE)) %>%
  select(user_id, ver, everything())
toc()

# сохраним в файл для последующей демонстрации параллелизации
demo_fpath <- here::here("temp", "demo_data.xlsx")
openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE)
Два варианта загрузки файлов
plan(multisession, workers = parallel::detectCores() - 2)
# plan(sequential)
# https://github.com/HenrikBengtsson/future

# считаем, что воркеров у нас 2
tic("Объединяем последовательно обработанные файлы")
tic("Читаем файлы последовательно")
res_lst <- list()
for (j in 1:6) {
  res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)}
}
toc()
seq_df <- bind_rows(res_lst)
toc()

tic("Объединяем параллельно обработанные файлы")
tic("Читаем файлы параллельно")
df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
toc()
par_df <- bind_rows(df1, df2, df3, df4, df5, df6)
toc()

all_equal(seq_df, par_df)

Некая разница наблюдается. Пример исключительно для демонстрации принципа. На тестовом прогоне у меня такая раскладка:

Объединяем последовательно обработанные файлы: 46.23 sec elapsed
Объединяем параллельно обработанные файлы: 37.82 sec elapsed

Паттерн 3. Параллелизация сложного процессинга

Ситуация. Много вычислительного данных, много кода, процессинг потенциально независим.

Пример.
Сделаем общее задание. Будем считать число сочетаний $C_n^k$. Дополнительно добавим несколько вариантов логирования при параллельных вычислениях.

Генерация списка заданий.
jobs_tbl <- tibble(idx = 1:200) %>%
  mutate(n = sample(100:1000, n(), replace = TRUE)) %>%
  rowwise() %>%
  mutate(k = sample(1:n, 1)) %>%
  ungroup() %>%
  mutate(idx_str = numform::f_pad_zero(idx))
Подготовка логгеров
# подготовка логгеров
flog_logname <- here::here("log", "job_futile.log")
lgr_logname <- here::here("log", "job_lgr.log")

initLogging <- function(log_file){

  lgr <- get_logger_glue("logger")

  lgr$set_propagate(FALSE)
  lgr$set_threshold("all")
  lgr$set_appenders(list(
    console = AppenderConsole$new(
      threshold = "info"
    ),
    file = AppenderFile$new(
      file = log_file,
      threshold = "all"
    )
  ))

  lgr  
}

invisible(flog.appender(appender.tee(flog_logname)))
invisible(flog.threshold(INFO))
lgr <- initLogging(lgr_logname)
Многопоточные расчеты
"Start batch processing" %T>%
  flog.info() %T>%
  lgr$info()

# инициализируем параллельную обработку
# https://github.com/HenrikBengtsson/doFuture
# https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html
registerDoFuture()
# future::plan(multiprocess)
future::plan(multisession, workers = parallel::detectCores())
# future::plan(sequential)
# plan(future.callr::callr)

tic("Batch processing")
start_time <- Sys.time()

foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"), 
        # .packages = 'futile.logger',
        .verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% {

          start <- Sys.time() - start_time

          # инициализируем логгер в потоке
          flog.appender(appender.tee(flog_logname))
          lgr <- initLogging(lgr_logname)

          res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE)

          # https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/
          message("     message from thread")

          glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.",
               "PID: {Sys.getpid()}",
               "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->",
               .sep = " ") %T>%
            flog.info() %T>%
            lgr$info()

          # вернем тайминги тоже
          return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time))
        } -> output_lst
flog.info("Foreach finished")

checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1)
output_tbl <- dplyr::bind_rows(output_lst)
# rm(output_lst)

# терминируем параллельную обработку --------------
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))

Для иллюстрации процесса нарисуем график запуска (точка) и завершения (крестик) задач на вычислителях. Хорошо видны первичные затраты на старт потоков windows.

Код построения графика.
# посмотрим графически на порядок запуска вычислителей
output_tbl %>%
  mutate_at("pid", as.factor) %>%
  mutate_at(vars(start, finish), as.numeric) %>%
  ggplot(aes(start, pid, colour = pid)) +
  geom_point(size = 3, alpha = .7) +
  geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") +
  geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) +
  ggthemes::scale_fill_tableau("Tableau 10") +
  theme_ipsum_rc() +
  xlim(c(0, 5))

Заключение

При параллелизации задач, для достижения максимальной эффективности вычислений следует учитывать ряд важных моментов, вытекающих из принципов работы компьютеров, ОС и теоретических пределов. Если не погружаться глубоко в детали, резюмируем в виде «проверочных пунктов»:

  1. Инициализация вычислителей (worker) является достаточно дорогостоящей. Требуется породить новое окружение (поток, кластер, …), его инициализировать. Для коротких вычислений (секунды) затраты на инициализацию могут оказаться существенно выше однопоточного вычисления.

  2. При выделении потоков на одной машине, рекомендуется отдавать под вычислители core - 1, или чуть меньше. Один поток выполняет роль мастера, раздающего задания и выполняющего reduce ответов, получаемых от вычислителей. Ну и самой операционке тоже могут быть нужны ресурсы.

  3. Дескрипторы файлов и коннектов к БД не переходят границы потоков.

  4. Накладные расходы на перегон больших объемов данных из мастер потока в вычислитель и обратно могут оказаться по времени существенно выше, чем время вычисления. Оптимально, если мастер поток дает метаинформацию по заданию, а вычислитель уже сам загружает эти данные (из БД, из файлов, из API и т.д.). Ну и результат наверх должен уходить максимально агрегированный.

  5. Состав заданий надо максимально готовить в мастер потоке, а вниз спускать грубую вычислительную подзадачу. Повышается прозрачность и воспроизводимость.

  6. Для ряда задач, связанных с длинными синхронными запросами внешних системы (типичные представители -- REST API/Web scrapping), можно создавать вычислителей много больше чем доступных ядер. Они все равно висят большую часть времени в режиме ожидания. Можно запускать в виде отдельных процессов ОС с помощью настройки соответствующего бэкенда registerDoFuture(); plan(future.callr::callr). Это оставшаяся 1/2 рецепта.

Предыдущая публикация -- «Нюансы эксплуатации R решений в enterprise окружении?».

Теги:
Хабы:
+4
Комментарии 1
Комментарии Комментарии 1

Публикации

Истории

Работа

Ближайшие события

Московский туристический хакатон
Дата 23 марта – 7 апреля
Место
Москва Онлайн