Как стать автором
Обновить

process mining: 100 строк кода и генератор логов у нас в руках

Python *Data Mining *R *Бизнес-модели *


Продакт-менеджерам посвящается...


Заступая на территорию proccess mining, каждый участник рано или поздно будет нуждаться в наборе логов событий, отражающих те или иные специфические моменты в процессах. Эти логи нужны как на этапе демонстрации решения, подсвечивания определенных вопросов, так и для отработки алгоритмов или же тестов на производительность. Оба рекомендуемых сценария «взять с продуктивных систем» или «взять из интернета» терпят фиаско. Как правило, это очень
малые датасеты, слабо удовлетворяющие потребностям как по наполнению, так и по объему.


Остается вариант — написать генератор правдоподобных логов самостоятельно. Тут тоже есть два варианта.


  • Вариант первый — превратить эту задачу в универсальный монстроподобный продукт, содержащий визуальный конструктор в нотации BPMN 2.0, всевозможные визуальные конструкторы формул и атрибутов, полноценную имитационную машину под капотом. Годы работы, миллионы на ветер, на выходе — файл с логами. КПД близок к нулю.
  • Вариант второй — отнестись к этой задаче как к вспомогательной и создать инструментами data science стека упрощенный генератор в 100 строк кода.

Остановимся далее на втором варианте.


Является продолжением серии предыдущих публикаций.


Постановка задачи


Сформулируем первичный набор требований:


  • описание процессов должно быть в конфигурационном файле, ориентированном на редактирование человеком;
  • генератор должен поддерживать элементы последовательно-параллельных локальных цепочек описываемых процессов;
  • генератор должен создавать файлы логов объемом не менее нескольких гигабайт в сыром виде;
  • генератор должен работать быстро (характерный масштаб времени — десятки секунд);
  • генератор должен быть простым и компактным.

На этом пока остановимся, этих требований более чем достаточно, чтобы сформировать путь решения и отмести заведомо трудозатратные варианты.


Генератор событий


Первая мысль, которая может прийти в голову — давайте напишем имитационный симулятор и будем по нему гонять цифровых человечков. Это хороший вариант, в R есть отличный пакет для DES — simmer. Можно даже AnyLogic прикупить. Но нам это все не годится по двум причинам:


  • процесс надо описывать кодом — какой уж тут конфигурационный файл и BPMN;
  • для наших максимальных объемов это будет чрезвычайно медленно.

Воспользуемся такой идеей. Пойдем от обратного — будем не события генерировать, а трейсы. И создавать мы будем их простым Монте-Карло. Потом эти трейсы расщепим и набьем атрибутикой. Договоримся сразу, что трейс у нас будет простой текстовой строкой в которой последовательности активностей разделены заданным символом, остановимся на -. При подобном подходе нам даже не о чем беспокоиться. Большая выборка даст все возможные комбинации для относительно несложных процессов, а после генерации можно сделать однократную отбраковку по критериям. Пакетная обработка на несколько порядков быстрее штучной возни с каждым событием.


Конфигурация процессов


Раз уж мы определились генерацию вести по трейсам, то и конфигуратор процесса нам тоже стоит сделать в пространстве трейсов. Как договорились выше, нотация BPMN нам не подходит в силу ее тяжеловесности. Все же, нам нужны различные вариативности в компактном виде, поэтому можем воспользоваться двумя трюками, заимствованных из семантики регулярных выражений:


  • для описания ветвлений будем оперировать подпроцессами;
  • для описания циклов воспользуемся квантификаторами.

Естественно, что это ограничивает модели, которые мы можем описать, но простота и компактность и большая гибкость в описании это вполне нивелируют. В силу того, что мы оперируем трейсами, то подпроцессы можно описывать как куски трейсов.


В итоге, остановимся на таком формате:


activity;quantifier
ENTER;1
FLOOR_1|FLOOR_2|FLOOR_3;1
(LOOK-DROP)|(LOOK-TAKE);{1,4}
(COFFEE-CAKE)|(CAKE-COFFEE);{0,1}
(OPEN-CLOSE);{0,2}
PAY|CANCEL;1

  • () — определяют подпроцесс,


  • | — определяет ветвление (ИЛИ),


  • {min, max} — определяет квантификатор этого этапа.



В конце мы убедимся, что даже такой тривиальный формат вполне позволяет описывать сложность процессов, необходимую для задач, описанных в постановке.


