Riak это NoSQL решение, честная DHT (key/value storage) с дополнительными возможностями для разруливания конфликтов.
У распределенной хеш таблицы есть как плюсы, так и минусы. DHT хорошо масштабируется, но возможны потери данных из-за конфликтов конкурентного доступа, рассмотрим следующий пример:
Получилось, что клиент b переписал данные клиента a и никто об этом не знает (ни a, ни b, ни тот, кто прочтет данные по этому ключу позже).
Так как многие NoSQL базы данных в своей основе имеют DHT, интересно смотреть как они пытаются решить проблему конкурентного доступа.
Например, MongoDB использует compare-and-swap стратегию: с каждым документом (значением) храниться его версия, при обновлении указывается версия «предка» измененного документа, если в базе в момент обновления храниться предок, то обновление проходит, если нет, то нет: обновляющая сторона получает сообщение, и пытается провести обновление снова — аналог STM. Такой подход хорошо работает с шардами, но плохо с репликацией.
Riak решает проблему конкурентного доступа подобно системам контроля версий, он, как бы, сохраняет конфликтные версии в разных бранчах, предоставляя программе при следующей выборке провести merge. Такой подход позволяет разрешать конфликты, связанные не только с конкурентным доступом, но и с времянной изолированостью части кластера (partition tolerance: кластер машин может распаться на две части, обе части будут работать и смогут без проблем объединиться в будущем).
Riak накладыват больше условий на разработку, но обеспечивает масштабируемость и надежность данных при работе с большим объемом информации. Статья опишет, как «обойти» ограничения Riak при разработке типичных web приложений.
Рассмотрим первый примитив, реализованный на базе Riak — append-only список небольшого размера.
Представим, что мы пишем блог, к каждому посту которого будет не очень много комментариев и комментарий нельзя изменять после добавления. В этом случае, разумно в качестве значения хранить весь пост с комментариями, так операция чтения будет проходить за O(1). Опишем схему для данных:
Теперь представим, что два пользователя «одновременно» добавили комментарий:
Теперь в базе с id «post/13» хранятся две записи; и первый, кто обратиться по этому ключу, получит их обе и должен будет самостоятельно их смерджить. Для простоты, предположим, что пост редактировать нельзя, поэтому подойдет пост из любой «ветки», а так как комментарии могут только добавляться, то списки комментариев к обоим постам имеют общий префикс, следовательно, нужно его выделить и создать новый список из префикса, его дополнения первого списка и его дополнения второго списка. Операция мерджа будет следущая:
Где mergeLists определена следующем образом:
Очевидно, что mergeLists очень похожа на объединение множеств, а следовательно, если какой-то элемент был в a или b, то он будет и в результирующем списке, следовательно, при слиянии нет потерь данных. Получается, что сейчас мы научились писать в список в Riak, избегая проблем конкурентного доступа.
Если нужно смерджить несколько постов, то используем merge внутри fold комбинатора.
Следующий примитив, который мы рассмотрим будет списком фиксированного размера с возможностью удаления. В случае, когда он достигает максимального размера, из него выкидывается какой-нибудь элемент (например, самый старый, хотя в распределенных системах это понятие весьма условное). Так как размер списка фиксированный, будем хранить весь список опять как одно значение.
На этот примитив хорошо ложатся всевозможные уведомления. Во-первых, уведомление об событии намного менее важно, чем само событие; во-вторых, вряд ли пользователь хочет видеть уведомление о старом событии, если он давно не заходил в систему; в третьих, если пользователь уже изучает информацию о событии, то уведомление стоит удалить.
В отличии от предыдущей схемы, где в качестве значения хранился один список объектов, здесь будут храниться два списка: список «оповещений» и список удаленных «оповещений». В случае мержа соответствующие списки будут объединяться и, таким образом, удаленный объект останется в списке удаленных объектов, а добавленный в списке добавленных (естественно после мержа из добавленных нужно будет вычесть удаленные). Запишем более формально:
Проблема в том, что при таком определении операций, наши списки неограничено растут, хотя реализуют необходимый примитив. Попробуем ограничить эти списки: если список добавленных объектов вырос до максимума — переносим любой объект в список удаленных, если вырос список удаленных — удаляем из него какой-нибудь объект, опять чуть более формально:
После каждой операции add, delete и merge нужно выполнять операцию ram. Понятно, что граничив длинну списков мы в чем-то потеряли и, скорее всего, преобрели нежелательное поведение. Попробуем это померить. В нашем случае (когда пропажа объектов штатное дело) единственным нежелательным поведением является появление уже удаленного оповещения. Для измерения этого показателя я смоделировал процесс и произвел серию наблюдений. Вполне очевидно, что количество ошибок должно зависить от длинны списка и от произведения длительности обработки запроса на частоту запросов (это действительно так, я проверял), назовем этот параметр ключевым. Ниже идут несколько графиков, по которым можно понять динамику:
График отражает долю изменений списка, после которых удаленное «оповещение» вновь появлялось как новое, в зависимости от максимального числа элементов в списке. Ключевой параметр был фиксирован (0.8 и 2), что соответсвует примерно 8 запросам в сек и 20 запросам в сек. Ниже будет написано, что это не так мало, как кажется.
На графике отображается динамика процента ошибок в зависимости от ключевого параметра при фиксированной длинне списка (30 и 130 элементов соответственно).
По оси абсцисс отложен ключевой параметр, красная линия отвечает за значение 1; по оси ординат отложена длинна списка, красные линии отвечают за 100, 200 и 300 элементов. Черным отмечена зона параметров, при которых ошибка менее 1%.
Почему 10 запросов в сек это не так мало, как кажется. Во-первых, учитываются только запросы на запись, во-вторых, это не общее количество запросов, а кол-ко запросов к одному объекту. В случае если мы проектируем, например, google+, то 10 запросов в сек это не кол-во обращений ко всему гуглу, а прогнозируемое частота комментариев к одной записи.
Последний паттерн в этой статье — большой список, с возможностью добавлять записи в конец, читать записи как с конца, так и с начала, а так же получать за один запрос несколько записей. Предполагается, что удалений из списка будет очень мало.
Этот примитив хорошо описывает ленту в твиттере, а так же стены в социальных сетях, но для него можно придумать и другие применения.
В отличие от предыдущих схем, которые описывались одной парой ключ-значение, этот паттерн описывается несколькими — служебной парой ключ-значением с информацией о начале и конце списка, а так же собственно чанками данных, в которых хранится отрезок фиксированного размера из списка. Определим дата модель, а так же операцию merge для каждого типа данных:
Думаю, операции очевидны. Если хотим добавить элемент в список:
Как видно из статьи, на Riak можно положить распространенные в web схемы работы с данными и впоследствии получить безболезненное распределение этих данных по нескольким узлам. Это достигается это за счет того набора примитивов, которые предоставляет Riak программисту.
Мне подход Riak понравился из-за прозрачного подхода к разрешению конфликтов и гибкости при выборе ограничений (CAP) при каждом запросе.
У распределенной хеш таблицы есть как плюсы, так и минусы. DHT хорошо масштабируется, но возможны потери данных из-за конфликтов конкурентного доступа, рассмотрим следующий пример:
client a: def o-value = DHT.get("some-key");
client a: def a-value = changeValue(o-value);
client b: def o-value = DHT.get("some-key");
client a: DHT.put("some-key", a-value);
client b: def b-value = changeValue(o-value);
client b: DHT.put("some-key", b-value);
Получилось, что клиент b переписал данные клиента a и никто об этом не знает (ни a, ни b, ни тот, кто прочтет данные по этому ключу позже).
Так как многие NoSQL базы данных в своей основе имеют DHT, интересно смотреть как они пытаются решить проблему конкурентного доступа.
Например, MongoDB использует compare-and-swap стратегию: с каждым документом (значением) храниться его версия, при обновлении указывается версия «предка» измененного документа, если в базе в момент обновления храниться предок, то обновление проходит, если нет, то нет: обновляющая сторона получает сообщение, и пытается провести обновление снова — аналог STM. Такой подход хорошо работает с шардами, но плохо с репликацией.
Riak решает проблему конкурентного доступа подобно системам контроля версий, он, как бы, сохраняет конфликтные версии в разных бранчах, предоставляя программе при следующей выборке провести merge. Такой подход позволяет разрешать конфликты, связанные не только с конкурентным доступом, но и с времянной изолированостью части кластера (partition tolerance: кластер машин может распаться на две части, обе части будут работать и смогут без проблем объединиться в будущем).
Riak накладыват больше условий на разработку, но обеспечивает масштабируемость и надежность данных при работе с большим объемом информации. Статья опишет, как «обойти» ограничения Riak при разработке типичных web приложений.
Блог
Рассмотрим первый примитив, реализованный на базе Riak — append-only список небольшого размера.
Представим, что мы пишем блог, к каждому посту которого будет не очень много комментариев и комментарий нельзя изменять после добавления. В этом случае, разумно в качестве значения хранить весь пост с комментариями, так операция чтения будет проходить за O(1). Опишем схему для данных:
public class Post {
public static class Comment implements Comparable<Comment> {
public String text;
public int ts; //timestamp
/* equals, hashCode, compareTo*/
}
public String text;
public List<Comment> comments;
}
Теперь представим, что два пользователя «одновременно» добавили комментарий:
client a: def o-post = DHT.get("post/13");
client a: def a-post = addComment(o-value, "забыл хабракат и все, что после него");
client b: def o-post = DHT.get("some-key");
client a: DHT.put("post/13", a-post);
client b: def b-post = addComment(o-post, "автор посчитал это очевидным");
client b: DHT.put("post/13", b-post);
Теперь в базе с id «post/13» хранятся две записи; и первый, кто обратиться по этому ключу, получит их обе и должен будет самостоятельно их смерджить. Для простоты, предположим, что пост редактировать нельзя, поэтому подойдет пост из любой «ветки», а так как комментарии могут только добавляться, то списки комментариев к обоим постам имеют общий префикс, следовательно, нужно его выделить и создать новый список из префикса, его дополнения первого списка и его дополнения второго списка. Операция мерджа будет следущая:
public static Post merge(Post a, Post b) {
Post c = new Post();
c.text = a.text;
c.comments = Mergers.mergeLists(a.comments, b.comments);
return c;
}
Где mergeLists определена следующем образом:
public static <T extends Comparable<T>> List<T> mergeLists(List<T> a, List<T> b) {
List<T> result = new ArrayList<T>();
List<T> rest = new ArrayList<T>();
int max = Math.min(a.size(), b.size());
int i=0;
// выделяем общий префикс
for(;i<max && a.get(i).equals(b.get(i));i++) {
result.add(a.get(i));
}
// собираем хвосты
for(int j=i;j<a.size();j++) {
rest.add(a.get(j));
}
for(int j=i;j<b.size();j++) {
rest.add(b.get(j));
}
// сортируем хвосты
Collections.sort(rest);
// добовляем хвост
for(T item : rest) {
result.add(item);
}
return result;
}
Очевидно, что mergeLists очень похожа на объединение множеств, а следовательно, если какой-то элемент был в a или b, то он будет и в результирующем списке, следовательно, при слиянии нет потерь данных. Получается, что сейчас мы научились писать в список в Riak, избегая проблем конкурентного доступа.
Если нужно смерджить несколько постов, то используем merge внутри fold комбинатора.
Оповещения (сообщения, обновления)
Следующий примитив, который мы рассмотрим будет списком фиксированного размера с возможностью удаления. В случае, когда он достигает максимального размера, из него выкидывается какой-нибудь элемент (например, самый старый, хотя в распределенных системах это понятие весьма условное). Так как размер списка фиксированный, будем хранить весь список опять как одно значение.
На этот примитив хорошо ложатся всевозможные уведомления. Во-первых, уведомление об событии намного менее важно, чем само событие; во-вторых, вряд ли пользователь хочет видеть уведомление о старом событии, если он давно не заходил в систему; в третьих, если пользователь уже изучает информацию о событии, то уведомление стоит удалить.
В отличии от предыдущей схемы, где в качестве значения хранился один список объектов, здесь будут храниться два списка: список «оповещений» и список удаленных «оповещений». В случае мержа соответствующие списки будут объединяться и, таким образом, удаленный объект останется в списке удаленных объектов, а добавленный в списке добавленных (естественно после мержа из добавленных нужно будет вычесть удаленные). Запишем более формально:
Проблема в том, что при таком определении операций, наши списки неограничено растут, хотя реализуют необходимый примитив. Попробуем ограничить эти списки: если список добавленных объектов вырос до максимума — переносим любой объект в список удаленных, если вырос список удаленных — удаляем из него какой-нибудь объект, опять чуть более формально:
После каждой операции add, delete и merge нужно выполнять операцию ram. Понятно, что граничив длинну списков мы в чем-то потеряли и, скорее всего, преобрели нежелательное поведение. Попробуем это померить. В нашем случае (когда пропажа объектов штатное дело) единственным нежелательным поведением является появление уже удаленного оповещения. Для измерения этого показателя я смоделировал процесс и произвел серию наблюдений. Вполне очевидно, что количество ошибок должно зависить от длинны списка и от произведения длительности обработки запроса на частоту запросов (это действительно так, я проверял), назовем этот параметр ключевым. Ниже идут несколько графиков, по которым можно понять динамику:
Процент ошибок записи от длинны списка
График отражает долю изменений списка, после которых удаленное «оповещение» вновь появлялось как новое, в зависимости от максимального числа элементов в списке. Ключевой параметр был фиксирован (0.8 и 2), что соответсвует примерно 8 запросам в сек и 20 запросам в сек. Ниже будет написано, что это не так мало, как кажется.
Процент ошибок записи от ключевого параметра
На графике отображается динамика процента ошибок в зависимости от ключевого параметра при фиксированной длинне списка (30 и 130 элементов соответственно).
1% зона ошибок
По оси абсцисс отложен ключевой параметр, красная линия отвечает за значение 1; по оси ординат отложена длинна списка, красные линии отвечают за 100, 200 и 300 элементов. Черным отмечена зона параметров, при которых ошибка менее 1%.
Почему 10 запросов в сек это не так мало, как кажется. Во-первых, учитываются только запросы на запись, во-вторых, это не общее количество запросов, а кол-ко запросов к одному объекту. В случае если мы проектируем, например, google+, то 10 запросов в сек это не кол-во обращений ко всему гуглу, а прогнозируемое частота комментариев к одной записи.
Поток (стена в вконтакте или лента в твиттер)
Последний паттерн в этой статье — большой список, с возможностью добавлять записи в конец, читать записи как с конца, так и с начала, а так же получать за один запрос несколько записей. Предполагается, что удалений из списка будет очень мало.
Этот примитив хорошо описывает ленту в твиттере, а так же стены в социальных сетях, но для него можно придумать и другие применения.
В отличие от предыдущих схем, которые описывались одной парой ключ-значение, этот паттерн описывается несколькими — служебной парой ключ-значением с информацией о начале и конце списка, а так же собственно чанками данных, в которых хранится отрезок фиксированного размера из списка. Определим дата модель, а так же операцию merge для каждого типа данных:
class Info {
public String key;
public String prefix;
public int lastChunk = 0;
public String getChunkKey(int chunk) {
return prefix + chunk;
}
public Info mergeWith(Info brother) {
Info info = new Info();
info.key = key;
info.prefix = prefix;
info.lastChunk = Math.max(lastChunk, brother.lastChunk);
return info;
}
}
class Chunk<T extends Comparable<T>> {
public String key;
List<T> added = new ArrayList<T>();
List<T> deleted = new ArrayList<T>();
public void add(T obj) {
added.add(obj);
}
public void delete(T obj) {
deleted.add(obj);
}
public Iterable<T> getData() {
List<T> data = new ArrayList<T>(added);
data.removeAll(deleted);
return data;
}
public Chunk<T> mergeWith(Chunk<T> brother) {
Chunk<T> chunk = new Chunk();
chunk.key = key;
chunk.added = mergeLists(added, brother.added);
chunk.deleted = mergeLists(deleted, brother.deleted);
return chunk;
}
}
Думаю, операции очевидны. Если хотим добавить элемент в список:
- достаем info, которая соответствует каждому списку
- определяем и достаем последний чанк
- если он не полон добавляем запись и сохраняем чанк
- иначе создаем новый чанк, добавляем запись и сохраняем его; кроме того правим «указатель» на последний чанк в info и так же сохраняем
Заключение
Как видно из статьи, на Riak можно положить распространенные в web схемы работы с данными и впоследствии получить безболезненное распределение этих данных по нескольким узлам. Это достигается это за счет того набора примитивов, которые предоставляет Riak программисту.
Мне подход Riak понравился из-за прозрачного подхода к разрешению конфликтов и гибкости при выборе ограничений (CAP) при каждом запросе.