Pull to refresh

Повторяем когортный анализ. Комплексный подход — Python, SQL, Power BI

Reading time7 min
Views5.8K

Добрый день уважаемые читатели! Данная статья является продолжением публикации "Повторяем когортный анализ, выполненный в Power BI, силами Python" (ссылка). Настоятельно рекомендую познакомиться с ней хотя бы бегло, иначе последующее повествование будет вам малопонятным. С момента ее выхода на Хабр прошло достаточно времени. Я основательно пересмотрел методологию решения подобных задач. Первым желанием было просто переписать старый материал, но после недолгих размышлений я пришел к выводу, что более разумным шагом будет оформить наработки в новую рукопись.

Какова основная причина моего "недовольства" Python и Power BI? Язык Python/R c тематическими библиотеками и Power BI (Tableau, Qlik) могут на 70-80% закрыть потребности бизнеса в расчете сложных метрик и построении визуализаций. Но только если речь идет об обработке относительно небольших датасетов с уже агрегированными данными. Если мы говорим о предварительном манипулировании данными в промышленном масштабе, то здесь игра переходит на сторону сервера с БД и используется SQL. Данный момент я не осветил в предыдущей публикации, поэтому решил ликвидировать это упущение здесь.

Для разработки и тестирования запросов SQL я выбрал БД PostgreSQL. Данную БД установил локально на ноутбук. Никаких специфических настроек не проводил, оставил все параметры как есть. Для повторения действий, описанных в материале, подойдет и запуск контейнера c PostgreSQL, если вы дружите с Docker.

Датасет в формате csv и файлы со скриптами вы можете найти на GitHub (ссылка). Так как информация была заранее подготовлена для непосредственной загрузки, то мне оставалось только воспользоваться встроенной программой pgAdmin. Время загрузки чуть более 1 млн строк в режиме графического редактора 4-5 сек. Данный показатель стал эталоном, так как мне не удалось превзойти его с помощью кода Python. Загрузку данных в PostgreSQL с помощью скриптов для нужд демо-примера можно было бы и не реализовывать, но ведь мы не ищем легких путей в аналитике.

На первом этапе создаем таблицу sales. Сам код предельно прост и не требует каких-либо дополнительных комментариев.

import psycopg2

# Подключение к БД
conn = psycopg2.connect("dbname='db' user='postgres' password='gfhjkm' host='localhost' port='5432'")

print("Database opened successfully")

# Создаем курсор
cursor = conn.cursor()

with conn:
    cursor.execute("""
            DROP TABLE IF EXISTS sales;
        """)

    cursor.execute("""
            CREATE TABLE IF NOT EXISTS sales (
              id SERIAL PRIMARY KEY,
              date DATE NOT NULL, 
              promo TEXT NOT NULL,
              site TEXT NOT NULL,
              user_id TEXT NOT NULL,
              transaction_id INTEGER NOT NULL,
              amount INTEGER NOT NULL);
        """)


print("Operation done successfully")

# Закрываем соединение и курсор
cursor.close()
conn.close()

Таблица сформирована, запускаем следующий скрипт на запись данных в БД. Pandas и sqlalchemy работают в паре. Параллельно замеряем время с помощью datetime.

import os
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
from datetime import datetime

start_time = datetime.now()

# Подключение к БД
engine = create_engine('postgresql://postgres:gfhjkm@localhost:5432/db')

print("Database opened successfully")

# Путь к исходнику с данными
path_to_data = "C:/Users/Pavel/PycharmProjects/database/"
# Считываем данные в датафрейм
sale_records = pd.read_csv(os.path.join(path_to_data, "СohortAnalysis_2016_2018.csv"),
                           sep=";", parse_dates=["date"], dayfirst=True)
postgresql_table = "sales"
# Записываем датасет в БД
sale_records.to_sql(postgresql_table, engine, if_exists='append', index=False)

print("Operation done successfully")

end_time = datetime.now()
print('Duration: {}'.format(end_time - start_time))

Получаем 3 минуты 26 секунд. Очень долго. Я решил, что во всем виновата библиотека sqlalchemy и написал новый код без нее.

import psycopg2
from datetime import datetime

start_time = datetime.now()