Код генератора


В первой версии работать будем в однопотоке. Естественно, что для очень больших объемов надо работать в многопоточном режиме и работать по ссылкам.


Загружаем библиотеки
library(tidyverse)
library(datapasta)
library(tictoc)
library(data.table)
library(stringi)
library(anytime)
library(rTRNG)
library(dqrng)
library(furrr)
library(tsibble)

data.table::setDTthreads(0) # отдаем все ядра в распоряжение data.table
data.table::getDTthreads() # проверим доступное количество потоков

Читаем и разбираем конфигурацию бизнес-процесса
bo_df <- here::here("data", "process_01.txt") %>%
  read_delim(delim = ";") %>%
  # первичный депарсинг
  mutate(across(activity, ~stri_extract_all_regex(., "([A-Z0-9_\\-]+)"))) %>%
  tidyr::extract(quantifier, into = c("q_min", "q_max"), "(\\d+),*(\\d+)?") %>%
  mutate(across(c(q_min, q_max), as.integer)) %>%
  # у одиночных операций нет верхнего квантификатора
  mutate(q_max = if_else(is.na(q_max), q_min, q_max)) %>%
  # для последующей расстановки операций
  mutate(bo_idx = row_number())

Генерируем трейсы быстрым способом
ff <- function(activity, q_min, q_max, bo_idx){
  nums <- q_min:q_max # class nums = 'integer'
  n_cases <- 100000
  dt <- data.table(case_id = 1:n_cases,
                   # на каждый кейс сгенерируем квантификатор
                   quantif = dqsample(nums, n_cases, replace = TRUE)) %>%
    # для одинаковых квантификаторов можем сделать матрицу
    .[, events := {
      m <- matrix(dqsample(activity, .BY[[1]] * .N, replace = TRUE), 
                  nrow = .N, byrow = TRUE);
      # делаем свертку по строкам (MARGIN = 1), получаем вектор 
      apply(m, MARGIN = 1, stri_c, collapse = "-")}, by = quantif] %>%
    # сбросим активности, которые оказались с нулевым квантификатором
    .[events != ""] %>%
    .[, bo_idx := bo_idx]

  dt
}

tic()
# раскрываем квантификаторы для всех трейсов сразу
t_dt <- purrr::transpose(bo_df) %>%
  map(~ff(.$activity, .$q_min, .$q_max, .$bo_idx)) %>%
  rbindlist() %>%
  # соберем паттерны по событиям в исходном порядке
  .[order(bo_idx), .(pattern = stri_c(events, collapse = "-")), by = case_id]
toc()

Сохраним набор трейсов
unique(t_dt[, .(pattern)]) %>%
  write_csv(here::here("data", "model_traces.csv"))

Разворачиваем в событийный лог. Важный момент — чтобы лог был правдоподобным, расчет событий должен вестись сверху вниз, от транзакции к событию. Перекосы случайных распределений исправляем гильотиной в конце. Создаем записи с запасом, все кривые потом просто отбрасываем.


Разворачиваем в событийный лог
# Важное допущение -- нет параллельных операций, все выполняется строго последовательно
tic("Generate events")
# транзакции идут одна за другой, в параллель ничего не исполняется
# для упрощения задачи мы сгенерируем количество записей с запасом, а потом отсечем по временнЫм границам
df2 <- t_dt %>%
  # для каждой транзакции случайным образом сгенерируем общую попугайную длительность
  .[, norm_duration := rnorm_trng(.N, mean = 1, sd = .5, parallelGrain = 1000L)] %>%
  # отбрасываем все транзакции, которые имеют слишком малую длительность или > таймаута
  # таймаут устанавливаем равным 1.2 условных попугая
  .[norm_duration %between% c(0.4, 1.2)] %>%
  # назначим среднюю длительность транзакций (~ 5 секунд на действие) пропорционально числу шагов
  .[, cnt := stri_count_fixed(.BY[[1]], "-"), by = pattern] %>%  
  .[, duration := norm_duration * 5 * cnt] %>%
  as_tibble() %>%
  select(-norm_duration, -cnt) %>%
  # расщепляем транзакции на отдельные состояния, при этом порядок следования состояний сохраняется
  separate_rows(pattern, sep = "-") %>%
  rename(event = pattern)
toc()  

Раскидываем события по реальной временной шкале
set.seed(46572)
RcppParallel::setThreadOptions(numThreads = parallel::detectCores() - 1)

