Riak это NoSQL решение, честная DHT (key/value storage) с дополнительными возможностями для разруливания конфликтов.
У распределенной хеш таблицы есть как плюсы, так и минусы. DHT хорошо масштабируется, но возможны потери данных из-за конфликтов конкурентного доступа, рассмотрим следующий пример:
Получилось, что клиент b переписал данные клиента a и никто об этом не знает (ни a, ни b, ни тот, кто прочтет данные по этому ключу позже).
Так как многие NoSQL базы данных в своей основе имеют DHT, интересно смотреть как они пытаются решить проблему конкурентного доступа.
Например, MongoDB использует compare-and-swap стратегию: с каждым документом (значением) храниться его версия, при обновлении указывается версия «предка» измененного документа, если в базе в момент обновления храниться предок, то обновление проходит, если нет, то нет: обновляющая сторона получает сообщение, и пытается провести обновление снова — аналог STM. Такой подход хорошо работает с шардами, но плохо с репликацией.
Riak решает проблему конкурентного доступа подобно системам контроля версий, он, как бы, сохраняет конфликтные версии в разных бранчах, предоставляя программе при следующей выборке провести merge. Такой подход позволяет разрешать конфликты, связанные не только с конкурентным доступом, но и с времянной изолированостью части кластера (partition tolerance: кластер машин может распаться на две части, обе части будут работать и смогут без проблем объединиться в будущем).
Riak накладыват больше условий на разработку, но обеспечивает масштабируемость и надежность данных при работе с большим объемом информации. Статья опишет, как «обойти» ограничения Riak при разработке типичных web приложений.
Рассмотрим первый примитив, реализованный на базе Riak — append-only список небольшого размера.
Представим, что мы пишем блог, к каждому посту которого будет не очень много комментариев и комментарий нельзя изменять после добавления. В этом случае, разумно в качестве значения хранить весь пост с комментариями, так операция чтения будет проходить за O(1). Опишем схему для данных:
Теперь представим, что два пользователя «одновременно» добавили комментарий:
Теперь в базе с id «post/13» хранятся две записи; и первый, кто обратиться по этому ключу, получит их обе и должен будет самостоятельно их смерджить. Для простоты, предположим, что пост редактировать нельзя, поэтому подойдет пост из любой «ветки», а так как комментарии могут только добавляться, то списки комментариев к обоим постам имеют общий префикс, следовательно, нужно его выделить и создать новый список из префикса, его дополнения первого списка и его дополнения второго списка. Операция мерджа будет следущая:
Где mergeLists определена следующем образом:
Очевидно, что mergeLists очень похожа на объединение множеств, а следовательно, если какой-то элемент был в a или b, то он будет и в результирующем списке, следовательно, при слиянии нет потерь данных. Получается, что сейчас мы научились писать в список в Riak, избегая проблем конкурентного доступа.
Если нужно смерджить несколько постов, то используем merge внутри fold комбинатора.
Следующий примитив, который мы рассмотрим будет списком фиксированного размера с возможностью удаления. В случае, когда он достигает максимального размера, из него выкидывается какой-нибудь элемент (например, самый старый, хотя в распределенных системах это понятие весьма условное). Так как размер списка фиксированный, будем хранить весь список опять как одно значение.
На этот примитив хорошо ложатся всевозможные уведомления. Во-первых, уведомление об событии намного менее важно, чем само событие; во-вторых, вряд ли пользователь хочет видеть уведомление о старом событии, если он давно не заходил в систему; в третьих, если пользователь уже изучает информацию о событии, то уведомление стоит удалить.
В отличии от предыдущей схемы, где в качестве значения хранился один список объектов, здесь будут храниться два списка: список «оповещений» и список удаленных «оповещений». В случае мержа соответствующие списки будут объединяться и, таким образом, удаленный объект останется в списке удаленных объектов, а добавленный в списке добавленных (естественно после мержа из добавленных нужно будет вычесть удаленные). Запишем более формально:

Проблема в том, что при таком определении операций, наши списки неограничено растут, хотя реализуют необходимый примитив. Попробуем ограничить эти списки: если список добавленных объектов вырос до максимума — переносим любой объект в список удаленных, если вырос список удаленных — удаляем из него какой-нибудь объект, опять чуть более формально:

