Коллеги, всем привет! На связи команда Magnit Tech, меня зовут Михаил, я продуктовый аналитик в команде Foodtech мобильного приложения Магнит. В этой статье мы обсудим, как понять каким образом ваши пользователи доходят до выполнения целевых действий и как выявить в этом пользовательском пути негативные паттерны. А чтобы процесс обсчета пользовательских путей не занимал у вас десятки часов, мы научимся применять для решения этой задачи алгоритмы PrefixSpan и Seq2Pat.

Перед тем как мы перейдем к деталям, давайте для начала разберемся с тем, что такое путь пользователя. Путь пользователя — это последовательность действий клиента для решения своей задачи. Например, если ваш продукт — мобильное приложение для заказа продуктов, то основным путем пользователя может быть путь от запуска приложения до оформления заказа.
Тут-то и начинаются трудности — между запуском приложения и оформлением заказа могут лежат бесчисленные множества вариантов прохождения различных этапов. Наиболее распространенный вариант решения этой проблемы — формирование воронки событий. Мы можем выделить ключевые события и посчитать воронку. В результате у нас получится нечто подобное:

В целом, подход не так уж плох. Он дает верхнеуровневое понимание того, как пользователи продвигаются в рамках заданной системы координат. Проблема в том, что сформированная воронка является абстрактным представлением, как пользователи должны использовать сервис, а не реальным отображением пути. Например, один клиент после запуска приложения добавит пару товаров из каталога и сразу перейдет к оформлению, а другой воспользуется поиском и добавит пару товаров оттуда.
Было бы хорошо взять все возможные пути пользователей и посчитать, как чаще всего пользователи доходят до выполнения целевого действия. На словах звучит просто, но на практике это выливается в достаточно трудоемкую задачу. Давайте разберем на примере.
Пример
Вновь обратимся к нашему гипотетическому приложению для заказа продуктов. Представим, что в нем есть всего 7 событий:
Название события | Описание |
start | Запуск приложения |
main_page_view | Просмотр главной страницы |
catalogue_page_view | Просмотр каталога товаров |
cart_page_view | Просмотр корзины |
item_page_view | Просмотр страницы товара |
checkout_page_view | Просмотр страницы оплаты |
payment | Оплата |
Нашазадача — понять, каким путем пользователи чаще всего доходят от start до payment. Прямолинейный подход может выглядеть так:
Сформировать перечень всех возможных комбинаций событий между start и payment
Последовательно посчитать количество пользователей, которые прошли воронку событий
Сравнить полученные значения и выделить ту воронку, которой пользовались чаще всего
С самого начала мы сталкиваемся с достаточно очевидной проблемой — неизвестно, какое количество событий учитывать в комбинациях. Попробуем ее обойти выбрав относительно большое число, например 10 событий, и учтем все комбинации событий длиной от 3 до 10 включительно. Реализуем это в Python:
import itertools
#Формируем список событий, который может находиться между start и payment
events = [
'main_page_view',
'catalogue_page_view',
'cart_page_view',
'item_page_view',
'checkout_page_view'
]
combs = []
#Циклом обходим все возможные длины последовательностей и наполняем список combs возможными вариациями пути пользователя
for i in range(3, 11):
combs = combs + [['start'] + list(i) + ['payment'] for i in itertools.product(events, repeat=i)]
На выходе мы получаем список, который содержит в себе все теоретически возможные пути пользователей в рамках нашего сервиса. Но вот проблема — он содержит 12 миллионов комбинаций. Даже имея хорошее железо, реализовать подсчет 12 миллионов воронок будет практически невозможно. Так что же делать?
Варианты решения
В рамках этой статьи будет рассмотрено два варианта решения:
Оптимизационный - мы постараемся сократить количество комбинаций и используем библиотеку Polars для более быстрого подсчета воронок
Алгоритмический - мы изучим алгоритмы PrefixSpan и Seq2Pat и используем их для решения поставленной задачи
В результате мы сравним скорость работы обоих подходов и выберем наиболее быстрый. Для проверки будет использован случайно сгенерированный кликстрим событий, который вы можете составить у себя запустив следующий скрипт:
import datetime
import random
import pandas as pd
from random import randrange, choice
from datetime import timedelta
import networkx as nx
# Функция для генерации случайных дат
def random_date(start, end):
delta = end - start
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = randrange(int_delta)
return start + timedelta(seconds=random_second)
# Функция для генерирации случайного пути в графе, начиная с заданной вершины
def random_path(graph, start_node, max_length):
# Инициализация списка для хранения пути, начиная с начальной вершины
path = [start_node]
# Установка текущей вершины как начальной
current_node = start_node
# Цикл для генерации пути до max_length
for _ in range(max_length):
# Получение списка соседей текущей вершины
neighbors = list(graph.neighbors(current_node))
# Если у текущей вершины нет соседей, выходим из цикла
if not neighbors:
break
# Выбор случайного соседа как следующей вершины
current_node = choice(neighbors)
# Добавление выбранной вершины в путь
path.append(current_node)
# Возвращаем сгенерированный путь
return path
# Зададим словарь с возможными вариантами пути в сервисе (ключ - текущая точка, значение - возможные варианты следующего шага
ps = {'start' :['main_page_view', 'catalogue_page_view', 'cart_page_view', 'checkout_page_view'],
'main_page_view':['catalogue_page_view', 'cart_page_view', 'checkout_page_view'],
'catalogue_page_view':['main_page_view', 'item_page_view', 'cart_page_view'],
'cart_page_view':['checkout_page_view', 'main_page_view', 'catalogue_page_view', 'checkout_page_view'],
'item_page_view':['catalogue_page_view'],
'checkout_page_view':['payment', 'cart_page_view'],
'payment':[]}
# Сформируем карту пользователского пути используя направленный граф
g = nx.from_dict_of_lists(ps, create_using=nx.DiGraph())
# Генерируем датафрейм с идентификатором пользователя, событием, которое он совершил и временем этого события
users = [i for i in range(1000, 10000)]
df = pd.DataFrame(columns=['user_id', 'event', 'timestamp'])
for i in range(10000):
dt = random_date(datetime.datetime.strptime('2024-12-01', '%Y-%m-%d'), datetime.datetime.strptime('2024-12-07', '%Y-%m-%d'))
user = random.choice(users)
row = []
for z in random_path(g, 'start', random.randint(3, 10):
row.append([user, z, dt])
dt += timedelta(seconds=random.randint(1,10))
df = pd.concat([df, pd.DataFrame(row, columns=['user_id', 'event', 'timestamp'])])
df.to_csv('user_events.csv', index=False)
Оптимизационный подход
Мы уже выделили две главные проблемы предложенной методики:
Слишком большое количество комбинаций
Слишком долгий подсчет воронок
Попробуем сократить эти два параметра и посмотрим, что нам это даст.
Сокращение количества комбинаций
В первую очередь займемся уменьшением возможного количества комбинаций. Внимательный читатель уже в прошлом разделе мог увидеть одну важную проблему, которая приводит к увеличению количества комбинаций — это учет ВСЕХ возможных комбинаций событий. К примеру, в рамках подсчете мы учтем такую комбинацию:
start → main_page_view → main_page_view → main_page_view — > main_page_view → payment
Наш продукт технически не позволяет выполнить подобную комбинацию, но в рамках нашей методики он обсчитывает и тратит наше время. Попробуем сформировать «карту событий» нашего приложения используя графы и библиотеку networkx. Таким образом мы зададим систему координат, по которой пользователь технически может перемещаться.
import networkx as nx
import pandas as pd
import numpy as np
# Функция для генерации уникальных пар событий в кликстриме
def uniq_pairs():
events = pd.read_csv('user_events.csv')
events['timestamp'] = pd.to_datetime(events['timestamp'])
events.sort_values(by=['user_id', 'timestamp'],inplace=True)
events['next_timestamp'] = events.groupby('user_id')['timestamp'].shift(-1)
events['time_diff'] = events['next_timestamp'] - events['timestamp']
events['next_event'] = events.groupby('user_id')['event'].shift(-1)
events.loc[events['time_diff'] > pd.Timedelta(hours=1), 'next_event'] = np.nan
events.dropna(inplace=True)
return events[['event','next_event']].rename(columns={'event':'source', 'next_event':'target'}).drop_duplicates()
# Функция для создания карты пользовательского пути, путем создания графа
def comb():
df = uniq_pairs()
g = nx.from_pandas_edgelist(df, 'source', "target", create_using=nx.DiGraph())
return g
А теперь, используя алгоритм Deep-First Search (DFS), сформируем перечень всех возможных комбинаций событий длины n, проходя по которым пользователь может добраться из точки start в точку payment:
import pandas as pd
import numpy as np
import networkx as nx
# Функция для генерации уникальных пар событий в кликстриме
def uniq_pairs():
events = pd.read_csv('user_events.csv')
events['timestamp'] = pd.to_datetime(events['timestamp'])
events.sort_values(by=['user_id', 'timestamp'],inplace=True)
events['next_timestamp'] = events.groupby('user_id')['timestamp'].shift(-1)
events['time_diff'] = events['next_timestamp'] - events['timestamp']
events['next_event'] = events.groupby('user_id')['event'].shift(-1)
events.loc[events['time_diff'] > pd.Timedelta(hours=1), 'next_event'] = np.nan
events.dropna(inplace=True)
return events[['event','next_event']].rename(columns={'event':'source', 'next_event':'target'}).drop_duplicates()
# Функция для создания карты пользовательского пути, путем создания графа
def comb():
df = uniq_pairs()
g = nx.from_pandas_edgelist(df, 'source', "target", create_using=nx.DiGraph())
return g
# Функция для поиска возможных путей в рамках графа из точки start в точку end
def find_fixed_length_paths(n, start='start', end='payment'):
graph = n[0]
length = n[1]
# Хранение всех найденных путей
all_paths = []
# Вспомогательная функция для поиска путей
def dfs(current_node, path):
# Если достигли конечной точки и длина пути соответствует заданной
if len(path) == length + 1: # +1, чтобы включить конечный узел
if current_node == end:
all_paths.append(path.copy())
return
# Продолжаем поиск по соседям
for neighbor in graph.neighbors(current_node):
path.append(neighbor)
dfs(neighbor, path)
path.pop() # Возврат к предыдущему состоянию
# Начинаем поиск с начальной точки
dfs(start, [start])
return all_paths
cmb = []
#Циклом обходим все возможные длины последовательностей и наполняем список cmb возможными вариациями пути пользователя
for i in range(3, 11):
cmb = cmb + find_fixed_length_paths([comb(), i])
Таким образом удалось сократить количество комбинаций той же длины примерно в сотни раз (теперь их всего 1587), не затронув при этом те пользовательские пути, которыми пользователи действительно могут доходить до шага payment.
Подсчет воронок на Polars
Помимо количества комбинаций, мы также можем оптимизировать и скорость подсчета комбинаций. Наиболее простой и короткий в записи способ расчета воронки пользовательских событий — это использование функции funnel библиотеки Retentioneering:
import pandas as pd
import time
from retentioneering.eventstream import Eventstream
from retentioneering.tooling import Funnel
# Формируем стрим событий для использования в библитеке retentioneering
stream = Eventstream(pd.read_csv('user_events.csv'))
start_time = time.time()
# Обсчитываем воронку
print(stream.funnel(
stages=['start', 'main_page_view', 'catalogue_page_view', 'main_page_view', 'item_page_view', 'catalogue_page_view', 'cart_page_view', 'checkout_page_view', 'cart_page_view', 'payment'],
stage_names=['start', 'main_page_view', 'catalogue_page_view', 'main_page_view', 'item_page_view', 'catalogue_page_view', 'cart_page_view', 'checkout_page_view', 'cart_page_view', 'payment'],
show_plot=False,
funnel_type='closed'
).values)
print("--- %s seconds ---" % (time.time() - start_time))
Правда проблема заключается в том, что ее расчет занимает относительно много времени. К примеру, обсчет воронки длиной 10 шагов на моем устройстве занимает 0.2 секунды. Если мы будем анализировать 128000 воронок, то обсчет займет 7 часов.
Чтобы не ставить скрипт прогружаться всю ночь можем использовать библиотеку Polars, в которой реализуем аналогичный функционал:
import networkx as nx
import pandas as pd
import polars as pl
from multiprocessing import Pool
import numpy as np
import time
from tqdm import tqdm
# Функция для создания карты пользовательского пути, путем создания графа
def comb():
df = pd.read_csv('pairs.csv')
g = nx.from_pandas_edgelist(df, 'source', "target", create_using=nx.DiGraph())
return g
# Функция для поиска возможных путей в рамках графа из точки start в точку end
def find_fixed_length_paths(n, start='start', end='payment'):
graph = n[0]
length = n[1]
# Хранение всех найденных путей
all_paths = []
# Вспомогательная функция для поиска путей
def dfs(current_node, path):
# Если достигли конечной точки и длина пути соответствует заданной
if len(path) == length + 1: # +1, чтобы включить конечный узел
if current_node == end:
all_paths.append(path.copy())
return
# Продолжаем поиск по соседям
for neighbor in graph.neighbors(current_node):
path.append(neighbor)
dfs(neighbor, path)
path.pop() # Возврат к предыдущему состоянию
# Начинаем поиск с начальной точки
dfs(start, [start])
return all_paths
# Функция для подсчета глубины прохождения воронки пользователем
def pattern_calc(x, y):
step = 0
for i in x:
if step == len(y):
return str(step)
if i == y[step]:
step += 1
return str(step)
def calcing(par):
true_df = pl.DataFrame(schema={'funnel_name': pl.String, 'stages': pl.String, 'unique_users': pl.UInt32}) # Инициализация пустого DataFrame для результатов
lst = par[0] # Извлечение списка этапов из кортежа
df = par[1] # Извлечение DataFrame событий пользователей из кортежа
for i in lst: # Проход по каждому этапу
start_time = time.time() # Запись времени начала обработки этапа
df_inner = df.group_by('user_id').agg(pl.col("event")) # Группировка событий по пользователям
df_inner = df_inner.with_columns(pl.col('event').map_elements(lambda x: pattern_calc(x, i), return_dtype=pl.String).alias("stages")) # Применение функции pattern_calc для подсчета стадий
res = df_inner.group_by('stages').agg(pl.count("user_id")).sort("stages") # Подсчет уникальных пользователей по стадиям
steps = pl.DataFrame({'stages': [str(i) for i in range(1, len(i) + 1)]}) # Создание DataFrame со стадиями
res = steps.join(res, on='stages', how='left').fill_null(0) # Объединение результатов с стадиями и заполнение пропусков нулями
res = res.with_columns(pl.col("user_id").cum_sum(reverse=True).alias("unique_users")) # Подсчет уникальных пользователей по стадиям
res = res.with_columns(pl.lit(' -> '.join(i)).alias('funnel_name')).drop('user_id') # Добавление имени воронки и удаление столбца user_id
true_df = pl.concat([true_df, res[['funnel_name', 'stages', 'unique_users']]]) # Объединение результатов с общим DataFrame
print("--- %s seconds ---" % (time.time() - start_time)) # Вывод времени обработки этапа
return true_df # Возврат итогового DataFrame с результатами
def main(res):
df = pl.read_csv('user_events.csv') # Чтение данных о событиях пользователей из CSV-файла
df = df.with_columns(df["user_id"].cast(pl.String)) # Приведение столбца user_id к строковому типу
flow = [(i, df) for i in np.array_split(np.asarray(res, dtype=object), 8)] # Разделение этапов на группы для параллельной обработки
with Pool(8) as p: # Создание пула процессов для параллельной обработки
results = p.map(calcing, flow) # Применение функции calcing ко всем группам этапов
final_df = pl.concat(results) # Объединение всех результатов в один DataFrame
final_df.write_csv('file.csv', separator=",") # Сохранение итогового DataFrame в CSV-файл
if __name__ == '__main__':
z = [] # Инициализация списка для хранения всех найденных путей
with Pool(8) as p: # Создание пула процессов для параллельного выполнения
x = p.map(find_fixed_length_paths, [(comb(), 6), (comb(), 7), (comb(), 8)]) # Поиск путей фиксированной длины в графе
for i in x: # Проход по всем найденным путям
for j in i:
z.append(j) # Добавление найденных путей в общий список
main(z) # Вызов основной функции для обработки данных и сохранения результатов
На этот раз обсчет 1 воронки занимает все 0.02 секунды, что в 10 раз быстрее исходного варианта. Таким образом аналогичное количество воронок будет считать не 7 часов, а всего 42 минуты.
Используя эти два методы мы кратно сократили время на обсчет воронок и теперь можем обрабатывать десятки тысяч воронок в течении нескольких часов. Но есть одна условность, на которую вы уже могли обратить внимание. При исходном подсчете комбинаций мы установили максимальную длину воронки на 10 шагах. В рамках крупного приложения 10 шагов — это совсем небольшой путь. Если же представить, что кол-во шагов увеличить хотя бы до 12, то даже с учетом оптимизации кол-во воронок составит 59 миллионов и даже с учетом оптимизированного скрипта в Polars их обсчет займет 2 недели. Что уж и говорить о более длинных воронках. И тут нам на помощь приходят алгоритмы.
Алгоритмический подход
PrefixSpan
Первый алгоритм для анализа пользовательских путей — это PrefixSpan (от Prefix-Projected Pattern Growth). На вход этот алгоритм принимает 2 аргумента:
Набор последовательностей
Минимальное количество раз, которое паттерн должен встречаться в данных (поддержка)
Для понимания возьмем в качестве примера такие параметры:
Последовательности:
[[‘a’, ‘b’, ‘c’]
[‘b’, ‘c’, ‘d’]
[‘a’, ‘c’]
[‘b’, ‘c’, ‘a’]]
Поддержка: 2
Далее выполняется поиск элементов, которые встречаются в наших последовательностях чаще чем заданная поддержка. В нашем примере, таких элементов всего 3:
‘a’ (встречается 3 раза)
‘b’ (встречается 3 раза)
‘c’ (встречается 4 раза)
Затем для каждого такого элемента ищется его проекция. Под проекцией понимаются “хвосты”, которые встречаются в последовательностях после выбранного элемента. Если мы берем элемент ‘a’ (он становится префиксом), то его проекциями в нашем примере будут:
[‘b’, ‘c’]
‘c’
Таким образом, мы получаем пары префикс-проекция, из которой мы можем создавать более длинные префиксы и считать частоту их появления в наших последовательностях. Процесс будет выполняться рекурсивно до тех пор, пока не будет исчерпаны все варианты префикс-проекций.
Попробуем использовать его для определения наиболее частых последовательностей в нашем датасете:
import pandas as pd
import time
from prefixspan import PrefixSpan
df = pd.read_csv('user_events.csv')
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values(by=['user_id', 'timestamp'])
df_gr = df.groupby('user_id')['event'].apply(list).reset_index(name='steps')
df_gr['ln'] = df_gr['steps'].apply(len)
steps = list(df_gr['steps'].values)
start_time = time.time()
ps = PrefixSpan(steps)
a = ps.frequent(800, filter=lambda patt, matches: (len(patt) >=7)&(patt[-1]=='payment')&(patt[0]=='start')&(not any(patt[i] == patt[i + 1] for i in range(len(patt) - 1))), closed=True)
print(time.time()-start_time)
df = pd.DataFrame(a)
df.to_excel('ps.xlsx', index=False)
На моем устройстве скорость выполнения данного скрипта составила 35 минут и на выходе был получен файл, который содержит информацию о пользовательских путях и частоте их появления у пользователей. За счет установки параметра минимальной поддержки на 100 удалось отсечь редкие паттерны, а также существенно сократить время работы алгоритма.
Казалось бы, найден идеальный способ для обработки пользовательских путей, но есть одна проблема — в основном пути наших пользователей были не очень длинные. В моем датасете длина пользовательского пути в среднем составляет 18-20 шагов. Если же попробовать передать в PrefixSpan более длинные последовательности, время выполнения запроса может кратно возрасти.
Чтобы этого избежать в алгоритме нужно реализовать ограничения, которые позволяет еще более эффективно обрабатывать последовательности. Как раз это и реализовано в алгоритме Seq2Pat.
Seq2Pat
По своей сути, Seq2Pat не является отдельным алгоритмом, а скорее является фреймворком, в рамках которого можно использовать различные алгоритмы. Тем не менее, концептуально, Seq2Pat работает также, как и PrefixSpan — собираются частые элементы и рекурсивно выстраиваются пары префикс-проекция. Отличие заключается в том, что Seq2Pat позволяет настраивать дополнительные ограничения при подсчетах, такие как:
Среднее значение атрибута
Разброс значений
Медиана
Разрыв между событиями
Это позволяет существенно увеличить эффективность работы алгоритма. Проверим на нашем кликстриме:
import pandas as pd
import time
from datetime import datetime
from sequential.seq2pat import Seq2Pat
df = pd.read_csv('user_events3.csv')
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values(by=['user_id', 'timestamp'])
df_gr = df.groupby('user_id')['event'].apply(list).reset_index(name='steps')
steps = list(df_gr['steps'].values)
if __name__ == '__main__':
start_time = time.time()
seq2pat = Seq2Pat(sequences=steps, max_span=20)
patterns_batch = [[i[:-1], i[-1]] for i in seq2pat.get_patterns(min_frequency=800) if i[:-1][0] == 'start' and i[:-1][-1] == 'payment' and len(i[:-1]) >= 7]
new_df = pd.DataFrame(data=patterns_batch, columns=['pattern', 'freq'])
new_df['pattern'] = new_df['pattern'].apply(lambda x: list(map(str, x)))
new_df.to_csv('sq.csv', index=False)
print(time.time()-start_time)
В то время как PrefixSpan обрабатывал датасет несколько минут, Seq2Pat управился за 6 секунд! В основном такой прирост производительности обусловлен параметром max_span. Он отвечает за максимальное число элементов, которые могут разделять два события. Соответственно, чем он меньше, тем быстрее работает скрипт. Но из-за этого итоговый вывод двух алгоритмов может отличаться.
Таким образом, Seq2Pat является более гибким в настройке инструментом и может существенно ускорить ваши расчеты, но при условии, что мы знаете где можно наложить ограничения.
Заключение
В рамках статьи мы прошли путь от максимально прямолинейного подхода с анализом всех возможных вариантов последовательностей; убедились, что оптимизация этого подхода может ускорить расчеты, но недостаточно для масштабирования на более длинные последовательности и изучили два алгоритма, которые выполняют анализ частоты последовательностей максимально эффективным образом.
С помощью этих инструментов можно в рамках разумного срока изучить самые разнообразные последовательности:
Пути пользователей внутри сервиса
Паттерны потребительской корзины
Поведение фродового трафика
И иные наборы последовательных элементов, в которых важно обнаружить паттерн. В нашей команде мы уже успели на практике применить Seq2Pat и смогли обнаружить в поведении пользователей приложения «Магнит» негативный паттерн: значительная часть пользователей перед оформлением заказа по несколько раз заходила и выходила из корзины. У некоторых число таких переключений могло доходить до 20 раз. По итогу исследования, были сформулированы продуктовые гипотезы, которые далее ушли на A/B тестирование.
Если вам интересно решать подобные задачи, присоединяйтесь к команде Magnit Tech: magnit.tech/vacancies