tic("Generate time markers, data.table way")  
# так примерно в 8-10 раз быстрее
samples_tbl <- data.table::as.data.table(df2) %>%
  # делаем в два захода
  # сначала просто генерируем случайные числа от 0 до 1 для каждой записи отдельно 
  # и масштабируем одним вектором
  # в таком раскладе trng быстрее в несколько раз (может даже на порядок получиться)
  # фактически, здесь мы считаем случайные соотношения времен между операциями внутри транзакции
  .[, trand := runif_trng(.N, 0, 1, parallelGrain = 10000L) * duration] %>%
  # делаем сортировку вектора внутри каждой сессии, простая сортировка будет ОЧЕНЬ долгой
  # делаем трюк, формируем составной индекс из case_id, который является монотонным, и смещением по времени
  # поскольку случайные числа генерятся в диапазоне [0, 1], мы их утаскиваем в дробную часть (за запятую)
  .[, t_idx := case_id + trand / max(trand)/10] %>%
  # подтягиваем в колонку сортированное значение вектора с реконструкцией
  # session_id в целой части гарантирует сортировку пропорций в рамках каждой транзакции без доп. группировок
  .[, tshift := (sort(t_idx) - case_id) * 10 * max(trand)] %>%
  # добавим дополнительной реалистичности, между транзакциями могут быть сдвижки (-60+30 сек)
  .[event == "ENTER", tshift := tshift + runif_trng(.N, -60, 30, parallelGrain = 10000L)] %>%
  # удаляем весь промежуточный мусор
  .[, `:=`(duration = NULL, trand = NULL, t_idx = NULL,
           timestamp = as.numeric(anytime("2020-06-01 08:00:00 MSK")))] %>%
  # переводим все в физическое время, начиная от 01.06.2020 (потенциально для каждого объекта отдельно)
  .[, timestamp := timestamp + cumsum(tshift)] %>%
  # добавим метку окончания события и длительность
  .[, duration := round((shift(timestamp, type = "lead") - timestamp) * 
      runif_trng(.N, .2, .6, parallelGrain = 10000L), 2)] %>%
  .[is.na(duration) | duration < 0, duration := 5] %>%
  .[, timestamp_finish := timestamp + duration] %>%
  as_tibble() %>%
  mutate_at(c("timestamp", "timestamp_finish"), anytime, tz = "Europe/Moscow") %>%
  select(case_id, event, timestamp, timestamp_finish, duration)
toc()
gc(full = TRUE)

plot(density(samples_tbl$duration))

Взглянем на результат во временнОм представлении
sample_tsbl <- as_tsibble(samples_tbl, key = case_id, index = timestamp, regular = FALSE)

sample_tsbl %>%
  index_by(year_week = ~ yearweek(.)) %>% # monthly aggregates
  summarise(n = n()) %T>%
  print(n = 100) %>%
  feasts::gg_tsdisplay()

Формируем выходные представления
saveProcessMap <- function(df, type){
  tic(glue::glue("{type} process map"))
  # https://github.com/gertjanssenswillen/processmapR/blob/master/R/process_map.R
  dfg <- df %>%
    mutate(activity_instance_id = dplyr::row_number()) %>%
    bupaR::simple_eventlog(case_id = "case_id",
                           activity_id = "event",
                           timestamp = "timestamp",
                           validate = FALSE) %>%
    # processmapR::process_map(type = processmapR::frequency("relative_case"))
    processmapR::process_map(render = FALSE)

  # прежде чем так сохранять, надо проверить наличие директории images
  checkmate::assertDirectoryExists(here::here("images"))
  title <- paste("Эмулированный процесс с", 
                  ifelse(type == 'complete', 'полным', 'урезанным'),
                  "перечнем событий")
  DiagrammeR::export_graph(
    dfg,
    file_name = here::here("images", glue::glue("{type}_pmap.png")),
    file_type = "png",
    title,
    width = 2560, height = NULL
  )
  toc()
}

# --------------------
# Переходим к реальной жизни
# Окончательно перемешиваем данные и добавим доп. атрибуты
actors_vec <- c("Петров", "Иванов", "Петровская", "Иванова", "Сидоров", "Сидоркина", "Ивановская")

log_tbl <- samples_tbl %>%
  select(case_id, timestamp, event) %>%
  # здесь мы можем сэмулировать частичную потерю элементов транзакций
  sample_frac(1 - .02) %>%
  mutate(actor = sample(!!actors_vec, n(), replace = TRUE))

