Как стать автором
Обновить

Кэш в асинхронных python приложениях

Время на прочтение8 мин
Количество просмотров15K

Предисловие

Кеширование играет важную роль в вебе. Без него невозможен современный Интернет. Кэш присутствует на всех уровнях работы веб приложения: клиентский кэш браузера, CDN ускоряет загрузку статики, в базах данных буферы и кэши ускоряют выполнение популярных запросов. На серверах кэш ускоряет чтение популярных данных. И конечно, на уровне приложения используется кэш. К сожалению, встречается это не часто. Во-первых, чаще всего разработчики рассматривают кэш как способ ускорить приложение и прибегают к нему только в крайнем случае. Конечно они правы, но использование кэша может быть и архитектурным решением. Во-вторых, использование кэша вносит дополнительные трудности: потребность сохранения согласованного состояния кэша с основным источником и необходимость поддерживать код, который этот кэш обслуживает.

В статье поговорим об использование кэша с точки зрения кода асинхронного Python приложения. Начнем с рассмотрения самостоятельной реализации, а в конце посмотрим на библиотеки и “пропиаримся”. 

Почему async? Просто потому, что популярно. Да и сама возможность асинхронного взаимодействия с кэшом имеет свои плюсы: мы можем “отстреливать” корутины на обновления кэша или иметь отдельные корутины обслуживающие наш кэш. Также, для каноничных приложений уже все давно есть – просто бери да используй.

Базовая реализация кэша

И так начнем. Рано или поздно вы задумаетесь об использовании кэша, и самый просто способ – реализовать его "в лоб". Например, как-нибудь так:

cache = {}
KEY_TEMPLATE = "user_account-{user_id}-{account_id}"

async def get_user_account(user: User, account_id: int):
    key = KEY_TEMPLATE.format(user_id=user.id, account_id=account_id)
    if key not in cache:
        cache[key] = await _get_user_account(user, account_id)
    return cache[key]

Довольно популярное и простое решение, но тут есть ряд проблем. 

Часть связана с кодом: 

  1. возможно в некоторых случаях понадобится получить свежие данные и тогда придется или добавить параметр к функции, что-то типа use_cache=False, или иметь две функции: get_user_account_with_cache и get_user_account, что не очень удобно в поддержке. 

  2. придется бороться с кэшом при написании юнит тестов для покрытия таких функций.

Другая важная проблема – отсутствие ограничений на размер кэша. Он будет расти и это приведет к чрезмерному росту потребления памяти, если конечно в системе не 100 пользователей. Нужно "умное" хранилище с одним из механизмов вытеснения – lru, mru, lfu, ttl. Конечно, можно написать кучу кода и реализовать умное in-memory хранилище самому или взять готовую библиотеку а-ля async-lru. Однако, когда приложения запускаются в Кубернетисах и контейнерах по 5-10 экземпляров и перезапускаются по несколько раз за день, такой кэш будет малоэффективным. 

Современные key-value базы данных умеют из коробки ограничивать кэш. Яркий представитель такого хранилища Redis, который мало того что дает указывать ttl (“time to live”) для ключа, но также имеет разные механизмы вытеснения, которые включаются путем простых настроек. 

Неизбежно наш код превратится во что-то такое:

KEY_TEMPLATE = "user_account-{user_id}-{account_id}" 
CACHE_TTL_SECONDS = 24 * 60 * 60  # 24 hours
cache = CacheStorage()

async def get_user_account(user: User, account_id: int):
    key = KEY_TEMPLATE.format(user_id=user.id, account_id=account_id)
    account = await cache.get(key)
    if account:
        return account
    account = await _get_user_account(user, account_id)
    await cache.set(key, account, ttl=CACHE_TTL_SECONDS)
    return account

Хорошо, если account это просто строка, однако он может быть объектом. В таком случае придется добавить код для сериализации и десериализации объекта account. Как правило, для этого используют или json или pickle. Давайте в дальнейшем считать, что это реализовано на уровне CacheStorage. При n-ой реализации такого кэширования часто допускаются ошибки. Можно забыть проставить значение в кеш или промахнуться с ttl. Ну и хотя мы избавились от ряда проблем, все равно такой код плохо масштабируется: поэтому ... напишем декоратор!

# never use it 
import asyncio
import json
from functools import wraps

CACHE_TTL_HOURS = 24
_cache = CacheStorage()

def cache(function):
    @wraps(function)
    async def _function(*args, **kwargs):
        cache_key = function.__module__ + function.__name__ + json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True)
        result = await _cache.get(cache_key)
        if result:
            return result
        result = await function(*args, **kwargs)
        asyncio.create_task(_cache.set(cache_key, result, ttl=CACHE_TTL_HOURS))
        return result
    return _function