После каждой операции add, delete и merge нужно выполнять операцию ram. Понятно, что граничив длинну списков мы в чем-то потеряли и, скорее всего, преобрели нежелательное поведение. Попробуем это померить. В нашем случае (когда пропажа объектов штатное дело) единственным нежелательным поведением является появление уже удаленного оповещения. Для измерения этого показателя я смоделировал процесс и произвел серию наблюдений. Вполне очевидно, что количество ошибок должно зависить от длинны списка и от произведения длительности обработки запроса на частоту запросов (это действительно так, я проверял), назовем этот параметр ключевым. Ниже идут несколько графиков, по которым можно понять динамику:
График отражает долю изменений списка, после которых удаленное «оповещение» вновь появлялось как новое, в зависимости от максимального числа элементов в списке. Ключевой параметр был фиксирован (0.8 и 2), что соответсвует примерно 8 запросам в сек и 20 запросам в сек. Ниже будет написано, что это не так мало, как кажется.

На графике отображается динамика процента ошибок в зависимости от ключевого параметра при фиксированной длинне списка (30 и 130 элементов соответственно).

По оси абсцисс отложен ключевой параметр, красная линия отвечает за значение 1; по оси ординат отложена длинна списка, красные линии отвечают за 100, 200 и 300 элементов. Черным отмечена зона параметров, при которых ошибка менее 1%.

