Рейт лимиты с помощью Python и Redis

Original author: Andrea Stagi
  • Translation
  • Tutorial

В этой статье мы рассмотрим некоторые алгоритмы рейт лимитов на основе Python и Redis, начиная с самой простой реализации и заканчивая продвинутым обобщённым алгоритмом контроля скорости передачи ячеек (Generic Cell Rate Algorithm, GCRA).

Для взаимодействия с Redis (pip install redis) мы будем пользоваться redis-py. Предлагаю клонировать мой репозиторий для экспериментирования с ограничениями запросов.

Ограничение по времени


Первый подход к ограничению количества запросов за период заключается в использовании алгоритма с ограничением по времени: для каждого ограничивающего ключа (rate-key, что-то уникальное, вроде ника или IP-адреса) отдельно хранятся счётчик (изначально задающий предельное значение) и срок действия (период), которые уменьшаются по мере получения запросов.

С помощью Python и Redis можно реализовать этот алгоритм следующим образом:

  1. Проверяем существование ограничивающего ключа.
  2. Если он существует, то инициализируем его с предельным значением (Redis SETNX) и сроком действия (Redis EXPIRE).
  3. Уменьшаем это значение с каждым последующим запросом (Redis DECRBY).
  4. Запросы останавливаются, только когда значение падает ниже нуля.
  5. Спустя заданный период времени ключ автоматически удаляется.

from datetime import timedelta
from redis import Redis

def request_is_limited(r: Redis, key: str, limit: int, period: timedelta):
    if r.setnx(key, limit):
        r.expire(key, int(period.total_seconds()))
    bucket_val = r.get(key)
    if bucket_val and int(bucket_val) > 0:
        r.decrby(key, 1)
        return False
    return True

Можете посмотреть работу этого кода при эмуляции ограничения в 20 запросов в 30 секунд (чтобы было понятнее, я поместил функцию в модуль).

import redis
from datetime import timedelta
from ratelimit import time_bucketed

r = redis.Redis(host='localhost', port=6379, db=0)
requests = 25

for i in range(requests):
    if time_bucketed.request_is_limited(r, 'admin', 20, timedelta(seconds=30)):
        print ('Request is limited')
    else:
        print ('Request is allowed')

Ограничиваться не будут лишь первые 20 запросов, после них придётся ждать 30 секунд, чтобы снова можно было слать новые запросы.

Недостаток этого подхода в том, что он ограничивает не частоту, а количество сделанных пользователем запросов за определённый период. Если весь лимит будет сразу исчерпан, придётся ждать окончания периода.

Алгоритм текущего ведра (Leaky bucket)


Есть подход, при котором мы можем очень аккуратно ограничивать частоту: это алгоритм текущего ведра. Принцип работы такой же, как у самого настоящего протекающего ведра: мы наливаем воду (запросы) в ведро по самые края, пока какая-то часть воды постоянно вытекает через отверстие в днище (обычно реализуется с помощью фонового процесса). Если количество вливаемой в ведро воды больше, чем количество выливающейся, то ведро наполняется, и когда оно наполнится, новые запросы перестают приниматься.

Как работает алгоритм

Можно пропустить этот подход ради более элегантного решения, не требующего отдельного процесса для эмулирования утечки: обобщённого алгоритма контроля скорости передачи ячеек (Generic Cell Rate Algorithm).

Обобщённый алгоритм контроля скорости передачи ячеек


GCRA был создан в телекоммуникационной отрасли, где его называют режимом асинхронной передачи (Asynchronous Transfer Mode, ATM). Он использовался диспетчерах ATM-сетей для задержки или отбрасывания ячеек — маленьких пакетов данных фиксированного размера, — которые приходили с частотой выше заданного лимита.

GCRA отслеживает оставшийся лимит с помощью так называемого теоретического времени прибытия (Theoretical Arrival Time, TAT) каждого запроса:

tat = current_time + period

и ограничивает следующий запрос, если время прибытия меньше текущего ТАТ. Это хорошо работает, если частота равна 1 запрос/период, когда запросы разделены по периодам. Но в реальности частоты обычно вычисляется как лимит/период. Например, если частота равна 10 запросов/60 сек, то пользователю можно будет делать 10 запросов в первые 6 секунд. А с частотой 1 запрос/6 сек ему придётся ждать по 6 секунд между запросами.

