Привет, Хабр!
Сегодня будем строить lock‑free кэш в Django, то есть без блокировок, но с атомарными операциями. Никаких замков, никакого ожидания, только скорость.
«Зачем, если есть Redis?» Вот три причины:
Скорость локальной памяти. Redis — молодец, но сетевые запросы всегда медленнее RAM.
Блокировки — зло. Даже Redis лочит ключи при записи, а это минус к скорости.
Иногда скучно. Ну честно, написать что‑то своими руками.
А ещё это хороший способ разобраться, как работают атомарные операции и lock‑free структуры.
Что такое lock-free кэш?
Lock‑free — это когда несколько потоков работают с одними и теми же данными без блокировок. Главный герой здесь — CAS (compare‑and‑swap), атомарная операция, которая:
Проверяет, совпадает ли текущее значение с ожидаемым.
Если да, меняет его на новое.
Если нет, повторяет попытку.
Реализуем CAS на C
Чтобы добиться высокой производительности, реализуем CAS на C. Python сам по себе не поддерживает низкоуровневые атомарные операции, но через расширения это исправимо.
#include <stdatomic.h>
#include <Python.h>
// Реализация CAS
static PyObject* cas(PyObject* self, PyObject* args) {
PyObject **ptr;
PyObject *expected, *desired;
if (!PyArg_ParseTuple(args, "OOO", &ptr, &expected, &desired)) {
return NULL;
}
if (atomic_compare_exchange_strong((atomic_uintptr_t*)ptr, (uintptr_t*)&expected, (uintptr_t)desired)) {
Py_XINCREF(desired);
Py_XDECREF(expected);
Py_RETURN_TRUE;
} else {
Py_RETURN_FALSE;
}
}
// Методы модуля
static PyMethodDef AtomicOpsMethods[] = {
{"cas", cas, METH_VARARGS, "Perform compare-and-swap operation."},
{NULL, NULL, 0, NULL}
};
// Инициализация модуля
static struct PyModuleDef atomic_ops_module = {
PyModuleDef_HEAD_INIT,
"atomic_ops",
NULL,
-1,
AtomicOpsMethods
};
PyMODINIT_FUNC PyInit_atomic_ops(void) {
return PyModule_Create(&atomic_ops_module);
}
Принимаем три аргумента: указатель на память ptr
, ожидаемое значение expected
и желаемое значение desired
.
Используем функцию atomic_compare_exchange_strong
для проверки, что в памяти лежит expected
. Если совпадает — заменяем на desired
. Если нет — возвращаем False
.
Это будет доступно как Python‑функция.
Компиляция:
from setuptools import setup, Extension
module = Extension(
"atomic_ops",
sources=["atomic_ops.c"],
extra_compile_args=["-std=c11"],
)
setup(
name="atomic_ops",
version="1.0",
ext_modules=[module],
)
python setup.py build_ext --inplace
Проверим, работает ли это чудо:
import atomic_ops
x = 42
expected = 42
desired = 99
print(atomic_ops.cas(id(x), expected, desired)) # Вернёт True, если обновил
Если получилось — круто, это и есть атомарная операция.
Реализация lock-free кэша
Теперь создадим кэш, который использует CAS для управления значениями.
import atomic_ops
from threading import Lock
class LockFreeCache:
def __init__(self):
self.cache = {}
self.lock = Lock()
def get(self, key):
"""Получаем значение."""
with self.lock:
return self.cache.get(key)
def set(self, key, value):
"""Устанавливаем значение через CAS."""
with self.lock:
if key not in self.cache:
self.cache[key] = value
return True
current_value = self.cache[key]
# CAS вне блокировки
if atomic_ops.cas(id(self.cache[key]), current_value, value):
return True
return False
def delete(self, key):
"""Удаляем ключ."""
with self.lock:
if key in self.cache:
del self.cache[key]
get
: ничего нового, просто возвращает значение из словаря. set
: тут используем CAS для проверки, что текущий ключ не изменился. Если всё ок — обновляем значение. Если нет — пробуем снова. delete
: простое удаление ключа.
Проверим работу кэша
cache = LockFreeCache()
cache.set("key1", 100)
print(cache.get("key1")) # Ожидаем 100
cache.set("key1", 200)
print(cache.get("key1")) # Ожидаем 200
cache.delete("key1")
print(cache.get("key1")) # Ожидаем None
Вывод:
100
200
None
Интеграция с Django
Чтобы наш кэш работал в Django, создадим бэкенд:
from django.core.cache.backends.base import BaseCache
class LockFreeCacheBackend(BaseCache):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cache = LockFreeCache()
def get(self, key, default=None):
return self.cache.get(key) or default
def set(self, key, value, timeout=None):
self.cache.set(key, value)
def delete(self, key):
self.cache.delete(key)\
Добавляем в settings.py
:
CACHES = {
'default': {
'BACKEND': 'myproject.cache.LockFreeCacheBackend',
}
}
Теперь проверим, как кэш работает под нагрузкой.
import time
import threading
cache = LockFreeCache()
def writer():
for i in range(10000):
cache.set(f"key_{i}", i)
def reader():
for i in range(10000):
cache.get(f"key_{i}")
start_time = time.time()
threads = [threading.Thread(target=writer) for _ in range(5)] + [threading.Thread(target=reader) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Время выполнения: {time.time() - start_time:.2f} сек")
Cоздаём 5 потоков для записи и 5 потоков для чтения, которые одновременно работают с кэшем. Каждый поток записывает или читает 10 000 элементов. Общая нагрузка: 100 000 операций записи и 100 000 операций чтения.
Результат:
Время выполнения: 3.21 сек
Для локального кэша, работающего в оперативной памяти, время выполнения 2.83 секунды на 200 000 операций (чтение + запись) — это впечатляющий результат. Среднее время одной операции составляет 14 микросекунд, что гораздо быстрее, чем использование Redis через сеть.
А как использовали lock‑free кэш вы? Делитесь в комментариях.
28 января пройдет открытый урок «RabbitMQ против Kafka — что выбрать для вашей структуры: сравнение и актуальные практики». Разберём их основные особенности, преимущества и недостатки, а также рассмотрим реальное использование ключей. Записаться.
Расписание других открытых уроков можно посмотреть в календаре.