Модель памяти, существующая на данный момент в Java, гарантирует ожидаемый порядок выполнения многопоточного кода, при отсутствии в этом коде гонок потоков. И для того, чтобы обезопасить ваш код от гонок, придуманы различные способы синхронизации и обмена данными между ними.

Пакет java.util.concurrent, входящий в состав HotSpot JDK, предоставляет следующие инструменты для написания многопоточного кода:
  • Atomic
  • Locks
  • Collections
  • Synchronization points
  • Executors
  • Accumulators _jdk 1.8_


Atomic

В дочернем пакете java.util.concurrent.atomic находится набор классов для атомарной работы с примитивными типами. Контракт данных классов гарантирует выполнение операции compare-and-set за «1 единицу процессорного времени». При установке нового значения этой переменной вы также передаете ее старое значение (подход оптимистичной блокировки). Если с момента вызова метода значение переменной отличается от ожидаемого — результатом выполнения будет false.

Для примера возьмем два массива long переменных [1,2,3,4,5] и [-1,-2,-3,-4,-5]. Каждый из потоков будет последовательно итерироваться по массиву и суммировать элементы в единую переменную. Код (groovy) с пессимистичной блокировкой выглядит так:

class Sum {
    static monitor = new Object()
    static volatile long sum = 0
}

class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
            synchronized (Sum.monitor) {
                println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                Sum.sum += it
            }
        }
    }
}

Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])

print("Sum: ${Sum.sum}")


Результат выполнения будет ожидаемым:

pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0


Однако такой подход имеет существенные недостатки по производительности. В данном случае на бесполезную для нас работу, уходит больше ресурсов, чем на полезную:
  • попытка блокирования монитора
  • блокировка потока
  • разблокировка монитора
  • разблокировка потока


Рассмотрим использование AtomicLong для реализации оптимистичной блокировки при расчете этой же суммы:

class Sum {
    static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
                while(true) {
                    long localSum = Sum.sum.get()
                    if (Sum.sum.compareAndSet(localSum, localSum + it)) {
                        println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                        break;
                    } else {
                        println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                    }
                }
        }
    }
}

Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])

print("Sum: ${Sum.sum}")


Как видно из результатов «ошибочных» попыток было не так уж и много:

[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0


При решении использовать оптимистичную блокировку важно, чтобы действие с модифицируемой переменной не занимало много времени. Чем дольше это действие — тем чаще будут случаться ошибочные compare-and-set, и тем чаще придется выполнять это действие повторно.

На основе compare-and-set может также реализовываться неблокирующая read блокировка. В данном случае в atomic переменной будет храниться версия обрабатываемого объекта. Получив значение версии до вычислений мы можем сверить ее после вычисления. Обычные read-write блокировки вступают в силу, только если проверка версии провалилась.

class Transaction {
    long debit
}

class Account {
    AtomicLong version = new AtomicLong()
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
    List<Transaction> transactions = new ArrayList<Transaction>()
}

long  balance(Account account) {
    ReentrantReadWriteLock.ReadLock locked
    while(true) {
        long balance = 0
        long version = account.version.get()
        account.transactions.each {balance += it.debit}
        //volatile write for JMM
        if (account.version.compareAndSet(version, version)) {
            if (locked) {locked.unlock()}
            return balance
        } else {
            locked = account.readWriteLock.readLock()
        }
    }
}

void modifyTransaction(Account account, int position, long newDebit) {
    def writeLock = account.readWriteLock.writeLock()
    account.version.incrementAndGet()
    account.transactions[position].debit = newDebit
    writeLock.unlock()
}


Locks

ReentrantLock

В отличие от syncronized блокировок, ReentrantLock позволяет более гибко выбирать моменты снятия и получения блокировки т.к. использует обычные Java вызовы. Также ReentrantLock позволяет получить информацию о текущем состоянии блокировки, разрешает «ожидать» блокировку в течение определенного времени. Поддерживает правильное рекурсивное получение и освобождение блокировки для одного потока. Если вам необходимы честные блокировки (соблюдающие очередность при захвате монитора) — ReentrantLock также снабжен этим механизмом.

Несмотря на то, что syncronized и ReentrantLock блокировки очень похожи — реализация на уровне JVM отличается довольно сильно.
Не вдаваясь в подробности JMM: использовать ReentrantLock вместо предоставляемой JVM syncronized блокировки стоит только в том случае, если у вас очень часто происходит битва потоков за монитор. В случае, когда в syncronized метод _обычно_ попадает лишь один поток — производительность ReentrantLock уступает механизму блокировок JVM.

ReentrantReadWriteLock

Дополняет свойства ReentrantLock возможностью захватывать множество блокировок на чтение и блокировку на запись. Блокировка на запись может быть «опущена» до блокировки на чтение, если это необходимо.

StampedLock _jdk 1.8_

Реализовывает оптимистичные и пессимистичные блокировки на чтение-запись с возможностью их дальнейшего увеличения или уменьшения. Оптимистичная блокировка реализуется через «штамп» лока (javadoc):

double distanceFromOriginV1() { // A read-only method
 long stamp;
 if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
   double currentX = x;
   double currentY = y;
   if (sl.validate(stamp))
     return Math.sqrt(currentX * currentX + currentY * currentY);
 }
 stamp = sl.readLock(); // fall back to read lock
 try {
   double currentX = x;
   double currentY = y;
     return Math.sqrt(currentX * currentX + currentY * currentY);
 } finally {
   sl.unlockRead(stamp);
 }
}


