Concurrency: 6 способов жить с shared state

  • Tutorial
concurrency

В многопоточном программировании много сложностей, основными из которых являются работа c разделяемым состоянием и эффективное использование предоставляемых ядер. Об использовании ядер пойдет речь в следующей статье.

С разделяемым состоянием в многопоточной среде существуют два момента, из-за которых возникают все сложности: состояние гонки и видимость изменений. В состоянии гонки, потоки одновременно изменяют состояние, что ведет к недетерменированному поведению. А проблема с видимостью заключаются в том, что результат изменения данных в одном потоке, может быть невидим другому. В статье будут рассказаны шесть способов как бороться с данными проблемами.

Все примеры приведены на Java, но содержат комментарии и я надеюсь будут понятны программистам не знакомым c Java. Данная статья носит обзорный характер и не претендует на полноту. В то же время она наполнена ссылками, которые дают более подробное объяснение терминам и утверждениям.



Shared State



Работу с разделяемым состоянием я покажу на примере вычисления чисел фибоначчи.
Формула для вычисления выглядит так:

f(0) = 0
f(1) = 1
f(n) = f(n - 1) + f(n - 2) , n >= 2


В первую очередь определим интерфейс:

public interface FibonacciGenerator<T> {
    /**
     * Следующее сгенерированное значение
     */
    T next();

    /**
     * Текущее значение в генераторе
     */
    public T val();
}


Наши классы будут реализовывать интерфейс FibonacciGenerator. Из формулы видно, что для предоставления следующего числа, надо хранить два предыдущих — это и будет разделяемое состояние, за которое будет проходить конкуренция. Наша реализация должна быть потоко-безопасной. То есть независимо от одновременного количества потребителей, она будет сохранять свои инварианты и придерживаться контракта. И так, приступим.

Locking



Первый способ сделать класс корректно работающим в многопоточной среде — это использовать блокировки и объявить все методы synchronized. Примером может служить класс Vector.

public class IntrinsicLock implements FibonacciGenerator<BigInteger> {

    private BigInteger curr = BigInteger.ONE;
    private BigInteger next = BigInteger.ONE;

    @Override
    public synchronized BigInteger next() {
        BigInteger result = curr;
        curr = next;
        next = result.add(next);
        return result;
    }

    @Override
    public synchronized BigInteger val() {
        return curr;
    }

}


Мы получили класс, который корректно работает в многопоточной среде, затратив при этом минимум усилий. В первую очередь, мы жертвуем производительностью. Производительность класса такая же, как если бы он запускался в однопоточной среде. К тому же, использование локов приносит проблемы, такие как: deadlock, livelock и т.п.

Fine-Grained locking



Следующий способ — разбить наши структуры на части и оградить локами только те секции, в которых происходит работа с разделяемым состоянием. Примером такого подхода является СoncurrentHashMap. В нем все данные разбиваются на несколько частей. При доступе блокируется только та часть, изменение которой происходит в текущий момент. Также есть вариант использовать более функциональные локи, такие как: StampedLock (java 8), ReadWriteLock. При корректных алгоритме и реализации мы получаем более высокий уровень параллелизма. Пример с использованием ReadWriteLock:

public class FineGrainedLock implements FibonacciGenerator<BigInteger> {

    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private BigInteger curr = BigInteger.ONE;
    private BigInteger next = BigInteger.ONE;

    @Override
    public BigInteger next() {
        BigInteger result;
        lock.writeLock().lock();
        try {
            // Вход другим потокам запрещен
            result = curr;
            curr = next;
            next = result.add(next);
            return result;
        } finally {
            lock.writeLock().unlock();
        }
    }

    @Override
    public BigInteger val() {
        lock.readLock().lock();
        try {
            // При отпущенном write lock
            // Допуст`им вход множества потоков
            return curr;
        } finally {
            lock.readLock().unlock();
        }
    }
}


Мы усовершенствовали класс и добились более высокой пропускной способности на чтение. У такой реализации есть все минусы локов. К тому же, из минусов добавилось то, что алгоритм усложнился (добавилось шума, не связанного с логикой работы) и вероятность некорректной реализации возросла.

Lock-free algorithms



Использование локов влечет массу проблем с производительностью и корректностью. Существует альтернатива в виде неблокирующих алгоритмов. Такие алгоритмы построены на атомарных операциях, предоставляемых процессорами. Примером служит метод get в ConcurrentHashMap. Чтобы писать неблокирующие алгоритмы, имеет смысл воспользоваться существующими неблокирующими классами: ConcurrentLinkedQueue, ConcurrentHashMap и т.п. Напишем неблокирующую реализацию нашего класса.

public class LockFree implements FibonacciGenerator<BigInteger> {

    // Инкапсулируем состояние генератора в класс
    private static class State {
        final BigInteger next;
        final BigInteger curr;

