Распределенные структуры данных [часть 1, обзорная]

    Давным давно, когда деревья компьютеры были большими, а процессоры одноядерными, все приложения запускались в один поток и не испытывали сложностей синхронизации.


    Современные же приложения стремятся использовать все имеющиеся ресурсы, в частности, все доступные CPU.


    К сожалению, использовать стандартные структуры данных при многопоточной обработке не представляется возможным, поэтому в Java 5 появились потокобезопасные структуры данных,
    т.е. функционирующие исправно, при использовании из нескольких потоков одновременно, и расположились они в пакете java.util.concurrent.


    Про Vector...

    На самом деле, потокобезопасные, но неэффективные, структуры данных, как, например, Vector и Hashtable, появились еще в Java 1.0.
    В настоящий момент, они не рекомендуются к использованию.


    Однако, не взирая на всю технологическую мощь, заложенную в пакет java.util.concurrent, обработка информации потокобезопасными коллекциями возможна лишь в рамках одного компьютера, а это порождает проблему масштабируемости.


    А что если нужно, в реальном времени, обрабатывать информацию о 100 миллионах клиентов,
    когда датасет занимает 100Тб, а каждую секунду нужно совершить 100+ тысяч операций?
    Вряд ли это возможно, даже на самом крутом современном железе, а если и возможно — только представьте себе его стоимость!


    Намного дешевле добиться такой же вычислительной мощности объединив множество обычных компьютеров в кластер.



    Остается лишь вопрос межкомпьютерного взаимодействия привычными средствами, схожими по API с потокобезопасными коллекциями из пакета java.util.concurrent и дающими те же гарантии, но не на одном компьютере, а на всем кластере.



    Такие возможности и гарантии могут дать распределенные структуры данных.


    Рассмотрим некоторые из распределенных структур данных, позволяющих, без особых сложностей, сделать из многопоточного алгоритма распределенный.


    Дисклеймер

    Рассматриваемые в дальнейших примерах, реализации распределенных структуры данных являются частью функционала распределенного кеша Apache Ignite.


    AtomicReference и AtomicLong


    IgniteAtomicReference предоставляет compare-and-set семантику.


    Предположим, есть 2 компьютера, связаных общей сетью.


    Запустим Apache Ignite на обоих (предварительно подключив библиотеки)


    // Запустим экземпляр (node) Ignite локально.
    // В зависимости от конфигурации, node станет частью кластера хранящего и обрабатывающего данные,
    // либо клиентской node, позволяющей иметь доступ к этому кластеру.
    Ignite ignite = Ignition.ignite();
    
    // Создадим или получим, ранее созданный, IgniteAtomicReference
    // со стартовым значением "someVal"
    IgniteAtomicReference<String> ref = ignite.atomicReference("refName", "someVal", true);

    На обоих компьютерах попробуем изменить хранимое значение


    // Изменим значение если текущее соответствует ожидаемому.
    boolean res = ref.compareAndSet("someVal", "someNewVal"); 
    
    // Изменение, в рамках кластера Ignite, произойдет.
    // Первый вызов изменит значение, и res будет равно true, 
    // Второй вызов получит res равное false, т.к. текущее значение уже не равно "someVal"

    Восстановим оригинальное значение


    ref.compareAndSet("someNewVal", "someVal"); // Изменение произойдет.

    IgniteAtomicLong расширяет семантику IgniteAtomicReference добавляя атомарные increment/decrement операции:


    // Создадим или получим, ранее созданный, IgniteAtomicLong.
    final IgniteAtomicLong atomicLong = ignite.atomicLong("atomicName", 0, true);
    
    // Выведем инкрементированное значение.
    System.out.println("Incremented value: " + atomicLong.incrementAndGet());

    Подробная документация: https://apacheignite.readme.io/docs/atomic-types


    Примеры на github



    AtomicSequence


    IgniteAtomicSequence позволяет получать уникальный идентификатор, причем уникальность гарантируется в рамках всего кластера.


    IgniteAtomicSequence работает быстрее IgniteAtomicLong, т.к. вместо того чтобы синхронизироваться глобально на получении каждого идентификатора, получает сразу диапазон значений и, далее, выдает идентификаторы из этого диапазона.


    // Создадим или получим, ранее созданный, IgniteAtomicSequence.
    final IgniteAtomicSequence seq = ignite.atomicSequence("seqName", 0, true);
    
    // Получим 20 уникальных идентификаторов.
    for (int i = 0; i < 20; i++) {
      long currentValue = seq.get();
      long newValue = seq.incrementAndGet();  
      ...
    }

    Подробная документация: https://apacheignite.readme.io/docs/id-generator
    Пример на githubIgniteAtomicSequenceExample


    CountDownLatch


    IgniteCountDownLatch позволяет синхронизировать потоки на разных компьютерах в рамках одного кластера.


    Запустим следующий код на 10 компьютерах одного кластера


    // Создадим или получим, ранее созданный, IgniteCountDownLatch
    // установив значение счетчика в 10
    IgniteCountDownLatch latch = ignite.countDownLatch("latchName", 10, false, true);
    
    // Декрементируем счетчик
    latch.countDown();
    
    // Дождемся пока countDown() будет вызван 10 раз
    latch.await();

    В результате, все latch.await() выполнятся гарантированно позже того, как выполнятся все десять latch.countDown().


    Подробная документация: https://apacheignite.readme.io/docs/countdownlatch
    Пример на githubIgniteCountDownLatchExample


    Semaphore


    IgniteSemaphore позволяет лимитировать число одновременных действий в рамках одного кластера.


    // Создадим или получим, ранее созданный, IgniteSemaphore
    // установив значение счетчика в 20
    IgniteSemaphore semaphore = ignite.semaphore("semName", 20,  true,  true);
    
    // Получаем разрешение
    semaphore.acquire();
    
    try {
        // Семафор захвачен, возможно выполнение кода
    }
    finally {
        // Возвращаем разрешение
        semaphore.release();
    }

    Гарантируется, что, одновременно, не более 20 потоков, в рамках одного кластера, будут выполнять код внутри секции try.


    Подробная документация: https://apacheignite.readme.io/docs/distributed-semaphore
    Пример на githubIgniteSemaphoreExample


    BlockingQueue


    IgniteQueue предоставляет те же возможности, что и BlockingQueue, но в рамках целого кластера.


    // Создадим или получим, ранее созданный, IgniteQueue.
    IgniteQueue<String> queue = ignite.queue("queueName", 0, colCfg);

    Попытаемся получить элемент из очереди


    // Получим первй элемент в очереди
    queue.take();

    Выполнение приостановится на queue.take() до тех пор пока, в рамках того же кластера, не произойдет добавление в очередь


    // Добавим объект в очередь
    queue.put("data");

    Подробная документация: https://apacheignite.readme.io/docs/queue-and-set
    Пример на githubIgniteQueueExample


    Вместо заключения


    В связи с тем, что статья получилась исключительно обзорной, а многим, наверняка, интересно как же все это работает под капотом — в следующей статье я рассмотрю особенности реализации каждой из распределенных структур данных описанных в данной статье.

    GridGain
    100,00
    Компания
    Поделиться публикацией

    Похожие публикации

    Комментарии 13

      0
      А что если нужно, в реальном времени, обрабатывать информацию о 100 миллионах клиентов,
      когда датасет занимает 100Тб, а каждую секунду нужно совершить 100+ тысяч операций?
      Вряд ли это возможно, даже на самом крутом современном железе, а если и возможно — только представьте себе его стоимость!

      А не лучше ли использовать Hadoop Map Reduce, Spark или Hive/Pig для таких задач?

        +1
        Все перечисленные системы накладывают ряд ограничений связанных с парадигмой Map Reduce.
        В случае же с набором, описанным в статье, таких ограничений нет.
        Я бы, даже, сказал что это совершенно разные подходы к решению совершенно разных задач :)

        Например, у вас есть сервер и 1000 бакноматов, вам нужно чтобы все они дождались инициализации сервера.
        Очень простое, с точки зрения дизайна, решение (в одну строку!):
        Вы можете на каждом из них ждать latch.await(), а на сервере сделать latch.countDown().
          0

          Тогда вам лучше поменять пример в статье, потому что для 100 Тб датасета и вычислений над ним как раз лучше всего подходит концепция MapReduce и вышеперечисленные технологии.

            +1
            Я говорю не про вычисления, а именно про операции над данными.
            А операции бывают разные,
            Например, возьмем, все те же, 1000 банкоматов и раз в час нужно разыграть приз — первый кто оплатит на сумму больше тысячи — победитель.
            Подвох в том, что человек должен узнать что он победитель сразу после завершения операции,
            Возможны ли подобные гарантии/способы синхронизации на описанных технологиях?
              +1

              Тогда не понятно, зачем это должно обрабатываться одним кодом на сервере и банкоматах? Чем клиент-серверная архитектура не устраивает? Тем более что тысяча банкоматов для одного сервера — вообще не проблема. Может приведете более актуальный пример?

                +1
                Да, продолжим с 1000 банкоматов.
                С каждого из них может поступить запрос на перевод денег, но заранее известно что можно совершать не более 20 переводов одновременно (ограничение в договоре с платежной системой).
                Представим также, что сервер у нас не один, а 10 и к каждому из них приписано по 100 банкоматов.

                вариант

                semaphore.acquire();
                doPayment(acc1, acc2, amount);
                semaphore.release();

                позволит ограничить нагрузку на платежную систему в соответствии с условиями договора в рамках всей системы.
                  +2

                  Вот теперь стало понятнее, спасибо

                    0
                    Немного не нравится мне Ваш пример. Как правило вводят ограничение на количество операций в какой-то промежуток времени.
                    Есть в Ignite что-нибудь для ограничения одновременного выполнения в минуту/час/день?
                      0
                      Да, конечно, как вариант, вы можете использовать IgniteAtomicLong и выполнять операцию только если после инкремента получили число не превышающее заданное.
                      А раз в минуту/час/день сбрасывать его значение в ноль из отдельного потока.
          +2
          А как медленно работает IgniteAtomicSequence на большом кластере?
            +1
            Если кратко, то — размер кластера не важен, чем больше диапазон значений, выделяемых на один локальный экземпляр IgniteAtomicSequence, тем быстрее, стремясь с скорости простого чтения из памяти.

            Размер кластера не имеет значения, т.к. состояние будет хранится всего на 2-х node (Primary и Backup).
            Важно лишь — сколько будет запросов на изменение состояния в рамках одного кластера в единицу времени.
            Если диапазон значений, выделяемых на одну node, большой — то запросов будет мало, как следствие — не будет контеншена на глобальном изменении состояния IgniteAtomicSequence и получение нового диапазона будет занимать минимум времени.

            Итого, размер кластера не важен, а увеличение числа «клиентов» всегда можно компенсировать увеличением диапазона выделяемых значений.
              +1
              Мне скорее интересно, сколько времени тратится на согласование диапазонов? Ведь несколько нод могут попросить один и тот же диапазон.
                +1
                Подробнее как это работает я планирую рассказать в следующей статье, но, вкратце:

                Допустим, есть 10 локальных экземпляров IgniteAtomicSequence (по одном у и тому же имени, т.к. привязанные к одному глобальному состоянию).

                При создании каждого экземпляра ему выделается, допустим, 100 идентификаторов.
                Итого, первому дают номера с 0 по 99, второму с 100 до 199 и т.д.

                Глобально хранится информация что было выдано 1000 идентификаторов, т.е. не сколько реально было запрошено, а сколько доступно к выдаче (или уже было выдано) на всех локальных экземплярах.

                И, когда, какой то из локальных экземпляров выдает все свои 100 — он запрашивает новые 100 и глобально фиксируется что было выдано не X, а X+100 (например 1000 -> 1100).
                Для каждого следующего случая окончания диапазона ситуация повторяется.

                Единственным минусом подобной имплементации выступает сложность определения сколько идентификаторов реально используется, а сколько томится в ожидании, но уже отмечено как выданные.

          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

          Самое читаемое