Недавно на Хабре вышла статья за авторством MilashchenkoEA , в которой автор восполняет обнаруженный им пробел в доступных материалах по методам построения кривых плотности распределения вероятности по имеющемуся набору числовых данных. Акцент в статье сделан на методическую сторону получения (оценки) плотности вероятности случайной величины, поэтому автор не преследует цели получения оптимального, с вычислительной точки зрения, алгоритма. Что ж, в данной заметке попытаемся исправить эту ситуацию, а также взглянем под другим углом на способ решения данной задачи.
Постановка задачи
Дан набор значений случайной величины
, сэмплированный из некоторого непрерывного распределения. Необходимо оценить функцию плотности распределения вероятности случайной величины по заданной выборке. Для решения задачи можно использовать только нативные функции python; для построения графиков используется matplotlib; Pandas DataFrame используется как контейнер данных в соответствие с оригинальной статьей (хотя в общем-то можно обойтись и без него).
Подготовка данных
Тестировать методы будем с использованием данных, сгенерированных для 4-х распределений, для которых известны аналитические выражения плотности, как в оригинальной статье: Релея, гамма, Вейбулла и экспоненциального. Для этого используем код MilashchenkoEA с небольшими изменениями (для удобства дальнейшего использования изменены сигнатуры функций, возвращающих значения аналитической функции плотности вероятности на заданном массиве значений случайной переменной):
Код
import random import math import matplotlib.pyplot as plt import pandas import pandas as pd import numpy as np # Понадобится для расчета метрик from time import perf_counter # Понадобится для определения времени выполнения def rel_pdf(rel_sigma: float, X: list) -> pandas.DataFrame: """ Вычисляет Релеевскую кривую плотности распределения вероятности по известной формуле :param rel_sigma: среднеквадратическое отклонение :param X: координаты по оси абсцисс :return: pandas.DataFrame """ pdf_y = [] # Координаты по оси ординат for x in X: pdf_y.append(((2 * x) / rel_sigma) * math.exp((-x ** 2) / rel_sigma)) return pd.DataFrame({'x': X, 'y': pdf_y}) def rel_rand(n: int, rel_sigma: float) -> list: """ Генерирует случайные числа с Релеевской плотностью распределения вероятности :param n: количество отсчетов :param rel_sigma: среднеквадратическое отклонение :return: list """ rel_list = [] for i in range(n): rel_list.append((rel_sigma / math.sqrt(2)) * math.sqrt(-2 * math.log(random.uniform(0, 1)))) return rel_list def gam_pdf(v: float, b: float, X: list) -> pandas.DataFrame: """ Вычисляет кривую плотности Гамма распределения вероятности по известной формуле :param v: параметр формы :param b: масштабный коэффициент :param X: координаты по оси абсцисс :return: pandas.DataFrame """ pdf_y = [] # Координаты по оси ординат for x in X: pdf_y.append(((b ** v) / math.gamma(v) * (x ** (v - 1)) * math.exp(-b * x))) return pd.DataFrame({'x': X, 'y': pdf_y}) def gam_rand(n: int, v: float, b: float) -> list: """ Генерирует случайные числа с гамма распределением плотности вероятности :param n: количество отсчетов :param v: параметр формы :param b: масштабный коэффициент :return: list """ gam_list = [random.gammavariate(v, 1 / b) for i in range(n)] return gam_list def weib_pdf(a: float, b: float, X: list) -> pandas.DataFrame: """ Вычисляет кривую плотности распределения вероятности Вейбулла по известной формуле :param a: масштабный коэффициент :param b: параметр формы :param X: координаты по оси абсцисс :return: pandas.DataFrame """ pdf_y = [] # Координаты по оси ординат for x in X: pdf_y.append((b / a) * ((x / a) ** (b - 1)) * math.exp(-(x / a) ** b)) return pd.DataFrame({'x': X, 'y': pdf_y}) def weib_rand(n: int, a: float, b: float) -> list: """ Генерирует случайные числа с распределением плотности вероятности Вейбулла :param n: количество отсчетов :param a: масштабный коэффициент :param b: параметр формы :return: list """ wei_list = [random.weibullvariate(a, b) for i in range(n)] return wei_list def exp_pdf(l: float, X: list) -> pandas.DataFrame: """ Вычисляет кривую экспоненциальной плотности распределения вероятности по известной формуле :param l: обратный коэффициент масштаба :param n: количество рассчитанных точех :param X: координаты по оси абсцисс :return: pandas.DataFrame """ pdf_y = [] # Координаты по оси ординат for x in X: pdf_y.append(l * math.exp(-l * x)) return pd.DataFrame({'x': X, 'y': pdf_y}) def exp_rand(n: int, l: float) -> list: """ Генерирует случайные числа с экспоненциальным распределением плотности вероятности :param n: количество отсчетов :param l: обратный коэффициент масштаба :return: list """ exp_list = [random.expovariate(l) for i in range(n)] return exp_list
Сгенерируем для каждого распределения наборы случайных величин, и организуем их в словарь:
random_series = { 'rrand': rel_rand(100000, 1), # генерируем случайные числа с распределением Релея 'grand': gam_rand(100000, 0.5, 0.5), # генерируем случайные числа с гамма распределением 'wrand': weib_rand(100000, 1, 5), # генерируем случайные числа с распределением Вейбулла 'exprand': exp_rand(100000, 1.5) # генерируем случайные числа с экспоненциальным распределением }
Далее, эти наборы данных будут использованы для тестирования целиком, либо срезами, содержащими значений случайной величины.
Оптимизируем алгоритм, основанный на бинаризации данных
В качестве алгоритма нахождения плотности распределения автор предлагает довольно широко распространенный метод, основанный на разбиении интервала значений переменной на заданное число
бинов равной ширины и подсчет числа вхождений значений переменной в каждый бин, реализованный в ряде библиотек [numpy, scipy, pandas], а также собственную реализацию в соответствии с условиями (оригинальные комментарии сохранены):
def pdf_original(k: int, rnd_list: list) -> pandas.DataFrame: """ Получает кривую плотности распределения вероятности :param k: количечиво интервалов разбиения гистограммы :param rnd_list: случайный процесс :return: pandas.DataFrame """ pdf_x = [] # Координаты по оси абсцисс pdf_y = [] # Координаты по оси ординат n = len(rnd_list) # количество элементов в рассматриваемой выборке h = (max(rnd_list) - min(rnd_list)) / k # ширина одного интервала a = min(rnd_list) # минимальное значение в рассматриваемой выборке for i in range(0, k): # проход по интервалам count = 0 for j in rnd_list: # подсчет количества вхождений значений из выборки в данный интервал if (a + i * h) < j < (a + (i * h) + h): count = count + 1 pdf_x.append(a + i * h + h / 2) # координата по оси абсцисс полученной кривой плотности распределения # вероятности pdf_y.append(count / (n * h)) # координата по оси ординат полученной кривой плотности распределения # вероятности d = {'x': pdf_x, 'y': pdf_y} return pd.DataFrame(d)
Можно заметить, что сложность данного алгоритма составляет , т.к. внешний цикл пробегает
значений, внутренний же, в худшем случае, пробегает
значений. Грубо оценим время выполнения функции при различных значениях
и
for k in [100, 1000, 10000]: tic = perf_counter() _ = pdf_original(k, rrand) print('%.3f s' % (perf_counter() - tic)) for N in [10000, 20000, 30000]: tic = perf_counter() _ = pdf_original(1000, rrand[:N]) print('%.3f s' % (perf_counter() - tic))
Вывод:
0.726 s 7.006 s 69.418 s 0.783 s 1.429 s 2.145 s
Разумеется, столь удручающие результаты не позволяют использовать данную реализацию метода, благо есть готовые реализации. Однако, можно добиться линейной сложности алгоритма подсчета значений в бинах по и константной по
, существенно, тем самым, снизив вычислительное время, всего лишь добавив предварительную сортировку значений и заменив внутренний цикл по всем значениям случайной величины на цикл лишь по тем значениям, которые попадают в бин:
def pdf_optimized(k: int, rnd_list: list) -> pandas.DataFrame: """ Получает кривую плотности распределения вероятности :param k: количечиво интервалов разбиения гистограммы :param rnd_list: случайный процесс :return: pandas.DataFrame """ pdf_x = [] # Координаты по оси абсцисс pdf_y = [] # Координаты по оси ординат n = len(rnd_list) # количество элементов в рассматриваемой выборке h = (max(rnd_list) - min(rnd_list)) / k # ширина одного интервала a = min(rnd_list) # минимальное значение в рассматриваемой выборке rnd_list = sorted(rnd_list) # сортируем значения j = 0 # индекс значения левой границы интервала for i in range(0, k): # проход по интервалам count = 0 while j < n and (a + i * h) <= rnd_list[j] < (a + (i * h) + h): # подсчитываем количество значений в k-м интервале count = count + 1 j += 1 pdf_x.append(a + i * h + h / 2) # координата по оси абсцисс полученной кривой плотности распределения # вероятности pdf_y.append(count / (n * h)) # координата по оси ординат полученной кривой плотности распределения # вероятности d = {'x': pdf_x, 'y': pdf_y} return pd.DataFrame(d)
Выполним аналогичное тестирование времени выполнения оптимизированной функции при различных значениях и
добавив некоторое число повторений:
N_repeats = 100 for k in [100, 1000, 10000]: tic = perf_counter() for _ in range(N_repeats): _ = pdf_optimized(k, random_series['rrand']) print('%.3f s' % ((perf_counter() - tic) / N_repeats)) for N in [25000, 50000, 100000]: tic = perf_counter() for _ in range(N_repeats): _ = pdf_optimized(10000, random_series['rrand'][:N]) print('%.3f s' % ((perf_counter() - tic) / N_repeats))
Вывод:
0.035 s 0.034 s 0.039 s 0.013 s 0.021 s 0.038 s
С такими результатами гораздо приятнее иметь дело и такую нативную реализацию можно использовать в условиях, когда нет возможности устанавливать дополнительные пакеты. Следует, однако, заметить, что ассимптотическая сложность метода определяется алгоритмом сортировки, и в данном случае составляет .
Примечание:
Представленные выше реализации используют нестрогое равенство для сравнения действительных чисел, что, вообще говоря, некорректно. Вследствие этого могут возникать граничные эффекты, поскольку вхождения минимального/максимального значений случайной величины в первый/последний бины не гарантировано. Следует немного модифицировать реализацию, добавив учет по умолчанию минимального и максимального значений случайной величины в первом и последнем бинах, соответственно, и пробегать в цикле по оставшимся значениям.
Считаем плотность распределения через функцию распределения
Далее рассмотрим несколько иной подход к получению оценки плотности распределения случайной величины. Вспомним, что плотность распределения случайной величины представляет собой первую производную от функции распределения, которая, в свою очередь, является вероятностью обнаружить значение случайной величины меньше либо равное заданному. Для того, чтобы получить оценку функции распределения вероятности, а из нее, в свою очередь, плотность распределения, предлагается выполнить следующие шаги:
сортируем значения переменной
по возрастанию, получаем набор отсортированных значений
;
ставим в соответствие каждому значению в массиве отсортированных значений его порядковый номер
начиная с нуля — с точностью до множителя,
представляет собой оценку функции распределения случайной величины (Рисунок 1, синяя кривая);
строим равномерную шкалу из
значений на интервале от
до
где
— желаемое число точек кривой плотности распределения (
) — аналог числа бинов в гистограмме;
интерполируем значения номеров переменных из шкалы упорядоченного массива значений переменной в равномерную шкалу, полученную в п. 3 (Рисунок 1, оранжевая кривая);
численно берем производную от интерполированной функции по соседним точкам (для этого и было нужно
значений) и делим на
— получаем, таким образом, искомую оценку плотности вероятности.

Ниже представлена реализация метода:
def pdf_custom(k: int, rnd_list: list): """ Получает кривую плотности распределения вероятности :param k: количечиво интервалов разбиения гистограммы :param rnd_list: случайный процесс :return: pandas.DataFrame """ X = sorted(rnd_list) # сортируем значения случайной переменной N = len(X) i = 0 dx = (X[-1] - X[0]) / (k + 2) # находим шаг по аргументу в равномерной шкале result = [] result_x = [] for j in range(k + 2): # пробегаем по точкам x = X[0] + j * dx # находим соответствующее значение аргумента в равномерной шкале result_x.append(x) while True: # с помощью данного цикла находим индекс i такой, # что x лежит в пределах от X[i] до X[i+1] if x > X[i + 1]: i += 1 else: break result.append(i + (x - X[i]) / (X[i + 1] - X[i])) norm = 0.5 / dx / N d = { 'x': result_x[1:-1], 'y': [(right - left) * norm for right, left in zip(result[2:], result[:-2])]} return pd.DataFrame(d)
Сложность представленного алгоритма также составляет . В результате тестирования времени выполнения получим следующие значения для
= [100, 1000, 10000] при
= 100 000 и
= [25000, 50000, 100000] при
= 10 000:
0.020 s 0.020 s 0.024 s 0.009 s 0.014 s 0.024 s
Новый метод оказался несколько быстрее, при той же алгоритмической сложности.
Сравниваем точность представленных методов
Для сравнения точности представленных методов будем использовать расстояние Кульбака-Лейблера от априорной функции плотности распределения до получаемой оценки
:
Для вычисления метрики уже, не мудрствуя лукаво, будем использовать numpy:
def KL_dist(P: list, Q: list): assert len(P) == len(Q) eps = 1e-9 P = np.asarray(P) Q = np.asarray(Q) return np.mean(P * (np.log(P + eps) - np.log(Q + eps)))
Получим оценки плотности распределения случайной величины для разных априорных распределений при разных соотношениях и сравним значений KL-дивергенции:
from functools import partial pdf_function = { 'rrand': partial(rel_pdf, 1), 'grand': partial(gam_pdf, 0.5, 0.5), 'wrand': partial(weib_pdf, 1, 5), 'exprand': partial(exp_pdf, 1.5) } N = 10000 k_values = np.logspace(2, 4, 10, dtype=np.int) metrics_optimized = {} for key, val in random_series.items(): for k in k_values: estimated_pdf = pdf_optimized(k, val[:N]) theoretical_pdf = pdf_function[key](estimated_pdf['x']) metrics_optimized.setdefault(key, []).append( KL_dist(theoretical_pdf['y'], estimated_pdf['y'])) metrics_custom = {} for key, val in random_series.items(): for k in k_values: estimated_pdf = pdf_custom(k, val[:N]) theoretical_pdf = pdf_function[key](estimated_pdf['x']) metrics_custom.setdefault(key, []).append( KL_dist(theoretical_pdf['y'], estimated_pdf['y'])) plt.figure(figsize=(12, 4)) plt.subplot(1, 2, 1) for key, val in metrics_optimized.items(): plt.scatter(k_values / N, val) plt.legend(metrics_optimized.keys()) plt.xlabel('k / N') plt.ylabel('KL-divergency') plt.title('Метод на основе гистограмм') plt.subplot(1, 2, 2) for key, val in metrics_custom.items(): plt.scatter(k_values / N, val) plt.legend(metrics_custom.keys()) plt.xlabel('k / N') plt.ylabel('KL-divergency') plt.title('Метод на основе функции распределения')
В результате ожидаемо в обоих случаях получаются монотонно возрастающие зависимости метрики от соотношения (Рисунок 2), однако, метод, основанный на численном дифференцировании функции распределения случайной величины дает на порядок меньшее расхождение кривой оценки плотности распределения с теоретической кривой, чем метод, основанный на построении гистограммы, что особенно заметно при больших значениях
.

Заключение
Предложенный альтернативный метод построения оценок плотности распределения случайной величины по заданному набору наблюдений отличается более высокой точностью, по сравнению с методом, основанным на получении гистограмм.
Выражаю благодарность MilashchenkoEA за обсуждение данных проблем и за то, что убедил написать эту небольшую заметку :)