        public State(BigInteger curr, BigInteger next) {
            this.next = next;
            this.curr = curr;
        }
    }

    // Сделаем состояние класса атомарным
    private final AtomicReference<State> atomicState = new AtomicReference<>(
            new State(BigInteger.ONE, BigInteger.ONE));

    public BigInteger next() {
        BigInteger value = null;
        while (true) { 
            // сохраняем состояние класса 
            State state = atomicState.get();
            // то что возвращаем если удалось изменить состояние класса
            value = state.curr; 
            // конструируем новое состояние
            State newState = new State(state.next, state.curr.add(state.next));
            // если за время пока мы конструировали новое состояние
            // оно осталось прежним, то заменяем состояние на новое,
            // иначе пробуем сконструировать еще раз
            if (atomicState.compareAndSet(state, newState)) {break;}
        }
        return value;
    }

    @Override
    public BigInteger val() {
        return atomicState.get().curr;
    }
}


Из плюсов такого подхода следует отметить увеличение производительности, по сравнению с блокирующими алгоритмами. А также, что не менее важно, избавление от недостатков локов. Минус в том, что неблокирующий алгоритм придумать гораздо сложнее.

Software Transactional Memory



Альтернативой неблокирующим алгоритмам является применение программной транзакционной памяти. Её использование похоже на использование транзакций при работе с базами данных. Концепция довольно таки новая (1995) и среди популярных языков, нативная поддержка есть только в Clojure. Поддержка STM на уровне библиотек есть во многих языках, в том числе и Java. Я буду использовать STM, реализованный в рамках проекта Akka.

public class STM implements FibonacciGenerator<BigInteger> {

    // Оборачиваем переменные в транзакционные ссылки
    // система будет отслеживать изменения этих переменных внутри транзакции
    private final Ref<BigInteger> curr = new Ref<>(BigInteger.ONE);
    private final Ref<BigInteger> next = new Ref<>(BigInteger.ONE);

    @Override
    public BigInteger next() {
        // Создаем транзакцию
        return new Atomic<BigInteger>() {
            // Изменения внутри метода 
            // будут обладают АСI (https://en.wikipedia.org/wiki/ACID)
            @Override
            public BigInteger atomically() {
                // Все значения отслеживаемых переменных согласованы
                BigInteger result = curr.get();
                // Изменения не видны другим потокам
                curr.set(next.get());
                next.set(result.add(next.get()));
                // Проверяется были ли изменения над отслеживаемыми
                // переменными. Если да, то нас опередили, но мы
                // оптимистичны и повторяем транзакцию еще раз.
                // Если мы первые, то атомарно записываем новые значения.
                return result;
            }
        // и выполняем ее
        }.execute();
    }

    @Override
    public BigInteger val() {
        // Транзакция создается неявно
        return curr.get();
    }

}


Код легко понимается и нет шума, создаваемого неблокирующими алгоритмами и использованием локов. Но за это мы платим более низкой производительностью и ограниченной применимостью, поскольку STM хорошо себя показывает когда читателей гораздо больше, чем писателей.

Immutability



Проблемы с общим доступом к состоянию от того, что оно изменяемое. То есть спроектировав класс неизменяемым, можно без ограничений работать с ним в многопоточной среде, так как он также будет потоко-безопасным. Но неизменяемые структуры данных зачастую требуют функциональных подходов и специальных структур данных, так как расходы на память могут быть очень велики.

public class Immutable {

    private final BigInteger next;
    // Текущее значение
    public final BigInteger val;

    private Immutable(BigInteger next, BigInteger val) {
        this.next = next;
        this.val = val;
    }

    public Immutable next() {
        return new Immutable(val.add(next), next);
    }

    public static Immutable first() {
        return new Immutable(BigInteger.ONE, BigInteger.ONE);
    }

}


Как вы заметили, интерфейс класса изменился и это потребует других способов работы с ним.

Isolated mutability



Идея изолированной изменяемости объектов состоит в том, что доступ к ним ограничен всегда одним потоком. Следовательно у нас не возникнет проблем, характерных для многопоточных программ. Такой подход использует модель акторов. Акторы — это сущности похожие на объекты, у которых есть изменяемое состояние и поведение. Взаимодействие акторов происходит через асинхронную передачу сообщений. Сообщения неизменяемы и актор обрабатывает одно сообщение за раз. Результатом обработки сообщения является изменение поведения, состояния и выполнение других действий. Пример использования акторов будет приведен в следующей статье.

Итог



У каждого подхода есть свои минусы и плюсы и нельзя дать универсального совета.
Комбинация fine-grained локов и неблокирующих алгоритмов, наиболее часто используемый подход в Java. В Clojure же напротив — транзакционная память и неизменяемые структуры данных. Транзакционная память, на мой взгляд, многообещающий инструмент (предлагаю читателю самостоятельно провести аналогию со сборкой мусора). Надеюсь, что при следующем проектировании системы, модуля или класса, вы вспомните подходы, описанные в данной статье, и выберите подходящий именно вам.