# Подключение к БД
conn = psycopg2.connect("dbname='db' user='postgres' password='gfhjkm' host='localhost' port='5432'")
print("Database opened successfully")

# Создаем курсор
cursor = conn.cursor()

# Путь к исходнику с данными
path_to_data = "C:/Users/Pavel/PycharmProjects/database/"
# Считываем данные в датафрейм
sale_records = pd.read_csv(os.path.join(path_to_data, "СohortAnalysis_2016_2018.csv"),
                           sep=";", parse_dates=["date"], dayfirst=True)

query = "INSERT INTO sales (date, promo, site, user_id, transaction_id, amount) values (%s, %s, %s, %s, %s, %s)"
dataset_for_db = sale_records.values.tolist()

cursor.executemany(query, dataset_for_db)
conn.commit()

print("Operation done successfully")

# Закрываем соединение и курсор
cursor.close()
conn.close()

end_time = datetime.now()
print('Duration: {}'.format(end_time - start_time))

В итоге я получил прирост в производительности в 10 секунд. Следующий кандидат на выбывание – pandas.

import psycopg2
from datetime import datetime

start_time = datetime.now()

# Подключение к БД
conn = psycopg2.connect("dbname='db' user='postgres' password='gfhjkm' host='localhost' port='5432'")
print("Database opened successfully")

# Создаем курсор
cursor = conn.cursor()


# Открываем файл. Считываем его построчно с записью в БД
with open('СohortAnalysis_2016_2018.csv', 'r', encoding='UTF8') as f:
    next(f)
    cursor.copy_from(f, 'sales', sep=';', columns=('date','promo','site','user_id','transaction_id','amount'))
    conn.commit()

f.close()

print("Operation done successfully")

# Закрываем соединение и курсор
cursor.close()
conn.close()

end_time = datetime.now()
print('Duration: {}'.format(end_time - start_time))

Лучший замер дал цифру 7 секунд. Это хороший результат. На нем я и решил остановиться. В качестве промежуточного итога отмечу следующее. Если вы строите пайплайн для целей дата инжиниринга, то кусок кода с библиотекой pandas может стать узким горлышком в плане быстродействия.

Переходим к написанию SQL скриптов, которые станут основой для итоговых отчетов. Первый отчет это разделение всех платежей пользователей на когорты исходя из даты их первой транзакции в сервисе и дельты между первой покупкой и последующими платежами. И в расчетах на базе ноутбука Python и в модели Power BI мы начинали с того, что находили дату первой покупки для каждого пользователя системы. В SQL мы также не будем отступать от этой традиции.

SELECT s3.date,
	s3.user_id,
	s3.date - s2.first_date AS delta_days,
	ceil((s3.date - s2.first_date)::real/30::real)*30 AS cohort_days,
	to_char(s2.first_date,'YYYY-MM') AS first_transaction
	s3.amount
FROM public.sales AS s3
LEFT JOIN
				(SELECT s1.user_id,
						MIN(s1.date) AS first_date
					FROM public.sales AS s1
					GROUP BY s1.user_id) AS s2 ON s3.user_id = s2.user_id
ORDER BY s3.user_id,
	s3.date


SELECT  s.date,
		s.user_id,
		s.date - FIRST_VALUE(s.date) OVER(PARTITION BY s.user_id ORDER BY s.date) AS delta_days,
		ceil((s.date - FIRST_VALUE(s.date) OVER(PARTITION BY s.user_id ORDER BY s.date))::real/30::real)*30 AS cohort_days,
		to_char(FIRST_VALUE(s.date) OVER(PARTITION BY s.user_id ORDER BY s.date),'YYYY-MM') AS first_transaction,
		s.amount
FROM public.sales AS s
ORDER BY s.user_id,
	s.date

Я сознательно привел два примера решения, так как среди читателей могут найтись люди, которые работают с БД, где пока еще не реализована поддержка оконных функций. После нахождения даты первой транзакции мы приводим ее к текстовому виду. Для этих целей в PostgreSQL есть функция to_char().