# ------- Сохраняем сгенерированный лог событий
fst::write_fst(log_tbl, here::here("data", "fuzzy_manual_log.fst"))  

# ------- Сохраним полную карту процесса
saveProcessMap(samples_tbl, "complete")
# ------- Сохраним редуцированную карту процесса
saveProcessMap(log_tbl, "reduced")

При формировании выходных представлений не забываем, что


  • нам нужны еще атрибуты какие-никакие,
  • в реальной жизни иногда могут теряться части событий.

Заключение


Для приведенного кода совокупным объемом ~100 строк весь цикл генерация на «офисном» ноутбуке осуществляется в пределах 30-40 секунд. Картинки процессов даже для такого тривиального конфигурационного файла выглядят очень насыщенными и информативными.


Complete process DFG


Incomplete process DFG


Естественно, что это быстрый прототип. Для более серьезных задач и генератор должен быть
чуть посложнее. В частности, требуется:


  • поддержка параллельных процессов;
  • поддержка быстрой генерации десятков гигабайт логов;
  • расширенное атрибутарное наполнение по типам данных (целые числа, плавающие числа,
    дата, дата-время, строка);
  • расширенное атрибутарное наполнение по точкам привязки (кейс, активность, исполнитель);
  • управление статистическими характеристиками генерируемых логов (число типов трейсов,
    число транзакций, распределения времен и пр.);
  • поддержка NA в атрибутах,
  • еще что-либо...

Это будет несколько сложнее, но при определенных упрощениях можно поддержать почти все без значимой потери в функционале и без существенного усложнения.


Для поддержки параллельных процессов можно ввести понятие основного пути и параллельных веток в конфигурационном файле. Например, так:


path;activity;quantifier
*;ENTER;1
*;FLOOR_1|FLOOR_2|FLOOR_3;1
*;(LOOK-DROP)|(LOOK-TAKE);{1,4}
*;(COFFEE-CAKE)|(CAKE-COFFEE);{0,1}
0;(OPEN-CLOSE);{0,2}
0;(UNCOVER-COVER);{0,1}
1<<;THINK|READ;{1,2}
1;WAIT;{0,1}
2<<;THINK|READ;{1,2}
*;FLOOR_1|FLOOR_2|FLOOR_3;1
0;(OPEN-CLOSE);{0,2}
0;(UNCOVER-COVER);{0,1}
1<<;THINK|READ;{1,2}
1;WAIT;{0,1}
*;PAY|CANCEL;1

где * — основной путь, n — ветка n в идеологии split/join.


И чуть усложнить параметризацию конфиг файл самого скрипта, например, подобным образом:


parallel_1:
  # имя входного файла с моделью бизнес-процесса
  model_filename: 'process_01p.txt'
  # ограничитель по количеству уникальных цепочек, отправляемых на разыменование
  max_unique_traces: 100
  # ограничение сверху по числу генерируемых кейсов
  max_case_id: 50000
  # имя выходного выходного csv лога событий
  eventlog_filename: 'process_01p'
  # максимальный размер выходного csv лога событий, в мегабайтах
  max_log_size_Mb: 7
  # средняя длительность одной активности в цепочке, в минутах
  # 1440 минут = 1 сутки
  average_activity_duration: 2880
  # исключать в финальном логе SPLIT/JOIN
  remove_split_point: TRUE
  # имя файла с параметрами атрибутивного наполнения (если есть), либо пустота
  attr_sample_config: 'attr_samples_default.xlsx'
  # следует ли генерировать перемешанную подвыборку
  generate_fuzzy_log: TRUE  
  # следует ли генерировать графические карты процессов
  generate_process_maps: TRUE  

Есть задел, вполне можно двигаться дальше.
И, кстати, при создании программных продуктов подобные задачки встречаются десятками. Опытный продакт не будет распылять дорогие ресурсы сеньоров на создание "продуктов-сателлитов", а воспользуется инструментами DS.


Предыдущая публикация — «ETL в анализе данных без перерывов на кофе и курилку».

Теги:
Хабы:
Всего голосов 3: ↑3 и ↓0 +3
Просмотры 2.2K
Комментарии 0
Комментарии Комментировать

Публикации

Истории

Работа

Python разработчик
169 вакансий
Data Scientist
113 вакансий