Эта наивная реализация кэширующего декоратора решает множество проблем: 

  • не нужно дублировать код, 

  • создаем ключ динамически 

  • единая реализация, которую легко изменить. 

Но у нас до сих пор есть и проблемы: 

  • ключ получается ужасный так как в нем по сути json

  • ключ получится разный в зависимости от сигнатуры вызова функции (происходит ли передача по ключу или по позиционным аргументам) и от типа передаваемых значений

  • не все объекты можно сериализовать в json из коробки

  • мы не можем управлять форматом ключа и ttl. 

не используйте hash

не используйте hash(obj) в качестве ключа для кэша – в алгоритме хеширования есть соль и она уникальна для каждого процесса. Использование других хешей, таких как, md5 или sha, в качестве ключей тоже не рекомендовано, так как затрудняет траблшутинг и ручной менеджмент кэша.

Чуть более сложная реализация

Давайте попробуем это исправить тем, что дадим пользователю нашего декоратора возможность передавать функцию, которая будет возвращать ключ для кэша:


import asyncio
from functools import wraps
from datetime import timedelta

def cache(key_function: Callable, ttl: timedelta):
    def _decorator(function):
        @wraps(function)
        async def _function(*args, **kwargs):
            cache_key = key_function(*args, **kwargs)
            result = await _cache.get(cache_key)
            if result:
                return result
            result = await function(*args, **kwargs)
            asyncio.create_task(_cache.set(cache_key, result, ttl=ttl.total_seconds()))
            return result
        return _function
    return _decorator


def _key_function(user, account_id):
    return f"user-account-{user.id}-{account_id}"


@cache(_key_function, timedelta(hours=3))
async def get_user_account(*, user: User, account_id: int):
    ...

Такой декоратор более предсказуем и прост в использовании, но придется адаптировать сигнатуру функции, чтобы облегчить описание функции формирования ключа. Плюс, возникает ряд вопросов. Как вызвать функцию без кэша? Можно ли понять, получили значение из кэша или путем вызова декорируемой функции? Как принудительно инвалидировать кэш? Эти сложности и вопросы наводят на то, что было бы намного проще использовать готовые реализации.

обидная бага

На одном из проектов мы столкнулись со сложной багой, причем обнаружили ее только через полгода после реализации. Ошибка была в использовании кэша:

async def get_avaliable_currencies():
    currencies = await cache.get(“currencies”)
    if not currencies:
        currencies = await _get_from_origin()
    await cache.set(“currencies”, ttl=_ONE_HOUR)
    return currencies

Так как функция вызывалась чаще чем раз в час, то и инвалидации кэша не происходило. Получалось, что мы достаем из кэша данные и кладем их туда снова продлевая тем самым ttl.

Кэширующие Либы

Безусловно, многие разработчики предпочтут использовать OpenSource библиотеки для функциональности, которая описана выше и это грамотное решение. Однако, на просторах Интернета не так много библиотек, которые “заточены” под работу с асинхронным Python. С той же проблемой столкнулся и я.

Наиболее популярная библиотека управления кэшом - aiocache. Она предоставляет много возможностей:

  • поддержка разных бекендов: redis, memory, memcached

  • разные механизмы сериализации: pickle, json

  • декоратор с примерно такими же функционалом который описали ранее

  • механизм плагинов

К сожалению, эта библиотека не дает ответы на вопросы адресованные ранее, а именно:

  • возможность вызова функции без кэша, причем динамически

  • API для того чтобы понять, был ли результат получен из кэша или нет

  • встроенные механизмы принудительной инвалидации

Своя реализация на “максималках”. Встречайте, Cashews!

Все недостатки существующих библиотек и дополнительные потребности побудили меня к написанию своей библиотеки. Пример использования:

from cashews import cache

cache.setup("redis://")

@cache(ttl="3h", key="user:{user.id}:{account_id}")
async def get_user_account(user: User, account_id: int) -> Account:
  ...

Хотелось бы сразу показать, как эта библиотека помогает справиться с теми проблемами, о которых я говорил выше:

1) возможность вызова функции без кэша, причем динамически

Пример использования: у нас есть веб сервер и есть endpoint на котором есть кэш. Появились кейсы, когда клиенту нужно показать свежие данные (без кэша):

from cashews import cache

@app.middleware("http")
async def disable_cache_for_no_store(request: Request, call_next):
    if request.method.lower() != "get":
        return await call_next(request)
    if request.headers.get("Cache-Control") in ("no-store", "no-cache"):
        with cache.disabling("get", "set"):
            return await call_next(request)
    return await call_next(request)