Почему 10 запросов в сек это не так мало, как кажется. Во-первых, учитываются только запросы на запись, во-вторых, это не общее количество запросов, а кол-ко запросов к одному объекту. В случае если мы проектируем, например, google+, то 10 запросов в сек это не кол-во обращений ко всему гуглу, а прогнозируемое частота комментариев к одной записи.
Последний паттерн в этой статье — большой список, с возможностью добавлять записи в конец, читать записи как с конца, так и с начала, а так же получать за один запрос несколько записей. Предполагается, что удалений из списка будет очень мало.
Этот примитив хорошо описывает ленту в твиттере, а так же стены в социальных сетях, но для него можно придумать и другие применения.
В отличие от предыдущих схем, которые описывались одной парой ключ-значение, этот паттерн описывается несколькими — служебной парой ключ-значением с информацией о начале и конце списка, а так же собственно чанками данных, в которых хранится отрезок фиксированного размера из списка. Определим дата модель, а так же операцию merge для каждого типа данных:
Думаю, операции очевидны. Если хотим добавить элемент в список:
Как видно из статьи, на Riak можно положить распространенные в web схемы работы с данными и впоследствии получить безболезненное распределение этих данных по нескольким узлам. Это достигается это за счет того набора примитивов, которые предоставляет Riak программисту.
Мне подход Riak понравился из-за прозрачного подхода к разрешению конфликтов и гибкости при выборе ограничений (CAP) при каждом запросе.
У распределенной хеш таблицы есть как плюсы, так и минусы. DHT хорошо масштабируется, но возможны потери данных из-за конфликтов конкурентного доступа, рассмотрим следующий пример:
client a: def o-value = DHT.get("some-key");
client a: def a-value = changeValue(o-value);
client b: def o-value = DHT.get("some-key");
client a: DHT.put("some-key", a-value);
client b: def b-value = changeValue(o-value);
client b: DHT.put("some-key", b-value);Получилось, что клиент b переписал данные клиента a и никто об этом не знает (ни a, ни b, ни тот, кто прочтет данные по этому ключу позже).
Так как многие NoSQL базы данных в своей основе имеют DHT, интересно смотреть как они пытаются решить проблему конкурентного доступа.
Например, MongoDB использует compare-and-swap стратегию: с каждым документом (значением) храниться его версия, при обновлении указывается версия «предка» измененного документа, если в базе в момент обновления храниться предок, то обновление проходит, если нет, то нет: обновляющая сторона получает сообщение, и пытается провести обновление снова — аналог STM. Такой подход хорошо работает с шардами, но плохо с репликацией.
Riak решает проблему конкурентного доступа подобно системам контроля версий, он, как бы, сохраняет конфликтные версии в разных бранчах, предоставляя программе при следующей выборке провести merge. Такой подход позволяет разрешать конфликты, связанные не только с конкурентным доступом, но и с времянной изолированостью части кластера (partition tolerance: кластер машин может распаться на две части, обе части будут работать и смогут без проблем объединиться в будущем).
Riak накладыват больше условий на разработку, но обеспечивает масштабируемость и надежность данных при работе с большим объемом информации. Статья опишет, как «обойти» ограничения Riak при разработке типичных web приложений.
Блог
Рассмотрим первый примитив, реализованный на базе Riak — append-only список небольшого размера.
Представим, что мы пишем блог, к каждому посту которого будет не очень много комментариев и комментарий нельзя изменять после добавления. В этом случае, разумно в качестве значения хранить весь пост с комментариями, так операция чтения будет проходить за O(1). Опишем схему для данных:
public class Post { public static class Comment implements Comparable<Comment> { public String text; public int ts; //timestamp /* equals, hashCode, compareTo*/ } public String text; public List<Comment> comments; }
Теперь представим, что два пользователя «одновременно» добавили комментарий:
client a: def o-post = DHT.get("post/13");
client a: def a-post = addComment(o-value, "забыл хабракат и все, что после него");
client b: def o-post = DHT.get("some-key");
client a: DHT.put("post/13", a-post);
client b: def b-post = addComment(o-post, "автор посчитал это очевидным");
client b: DHT.put("post/13", b-post);Теперь в базе с id «post/13» хранятся две записи; и первый, кто обратиться по этому ключу, получит их обе и должен будет самостоятельно их смерджить. Для простоты, предположим, что пост редактировать нельзя, поэтому подойдет пост из любой «ветки», а так как комментарии могут только добавляться, то списки комментариев к обоим постам имеют общий префикс, следовательно, нужно его выделить и создать новый список из префикса, его дополнения первого списка и его дополнения второго списка. Операция мерджа будет следущая:
public static Post merge(Post a, Post b) { Post c = new Post(); c.text = a.text; c.comments = Mergers.mergeLists(a.comments, b.comments); return c; }
Где mergeLists определена следующем образом:
public static <T extends Comparable<T>> List<T> mergeLists(List<T> a, List<T> b) { List<T> result = new ArrayList<T>(); List<T> rest = new ArrayList<T>(); int max = Math.min(a.size(), b.size()); int i=0; // выделяем общий префикс for(;i<max && a.get(i).equals(b.get(i));i++) { result.add(a.get(i)); } // собираем хвосты for(int j=i;j<a.size();j++) { rest.add(a.get(j)); } for(int j=i;j<b.size();j++) { rest.add(b.get(j)); } // сортируем хвосты Collections.sort(rest); // добовляем хвост for(T item : rest) { result.add(item); } return result; }
Очевидно, что mergeLists очень похожа на объединение множеств, а следовательно, если какой-то элемент был в a или b, то он будет и в результирующем списке, следовательно, при слиянии нет потерь данных. Получается, что сейчас мы научились писать в список в Riak, избегая проблем конкурентного доступа.
Если нужно смерджить несколько постов, то используем merge внутри fold комбинатора.
Оповещения (сообщения, обновления)
Следующий примитив, который мы рассмотрим будет списком фиксированного размера с возможностью удаления. В случае, когда он достигает максимального размера, из него выкидывается какой-нибудь элемент (например, самый старый, хотя в распределенных системах это понятие весьма условное). Так как размер списка фиксированный, будем хранить весь список опять как одно значение.
На этот примитив хорошо ложатся всевозможные уведомления. Во-первых, уведомление об событии намного менее важно, чем само событие; во-вторых, вряд ли пользователь хочет видеть уведомление о старом событии, если он давно не заходил в систему; в третьих, если пользователь уже изучает информацию о событии, то уведомление стоит удалить.
В отличии от предыдущей схемы, где в качестве значения хранился один список объектов, здесь будут храниться два списка: список «оповещений» и список удаленных «оповещений». В случае мержа соответствующие списки будут объединяться и, таким образом, удаленный объект останется в списке удаленных объектов, а добавленный в списке добавленных (естественно после мержа из добавленных нужно будет вычесть удаленные). Запишем более формально:

Проблема в том, что при таком определении операций, наши списки неограничено растут, хотя реализуют необходимый примитив. Попробуем ограничить эти списки: если список добавленных объектов вырос до максимума — переносим любой объект в список удаленных, если вырос список удаленных — удаляем из него какой-нибудь объект, опять чуть более формально:

После каждой операции add, delete и merge нужно выполнять операцию ram. Понятно, что граничив длинну списков мы в чем-то потеряли и, скорее всего, преобрели нежелательное поведение. Попробуем это померить. В нашем случае (когда пропажа объектов штатное дело) единственным нежелательным поведением является появление уже удаленного оповещения. Для измерения этого показателя я смоделировал процесс и произвел серию наблюдений. Вполне очевидно, что количество ошибок должно зависить от длинны списка и от произведения длительности обработки запроса на частоту запросов (это действительно так, я проверял), назовем этот параметр ключевым. Ниже идут несколько графиков, по которым можно понять динамику:
Процент ошибок записи от длинны списка
График отражает долю изменений списка, после которых удаленное «оповещение» вновь появлялось как новое, в зависимости от максимального числа элементов в списке. Ключевой параметр был фиксирован (0.8 и 2), что соответсвует примерно 8 запросам в сек и 20 запросам в сек. Ниже будет написано, что это не так мало, как кажется.

Процент ошибок записи от ключевого параметра
На графике отображается динамика процента ошибок в зависимости от ключевого параметра при фиксированной длинне списка (30 и 130 элементов соответственно).

1% зона ошибок
По оси абсцисс отложен ключевой параметр, красная линия отвечает за значение 1; по оси ординат отложена длинна списка, красные линии отвечают за 100, 200 и 300 элементов. Черным отмечена зона параметров, при которых ошибка менее 1%.

Почему 10 запросов в сек это не так мало, как кажется. Во-первых, учитываются только запросы на запись, во-вторых, это не общее количество запросов, а кол-ко запросов к одному объекту. В случае если мы проектируем, например, google+, то 10 запросов в сек это не кол-во обращений ко всему гуглу, а прогнозируемое частота комментариев к одной записи.
Поток (стена в вконтакте или лента в твиттер)
Последний паттерн в этой статье — большой список, с возможностью добавлять записи в конец, читать записи как с конца, так и с начала, а так же получать за один запрос несколько записей. Предполагается, что удалений из списка будет очень мало.
Этот примитив хорошо описывает ленту в твиттере, а так же стены в социальных сетях, но для него можно придумать и другие применения.
В отличие от предыдущих схем, которые описывались одной парой ключ-значение, этот паттерн описывается несколькими — служебной парой ключ-значением с информацией о начале и конце списка, а так же собственно чанками данных, в которых хранится отрезок фиксированного размера из списка. Определим дата модель, а так же операцию merge для каждого типа данных:
class Info { public String key; public String prefix; public int lastChunk = 0; public String getChunkKey(int chunk) { return prefix + chunk; } public Info mergeWith(Info brother) { Info info = new Info(); info.key = key; info.prefix = prefix; info.lastChunk = Math.max(lastChunk, brother.lastChunk); return info; } } class Chunk<T extends Comparable<T>> { public String key; List<T> added = new ArrayList<T>(); List<T> deleted = new ArrayList<T>(); public void add(T obj) { added.add(obj); } public void delete(T obj) { deleted.add(obj); } public Iterable<T> getData() { List<T> data = new ArrayList<T>(added); data.removeAll(deleted); return data; } public Chunk<T> mergeWith(Chunk<T> brother) { Chunk<T> chunk = new Chunk(); chunk.key = key; chunk.added = mergeLists(added, brother.added); chunk.deleted = mergeLists(deleted, brother.deleted); return chunk; } }
Думаю, операции очевидны. Если хотим добавить элемент в список:
- достаем info, которая соответствует каждому списку
- определяем и достаем последний чанк
- если он не полон добавляем запись и сохраняем чанк
- иначе создаем новый чанк, добавляем запись и сохраняем его; кроме того правим «указатель» на последний чанк в info и так же сохраняем
Заключение
Как видно из статьи, на Riak можно положить распространенные в web схемы работы с данными и впоследствии получить безболезненное распределение этих данных по нескольким узлам. Это достигается это за счет того набора примитивов, которые предоставляет Riak программисту.
Мне подход Riak понравился из-за прозрачного подхода к разрешению конфликтов и гибкости при выборе ограничений (CAP) при каждом запросе.
