Продолжение статьи про структуры данных в memcached. В этой завершающей части мы рассмотрим еще три структуры данных: лог событий, массив и таблицу.
Задача этой структуры данных — хранение событий, произошедших в распределенной системе за последние T секунд. Каждое событие имеет момент времени, когда оно произошло, остальное содержимое события определяется логикой приложения.
Операции над логом событий:
Основная идея лога событий — кольцевой буфер, состоящий из
Выбор параметров
В примере используется некоторый класс
Метод
Приведенная выше сигнатура метода позволяет последовательными обращениями выводить новые события из лога:
В массиве хранится список значений произвольного типа, относительно редко происходит обновление списка, гораздо чаще происходит получение списка целиком.
Операции над массивом:
Приведенный выше способ решения на самом деле не имеет никакого отношения к массивам, а может быть применен для любой структуры данных. Он основан на модели reader-writer, когда есть много читателей и относительно мало писателей. Читатели в любой момент с помощью метода
В данной ситуации можно было бы избежать использования блокировки и воспользоваться командами
Массив хранит список значений некоторого типа, часто происходит операция вида «добавить значение в массив». Относительно редко массив запрашивается целиком. Для простоты реализации в дальнейшем будет рассматриваться массив целых чисел, хотя для решения задачи тип данных не имеет существенного значения.
Операции над массивом:
Эта реализация практически повторяет аналогичный код для лога событий, только упрощенный в силу наличия всего одного ключа. По сравнению с первым вариантом реализации типа данных «массив» уменьшилось число операций memcached, все изменяющие массив процессы могут выполняться без задержек (отсутствие блокировок). Как и в первом варианте, не проверяется наличие дубликатов при добавлении элемента в массив (может быть и хорошо, и плохо, в зависимости от применения).
Возможны следующие улучшения (или расширения) описанного примера:
Необходимо хранить множество строк. Операции над множеством:
Можно рассматривать данную структуру данных как таблицу, в которой осуществляется быстрый поиск нужной строки. Или как хэш, хранящийся в распределенной памяти.
Вообще говоря memcached представляет собой огромную хэш-таблицу, правда в ней отсутствует одна операция, которая необходима для нашей структуры данных: получение списка ключей. Поэтому реализация таблицы использует отдельные ключи для хранения каждого элемента таблицы, и отдельно еще один ключ для хранения списка всех её элементов. Реализация хранения списка всех элементов фактически совпадает с реализацией «массива 1». Для сериализации доступа к списку всех элементов используется блокировка, при этом методы
Проверка наличия ключа в таблице выполняется максимально быстро: проверяется наличие соответствующего ключа в memcached. Любое изменение списка элементов всегда происходит одновременно и в ключе, хранящем весь список, и в отдельных ключах для каждого элемента (которые используются только для проверки).
На основе приведенной схемы можно реализовать полноценный хэш, когда для каждого элемента таблицы будет храниться связанное значение, это значение необходимо будет записывать только в отдельные ключи, соответствующие элементам, а список элементов не будет содержать значений.
Итак, приведем список «приемов» или «трюков», описанных в данной статье:
В статье не рассматривались вопросы оптимизации, специфичной для memcached, например, использование multi-get запросов. Это делалось сознательно, чтобы не перегружать исходный код и рассказ. Во многих ситуациях приведенные выше примеры следует рассматривать скорее как псевдокод, чем как пример идеальной реализации на Python.
Если Вы нашли ошибку, хотите предложить более ясное, более оптимальное решение поставленным задачам, хотите предложить реализацию для какой-то еще структуры данных — буду рад комментариям и критике.
Лог событий
Задача
Задача этой структуры данных — хранение событий, произошедших в распределенной системе за последние T секунд. Каждое событие имеет момент времени, когда оно произошло, остальное содержимое события определяется логикой приложения.
Операции над логом событий:
- добавить сообщение в лог событий (должна быть максимально быстрой);
- получить события, произошедшие в период времени от Tmin до Tmax (должна быть эффективной, но вызывается реже, чем добавление);
Решение
def time():
"""
Текущее время в секундах с любого момента времени (например, UNIX Epoch).
@return: текущее время в секундах
@rtype: C{int}
"""
class Event:
"""
Событие, помещаемое в лог событий.
"""
def when(self):
"""
Момент времени, когда произошло событие (секунды).
"""
def serialize(self):
"""
Сериализовать событие.
@return: сериализованное представление
@rtype: C{str}
"""
@static
def deserialize(serialized):
"""
Десериализовать набор событий.
@param serialized: сериализованное представление одного
или нескольких событий
@type serialized: C{str}
@return: массив десериализованных событий
@rtype: C{list(Event)}
"""
class MCEventLog(MemcacheObject):
def __init__(self, mc, name, timeChunk=10, numChunks=10):
"""
Конструктор.
@param name: имя лога событий
@type name: C{str}
@param timeChunk: емкость одного ключа лога в секундах
@type timeChunk: C{int}
@param numChunks: число выделяемых ключей под лог
@type numChunks: C{int}
"""
super(MCEventLog, self).__init__(mc)
self.keyTemplate = 'messagelog' + name + '_%d';
self.timeChunk = timeChunk
self.numChunks = numChunks
def put(self, event):
"""
Поместить событие в лог.
@param event: событие
@type event: L{Event}
"""
serialized = event.serialize()
key = self.keyTemplate % (event.when() // self.timeChunk % self.numChunks)
while True:
try:
self.mc.append(key, serialized)
return
except KeyError:
pass
try:
self.mc.add(key, serialized, self.timeChunk * (self.numChunks-1))
return
except KeyError:
pass
def fetch(self, first=None, last=None):
"""
Получить события из лога за указанный период (или все события).
@param first: минимальное время возвращаемого сообщения
@type first: C{int}
@param last: максимальное время возвращаемого сообщения
@type last: C{int}
@return: массив событий
@rtype: C{list(Event)}
"""
if last is None or last > time():
last = time()
if first is None or last < first or
(last-first) > self.timeChunk * (self.numChunks-1):
first = time() — self.timeChunk * (self.numChunks-1)
firstKey = first / self.timeChunk % self.numChunks
lastKey = last / self.timeChunk % self.numChunks
if firstKey < lastKey:
keyRange = range(firstKey, lastKey+1)
else:
keyRange = range(firstKey, self.numChunks) + range(0, lastKey+1)
keys = [self.keyTemplate % n for n in keyRange]
result = []
for key in keys:
try:
events = Event.deserialize(self.mc.get(key))
except KeyError:
continue
result.extend(filter(lambda e: e.when() >= first and
e.when() <= last, l))
return result
Обсуждение
Основная идея лога событий — кольцевой буфер, состоящий из
numChunks
ключей в memcached. Каждый ключ активен (то есть дополняется значениями) в течение timeChunk
секунд, после чего активным становится следующий ключ (если активным был последний ключ, эта роль переходит к первому ключу). Полный цикл буфера, т.е. период времени между двумя использованиями одного ключа составляет numChunks * timeChunk
секунд, а время жизни каждого ключа — (numChunks - 1) * timeChunk
секунд, таким образом при любом сдвиге времени создания ключа по модулю timeChunk
к моменту времени следующего использования ключ гарантированно будет уничтожен. Таким образом, ёмкость лога событий (или период времени, за который сохраняются события) составляет (numChunks - 1) * timeChunk
секунд. Такое разбиение лога на ключи позволяет при получении событий из лога вынимать лишь те ключи, которые соответствуют интересному нам временному отрезку.Выбор параметров
timeChunk
и numChunks
зависит от применения лога событий: сначала определяется желаемый срок хранения событий, затем по частоте событий выбирается такое значение timeChunk
, чтобы размер каждого ключа лога событий был относительно небольшим (например, 10-20Кб). Из этих соображений можно найти значение и второго параметра, numChunks
.В примере используется некоторый класс
Event
, который обладает единственным интересным для нас свойством — временем, когда произошло событие. В методе put
лога событий предполагается, что событие event
, переданное в качестве параметра, произошло «недавно», то есть с момента event.when()
прошло не более чем (numChunks - 1) * timeChunk
секунд (емкость лога). При работе put
вычисляется ключ, в который должна быть помещена информация о событии, в соответствие с его временной меткой. После этого с помощью уже знакомой по предыдущим примерам техники ключ либо создается, либо к значению уже существующего ключа дописывается сериализованное представление события.Метод
fetch
вычисляет потенциальный набор ключей лога, в которых могут находиться события, произошедшие во временной интервал от first
до last
. Если временные рамки не заданы, last
считается равным текущему моменту времени, а first
— моменту времени, отстоящему от текущего на емкость лога. Набор ключей вычисляется с учетом кольцевой структуры метода, после чего выбираются соответствующие ключи, десериализуются последовательно записанные в них события и проводится дополнительная фильтрация на попадание в отрезок [first, last]
.Приведенная выше сигнатура метода позволяет последовательными обращениями выводить новые события из лога:
- Первый раз вызывается
events = fetch()
. ВычисляетсяlastSeen
какmax(events.when())
. - Все последующие обращения выглядят следующим образом:
events = fetch(first=lastSeen)
, при этом
lastSeen
каждый раз перевычисляется.
Массив
Задача 1
В массиве хранится список значений произвольного типа, относительно редко происходит обновление списка, гораздо чаще происходит получение списка целиком.
Операции над массивом:
- изменить массив (редкая операция);
- получить массив целиком (частая операция).
Решение 1
def serializeArray(array):
"""
Сериализовать массив в бинарное представление.
"""
def deserializeArray(str):
"""
Десериализовать массив из бинарного представления.
"""
class MCArray1(MemcacheObject):
def __init__(self, mc, name):
"""
Конструктор.
@param name: имя массива
@type name: C{str}
"""
super(MCArray1, self).__init__(mc)
self.lock = MCLock(name)
self.key = 'array' + name
def fetch(self):
"""
Получить текущее значение массива.
@return: массив
@rtype: C{list}
"""
try:
return deserializeArray(self.mc.get(self.key))
except KeyError:
return []
def change(self, add_elems=[], delete_elems=[]):
"""
Изменить значение массива, добавив или удалив из него
элементы.
@param add_elems: элементы, которые надо добавить
@type add_elems: C{list}
@param delete_elems: элементы, которые надо удалить
@type delete_elems: C{list}
"""
while not self.lock.try_lock():
pass
try:
try:
array = deserializeArray(self.mc.get(self.key))
except KeyError:
array = []
array = filter(lambda e: e not in delete_elems, array) + add_elems
self.mc.set(self.key, serializeArray(array), 0)
finally:
self.lock.unlock()
Обсуждение 1
Приведенный выше способ решения на самом деле не имеет никакого отношения к массивам, а может быть применен для любой структуры данных. Он основан на модели reader-writer, когда есть много читателей и относительно мало писателей. Читатели в любой момент с помощью метода
fetch
получают содержимое массива, при этом важно, что «писатель» сhange
записывает содержимое одной командой memcached, то есть в силу внутренней атомарности операций get
и set
в memcached и несмотря на отсутствие синхронизации между методами fetch
и сhange
, результат fetch
всегда будет консистентным: это будет значение до или после очередного изменения. Писатели блокируются от одновременного изменения массива с помощью блокировки MCLock
, описанной выше.В данной ситуации можно было бы избежать использования блокировки и воспользоваться командами
gets
, cas
и add
из протокола memcached для того, чтобы гарантировать атомарность изменений с помощью функции change
.Задача 2
Массив хранит список значений некоторого типа, часто происходит операция вида «добавить значение в массив». Относительно редко массив запрашивается целиком. Для простоты реализации в дальнейшем будет рассматриваться массив целых чисел, хотя для решения задачи тип данных не имеет существенного значения.
Операции над массивом:
- добавить значение в массив (частая операция);
- получить массив целиком.
Решение 2
def serializeInt(int):
"""
Сериализовать целое число в бинарное представление (str).
"""
def deserializeIntArray(str):
"""
Десериализовать массив целых чисел из бинарного представления.
"""
class MCArray2(MemcacheObject):
def __init__(self, mc, name):
"""
Конструктор.
@param name: имя массива
@type name: C{str}
"""
super(MCArray2, self).__init__(mc)
self.key = 'array' + name
def fetch(self):
"""
Получить текущее значение массива.
@return: массив
@rtype: C{list}
"""
try:
return deserializeIntArray(self.mc.get(self.key))
except KeyError:
return []
def add(self, element):
"""
Добавить элемент в массив.
@param element: элемент, который необходимо добавить в массив
@type element: C{int}
"""
element = serializeInt(element)
while True:
try:
self.mc.append(self.key, element)
except KeyError:
return
try:
self.mc.add(self.key, element, 0)
except KeyError:
return
Обсуждение 2
Эта реализация практически повторяет аналогичный код для лога событий, только упрощенный в силу наличия всего одного ключа. По сравнению с первым вариантом реализации типа данных «массив» уменьшилось число операций memcached, все изменяющие массив процессы могут выполняться без задержек (отсутствие блокировок). Как и в первом варианте, не проверяется наличие дубликатов при добавлении элемента в массив (может быть и хорошо, и плохо, в зависимости от применения).
Возможны следующие улучшения (или расширения) описанного примера:
- использование нескольких ключей для хранения массива вместо одного, распределение элементов по ключам с использованием хэширования; такой вариант позволит ограничить размер каждого ключа, при условии что массив большой (содержит много элементов);
- реализация в том же стиле операции удаления элемента из массива, тогда массив можно представить
как последовательность операций «удалить» и «добавить», например сериализованное представление
+1 +3 +4 -3 +5
будет после десериализации образовывать массив[1, 4, 5]
; при этом как
операция добавления элемента, так и удаления, будет приводить к дописыванию байт в конец
сериализованного представления (атомарная операцияappend
).
Таблица
Задача
Необходимо хранить множество строк. Операции над множеством:
- проверка принадлежности строки множеству (самая частая операция);
- получение множества целиком, добавление элемента, удаление элемента — редкие операции.
Можно рассматривать данную структуру данных как таблицу, в которой осуществляется быстрый поиск нужной строки. Или как хэш, хранящийся в распределенной памяти.
Решение
def serializeArray(array):
"""
Сериализовать массив в бинарное представление.
"""
def deserializeArray(str):
"""
Десериализовать массив из бинарного представления.
"""
class MCTable(MemcacheObject):
def __init__(self, mc, name):
"""
Конструктор.
@param name: имя таблицы
@type name: C{str}
"""
super(MCTable, self).__init__(mc)
self.lock = MCLock(name)
self.key = 'table' + name
def has(self, key):
"""
Проверка наличия ключа в таблице.
@param key: ключ
@type key: C{str}
@rtype: C{bool}
"""
try:
self.mc.get(self.key + '_v_' + key)
return True
except KeyError:
return False
def fetch(self):
"""
Получить целиком значение элементов таблицы.
@return: значение таблицы
@rtype: C{list(str)}
"""
try:
return deserializeArray(self.mc.get(self.key + '_keys'))
except KeyError:
pass
def add(self, key):
"""
Добавить ключ в таблицу.
@param key: ключ
@type key: C{str}
"""
while not self.lock.try_lock():
pass
try:
try:
array = deserializeArray(self.mc.get(self.key + '_keys'))
except KeyError:
array = []
if key not in array:
array.append(key)
self.mc.set(self.key + '_v_' + key, 1, 0)
self.mc.set(self.key + '_keys', serializeArray(array), 0)
finally:
self.lock.unlock()
def delete(self, key):
"""
Удалить ключ из таблицы.
Реализация аналогична методу add().
"""
Обсуждение
Вообще говоря memcached представляет собой огромную хэш-таблицу, правда в ней отсутствует одна операция, которая необходима для нашей структуры данных: получение списка ключей. Поэтому реализация таблицы использует отдельные ключи для хранения каждого элемента таблицы, и отдельно еще один ключ для хранения списка всех её элементов. Реализация хранения списка всех элементов фактически совпадает с реализацией «массива 1». Для сериализации доступа к списку всех элементов используется блокировка, при этом методы
fetch
и add
не синхронизированы друг с другом, т.к. список всех элементов меняется атомарно и при чтении ключа мы всегда получим некоторое консистентное состояние.Проверка наличия ключа в таблице выполняется максимально быстро: проверяется наличие соответствующего ключа в memcached. Любое изменение списка элементов всегда происходит одновременно и в ключе, хранящем весь список, и в отдельных ключах для каждого элемента (которые используются только для проверки).
На основе приведенной схемы можно реализовать полноценный хэш, когда для каждого элемента таблицы будет храниться связанное значение, это значение необходимо будет записывать только в отдельные ключи, соответствующие элементам, а список элементов не будет содержать значений.
Заключение
Итак, приведем список «приемов» или «трюков», описанных в данной статье:
- атомарность операций с помощью memcached (пара
add
/set
и т.п.); - блокировки;
- теневые ключи;
- кольцевой буфер с автоматическим «отмиранием» ключей;
- блокировки и модель reader-writer.
В статье не рассматривались вопросы оптимизации, специфичной для memcached, например, использование multi-get запросов. Это делалось сознательно, чтобы не перегружать исходный код и рассказ. Во многих ситуациях приведенные выше примеры следует рассматривать скорее как псевдокод, чем как пример идеальной реализации на Python.
Если Вы нашли ошибку, хотите предложить более ясное, более оптимальное решение поставленным задачам, хотите предложить реализацию для какой-то еще структуры данных — буду рад комментариям и критике.