Реализация потокобезопастного алгоритма round-robin на Java

Доброго времени суток, уважаемый хабрачитатель.
В один прекрасный рабочий день начальство поставило задачу добавить в разрабатываемую нами систему возможность использовать несколько серверов для повышения производительности. Разворачивать кластер и осуществлять балансировку за счет специализированных средств возможности не было. Поэтому пришлось придумывать свою реализацию, используя алгоритм round-robin.

Алгоритм


Исходя из архитектуры приложения было принято решение, что балансировку нужно осуществить проходя по списку серверов, которые задаются в конфигурации, и выдавать на новый запрос адрес следующего сервера.

Этот принцип лежит в основе алгоритма round-robin.

Реализация


Проведя поиск в интернете, я нашел много вариантов реализации данного алгоритма, которые сводились к следующему коду:
    public synchronized Entity getNext_Iterator(){
        if (!it.hasNext()) {
            it = objectList.iterator();
        }
        return it.next();
    }


Здесь objectList — это список доступных серверов.

В данной реализации мне не понравилось использование synchronized метода, поэтому мной была предложена следующая реализация:
  1. При инициализации создается список доступных серверов и сохраняется длина этого списка. Создается глобальный счетчик с типом данных AtomicInteger и начальным значением 0
  2. При запросе адреса сервера счетчик увеличивается на 1. Если получившееся значение превышает длину списка, то обнуляем счетчик. Значение этого счетчика используем, как индекс в списке.


Теперь непосредственно получившийся код (спасибо за совет relgames):

    private final AtomicInteger position = new AtomicInteger(0);
    private volatile int listSize;
    ...
    public synchronized void init(List<Entity> objects) {
            ...
            listSize = objectList.size();
            ...
        }
    }

    public Entity getNext() {
         return objectList.get(getNextPosition());
    }

    public final int getNextPosition() {
        for (;;) {
            int current = position.get();
            int next = current + 1;
            if(next >= listSize){
                next = 0;
            }
            if (position.compareAndSet(current, next))
                return current;
        }
    }



Для проверки работоспособности данной реализации под нагрузкой была написана небольшая программка.
Методика тестирования заключалась в том, что выполнялось 1 000 000 запросов на получения адреса сервера при разном количестве потоков (от 1 до 991) и измерялось среднее время получения адреса.

Результаты представлены в виде графика и их можно посмотреть тут.
Исходники для экспериментов тут.
Данные, на основе которых построены графики, тут.

Спасибо за внимание.

P.S. Предложенный вариант реализации работает одинаково стабильно как при работе в однопоточной системе, так и при работе в многопоточной системе, что хорошо видно на графике. Это и есть основное преимущество данной реализации.

UPD:
Старый график тут. Исследования тогда проводились на более медленном компе.
Share post
AdBlock has stolen the banner, but banners are not teeth — they will be back

More
Ads