С отнесением временной дельты (разница между датой транзакции и датой первой покупки) к когорте дела обстоят несколько сложнее. Когда речь заходит о проверке попадания того или иного числового значения в какие-либо рамки, люди обычно вспоминают CASE. Данная конструкция выглядит относительно неплохо, но только если нужно одновременно реализовать не более 3 условий. В нашем случае их десятки и она просто технически невозможна. Вторая мысль, которая меня посетила, это найти решение среди оконных функций. Среди синтаксического сахара PostgreSQL также не было ответа на искомый вопрос. Решение этой нетривиальной для меня задачи лежало совершенно в другой области - в элементарной математике.

Порассуждаем вместе. Шаг когорты – 30 дней. Значит все значения нужно делить на 30. Берем 0 и делим на 30, получаем 0, это желаемый результат. Далее берем любое значение от 0 до 30 и делим на 30. Получаем дробь, но ее обязательно нужно округлить до ближайшего целого. Еще шаг, 30 делим на 30, получаем 1, и ничего округлять не нужно. Следовательно, нам нужна функция, которая округляет дроби до целого в большую сторону. В PostgreSQL это ceil(). Если результат округления умножить на 30 получается номер когорты.

Остается заострить ваше внимание еще на одном интересном нюансе. Если число INTEGER разделить на число INTEGER, то мы получим только целочисленный остаток. Но ведь нам критически важна дробная часть! Это не беда, просто добавляем в нужные места конструкцию ::real и все будет считаться правильно.

Ключевую мысль по данному разделу можно сформулировать следующим образом: если вам сходу не удается подобрать ключик к решению на языке SQL просто посмотрите на проблему под другим углом.

Полученный запрос оформим в отдельное представление, чтобы иметь возможность обращаться к нему как к отдельной физической таблице БД.

Второй отчет сводится к тому, что нужно посчитать нарастающий итог по строке в процентном отношении.

SELECT r2.first_transaction,
		r2.cohort_days,
		--r2.total_amount,
		--sum(r2.total_amount) OVER (PARTITION BY r2.first_transaction ORDER BY r2.first_transaction, r2.cohort_days) as cumsum_amount,
		--first_value(r2.total_amount) OVER (PARTITION BY r2.first_transaction ORDER BY r2.first_transaction, r2.cohort_days) as first_total_amount,
		round((sum(r2.total_amount) OVER (PARTITION BY r2.first_transaction ORDER BY r2.first_transaction, r2.cohort_days)/ 
		first_value(r2.total_amount) OVER (PARTITION BY r2.first_transaction ORDER BY r2.first_transaction, r2.cohort_days)-1),3) as percent_cumsum_amount
FROM 
		(SELECT r.first_transaction, r.cohort_days, sum(r.amount) AS total_amount		
		FROM public.report_cohort_analysis AS r
		GROUP BY r.first_transaction, r.cohort_days
		ORDER BY r.first_transaction, r.cohort_days) as r2

В данном скрипте нет подводных камней, все решение строится на оконных функциях. Сначала проводим группировку, чтобы найти агрегированную сумму. В полученной таблице рассчитываем нарастающий итог в рамках окна (год-месяц первой транзакции и когорта). Каждую строчку делим на итоговые продажи нулевой когорты согласно год-месяц первой транзакции. Данный алгоритм расчета полностью дублирует ноутбук из предыдущей рукописи (ссылка).

В итоге мы получаем второе представление, которое можно использовать в других аналитических изысканиях.

По идее оба отчета нужно выводить конечному пользователю в виде сводной таблицы. Но данный вопрос – слабое место SQL. В PostgreSQL есть функция CROSSTAB, но как с помощью нее быстро и легко создать таблицу с десятками столбцов я себе не представляю. Поэтому на финальном этапе работы я решил обратиться за помощью к BI платформе. Забрать данные из БД для Power BI не представляет никакого труда, достаточно при первом подключении правильно прописать все параметры (значения аналогичны тем, что мы указывали при коннекте к БД через скрипт Python). Представления отображаются как физические таблицы (не нужно повторно вводить скрипт SQL). Нам остается только отметить необходимые отчеты галочками и дождаться загрузки данных.

Завершить данную публикацию мне хотелось бы следующей мыслью. Лучшие решения для аналитики строятся на основе оптимального сочетания возможностей различных платформ, а не за счет выжимания всех соков из одного инструмента.

На этом все. Всем здоровья, удачи и профессиональных успехов!

Tags:
Hubs:
+5
Comments0

Articles

Change theme settings