Реализация консенсус-алгоритма RAFT для распределенного K-V хранилища на Java

    И снова здравствуйте. Несколько дней назад началось обучение в новой группе по курсу «Архитектор ПО», а сегодня мы хотели бы поделиться статьей, которую написал один из студентов курса — Плешаков Антон (руководитель направления разработки в компании «Программная логистика» и co-founder в Clusterra).




    В настоящее время распределенные микросервисные системы стали фактически промышленным стандартом, причем не только в мире энтерпрайза. Преимущества использования распределенных систем описывались и обсуждались уже неоднократно. Плюсы микросервисов всем давно известны: технологии под задачу, composability, scalability, масштабирование разработки, снижение TTM и так далее. Очевидно что разработка распределенных приложений дает больше опций для своевременной реакции на растущие запросы бизнеса и цифровизацию всего вокруг.

    Важно также отметить, что на текущий момент очень важным фактором, влияющим на выбор стратегии разработки в пользу микросервисов, является — наличие всевозможных готовых инфраструктурных решений, берущих на себя решение проблем, связанных с дополнительными издержками на эксплуатацию распределенной системы. Речь идет о системах контейнерной оркестрации, service mash, средствах распределенной трассировки, мониторинга, логирования и прочее прочее. Можно смело утверждать что большинство факторов, ранее упоминавшихся как минусы микросервисного подхода, на сегодняшний день не имеют такого большого влияния, как пару лет назад.

    Исходя из современных реалий, большинство разработчиков стремятся при первой же возможности перейти от монолитной структуры к микросервисной. Один из первых шагов, который можно сделать, не прибегая к тотальному рефакторингу и серьезной декомпозиции это добиться от системы горизонтальной масштабируемости. То есть превратить свое монолитное приложение в кластер, возможно даже состоящий из тех же самых монолитов, но позволяющий динамически варьировать их количество.

    При попытке достигнуть горизонтальной масштабируемости, очень быстро и очень остро встает вопрос синхронизации данных внутри кластера. К счастью, все современные СУБД так или иначе поддерживают репликацию данных между узлами. Разработчику нужно просто подобрать СУБД под задачу и определиться, какие свойства системы (согласно CAP теоремы) ему необходимы, CP или AP, и вопрос решен. В том случае, когда требуется именно CP и требования к консистентности высокие, одним из методов решения проблемы синхронизации данных является использование кластера, поддерживающего консенсус-алгоритм RAFT.

    Этот достаточно новый алгоритм (был разработан в 2012 году) дает высокую гарантию консистентности и очень популярен. Я решил разобраться, как он работает, и написал свою реализацию консистентного key-value хранилища на Java (Spring Boot).

    Есть ли смысл реализовывать какой-либо распределенный алгоритм самостоятельно? Понятно что можно взять готовую реализацию какого-либо распределенного алгоритма, и с высочайшей долей вероятности эта реализация будет лучше самодельного “велосипеда”. Например, можно использовать СУБД, поддерживающую необходимый уровень консистентности. Или же можно развернуть Zookeeper. Или можно найти подходящий для вашего языка фреймворк. Для java есть Atomix, который отлично решает проблемы синхронизации распределенных данных.

    Но с другой стороны. Если брать готовое решение, то использование внешнего приложения — это как правило добавление дополнительной точки отказа в вашу систему. А фреймворки могут быть избыточны или сложны в эксплуатации и изучении, а могут и вовсе не существовать для вашего языка программирования. Кроме того самостоятельная реализация консенсус алгоритма это крайне интересная инженерная задача, которая расширяет ваш кругозор и дает понимание, как решать более оптимальным методом проблемы, возникающие при взаимодействии сервисов в кластере.

    Поскольку спецификация алгоритма содержит в себе комплекс мер по поддержанию целостности данных, вы можете воспользоваться полученным знаниями и даже использовать алгоритм не целиком. Любая часть алгоритма может быть полезна в реальной жизни. Допустим у вас есть набор воркеров для параллельного парсинга файлов. Воркеры равнозначны, но вы хотите назначить один из воркеров в качестве координатора и при падении воркера-координатора назначать координатором любой другой свободный воркер. В этом вам поможет первая половина алгоритма RAFT, в которой описывается как выбирать лидера среди равнозначных узлов. Или же например, если у вас есть всего два узла в отношении master-slave, вы вполне можете воспользоваться правилами репликации, описанных спецификации RAFT для организации обмена данными в вашем более простом случае.

    Статья по сути является практическим руководством как реализовать RAFT самостоятельно. Сам алгоритм и теоретические аспекты его работы разбираться не будут. Можно почитать краткое описание вот в этой отличной статье или же изучить полную спецификацию здесь. Там же можно найти очень наглядную визуализацию работы алгоритма.

    Общее описание решения


    В статье разобрана та часть кода, которая непосредственно связана с реализацией алгоритма. В конце статьи есть ссылка на репозиторий, там можно посмотреть весь код целиком.

    Задача ставилась следующая. Разработать распределенную систему, позволяющую хранить данные в key-value БД. Данные каждой ноды должны быть согласованы, а именно, если данные попали в БД одной ноды и большинство нод подтвердило, что ими эти данные тоже получены, то рано или поздно эти данные окажутся в БД каждой ноды. При отключении части кластера и при его подключении обратно ноды, которые были вне кластера, должны догнать основной кластер и синхронизироваться. Каждая нода предоставляет REST API для записи и чтения данных БД. Система состоит из двух модулей для двух типов нод: клиент и сервер. Ниже мы рассмотрим особенности реализации непосредственно сервера. Код клиента есть в репозитории.

    Серверная нода может работать в трех состояниях:

    • Фоловер(follower). Принимает запросы на чтение от клиента. Принимает heartbeat от лидера
    • Кандидат(candidate). Принимает запросы на чтение от клиента. Рассылает vote запросы другим нодам
    • Лидер(leader). Принимает запросы на чтение и на запись. Рассылает heartbeat запросы другим нодам. Рассылает append запросы данными другим нодам.

    Период “лидерства” одного из узлов называется раунд (term). Новый кандидат открывает новый раунд.

    Хранение данных


    Каждая нода обеспечивает доступ к хранилищу лога операций, в котором последовательно фиксируются операции по изменению данных.

    https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/operations/OperationsLog.java


    public interface OperationsLog {
       void append(Operation operation);
       Operation get(Integer index);
       List<Operation> all();
    
       Long getTerm(Integer index);
       Integer getLastIndex();
       Long getLastTerm();
    
       void removeAllFromIndex(int newOperationIndex);
    }

    Каждая операция, кроме данных и типа (вставка, изменение, удаление), содержит в себе номер раунда, в рамках которого она создана. Помимо этого, у каждой операции есть индекс, который последовательно возрастает. Важно, что все операции вставляются в логи фолловеров в том же порядке, в котором они вставлялись в лог лидера.

    Каждая нода имеет доступ к БД, в которой хранятся непосредственно данные.

    https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/storage/Storage.java

    public interface Storage {
       List<Entry> all();
       String get(Long key);
       void insert(Long key, String val);
       void update(Long key, String val);
       void delete(Long key);
    }

    В текущей реализации используются embedded in-memory решения как для лога, так и для БД (обычные конкурентные List и Map). В случае необходимости можно просто имплементировать соответствующий интерфейс для поддержки иных типов хранилищ.

    Применение операций из лога на БД осуществляет распределенная машина состояния (state machine). Машина состояния — это такой механизм, который отвечает за изменение состояния кластера, ограничивая применение неправильных изменений (операции не по порядку или отключившийся узел, считающий себя лидером). Для того чтобы изменения считались валидными и для того чтобы их можно было применить к БД они должны пройти ряд проверок и соответствовать определенным критериям, что как раз и обеспечивается машиной состояния.

    Для лидера операция применяется к БД, если большинство узлов подтвердили тот факт, что операция реплицирована в их лог тоже. Для фолловера операция применяется к БД, если от лидера получен сигнал о том, что она попала в его БД.

    Таймеры


    Каждая нода обеспечивает обмен данными с другими нодами.

    Поддерживаются два типа запросов:

    • vote при проведении раунда голосования
    • append, он же heartbeat (если без данных), для репликации данных лога фолловерам и для предотвращения старта нового раунда голосования.

    Факт наступления того или иного события определяется таймером. На ноде запущены два вида таймеров:

    • vote. Для запуска раунда голосовании. У каждой ноды определен свой интервал, по истечении которого он попытается начать новое голосование. Отсчет начинается заново при получении heartbeat от лидера.
    • heartbeat. Для отправки лидером append запроса фолловерам. Если узел не получает heartbeat и таймер голосования истек, он становится кандидатом и инициирует выборы, повышает номер раунда голосования и рассылает vote запросы другим нодам. Если нода соберет большинство голосов, то она становится лидером и начинает рассылать heartbeat.

    Текущее состояние узла


    Каждая нода хранит данные о текущем состоянии.

    https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/context/Context.java

    public interface Context {
       Integer getId(); // идентификатор текущей ноды 
       State getState();//статус: лидер, кандидат, фоловер 
       Integer getVotedFor(); 
                   //Идентификатор узла за который был отдан голос в текущем раунде 
       Long getCurrentTerm(); //текущий раунд 
       Integer getCommitIndex(); //индекс последней примененной операции 
       List<Peer> getPeers(); //список нод с которыми ведется обмен 
    }

    Нода-лидер также хранит метаданные тех узлов, которым она реплицирует данные.

    https://github.com/pleshakoff/raft/blob/master/server/src/main/java/com/raft/server/node/peers/Peer.java

    public interface Peer {
       Integer getId(); //идентификатор узла 
       Integer getNextIndex(); //индекс следующей операции, которую готов принять узел
       Integer getMatchIndex();//индекс последней примененной операции
       Boolean getVoteGranted(); //голосовал ли узел в этом раунде
    }

    Метаданные узлов обновляются лидером при получении ответов от фолловеров. Они используются для определения лидером, какую следующую операцию по индексу готов принять фолловер и какие операции уже добавлены в лог фолловера.

    Голосование


    За проведение голосования отвечает класс ElectionService

    public interface ElectionService {
       void processElection();
       AnswerVoteDTO vote(RequestVoteDTO requestVoteDTO);
    } 

    Отправка запроса на голосование


    Если узел является фолловером и не получает heartbeat в течение заданного для ожидания периода, то он повышает свой текущий раунд, объявляет себя кандидатом и начинает рассылать vote запросы другим узлам. Если ему удастся собрать кворум и большинство узлов отдаст ему голос, то он станет новым лидером. В терминах RAFT, кворум — это больше половины всех узлов (51%).

    Разберем метод processElection класса ElectionServiceImpl, который вызывается vote-таймером при наступлении срока голосования и отправляет узлам запрос на голосование.

    //1
    context.setState(CANDIDATE); 
    Long term = context.incCurrentTerm(); 
    context.setVotedFor(context.getId()); 
    
    List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());
    long voteGrantedCount = 1L;
    long voteRevokedCount = 0L;
    
    //2
    while (checkCurrentElectionStatus(term)) {
       List<AnswerVoteDTO> answers = getVoteFromAllPeers(term, peersIds);
       peersIds = new ArrayList<>();
       for (AnswerVoteDTO answer : answers) {
           //3
           if (answer.getStatusCode().equals(OK)) {
               //4
               if (answer.getTerm()>context.getCurrentTerm()) {
                   context.setTermGreaterThenCurrent(answer.getTerm());
                   return;
               }
               if (answer.isVoteGranted()) {
                   //5 
                   context.getPeer(answer.getId()).setVoteGranted(true);
                   voteGrantedCount++;
               } else
                   //6 
                   voteRevokedCount++;
           } else {
              peersIds.add(answer.getId());
           }
       }
      //7
      if (voteGrantedCount >= context.getQuorum()) {
           winElection(term);
           return;
       } else if (voteRevokedCount >= context.getQuorum()) {
           loseElection(term);
           return;
       } 

    1. Устанавливаем статус “Кандидат”. Повышаем номер раунда и голосуем за себя.
    2. Запросы узлам отправляются, пока большая часть узлов не ответит (не важно положительно или отрицательно). Если какие-то узлы недоступны, то запросы будут им отправляться до тех пор, пока не будет получен ответ или же пока не придет heartbeat от лидера и кандидат тогда превратится в фолловера или пока не запустится следующий раунд голосования по таймеру.
    3. Если какой-либо ответ получен, то анализируется его содержимое. Если же ответа нет, то узел добавляется в список узлов, которым надо отправить запрос по-новой.
    4. Если текущий раунд отвечающего нам узла больше нашего, значит он начал голосование раньше и у него больше шансов стать лидером или он уже лидер или наш узел был отключен от основного кластера и отстал от жизни. Устанавливаем текущий раунд равным полученному, превращаемся в фолловеров и ждем heartbeat или начала нового раунда голосования.
    5. Узел проголосовал за нас! Увеличиваем количество отдавших голос за нас узлов и фиксируем, что этот узел голосовал за нас.
    6. Проголосовали не за нас, тоже считаем.
    7. Если кворум собран и узел выиграл выборы, устанавливаем статус “Лидер”. В противном случае становимся фолловером и ждем.

    Также надо отметить, что когда узел становится лидером, то в хранящемся у лидера списке узлов для каждого устанавливается Next Index, равный последнему индексу в логе лидера плюс 1. Начиная с этого индекса, лидер будет пытаться обновлять логи фолловеров. На самом деле этот хранимый у лидера индекс может не соответствовать реальному индексу лога фолловера и актуальное значение будет получено только при обмене данными с фолловером и будет скорректировано. Но нужна какая-то отправная точка.

      private void winElection(Long term) {
           context.setState(LEADER);
           context.getPeers().forEach(peer ->
                   peer.setNextIndex(operationsLog.getLastIndex()+1)
    
           );
       }

    Обработка запроса на голосование


    При голосовании каждый узел получает от кандидата запрос вот такого вида:

    class RequestVoteDTO {
       private final Long term; //текущий раунд кандидата   
       private final Integer candidateId; //идентификатор кандидата 
       private final Integer lastLogIndex; //индекс последней записи в логе кандидата
       private final Long lastLogTerm; //раунд последней записи в логе кандидата  
    }

    Теперь давайте рассмотрим процедуру vote класса ElectionServiceImpl, она обрабатывает vote запрос от кандидата и возвращает решение по поводу его кандидатуры на роль лидера.

    https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/election/ElectionServiceImpl.java#L178


    public AnswerVoteDTO vote(RequestVoteDTO dto) {
       
           boolean termCheck;
           //1
           if (dto.getTerm() < context.getCurrentTerm())
               return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),false);
           else //2
           if (dto.getTerm().equals(context.getCurrentTerm())) {
               termCheck = (context.getVotedFor() == null||
                              context.getVotedFor().equals(dto.getCandidateId()));
           }
           else
           {   //3
               termCheck = true;
                 context.setTermGreaterThenCurrent(dto.getTerm());
           }
    
           //4  
           boolean logCheck = !((operationsLog.getLastTerm() > dto.getLastLogTerm()) ||
                   ((operationsLog.getLastTerm().equals(dto.getLastLogTerm())) &&
                           (operationsLog.getLastIndex() > dto.getLastLogIndex())));
    
    
           boolean voteGranted = termCheck&&logCheck;
    
           //5
           if (voteGranted) {
               context.setVotedFor(dto.getCandidateId());
           }
           //6   
           return new AnswerVoteDTO(context.getId(),context.getCurrentTerm(),voteGranted);
       }

    При получении запроса от кандидата узел делает две проверки: проверяет раунд кандидата и длину его лога. Если раунд кандидата выше и его лог длиннее или равен, то узел отдает свой узел голос за кандидата

    1. Если текущий раунд узла больше, чем раунд кандидата, отвечаем отказом, потому что это запрос какого-то отставшего узла, который, видимо, был какое-то время вне кластера и начал процедуру выборов из-за того, что не видел действующего лидера.
    2. Может сложиться такая ситуация, что раунды равны, например, узел уже успел получить запрос от кандидата и проголосовал за него, и поднял свой раунд до уровня раунда кандидата, но его ответ не дошел, и кандидат прислал ему запрос еще раз; в данной ситуации проверка раунда считается пройденным. Или же узел уже успел проголосовать за другого кандидата с таким же раундом — тогда проверка не пройдена.
    3. Если раунд кандидата больше раунда узла, то проверка раунда пройдена
    4. Проверка лога. Если раунд последней операции лога кандидата больше или равен раунду последней операции лога узла или же если раунды равны, но индекс последней операции кандидата больше или равен индексу последней операции узла, иными словами, если лог кандидата длиннее или равен логу узла, то проверка лога считается пройденной.
    5. При положительном исходе фиксируем факт того, что узел принял участие в выборах и отдал голос за кандидата.
    6. Отправляем результат обратно кандидату

    Наверняка условия можно было бы записать как-то покороче и поэлегантнее, но я оставил такой, более “наивный”, вариант, чтобы и самому не запутаться, и никого не запутать.

    Репликация


    Лидер по таймеру отправляет всем узлам фолловерам heartbeat, чтобы сбросить их таймеры голосований. Так как лидер хранит у себя в метаданных индексы последних операций у всех фолловеров, то он может оценить, требуется ли отправка операции узлам. Если лог операций лидера становится длиннее лога какого-либо фолловера, то он вместе с heartbeat последовательно отправляет ему недостающие операции. Назовем это append запросом. Если большинство узлов подтвердили получение новых операций, то лидер применяет эти операции на свою БД и повышает индекс последней примененной операции. Этот индекс тоже отправляется фолловерам вместе с heartbeat запросом. И если индекс лидера выше индекса фолловеров, то фолловер тоже применяет операции к своей БД, чтобы уравнять индексы.

    Вот такого вида append запрос отправляет лидер фолловеру

    class RequestAppendDTO {
       private final Long term; //текущий раунд лидера 
       private final Integer leaderId; //идентификатор лидера  
    
       private final Integer prevLogIndex;//индекс операции предшествующий передаваемой
       private final Long prevLogTerm;//раунд операции предшествующий передаваемой
       private final Integer leaderCommit;//индекс последней примененной на БД операции 
       private final Operation operation; //операция
    }
    

    Существуют реализации, в которых операции передаются пачками по несколько штук за запрос. В текущей реализации за один запрос можно передать только одну операцию

    За отправку и обработку heartbeat-append запроса отвечает класс:

    https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationService.java

    public interface ReplicationService {
       void appendRequest();
       AnswerAppendDTO append(RequestAppendDTO requestAppendDTO);
    }

    Отправка запроса на изменение данных


    Рассмотрим фрагмент метода sendAppendForOnePeer класса ReplicationServiceImpl

    Метод отвечает за формирование запроса фолловеру и его отправку.

    private CompletableFuture<AnswerAppendDTO> sendAppendForOnePeer(Integer id) {
       return CompletableFuture.supplyAsync(() -> {
           try {
               //1
               Peer peer = context.getPeer(id);
    
               Operation operation;
               Integer prevIndex;
               //2    
               if (peer.getNextIndex() <= operationsLog.getLastIndex()) {
                   operation = operationsLog.get(peer.getNextIndex());
                   prevIndex = peer.getNextIndex() - 1;
               } else 
               //3  
               {
                   operation = null;
                   prevIndex = operationsLog.getLastIndex();
               }
    
    
               RequestAppendDTO requestAppendDTO = new RequestAppendDTO(
                       context.getCurrentTerm(), //текущий раунд лидера 
                       context.getId(), //текущий идентификатор лидера
                       prevIndex,//индекс после которого должна быть вставлена операция
                       operationsLog.getTerm(prevIndex),//раунд этого индекса
                       context.getCommitIndex(),
                                   //индекс последней примененной на БД операции 
                       Operation //операция
               );
    
    ...
    /*дальше идет отправка http запроса и обратка возможных исключений*/
    }

    1. Метаданные фолловера
    2. Оцениваем индекс следующей операции, которую ожидает фолловер. Если индекс последней операции лидера больше или равен индексу операции ожидаемой фолловером (лог операций лидера длиннее лога операций фолловера), то подготавливаем недостающую операцию к отправке и вычисляем индекс операции, которая должна предшествовать этой новой операции, присланной лидером, в логе фолловера. Когда фолловер получит запрос, он сравнит свой последний индекс с тем, что прислал лидер и, если он совпадает, то новую операцию можно добавить
    3. Если лог лидера короче, то операцию не передаем, но фолловеру сообщаем индекс последней операции лидера; для фолловера это может стать сигналом о том, что операции в его логе, индекс которых выше последнего индекса лога лидера, невалидны

    Далее рассмотрим метод appendRequest класса ReplicationServiceImpl, который отвечает за отправку append запроса и обработку результата всем фолловерам.

    https://github.com/pleshakoff/raft/blob/eba5ea1984e2623702f4c299cf1b0af7a6ba0d14/server/src/main/java/com/raft/server/replication/ReplicationServiceImpl.java#L109

    public void appendRequest() {
           List<Integer> peersIds = context.getPeers().stream().map(Peer::getId).collect(Collectors.toList());
    
           //1 
           while (peersIds.size() > 0) {
               //2 
               List<AnswerAppendDTO> answers = sendAppendToAllPeers(peersIds);
               peersIds = new ArrayList<>();
               for (AnswerAppendDTO answer : answers) {
                   //3
                   if (answer.getStatusCode().equals(OK)) {
                       //4
                       if (answer.getTerm() > context.getCurrentTerm()) {
                            context.setTermGreaterThenCurrent(answer.getTerm());
                           return;
                       }
                       Peer peer = context.getPeer(answer.getId());
                       //5     
                       if (answer.getSuccess()) {                      
                           peer.setNextIndex(answer.getMatchIndex() + 1);
                           peer.setMatchIndex(answer.getMatchIndex());
                           if (peer.getNextIndex() <= operationsLog.getLastIndex())
                               peersIds.add(answer.getId());
                       //6      
                       } else {
                           peer.decNextIndex();
                           peersIds.add(answer.getId());
                       }
                   }
               }
               //7
               tryToCommit();
           }
    }
    

    1. Повторяем запрос, пока не получим ответ от всех фолловеров, что репликация прошла успешно. Поскольку отправляется одна операция за запрос, может потребоваться несколько итераций, чтобы синхронизировать логи фолловеров
    2. Отправили запросы всем фолловерам и получили список с ответами
    3. Рассматриваем ответы только от доступных фолловеров
    4. Если выяснилось, что раунд одного из фолловеров больше раунда лидера, все останавливаем и превращаемся в фолловера
    5. Если фолловер ответил, что все прошло успешно, обновляем метаданные фолловера: сохраняем последний индекс лога фолловера и индекс следующей ожидаемой фолловером операции.
    6. Если фолловер отвечает, что репликация не удалась, это значит, что индекс последней операции фолловера не равен предшествующему индексу текущей операции, который мы ему отправили и он не может вставить эту операцию. Другими словами, лог фолловеров отстал от лога лидера более чем на одну операцию и фолловер не может принять последнюю операцию, потому что еще не принял предыдущие. Для данного фолловера меняем метаданные, снижаем ему индекс следующей ожидаемой операции и повторяем отправку. Эта процедура будет повторяться, пока лог фолловера не придет в соответствие с логом лидера.
    7. Если большинство узлов подтвердили занесение операции себе в лог, то она будет применена к БД. Метод подробно будет рассмотрен ниже.

    Обработка запроса на изменение данных


    Теперь рассмотрим, как именно фолловер обрабатывает append запрос от лидера.
    Метод append класса ReplicationServiceImpl

    public AnswerAppendDTO append(RequestAppendDTO dto) {
         
           //1     
           if (dto.getTerm() < context.getCurrentTerm()) {
               return new AnswerAppendDTO(context.getId(),context.getCurrentTerm(),false, null);
           } else if (dto.getTerm() > context.getCurrentTerm()) {
               //2 
               context.setCurrentTerm(dto.getTerm());
               context.setVotedFor(null);
           }
           //3  
           applicationEventPublisher.publishEvent(new ResetElectionTimerEvent(this));
    
           if (!context.getState().equals(FOLLOWER)) {
               context.setState(FOLLOWER);
           }
            
           //4  
           if ((dto.getPrevLogIndex() > operationsLog.getLastIndex()) ||                                                                                        !dto.getPrevLogTerm().equals(operationsLog.getTerm(dto.getPrevLogIndex()))) {
                          return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), false, null);
           }
    
    
           Operation newOperation = dto.getOperation();
           if (newOperation != null) {
               int newOperationIndex = dto.getPrevLogIndex() + 1;
               
             synchronized (this) {
                   //5
                   if ((newOperationIndex <= operationsLog.getLastIndex()) &&
                          (!newOperation.getTerm().equals(operationsLog.getTerm(newOperationIndex)))){
                       operationsLog.removeAllFromIndex(newOperationIndex);
                   }
                   //6
                   if (newOperationIndex <= operationsLog.getLastIndex())
                   {
                     return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true,      operationsLog.getLastIndex());
                   }
                   //7
                   operationsLog.append(newOperation);
               }
            }
            //8 
            if (dto.getLeaderCommit() > context.getCommitIndex()) {
               context.setCommitIndex(Math.min(dto.getLeaderCommit(), operationsLog.getLastIndex()));
           }
    
                     
           return new AnswerAppendDTO(context.getId(), context.getCurrentTerm(), true, operationsLog.getLastIndex());
       }

    1. Если раунд лидера меньше, чем раунд фолловера, то отправляем лидеру свой раунд и признак того, что его запрос отвергнут. Как только лидер получит в ответе раунд больше своего, он превратится в фолловера
    2. Если раунд лидера больше раунда фолловера, устанавливаем этот раунд фолловеру
    3. Поскольку получен запрос от лидера, независимо от того, есть ли данные там или нет, сбрасываем vote таймер и, если не были фолловером, становимся им
    4. Если индекс предыдущей операции, полученный от лидера, больше реального индекса последнего элемента лога фолловера или их терм не совпадает, это значит, что перед тем, как вставить присланную операцию, надо вставить предыдущие. Отвечаем отказом и отправляем текущий последний индекс лога, лидер должен повторить попытку, но уже с предыдущей операцией
    5. В логе фолловера уже есть операция с таким индексом и она не совпадает с операцией, полученной от лидера. Удаляем эту операцию в логе фолловера и все операции с индексом больше. Такое может произойти, если лидер отключился от кластера, по-прежнему считает, что он лидер, и принимает новые операции в лог, но на БД их не применяет, потому что не может собрать кворум. Когда узел вернется в кластер и получит подтвержденные кворумом данные от истинного лидера, невалидные операции будут удалены.
    6. Повторно прилетела операция, которая уже вставлялась. Просто отвечаем, что все ОК
    7. Все проверки пройдены, добавляем операцию в лог
    8. Если индекс последней примененной на БД операции фолловера меньше, чем индекс лидера, применяем операции на БД фолловера из лога, пока индекс на сравняется.

    Изменение данных в БД


    Осталось только разобраться, каким образом лидер применяет операции из лога на БД. В процессе отправки операций фолловерам и обработки ответов от них лидер обновляет метаданные узлов. Как только количество узлов, у которых индекс последней операции в логе больше, чем индекс последней примененной на БД операции у лидера, становится равным кворуму, то мы можем утверждать, что большинство узлов операцию получило и мы можем применить её на БД лидера. Иными словами, если лидер отправил фолловерам операцию и большинство ее вставило в свой лог и ответило лидеру, то мы можем эту операцию применить к БД лидера и повысить индекс последней примененной операции. Этот индекс со следующим append-heartbeat запросом прилетит фолловеру и он применит операцию с таким же индексом из своего лога на свою БД.

    Разберем метод tryToCommit класса ReplicationServiceImpl

      private void tryToCommit() {
           while (true) {
               //1
               int N = context.getCommitIndex() + 1;
               //2
               Supplier<Long> count = () ->
                   context.getPeers().stream().map(Peer::getMatchIndex).
                           filter(matchIndex -> matchIndex >= N).count() + 1;
    
               //3 
               if (operationsLog.getLastIndex() >= N &&
                       operationsLog.getTerm(N).equals(context.getCurrentTerm())&&
                          count.get()>=context.getQuorum()
               )
               {
                   context.setCommitIndex(N);
               } else
                   return;
           }
       }

    1. Получаем следующий индекс применяемой к БД операции
    2. Считаем, сколько фолловеров имеют у себя в логе операцию с таким индексом, и не забываем прибавить лидера
    3. Если число таких фолловеров составляют кворум и операция с таким индексом есть в логе лидера, и раунд этой операции эквивалентен текущему, то лидер применяет операцию к БД и повышает индекс последней примененной операции. Операции из предыдущего раунда не могут быть применены, т.к за них отвечал другой лидер и может возникнуть коллизия. Каждый лидер применяет операции только своего текущего раунда.

    Заключение


    Любой распределенный алгоритм, представителем семейства которых является RAFT — это мощное комплексное решение, гарантирующее достижение результата, при соблюдении всех описываемых в спецификации правил.

    Распределенных алгоритмов много и они разные. Есть ZAB, который реализован в Zookeeper и используется, например, для синхронизации данных в Kafka. Есть алгоритмы с менее жесткими требованиями к консистентности, например масса имплементаций Gossip протокола, которые применяются в AP системах. Существуют алгоритмы, которые следуют принципам RAFT, и при этом используют gossip протокол для обмена логами например MOKKA, который причем еще и использует шифрование.

    Я считаю что попытаться разобраться в каком-либо из этих алгоритмов крайне полезно для любого разработчика, и как я уже упоминал выше, решения могут быть интересны как комплексно так и отдельными частями. И очевидно обязательно надо посмотреть в эту сторону тем, чья деятельность связана разработкой распределенных систем и касается вопросов синхронизации данных, даже если они используют стандартные промышленные решения.

    Ссылки



    Надеемся материал был вам полезен. А если вы хотите успеть на курс, то это можно сделать прямо сейчас.
    OTUS. Онлайн-образование
    Цифровые навыки от ведущих экспертов

    Комментарии 0

    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

    Самое читаемое