Привет, Хабр!
Меня зовут Евгений Пантелеев. Я занимаюсь аналитикой в Авито Авто в сегменте Resellers.
Каждый день мы с командой сталкиваемся с необходимостью измерения небольшого инкремента (в районе 1%) от CRM-коммуникаций на изменчивой выборке пользователей в условиях долгосрочного эксперимента (до 6 месяцев). При этом нам важно минимизировать размер контрольной группы, не жертвуя статистической значимостью результата.
В этой статье я покажу, как нам удалось усилить классический метод CUPED за счет эмпирического подбора оптимального предпериода и применить пост-стратификацию на основе бизнес-логики. Этот подход позволил нам получить статистически значимый результат и запланировать дальнейшее сокращение контрольной группы.
Если вы знакомы с основами CUPED и пост-стратификации (о них подробно писали коллеги из X5: тут и тут), то вы узнаете, как адаптировать эти методы для решения сложных, «неидеальных» задач.

Контекст
Одна из моих регулярных задач - оценка инкремента в денежном выражении от селлерских CRM-коммуникаций нашей команды (количество коммуникаций исчисляется несколькими десятками): push, e-mail, чат-бот и т.п.
Как правило, я провожу такую оценку с помощью обратного эксперимента: тестовая группа получает все CRM-коммуникации компании, контрольная - все, за исключением коммуникаций нашей команды. В результате можно получить аплифт, и рассчитать из него инкремент по формуле: increment = revenue_test - revenue_test / (1 + effect / 100).
При этой оценке важно получать статзначимый результат, и сделать это на контрольной группе минимального размера.
Это достаточно непростая задача по нескольким причинам:
1.Эффект от CRM-коммуникаций отдельной команды незначителен (в районе 1%), так как эти коммуникации отправляются поверх остальных коммуникаций компании.
2.Сегмент Resellers имеет относительно небольшой размер (несколько сотен тысяч пользователей) и достаточно изменчивый (каждый день его пополняют и покидают наши клиенты).
3. Требуется проводить длительные эксперименты (от 3 до 6 месяцев). Как следствие, нет возможности учитывать ARPU пользователей на протяжении одного и того же периода. В эксперименте есть пользователи с выручкой за 1 день, а есть - за 180 дней.
4. Отсутствует возможность точно фиксировать экспоужер (exposure) для контрольной группы, так как для каждого пользователя стоит ограничение на количество отправляемых коммуникаций в течение дня с определенным сложным алгоритмом ранжирования.
5. Как известно, увеличение диспропорции групп в эксперименте (в нашем случае, сокращение размера контрольной группы) ведет к экспоненциальному росту MDE.