Здесь контекст менеджер cache.disabling("get", "set") отключает операции get и set у кэша. Тем самым обращение к кэшу не происходит, если в запросе есть заголовок Cache-Control равный no-cache.

2) API для того чтобы понять, был ли результат получен из кэша или нет

Пример использования: у нас есть веб-приложение и мы хотим добавить в заголовки признак того, что для результата запроса использовался кэш. Сделаем middleware:

from cashews import cache

@app.middleware("http")
async def add_from_cache_headers(request: Request, call_next):
    with cache.detect as detector:
        response = await call_next(request)
        if request.method.lower() != "get":
            return response
        if detector.calls:
            response.headers["X-From-Cache-keys"] = ";".join(detector.calls.keys())
    return response

Здесь контекст менеджер cache.detect отслеживает успешные вызовы кэша и помещает их в возвращаемый объект detector. А далее проверяем, если в него что-то попало, то значит был успешный “хит” по кэшу. Из детектора можно получить список ключей – его и добавляем в наш кастомный заголовок.

3) встроенные механизмы принудительной инвалидации

Пример использования: у нас есть CRUD API  На read у нас есть кэш, но его нужно инвалидировать после операций create, update или delete

from cashews import cache

@app.get("/friends")
@cache(ttl="10h")
async def get_fiends(user: User = Depends(get_current_user)):
    pass


@app.post("/friends")
@cache.invalidate(get_fiends)
async def create_friend(user: User = Depends(get_current_user)):
    pass

Суть в том, что при успешном вызове create_friend библиотека сама сформирует ключ, который используется для кэша и удалит ключ или ключи.

Другие возможности Cashews

Ну и немного хотел рассказать про крутые возможности этой либы:

1) На данный момент библиотека поддерживает следующие хранилища("бэкенды") :

cache.setup("mem://", size=1000)  # inmemory (LRU + ttl)
cache.setup("disk://?directory=/tmp/cache")  # use filesystem
cache.setup("redis://redis_host/0")  # use redis

2) Client side cache. В общем-то этих бэкендов бы хватило, но есть еще один который появился благодаря новой фиче добавленной в Redis 6. А именно, client side cache – это механизм, который предоставляет Redis для того, чтобы держать локальную копию кэша согласованной. Все очень просто. При записи кладем пару ключ-значение в Redis и в память, и держим отдельное подключение, которое подписано на события (broadcasting mode) инвалидации ключей. При получении сообщения стираем в своем локальном кэше значение по ключу. Этот простой механизм дает возможность иметь очень быстрый кэш, который не становится холодным после перезапуска приложения:

cache.setup("redis://0.0.0.0", client_side=True)

3) Использование разных бекендов в зависимости от префикса ключа

cache.setup("redis://redis")
cache.setup("mem://",  prefix="users:")


await cache.get("key")  # use redis backend
await cache.get("users:1")  #  use memory backend

4) Разные "стратегии" использования кэша:

@cache(ttl="5h") - самый обычный кэш, как мы описывали в начале статьи.

@cache.hit(ttl="1d", cache_hits=500, update_after=100) – кроме как экспирации по ttl добавляется экспирация по количеству хитов, при указании update_after кэш обновится в бекграунде после указанного количества хитов и сбросит счетчик. Решает проблему cache stampede эффекта.

@cache.early(ttl="10m", early_ttl="7m") – кэш который кроме обычного ttl имеет ttl ранней экпирации, после которого кэш обновится в бекграунде и сбросит ttl. Решает проблему cache stampede эффекта.

@cache.failover(ttl="2h", exceptions=(ValueError, MyException)) – можно сказать, что это не кэш. Суть в том, что этот декоратор сохраняет результат ответа почти всегда. Однако, если вызов функции закончится заданными исключениями, то вернется ранее сохраненное значение. Это особенно полезно для внешних вызовов когда сервис часто недоступен, а хотелось бы его падения “сгладить”

5) "Джентльменский" набор утилит-декораторов для микросервисов: Circuit breaker, Shared Lock, Rate limit

6) При использовании Redis в качестве хранилища используется Pickle для сериализации. Для защиты от атаки через pickle возможно включить защитную проверку, которая к значению также будет добавлять его хеш с секретной солью и сверять его при получении

В общем, если хотите эффективно использовать кэш – используйте cashews :)

Ссылки:

Теги:
Хабы:
Всего голосов 22: ↑22 и ↓0+22
Комментарии22

Публикации

Истории

Работа

Data Scientist
61 вакансия
Python разработчик
137 вакансий

Ближайшие события