Collections

ArrayBlockingQueue

Честная очередь для передачи сообщения из одного потока в другой. Поддерживает блокирующие (put() take()) и неблокирующие (offer() pool()) методы. Запрещает null значения. Емкость очереди должна быть указанна при создании.

ConcurrentHashMap

Ключ-значение структура, основанная на hash функции. Отсутствуют блокировки на чтение. При записи блокируется только часть карты (сегмент). Кол-во сегментов ограничено ближайшей к concurrencyLevel степени 2.

ConcurrentSkipListMap

Сбалансированная многопоточная ключ-значение структура (O(log n)). Поиск основан на списке с пропусками. Карта должна иметь возможность сравнивать ключи.

ConcurrentSkipListSet

ConcurrentSkipListMap без значений.

CopyOnWriteArrayList

Блокирующий на запись, не блокирующий на чтение список. Любая модификация создает новый экземпляр массива в памяти.

CopyOnWriteArraySet

CopyOnWriteArrayList без значений.

DelayQueue

PriorityBlockingQueue разрешающая получить элемент только после определенной задержки (задержк�� объявляется через Delayed интерфейс объекта). DelayQueue может быть использована для реализации планировщика. Емкость очереди не фиксирована.

LinkedBlockingDeque

Двунаправленная BlockingQueue, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.

LinkedBlockingQueue

Однонаправленная BlockingQueue, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.

LinkedTransferQueue

Однонаправленная `BlockingQueue`, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована. Данная очередь позволяет ожидать когда элемент «заберет» обработчик.

PriorityBlockingQueue

Однонаправленная `BlockingQueue`, разрешающая приоритизировать сообщения (через сравнение элементов). Запрещает null значения.

SynchronousQueue

Однонаправленная `BlockingQueue`, реализующая transfer() логику для put() методов.

Synchronization points

CountDownLatch

Барьер (await()), ожидающий конкретного (или больше) кол-ва вызовов countDown(). Состояние барьера не может быть сброшено.

CyclicBarrier

Барьер (await()), ожидающий конкретного кол-ва вызовов await() другими потоками. Когда кол-во потоков достигнет указанного будет вызван опциональный callback и блокировка снимется. Барьер сбрасывает свое состояние в начальное при освобождении ожидающих потоков и может быть использован повторно.

Exchanger

Барьер (`exchange()`) для синхронизации двух потоков. В момент синхронизации возможна volatile передача объектов между потоками.

Phaser

Расширение `CyclicBarrier`, позволяющая регистрировать и удалять участников на каждый цикл барьера.

Semaphore

Барьер, разрешающий только указанному кол-во потоков захватить монитор. По сути расширяет функционал `Lock` возможность находиться в блоке нескольким потокам.

Executors


ExecutorService пришел на замену new Thread(runnable) чтобы упростить работу с потоками. ExecutorService помогает повторно использовать освободившиеся потоки, организовывать очереди из задач для пула потоков, подписываться на результат выполнения задачи. Вместо интерфейса Runnable пул использует интерфейс Callable (умеет возвращать результат и кидать ошибки).

ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
    Object call() throws Exception {
        println("In thread")
        return "From thread"
    }
})
println("From main")
println(future.get())

try {
    pool.submit(new Callable() {
        Object call() throws Exception {
            throw new IllegalStateException()
        }
    }).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}

pool.shutdown()


Метод invokeAll отдает управление вызвавшему потоку только по завершению всех задач. Метод invokeAny возвращает результат первой успешно выполненной задачи, отменяя все последующие.

ThreadPoolExecutor

Пул потоков с возможностью указывать рабочее и максимальное кол-во потоков в пуле, очередь для задач.

ScheduledThreadPoolExecutor

Расширяет функционал ThreadPoolExecutor возможностью выполнять задачи отложенно или регулярно.

ThreadPoolExecutor

Более легкий пул потоков для «самовоспроизводящих» задач. Пул ожидает вызовов `fork()` и `join()` методов у дочерних задач в родительской.

class LNode {
    List<LNode> childs = []
    def object
}

class Finder extends RecursiveTask<LNode> {
    LNode  node
    Object expect

    protected LNode compute() {
        if (node?.object?.equals(expect)) {
            return node
        }
        node?.childs?.collect {
            new Finder(node: it, expect: expect).fork()
        }?.collect {
            it.join()
        }?.find {
            it != null
        }
    }
}

ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
        node: new LNode(
                childs: [
                        new LNode(object: "ivalid"),
                        new LNode(
                                object: "ivalid",
                                childs: [new LNode(object: "test")]
                        )
                ]
        ),
        expect: "test"
))

print("${invoke?.object}")


Accumulators _jdk 1.8_

Аккумуляторы позволяют выполнять примитивные операции (сумма/поиск максимального значения) над числовыми элементами в многопоточной среде без использования CAS.