Таким образом, требуется оценить незначительный эффект в изменчивой выборке на протяжении длительного времени без окна атрибуции и четкого экспоужера с минимально возможным размером контрольной группы.
Дизайн
Самый важный шаг к повышению чувствительности, на котором закладывается фундамент итогового результата, — это дизайн эксперимента.
В качестве базового соотношения выбрали 70/30 (тестовая группа/контрольная группа). Это оптимальное для начала соотношение между размером тестовой группы и MDE.
Длительность эксперимента: 6 месяцев.
Механика: каждый день новые реселлеры, попадающие в сегмент триггеров CRM-кампаний, добавляются в эксперимент. В качестве экспоужера учитываем попадание пользователя в сегмент одного из триггеров.
Учет метрик: учитываем метрики пользователя с первого дня попадания пользователя в эксперимент на всем его протяжении (не принимаем во внимание выход пользователя из сегмента Resellers).
Усиление эффекта: ортогонально проверяем новые гипотезы, и в случае получения успешного результата, добавляем их триггеры в наш основной эксперимент для усиления общего аплифта.
Целевая метрика эксперимента: ARPU по общей выручке.
Дополнительные метрики: ARPU по отдельным продуктам, доля платящих пользователей, доля платящих пользователей по отдельным продуктам.
Контр-метрика: доля отписавшихся от CRM-коммуникаций пользователей.
При пропорции контрольной и тестовой групп 30/70 и альфе на уровне 0,05, с учетом применения CUPED, получен MDE по целевой метрике: 1,92% (без учета экспоужера и пост-стратификации). Это будет наш ориентир по верхней границе.
Расчет MDE с учетом экспоужера для несколькоих десятков кампаний на длительном промежутке времени крайне ресурсоемкий, и выглядит нецелесообразным. Поэтому такие расчеты не проводились.
Выбор оптимального предпериода для CUPED
Традиционно считается, что оптимальная длительность предпериода для CUPED сопоставима с длительностью эксперимента. Однако в случае длительных экспериментов (например, 6 месяцев) такой подход может быть неоптимальным: за это время поведение пользователей и сами данные могут существенно измениться, что приводит к снижению полезности ковариаты.
Поэтому мы попробуем эмпирически подобрать оптимальную длительность предпериода для CUPED, не подсматривая в экспериментальные данные.
Методология:
В качестве связи с экспериментальным периодом мы используем только длительность наблюдения пользователя в эксперименте — количество дней, в течение которых пользователь присутствовал в эксперименте.
Значения целевой метрики в эксперименте, принадлежность к группе и любые экспериментальные эффекты при этом не используются.
Для разных вариантов длительности предпериода (30, 40, 50, 60, 70, 80 и 90 дней) мы считаем коэффициенты корреляции между:
значением целевой метрики в предпериоде;
значением целевой метрики в периоде, длина которого соответствует предполагаемой длительности наблюдения пользователя в эксперименте.
Таким образом, мы оцениваем, насколько хорошо ковариата, рассчитанная на разных временных окнах, предсказывает будущие значения метрики, не используя при этом ни экспериментальные эффекты, ни разбиение на группы.