Чтобы иметь возможность отправлять в течение короткого периода группу запросов и поддерживать ограничение их количества за период с лимитом > 1, каждый запрос нужно разделить отношением период/лимит, и тогда следующее теоретическое время прибытия (new_tat) будет вычисляться иначе. Обозначим время прибытия запроса как t:

  • new_tat = tat + period / limit, если запросы объединяются в группу (t <= tat)
  • new_tat = t + period / limit, если запросы не объединяются в группу (t > tat)

Следовательно:

new_tat = max(tat, t) + period / limit

Запрос будет ограничен, если new_tat превышает сумму текущего времени и периода: new_tat > t + period. При new_tat = tat + period / limit мы получаем tat + period / limit > t + period. Следовательно, нужно ограничивать запросы только при tat - t > period - period / limit.

      period — period / limit
      <----------------------->
--|----------------------------|--->
  t                           TAT

В этом случае нам не нужно обновлять ТАТ, потому что у ограниченных запросов нет теоретического времени прибытия.

Теперь соберём финальную версию кода!

from datetime import timedelta
from redis import Redis

def request_is_limited(r: Redis, key: str, limit: int, period: timedelta):
    period_in_seconds = int(period.total_seconds())
    t = r.time()[0]
    separation = round(period_in_seconds / limit)
    r.setnx(key, 0)
    tat = max(int(r.get(key)), t)
    if tat - t <= period_in_seconds - separation:
        new_tat = max(tat, t) + separation
        r.set(key, new_tat)
        return False
    return True

Мы воспользовались Redis TIME, потому что GCRA зависит от времени, и нужно убедиться, что текущее время консистентно в течение нескольких развёртываний (расхождение часов между разными машинами может привести к ложно положительным срабатываниям).

Этот код демонстрирует работу GCRA при частоте 10 запросов/60 сек.

import redis
from datetime import timedelta
from ratelimit import gcra

r = redis.Redis(host='localhost', port=6379, db=0)
requests = 10

for i in range(requests):
    if gcra.request_is_limited(r, 'admin', 10, timedelta(minutes=1)):
        print ('Request is limited')
    else:
        print ('Request is allowed')

Алгоритм не ограничивает первые 10 запросов, но вам придётся ждать не меньше 6 сек, чтобы сделать следующий запрос. Попробуйте запустить скрипт спустя какое-то время и измените величину лимита и периода (например, limit = 1 и period=timedelta(seconds=6)).

Чтобы сохранить чистоту реализации, я не стал добавлять блокировку между получением и отправкой вызовов. Но она нужна при многочисленных запросах с одинаковым ключом и в одно и то же время. С помощью Lua можно добавить блокировку в виде менеджера контекста.

def request_is_limited(r: Redis, key: str, limit: int, period: timedelta):
    period_in_seconds = int(period.total_seconds())
    t = r.time()[0]
    separation = round(period_in_seconds / limit)
    r.setnx(key, 0)
    try:
        with r.lock('lock:' + key, blocking_timeout=5) as lock:
            tat = max(int(r.get(key)), t)
            if tat - t <= period_in_seconds - separation:
                new_tat = max(tat, t) + separation
                r.set(key, new_tat)
                return False
            return True
    except LockError:
        return True

Полный код лежит на GitHub.
Mail.ru Group
Building the Internet

Comments 3

    0

    Между прочим, у Nginx есть встроенный лимитер запросов https://nginx.org/ru/docs/http/ngx_http_limit_req_module.html


    Когда-то писал троттлинг вызовов на Python
    import time
    
    def rate_throttle(key, rate, d={}):
        current = time.monotonic()
        last = d.get(key, 0)
        elapsed = current - last
        sleep = 0
    
        if rate > elapsed > 0:
            sleep = rate - elapsed
    
        d[key] = current + sleep
        return sleep
    
    for i in range(5):
        # No more 5 requests per 1 second
        time.sleep(rate_throttle('', 1/5))
        print('Faster', i)
    
    for i in range(5):
        # No more 1 request per 2 second
        time.sleep(rate_throttle('', 2/1))
        print('Slower', i)
      0
      Спасибо за статью. У соседей-функциональщиков уже есть GenStage, «разогнанный» до безобразия :)
        0

        Лучше везде перевести указанный алгоритм именно как "протекающее ведро". Течь ведро не может, но может протекать. К тому же, "текущее" по умолчанию наводит на значение "конкретное", как текущий элемент в коллекции.

        Only users with full accounts can post comments. Log in, please.