Comments 16

    0
    С синхронизацией тесты проводились? Какие здесь недостатки ее использования.
      0
      На приложенном графике результаты тестирования моей реализации(синим цветом) и реализации с синхронизацией(красный и зеленой). Как видно в зависимости от нагрузки увеличивается время ответа на запрос, причем существенно.
      В реальной системе вариант с синхронизацией давал в среднем задержку 3ms. Отправлялся запрос к приложению(веб приложение) и оценивалось время ответа. В то время как предложенная реализация не давала задержек по сравнению с вариантом, когда всегда отдавался только один адрес сервера (простой геттер).
        0
        проверки работоспособности данной реализации под нагрузкой

        не смотрел график, извините)
        а по нему и сравнить можно а не только убедиться что работает, в чем и не сомневался
        спасибо
      +2
      1) Код метода с AtomicInteger не потокобезопасен.
      2) Имхо, экономия на спичках. Если важна каждая наносекунда, то почему бы не переписать балансировщик на C, избавившись от задержек при сборке мусора?
        0
        1) не совсем понял. Поясните пожалуйста. Насколько я понял из описания данного класса он как раз и существует для решения подобных задач.
        2) Синхронизация останавливает вообще все потоки, которые обращаются к данному методу. И при нормальной нагрузке, когда в системе работает порядка 100 человек, задержка составляет примерно 3ms. А это уже не спички. Когда на странице выполняется порядка 20 запросов на получение разнородных данных задержки суммируются и пользователь начинает нервничать.
        Переписывать на С нету смысла, потому что хватает и этой реализации. И нужно искать человека, который сможет это нормально на С написать.Потом поддерживать.А так все написано в одном стиле.
          0
          1)

          1.    public Entity getNext() {
          2.        int currentPosition = position.getAndIncrement();
          3.        int maxPosition = objectList.size();
          4.        position = position.get() > MAX_COUNTER ? new AtomicInteger(0) : position;
          5.        return objectList.get(currentPosition % maxPosition);
          6.    }
          


          Начальные условия:
          Есть 4 потока: A, B, C и D.
          MAX_COUNTER = 1000.
          Текущее значение position: 999.

          1) Поток A доходит до точки 3, изменяя значение position в 1000.
          2) Поток B доходит до точки 3, изменяя значение position в 1001.
          3) Поток A доходит до точки 4. position.get() > MAX_COUNTER, значит position = new AtomicInteger(0).
          4) Поток C доходит до точки 2. currentPosition инициализируется значением 0.
          5) Поток B доходит до точки 4. position.get() > MAX_COUNTER, значит position = new AtomicInteger(0).
          6) Поток D доходит до точки 2. currentPosition инициализируется значением 0.

          В итоге, потоки C и D шлют реквест на сервер objectList.get(0);
          В данном случае это некритично, но всё же бажок =)

          2) Тут надо смотреть из-за чего такие большие задержки. Может быть, слишком много потоков в пуле?

          Насчёт C я, конечно, пошутил =)
            –1
            1) Насчет сброса позиции согласен.Я тоже думал как это можно обойти.
            Тут я вижу два варианта:
            1. Можно сделать значение MAX_COUNTER = Integer.MAX_VALUE — 100 000(любое число превышающее количество потоков). И эта ситуация будет наступать раз в пару месяцев или раз в неделю(в зависимости от нагрузки конечно). То есть погрешность будет, но ее вклад будет не большим.
            2. Отлавливать исключительную ситуацию при превышении максимально возможного значения для типа и в обработчике вызвать synchronized метод сброса счетчика.В методе сброса счетчика сделать обработку от многократных сбрасываний. Такая ситуация, при большом значении MAX_COUNTER, будет возникать достаточно редко и в эти моменты задержки на синхронизацию на каком-то одном запросе можно будет пренебречь.


            2) Возможно. Система довольно большая и это только маленький ее кусочек. Исследовать не было времени. Да и пулом потоков мы не управляем. Потоки создает J2EE контейнер, в который установлено наше приложении.Поэтому в данном случае лучше написать более правильно этот «маленький кусочек», который будет нормально работать при любом адекватном количестве потоков.

              0
              Проще всего было бы сделать число потоков пропорциональным 2^n.
              Тогда можно просто остатком от деления получить номер текущего потока.
              А на ноль в это мслучае сбрасывать не надо, т.к. при превышении MAX_VALUE получишь такое же отрицательное число.

              Резюмируя, получаем примерно так:

              
              public int getNextId() {
                      int currentPosition = position.getAndIncrement();
              
                      if (currentPosition < 0) {
                              currentPosition += Integer.MAX_VALUE; // Integer.MAX_VALUE == 2^31 - 1
                              currentPosition++; // Сделаем кратным 2^31
                              assert (currentPosition > 0); // Нет переполнения
                      }
              
                      return objectList.get(currentPosition % objectList.size());
              }
              

                0
                1) Не совсем понял что в этом простого и зачем не понятная логика.
                Представьте, что другой человек через полтора года откроет этот код и попробует понять зачем мы это все делаем.
                2) Зачем делать ограничение, что число потоков должно быть кратно двум.Мы вообще не управляем числом потоков. Мы управляем количеством доступных сред распределения. А если у нас три сервера, на которые распределяется нагрузка? В этом случае мы должны, получается, отказаться от одного сервера?

                зы: Если вы еще раз прочитаете статью, то там уже переписана реализация(по сравнению с реализацией, которая обсуждается в комментариях).
                И она является потокобезопастной и высокопроизводительной(по результатам тестирования).
        +1
        Код действительно не потокобезопасный.

        Я бы сделал так (немного измененный кусок кода из реальной системы, которую поддерживаю я):

            private AtomicInteger currentId = new AtomicInteger();
        
            public int getNextId() {
                for (;;) {
                    int current = currentId.get();
                    int next = current + 1;
                    if (next>MAX_ID) {
                        next = 0;
                    }
                    if (currentId.compareAndSet(current, next)) {
                        return next;
                    }
                }
            }
        


        Ну и потом list.get(getNextId())
          0
          Хороший вариант

          Ну и потом list.get(getNextId())

          Вот это не совсем правильно. Здесь получается, что MAX_ID — это длина списка серверов. Если серверов мало(допустим 2), а потоков много (допустим 20), то эта проверка может привести к неравномерному распределению запросов по потокам(Большая часть потоков может уйти к серверу под номером 0).Надо будет провести эксперимент:)
          Правильно будет так(имхо)
          list.get(getNextId() % maxPosition), где maxPosition — это длина списка.
          В моем случае MAX_ID не зависит от длины списка.
          Кстати именно этот момент для меня является основным в реализации(позволяет не использовать итератор и уменьшить количество сбросов при максимальном значении).
          Кстати метод getAndIncrement() у AtomicInteger идентичен вашему коду за исключением проверки на максимальное значение. Поэтому можно сделать симбиоз вашего кода и моего.
          По мне так это будет оптимальный вариант.:)
            0
            Если серверов мало(допустим 2), а потоков много (допустим 20), то эта проверка может привести к неравномерному распределению запросов по потокам(Большая часть потоков может уйти к серверу под номером 0)

            Почему может? Метод будет гарантированно равномерно выдавать 0 и 1, т.е. если будет 1000 вызовов метода, то 500 раз вернет 0 и 500 раз 1.

            Набросал код для проверки pastebin.com/ixSXp5xu, распределение действительно равномерное.
              0
              В топик с подсказки добавил вариант, который проще и лаконичней:)
                0
                Только это не round-robin =)
                  0
                  Да, хотя очень близко:)
                  Оставить или нет в топике(как возможный вариант)?
                  Хотя топик и так уже заминусовали:(
                  Ну ничего будем учиться.
                0
                Провел более агрессивное тестирование при использовании 1000 потоков и двух адресов.
                Все работает на ура.
                Протестировал это решение на быстродействие в сравнении с моим вариантом. Быстродействие оказалось примерно одинаковым, ваш вариант даже немного стабильнее.

                Обновил статью. В приложенном файле с данными есть значения по исследованию вашего варианта с моим, если интересно можете глянуть.

                Спасибо за совет:)

          Only users with full accounts can post comments. Log in, please.