Результат: коэффициент корреляции рос вплоть до длительности 60 дней, а затем начинал снижаться.
Вывод: для нашего эксперимента оптимальная длина предпериода для CUPED — 60 дней.
Пример функции для расчета
def compare_cuped_periods(df):
data = df.copy()
# Словарь для хранения результатов
results = []
# Определяем метрики и периоды для анализа
metrics = {
'revenue': 'revenue'
}
periods = [90, 80, 70, 60, 50, 40, 30]
# Для каждой метрики и периода считаем корреляцию
for metric_name, metric_col in metrics.items():
for period in periods:
# Название CUPED-столбца
cuped_col = f"{metric_name}_cuped_{period}"
# Проверяем, что столбец существует
if cuped_col not in data.columns:
print(f"Предупреждение: Столбец {cuped_col} не найден")
continue
# Рассчитываем корреляцию Пирсона
correlation = data[metric_col].corr(data[cuped_col])
# Рассчитываем снижение дисперсии (%)
var_original = data[metric_col].var()
var_cuped = data[cuped_col].var()
variance_reduction = ((var_original - var_cuped) / var_original * 100
if var_original > 0 else 0)
# Добавляем результат
results.append({
'Метрика': metric_name,
'Период_CUPED': period,
'Корреляция_Пирсона': round(correlation, 4),
'Снижение_дисперсии_%': round(variance_reduction, 2)
})
# Создаем DataFrame с результатами
results_df = pd.DataFrame(results)
# Сортируем по метрике и корреляции (по убыванию)
results_df = results_df.sort_values(['Метрика', 'Корреляция_Пирсона'],
ascending=[True, False])
return results_dfВыбор оптимальных страт для пост-стратификации
Пост‑стратификация — один из методов уменьшения дисперсии оценок эффекта эксперимента. Основная идея: разделить наблюдения на страты так, чтобы внутри каждой страты вариативность метрики была минимальна, а различия между стратами были информативны для оценки эффекта.
Для реализации этого метода важно осуществить два последовательных шага: выбор критерия для стратификации и выбор границ страт.
1. Выбор мощного критерия (ковариаты)
Выбор переменной, которая максимально предсказывает вариативность целевой метрики (без использования результатов эксперимента) — ключевой момент. Это требует глубокого погружения в бизнес. В нашем кейсе наиболее подходящим критерием оказался: "Количество объявлений в логической категории б/у автомобилей за последние полгода до участия в эксперименте".
Распределение его значений неоднородно. Большинство значений смещено к нулю, но при этом есть длинный "правый" хвост, достигающий нескольких тысяч.
2. Определение границ страт
После того как критерий выбран, нужно определить конкретные границы страт. Мы это делаем автоматически с помощью алгоритма:
Значения критерия сортируются по возрастанию.
Начальная страта формируется из первых значений.
Добавление новых наблюдений проверяется по росту внутристратной дисперсии целевой метрики на предпериоде.
Если добавление новых значений увеличивает дисперсию выше заранее установленного порога (var_growth_threshold = 1.15), формируется новая страта.
Параметр min_bin_size (установили на уровне 2 000) гарантирует, что ни одна страта не будет слишком мала, а хвостовые редкие значения объединяются с соседними стратами.
Границы страт определялись исключительно на основе предпериодных данных, что исключает утечку информации о эффекте и обеспечивает корректность оценки.
Функция разбиения на страты
def build_strata_from_preperiod_target(
x_pre,
y_pre,
min_bin_size=2000,
var_growth_threshold=1.15,
max_bins=10,
):
df = pd.DataFrame({
"x": np.asarray(x_pre),
"y": np.asarray(y_pre),
})
# агрегируем предпериод по значениям ковариаты
grouped = (
df.groupby("x")
.agg(
count=("y", "size"),
y_values=("y", list)
)
.sort_index()
)
bins = []
current_x_vals = []
current_y_vals = []
# --- основной greedy-проход ---
for x_val, row in grouped.iterrows():
y_vals = row["y_values"]
candidate_x = current_x_vals + [x_val] * len(y_vals)
candidate_y = current_y_vals + y_vals
current_var = np.var(current_y_vals) if len(current_y_vals) > 1 else 0.0
candidate_var = np.var(candidate_y) if len(candidate_y) > 1 else 0.0
start_new_bin = False
if len(current_y_vals) >= min_bin_size and current_var > 0:
if candidate_var / current_var > var_growth_threshold:
start_new_bin = True
if start_new_bin:
bins.append({
"x_vals": current_x_vals,
"y_vals": current_y_vals
})
current_x_vals = [x_val] * len(y_vals)
current_y_vals = y_vals.copy()
else:
current_x_vals += [x_val] * len(y_vals)
current_y_vals += y_vals
if current_x_vals:
bins.append({
"x_vals": current_x_vals,
"y_vals": current_y_vals
})
# --- ограничение на число страт ---
if len(bins) > max_bins:
tail = bins[max_bins - 1 :]
merged_tail = {
"x_vals": sum([b["x_vals"] for b in tail], []),
"y_vals": sum([b["y_vals"] for b in tail], [])
}
bins = bins[: max_bins - 1] + [merged_tail]
# --- гарантируем минимальный размер последней страты ---
def bin_size(b):
return len(b["y_vals"])
while len(bins) >= 2 and bin_size(bins[-1]) < min_bin_size:
bins[-2]["x_vals"].extend(bins[-1]["x_vals"])
bins[-2]["y_vals"].extend(bins[-1]["y_vals"])
bins.pop()
# --- строим отображение x -> bin ---
bin_map = {}
bin_edges = []
for i, b in enumerate(bins):
unique_x = sorted(set(b["x_vals"]))
for v in unique_x:
bin_map[v] = i
bin_edges.append((min(unique_x), max(unique_x)))
bin_id = np.array([bin_map.get(v, len(bins) - 1) for v in df["x"]])
return bin_id, bin_edgesВ нашем случае были выделены следующие страты:

Сравнение метрик
Далее рассмотрим влияние каждого метода повышения чувствительности на основные метрики эксперимента.
Посмотрим на динамику доверительного интервала и p-value по целевой метрике в трех разрезах в нашем эксперименте:
Без применения CUPED и пост-стратификации;
С применением CUPED;
С применением CUPED и пост-стратификации.