Спасибо за внимание. Жду ваших замечаний и предложений.
Поделиться публикацией
AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее
Реклама

Комментарии 20

    +5
    Замечание из личного опыта: Isolated mutability гораздо сложнее покрыть тестами, нужно учитывать, что сообщения могут скапливаться в очередях и собранные с акторов мгновенные значения (например, количество обслуживаемых подключений) могут не совпадать с ожидаемыми (сумма активных и неактивных соединений может быть не равна общему числу соединений, как пример).
      +2
      А как покрывать тестами без isolated mutability? Тестировать многопоточные приложения с shared state куда сложнее, тут isolated mutability часто как раз помогает.
        +1
        В остальных методах можно поставить дебажные мьютексы в ключевых местах и после успешного прохождения ассерта отпускать тестируемые потоки. С isolated mutability так сделать сложно, заблокированный worker все еще будет иметь необработанные сообщения в очереди.
          0
          спасибо, интересно было бы посмотреть на примеры таких тестов. Как они работают понятно, но не очень хочется добавлять зависимость от таких мютексов в API.

          А проблем тестирования самих акторов в случае с isolated mutability я не вижу, нужно тестировать реакцию на различные сообщения независимо от наличия очереди, точно так же как в однопоточном варианте.
            0
            Пример взят отсюда, тестирование read-write mutex (сорри, что код на D, но в данном случае от джавы почти не отличается):
                //...
                void readerFn()
                {
                    synchronized (mutex.reader)
                    {
                        atomicOp!"+="(numReaders, 1);
                        rdSemA.notify();
                        rdSemB.wait();
                        atomicOp!"-="(numReaders, 1);
                    }
                }
            
                void writerFn()
                {
                    synchronized (mutex.writer)
                    {
                        atomicOp!"+="(numWriters, 1);
                        wrSemA.notify();
                        wrSemB.wait();
                        atomicOp!"-="(numWriters, 1);
                    }
                }
            
                // ...
            
                scope group = new ThreadGroup;
            
                // 2 simultaneous readers
                group.create(&readerFn); group.create(&readerFn);
                rdSemA.wait(); rdSemA.wait();
                assert(numReaders == 2);
                rdSemB.notify(); rdSemB.notify();
                group.joinAll();
                assert(numReaders == 0);
                foreach (t; group) group.remove(t);
            


            А проблем тестирования самих акторов в случае с isolated mutability я не вижу, нужно тестировать реакцию на различные сообщения независимо от наличия очереди, точно так же как в однопоточном варианте.


            Согласен, когда тестируется внешний API, тесты написать гораздо легче, чем для других методов. Но если юниттесты проверяют правильность общения между воркерами, то там можно напороться на эту особенность.
      +3
      Стоит заметить, что в варианте неблокирующего оптимистичного алгоритма есть существенный недостаток: при высокой конкуренции вероятность получить false на compare-and-swap становится достаточно высока, особенно с большими значениями BigInteger (что приведет к большому числу переповторов впустую), поэтому есть смысл подумать о других вариантах, например synchronized/ReentrantLock.

      Кроме того, пожалуй, метод val() излишний в api, потому что класс в таком виде становится не Thread safe (условно, конечно, но его контракт несколько странный).
        0
        в варианте неблокирующего оптимистичного алгоритма есть существенный недостаток: при высокой конкуренции вероятность получить false на compare-and-swap становится достаточно высока, особенно с большими значениями BigInteger

        Да, вы правы. Я сделал перформанс тесты (спасибо огромное TheShade) и до определенного момента неблокирующий алгоритм выигрывает в 2 раза у блокирующих. Затем деградириует и в конце концов становится медленнее. Здесь он просто приведен для демонстрации подхода.

        Кроме того, пожалуй, метод val() излишний в api, потому что класс в таком виде становится не Thread safe

        В какой реализации? BigInteger же immutable.
          +1
          Я не об этом, между next и val нет синхронизации, но если рассматривать как аналогию AtomicInteger get/incrementAndGet, то для примера пойдет, а в боевой реализации я бы без нужды метод val добавлять не стал. Это все условности, поэтому не стоит придавать этому большое значение.
        +6
        Одну важную вещь всегда опускают, когда говорят про транзакции — это принцип «let it fault». Транзакция не обязана завершиться успешно. В случае, когда возникает коллизия на write, одна из них всегда откатывается. Поэтому приложение должно уметь отлавливать такие случаи и повторять попытку изменений с самого начала. Практически нигде в примерах это даже не упоминается. К сожалению, во всех системах, с которыми я имел дело, подобная проверка отсутствует (если это не делается самой платформой).

        Другое свойство, которое не часто упоминается, это изолированность транзакций. Большинство STM сделано на схеме MVCC, а она не отвечает «full-serialized» уровню, поскольку отсутствуют read locks. Работа на таком уровне чревата неприятным эффектом write skew.

        На самом деле я не вижу сильно ограниченным использование STM, разве что в экзотических решениях. Для несложных структур можно использовать стандартные классы библиотеки java.concurrent, а сложные графы хранить в памяти без persistence как правило нет нужды. В большинстве архитектур shared state реализуется как раз на уровне persistence в базе данных, а все, что находится в памяти — это временные данные: либо immutable объекты, либо «isolated mutability». STM не масштабируются: для этого есть множество других решений типа distributed cache. Обход большого графа в STM достаточно накладен, требуются дополнительные индексирующие структуры для быстрого доступа. Вобщем, in-memory движки баз данных уже имеют весь необходимый функционал и являются более стандартным решением для хранения shared state.

        В одно время в проекте встал вопрос использовать STM или in-memory database, и выбор именно на последний (JPA+H2).
          0
          Спасибо за комментарий!

          Поэтому приложение должно уметь отлавливать такие случаи и повторять попытку изменений с самого начала. Практически нигде в примерах это даже не упоминается

          В комментах упоминается
          Если да, то нас опередили, но мы оптимистичны и повторяем транзакцию еще раз.

          В данном случае изменения повторяются автоматически системой. Поэтому операции внутри транзакции должны быть идемпотентны. Я специально убрал часть конфигурации отвечающую за количество повторений транзакции, чтобы не загромождать код.
          private final TransactionFactory txFactory = new TransactionFactoryBuilder().setMaxRetries(1_000_000) .build();
          

          И да, я не рассматривал случай, когда максимальное количество повторений превышено. Спасибо за замечание.
            0
            Поэтому операции внутри транзакции должны быть идемпотентны.


            Этого часто ой как сложно добиться :-)
          +2
          6 этих способов можно разбить на группы:

          a) универсальные:
          — пессимитические: synchronized, locks
          — оптимистические: lock-free, stm
          b) частные:
          — иммутабельные: обеспечивают только чтение общего состояния
          — isolated mutability: инициирующий поток не нуждается в результате (по крайней мере, немедленно), поэтому может выдать задание (или послать сообщение) и продолжить свою работу. Замечу, выдача задания также требует работы с общим состянием (планом вычислений) и, соответственно, обращения к универсальным методам.
            0
            Что-то мне кажется, что никакой выгоды от применения хитрой синхронизации вместо обычных блокировок вы тут не получите, особенно, если слегка модифицировать первый пример, использовав Atomic<BigInteger> curr (или как там в Java) и отказаться от synchronized в get.

            В таком случае Fine-Grained становится просто не нужен.

            В 'Lock free' вы точно так же блокируетесь на записи, только еще и загружаете все время при этом процессор.

            Транзакции внутри, наверное, тоже используют блокировки.

            Immutable — совершенно другой интерфейс, его просто нельзя ставить в один ряд со всеми остальными.

            Передача сообщений опять же внутри использует блокировки.

            Это я все к тому, что иногда нет смысла придумывать что-то сложное, когда решение лежит на поверхности.

            Как обзор технологий — норм, только пример надо было поудачней подобрать.

              0
              Сообщения можно и без блокировок отправлять, если у каждого потока своя личная очередь исходящих сообщений, в которую пишет только он (хотя это и усложняет чтение входящих)
                0
                Тогда блокировки будут на чтении ) за все надо платить.
              +4
              Раз уж в посте упомянута Akka, не могу не привести ссылку на разбор авторами этого проекта основных моделей многопоточного программирования Concurrency – The good, the bad, the ugly.
                +1
                Есть еще один способ: использовать агенты. Они есть в Clojure и Akka.
                  0
                  LockFree вариант делает лишнюю работу и чем больше нагрузка и сложнее операция, тем больше. В пределе это практически livelocking.
                  Мне кажется, Immutable пример неполный, смысл ведь не в том, чтобы генерировать числа (тогда можно просто брать новый генератор и все), а в том, чтобы не было повторов, в этом случае придется где-то еще хранить ссылку на последний Immutable и иметь все связанные с этим проблемы. Поправьте, если я ошибаюсь.
                    0
                    О каких проблемах идет речь? Если ссылку на immutable объект расшарить между двумя потоками, то потоки, одновременно работающие с ним, не смогут его «испортить», так как объект неизменяемый. В этом суть данного подхода. Конечно он не полностью решает все проблемы и не всегда подходит.
                      0
                      Расшарить не проблема, но как гарантировать последовательность чисел? Новое число будет одинаковым для потоков между которыми расшарен immutable, что отличается от всех остальных примеров в статье.

                  Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                  Самое читаемое