Совместное применение двух этих методов в нашем конкретном кейсе позволило, во-первых, получить статзначимый результат за прошедший период, а, во-вторых, осознанно запланировать снижение контрольной группы до 20% в следующем полугодии.
Так как у меня нет возможности делиться с широкой публикой остальными реальными показателями, приведу пример использования функций с синтетическим датасетом.
Скрипт для симуляции
import numpy as np
import pandas as pd
from typing import Optional
import scipy.stats as stats
np.random.seed(42)
# Генерация данных
def generate_data(
n_control=5000,
n_test=5000,
effect=0.02, # 2% uplift
rho=0.7 # корреляция ковариаты с метрикой
):
strata = np.array(["A", "B", "C"])
strata_probs = np.array([0.5, 0.3, 0.2])
def sample_group(n, is_test=False):
stratum = np.random.choice(strata, size=n, p=strata_probs)
# базовые средние по стратам
base_mean = np.select(
[stratum == "A", stratum == "B", stratum == "C"],
[100, 200, 400]
)
# предпериод
pre_metric = base_mean + np.random.normal(0, 50, size=n)
# основная метрика, коррелированная с предпериодом
noise = np.random.normal(0, 50 * np.sqrt(1 - rho**2), size=n)
metric = (
base_mean
+ rho * (pre_metric - base_mean)
+ noise
)
if is_test:
metric *= (1 + effect)
return pd.DataFrame({
"metric": metric,
"pre_metric": pre_metric,
"stratum": stratum
})
control = sample_group(n_control, is_test=False)
test = sample_group(n_test, is_test=True)
return control, test
# Применение CUPED
def cuped_adjustment(
control_metric,
test_metric,
control_covariate,
test_covariate,
eps: float = 1e-10
):
control_metric = np.asarray(control_metric)
test_metric = np.asarray(test_metric)
control_covariate = np.asarray(control_covariate)
test_covariate = np.asarray(test_covariate)
all_cov = np.concatenate([control_covariate, test_covariate])
cov_mean = np.mean(all_cov)
control_cov_c = control_covariate - cov_mean
test_cov_c = test_covariate - cov_mean
var_c = np.var(control_cov_c, ddof=1)
var_t = np.var(test_cov_c, ddof=1)
total_var = var_c + var_t
if total_var < eps:
return control_metric.copy(), test_metric.copy(), 0.0
cov_control = np.cov(control_metric, control_cov_c, ddof=1)[0, 1]
cov_test = np.cov(test_metric, test_cov_c, ddof=1)[0, 1]
theta = (cov_control + cov_test) / total_var
return (
control_metric - theta * control_cov_c,
test_metric - theta * test_cov_c,
theta
)
# t-тест для оценки относительного эффекта с использованием дельта-метода
def relative_ttest(
control,
test,
alpha: float = 0.05
):
control = np.asarray(control)
test = np.asarray(test)
mean_c = control.mean()
mean_t = test.mean()
if abs(mean_c) < 1e-10:
return None
delta = (mean_t - mean_c) / mean_c
var_c = np.var(control, ddof=1) / len(control)
var_t = np.var(test, ddof=1) / len(test)
var_delta = (
var_t / mean_c**2 +
(mean_t**2 / mean_c**4) * var_c
)
se = np.sqrt(var_delta)
z = delta / se if se > 0 else 0.0
p_value_one_sided = 1 - stats.norm.cdf(z)
z_crit = stats.norm.ppf(1 - alpha / 2)
ci_lower = delta - z_crit * se
ci_upper = delta + z_crit * se
return {
'effect': delta * 100,
'p_value_one_sided': p_value_one_sided,
'ci_length': (ci_upper - ci_lower) * 100,
'left_bound': ci_lower * 100,
'right_bound': ci_upper * 100
}
# Применение пост-стратификации
def relative_ttest_post_strat(
control_values,
test_values,
control_strata,
test_strata,
alpha=0.05,
min_n=30,
min_mean=1e-6
):
control_values = np.asarray(control_values)
test_values = np.asarray(test_values)
control_strata = np.asarray(control_strata)
test_strata = np.asarray(test_strata)
all_strata = np.concatenate([control_strata, test_strata])
unique_strata = np.unique(all_strata)
pop_weights = {s: np.mean(all_strata == s) for s in unique_strata}
effect_sum = 0.0
var_sum = 0.0
weight_sum = 0.0
strata_log = []
for s in unique_strata:
c = control_values[control_strata == s]
t = test_values[test_strata == s]
n_c, n_t = len(c), len(t)
log_row = {"stratum": s, "n_control": n_c, "n_test": n_t, "included": False}
if n_c < min_n or n_t < min_n:
strata_log.append(log_row)
continue
mean_c = c.mean()
mean_t = t.mean()
if mean_c < min_mean:
strata_log.append(log_row)
continue
var_c = np.var(c, ddof=1) / n_c
var_t = np.var(t, ddof=1) / n_t
delta_s = (mean_t - mean_c) / mean_c
var_delta_s = (
var_t / mean_c**2 +
(mean_t**2 / mean_c**4) * var_c
)
w = pop_weights[s]
effect_sum += w * delta_s
var_sum += w**2 * var_delta_s
weight_sum += w
log_row.update({"relative_effect": delta_s * 100, "included": True})
strata_log.append(log_row)
if weight_sum == 0:
raise ValueError("No valid strata")
delta = effect_sum / weight_sum
se = np.sqrt(var_sum) / weight_sum
z = delta / se if se > 0 else 0.0
p_value_one_sided = 1 - stats.norm.cdf(z)
z_crit = stats.norm.ppf(1 - alpha / 2)
ci_lower = delta - z_crit * se
ci_upper = delta + z_crit * se
return {
'effect': delta * 100,
'p_value_one_sided': p_value_one_sided,
'ci_length': (ci_upper - ci_lower) * 100,
'left_bound': ci_lower * 100,
'right_bound': ci_upper * 100,
'strata_log': strata_log
}
# Собираем результат
def relative_ttest_comparison(
control_df,
test_df,
metric_column,
covariate_column: Optional[str] = None,
stratification_column: Optional[str] = None,
metric_type: str = "numeric", # "numeric" | "binary"
alpha: float = 0.05
):
control_metric = control_df[metric_column].values
test_metric = test_df[metric_column].values
# ---------- BASELINE ----------
baseline = relative_ttest(control_metric, test_metric, alpha)
# ---------- POST-STRAT ----------
post_strat = None
if stratification_column is not None:
if metric_type == "binary":
post_strat = relative_ttest_post_strat_binary(
control_metric,
test_metric,
control_df[stratification_column].values,
test_df[stratification_column].values,
alpha
)
else:
post_strat = relative_ttest_post_strat(
control_metric,
test_metric,
control_df[stratification_column].values,
test_df[stratification_column].values,
alpha
)
# ---------- CUPED ----------
cuped = None
post_strat_cuped = None
theta = None
if metric_type == "numeric" and covariate_column is not None:
control_cup, test_cup, theta = cuped_adjustment(
control_metric,
test_metric,
control_df[covariate_column].values,
test_df[covariate_column].values
)
cuped = relative_ttest(control_cup, test_cup, alpha)
if stratification_column is not None:
post_strat_cuped = relative_ttest_post_strat(
control_cup,
test_cup,
control_df[stratification_column].values,
test_df[stratification_column].values,
alpha
)
return {
# baseline
'effect': baseline['effect'] if baseline else None,
'p_value_one_sided': baseline['p_value_one_sided'] if baseline else None,
'ci_length': baseline['ci_length'] if baseline else None,
'left_bound': baseline['left_bound'] if baseline else None,
'right_bound': baseline['right_bound'] if baseline else None,
# post-strat
'effect_post_strat': post_strat['effect'] if post_strat else None,
'p_value_one_sided_post_strat': post_strat['p_value_one_sided'] if post_strat else None,
'ci_length_post_strat': post_strat['ci_length'] if post_strat else None,
'left_bound_post_strat': post_strat['left_bound'] if post_strat else None,
'right_bound_post_strat': post_strat['right_bound'] if post_strat else None,
# cuped
'effect_cuped': cuped['effect'] if cuped else None,
'p_value_one_sided_cuped': cuped['p_value_one_sided'] if cuped else None,
'ci_length_cuped': cuped['ci_length'] if cuped else None,
'left_bound_cuped': cuped['left_bound'] if cuped else None,
'right_bound_cuped': cuped['right_bound'] if cuped else None,
# cuped + post-strat
'effect_cuped_post_strat': post_strat_cuped['effect'] if post_strat_cuped else None,
'p_value_one_sided_cuped_post_strat': post_strat_cuped['p_value_one_sided'] if post_strat_cuped else None,
'ci_length_cuped_post_strat': post_strat_cuped['ci_length'] if post_strat_cuped else None,
'left_bound_cuped_post_strat': post_strat_cuped['left_bound'] if post_strat_cuped else None,
'right_bound_cuped_post_strat': post_strat_cuped['right_bound'] if post_strat_cuped else None,
'cuped_post_strat_strata_log': post_strat_cuped['strata_log'] if post_strat_cuped else None,
'theta': theta
}
df_result = pd.DataFrame(columns=[
'ci_length',
'ci_length_cuped',
'ci_length_cuped_post_strat',
'effect',
'effect_cuped',
'effect_cuped_post_strat',
'p_value_one_sided',
'p_value_one_sided_cuped',
'p_value_one_sided_cuped_post_strat'])
for i in range(20):
control_df, test_df = generate_data(effect=0.001)
results = relative_ttest_comparison(
control_df,
test_df,
metric_column="metric",
covariate_column="pre_metric",
stratification_column="stratum",
metric_type="numeric",
alpha=0.05
)
new_row = {
'ci_length': results['ci_length'],
'ci_length_cuped': results['ci_length_cuped'],
'ci_length_cuped_post_strat': results['ci_length_cuped_post_strat'],
'effect': results['effect'],
'effect_cuped': results['effect_cuped'],
'effect_cuped_post_strat': results['effect_cuped_post_strat'],
'p_value_one_sided': results['p_value_one_sided'],
'p_value_one_sided_cuped': results['p_value_one_sided_cuped'],
'p_value_one_sided_cuped_post_strat': results['p_value_one_sided_cuped_post_strat']
}
df_result = pd.concat([df_result, pd.DataFrame([new_row])], ignore_index=True)
df_result
На синтетических данных, как и в реальных кейсах, далеко не всегда есть существенная польза от пост-стратификации в дополнение к CUPED. Выбор критерия стратификации и корректное выделение страт имеют решающее значение в методологии.
Заключение
В этой статье мы разобрали, как с помощью CUPED и пост-стратификации можно значительно повысить чувствительность долгосрочного эксперимента в сложных условиях.
Что важно запомнить:
Не всегда "чем больше, тем лучше": Для CUPED в долгих экспериментах длинный предпериод может потерять релевантность. Оптимальную длину стоит проверять через корреляцию на исторических данных, не связанных с экспериментом.
Мощь в бизнес-понимании: Самый эффективный критерий для стратификации часто лежит в плоскости бизнес-логики, а не просто в исторических значениях целевой метрики. Погружайтесь в контекст.
Автоматизация с контролем: Алгоритмическое определение границ страт (с параметрами min_bin_size и var_growth_threshold) позволяет избежать субъективности, сохраняя при этом контроль над качеством разбиения.
Сила комбинации: CUPED и пост-стратификация — не конкуренты, а союзники. Их совместное применение дает синергетический эффект, снижая дисперсию больше, чем каждый метод по отдельности.
Этот подход не только решил нашу текущую задачу, но и открыл путь для оптимизации: благодаря возросшей чувствительности мы можем уменьшать размер контрольной группы, тем самым экспонируя новые гипотезы на большую аудиторию без потери точности измерений. В условиях, где каждый пользователь на счету, это критически важно.
