Модель памяти, существующая на данный момент в Java, гарантирует ожидаемый порядок выполнения многопоточного кода, при отсутствии в этом коде гонок потоков. И для того, чтобы обезопасить ваш код от гонок, придуманы различные способы синхронизации и обмена данными между ними.
Пакет
В дочернем пакете
Для примера возьмем два массива
Результат выполнения будет ожидаемым:
Однако такой подход имеет существенные недостатки по производительности. В данном случае на бесполезную для нас работу, уходит больше ресурсов, чем на полезную:
Рассмотрим использование
Как видно из результатов «ошибочных» попыток было не так уж и много:
При решении использовать оптимистичную блокировку важно, чтобы действие с модифицируемой переменной не занимало много времени. Чем дольше это действие — тем чаще будут случаться ошибочные
На основе
В отличие от syncronized блокировок,
Несмотря на то, что
Не вдаваясь в подробности JMM: использовать
Дополняет свойства
Реализовывает оптимистичные и пессимистичные блокировки на чтение-запись с возможностью их дальнейшего увеличения или уменьшения. Оптимистичная блокировка реализуется через «штамп» лока (javadoc):
Честная очередь для передачи сообщения из одного потока в другой. Поддерживает блокирующие (
Ключ-значение структура, основанная на
Сбалансированная многопоточная ключ-значение структура (O(log n)). Поиск основан на списке с пропусками. Карта должна иметь возможность сравнивать ключи.
Блокирующий на запись, не блокирующий на чтение список. Любая модификация создает новый экземпляр массива в памяти.
Двунаправленная
Однонаправленная
Однонаправленная `BlockingQueue`, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована. Данная очередь позволяет ожидать когда элемент «заберет» обработчик.
Однонаправленная `BlockingQueue`, разрешающая приоритизировать сообщения (через сравнение элементов). Запрещает null значения.
Однонаправленная `BlockingQueue`, реализующая
Барьер (
Барьер (
Барьер (`exchange()`) для синхронизации двух потоков. В момент синхронизации возможна volatile передача объектов между потоками.
Расширение `CyclicBarrier`, позволяющая регистрировать и удалять участников на каждый цикл барьера.
Барьер, разрешающий только указанному кол-во потоков захватить монитор. По сути расширяет функционал `Lock` возможность находиться в блоке нескольким потокам.
Метод
Пул потоков с возможностью указывать рабочее и максимальное кол-во потоков в пуле, очередь для задач.
Расширяет функционал
Более легкий пул потоков для «самовоспроизводящих» задач. Пул ожидает вызовов `fork()` и `join()` методов у дочерних задач в родительской.
Аккумуляторы позволяют выполнять примитивные операции (сумма/поиск максимального значения) над числовыми элементами в многопоточной среде без использования CAS.
Пакет
java.util.concurrent, входящий в состав HotSpot JDK, предоставляет следующие инструменты для написания многопоточного кода:- Atomic
- Locks
- Collections
- Synchronization points
- Executors
- Accumulators _jdk 1.8_
Atomic
В дочернем пакете
java.util.concurrent.atomic находится набор классов для атомарной работы с примитивными типами. Контракт данных классов гарантирует выполнение операции compare-and-set за «1 единицу процессорного времени». При установке нового значения этой переменной вы также передаете ее старое значение (подход оптимистичной блокировки). Если с момента вызова метода значение переменной отличается от ожидаемого — результатом выполнения будет false.Для примера возьмем два массива
long переменных [1,2,3,4,5] и [-1,-2,-3,-4,-5]. Каждый из потоков будет последовательно итерироваться по массиву и суммировать элементы в единую переменную. Код (groovy) с пессимистичной блокировкой выглядит так:class Sum {
static monitor = new Object()
static volatile long sum = 0
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
synchronized (Sum.monitor) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
Sum.sum += it
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
Результат выполнения будет ожидаемым:
pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0
Однако такой подход имеет существенные недостатки по производительности. В данном случае на бесполезную для нас работу, уходит больше ресурсов, чем на полезную:
- попытка блокирования монитора
- блокировка потока
- разблокировка монитора
- разблокировка потока
Рассмотрим использование
AtomicLong для реализации оптимистичной блокировки при расчете этой же суммы:class Sum {
static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
while(true) {
long localSum = Sum.sum.get()
if (Sum.sum.compareAndSet(localSum, localSum + it)) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
break;
} else {
println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
}
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
Как видно из результатов «ошибочных» попыток было не так уж и много:
[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0
При решении использовать оптимистичную блокировку важно, чтобы действие с модифицируемой переменной не занимало много времени. Чем дольше это действие — тем чаще будут случаться ошибочные
compare-and-set, и тем чаще придется выполнять это действие повторно.На основе
compare-and-set может также реализовываться неблокирующая read блокировка. В данном случае в atomic переменной будет храниться версия обрабатываемого объекта. Получив значение версии до вычислений мы можем сверить ее после вычисления. Обычные read-write блокировки вступают в силу, только если проверка версии провалилась.class Transaction {
long debit
}
class Account {
AtomicLong version = new AtomicLong()
ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
List<Transaction> transactions = new ArrayList<Transaction>()
}
long balance(Account account) {
ReentrantReadWriteLock.ReadLock locked
while(true) {
long balance = 0
long version = account.version.get()
account.transactions.each {balance += it.debit}
//volatile write for JMM
if (account.version.compareAndSet(version, version)) {
if (locked) {locked.unlock()}
return balance
} else {
locked = account.readWriteLock.readLock()
}
}
}
void modifyTransaction(Account account, int position, long newDebit) {
def writeLock = account.readWriteLock.writeLock()
account.version.incrementAndGet()
account.transactions[position].debit = newDebit
writeLock.unlock()
}
Locks
ReentrantLock
В отличие от syncronized блокировок,
ReentrantLock позволяет более гибко выбирать моменты снятия и получения блокировки т.к. использует обычные Java вызовы. Также ReentrantLock позволяет получить информацию о текущем состоянии блокировки, разрешает «ожидать» блокировку в течение определенного времени. Поддерживает правильное рекурсивное получение и освобождение блокировки для одного потока. Если вам необходимы честные блокировки (соблюдающие очередность при захвате монитора) — ReentrantLock также снабжен этим механизмом.Несмотря на то, что
syncronized и ReentrantLock блокировки очень похожи — реализация на уровне JVM отличается довольно сильно. Не вдаваясь в подробности JMM: использовать
ReentrantLock вместо предоставляемой JVM syncronized блокировки стоит только в том случае, если у вас очень часто происходит битва потоков за монитор. В случае, когда в syncronized метод _обычно_ попадает лишь один поток — производительность ReentrantLock уступает механизму блокировок JVM.ReentrantReadWriteLock
Дополняет свойства
ReentrantLock возможностью захватывать множество блокировок на чтение и блокировку на запись. Блокировка на запись может быть «опущена» до блокировки на чтение, если это необходимо.StampedLock _jdk 1.8_
Реализовывает оптимистичные и пессимистичные блокировки на чтение-запись с возможностью их дальнейшего увеличения или уменьшения. Оптимистичная блокировка реализуется через «штамп» лока (javadoc):
double distanceFromOriginV1() { // A read-only method
long stamp;
if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
double currentX = x;
double currentY = y;
if (sl.validate(stamp))
return Math.sqrt(currentX * currentX + currentY * currentY);
}
stamp = sl.readLock(); // fall back to read lock
try {
double currentX = x;
double currentY = y;
return Math.sqrt(currentX * currentX + currentY * currentY);
} finally {
sl.unlockRead(stamp);
}
}
Collections
ArrayBlockingQueue
Честная очередь для передачи сообщения из одного потока в другой. Поддерживает блокирующие (
put() take()) и неблокирующие (offer() pool()) методы. Запрещает null значения. Емкость очереди должна быть указанна при создании.ConcurrentHashMap
Ключ-значение структура, основанная на
hash функции. Отсутствуют блокировки на чтение. При записи блокируется только часть карты (сегмент). Кол-во сегментов ограничено ближайшей к concurrencyLevel степени 2.ConcurrentSkipListMap
Сбалансированная многопоточная ключ-значение структура (O(log n)). Поиск основан на списке с пропусками. Карта должна иметь возможность сравнивать ключи.
ConcurrentSkipListSet
ConcurrentSkipListMap без значений.CopyOnWriteArrayList
Блокирующий на запись, не блокирующий на чтение список. Любая модификация создает новый экземпляр массива в памяти.
CopyOnWriteArraySet
CopyOnWriteArrayList без значений.DelayQueue
PriorityBlockingQueue разрешающая получить элемент только после определенной задержки (задержка объявляется через Delayed интерфейс объекта). DelayQueue может быть использована для реализации планировщика. Емкость очереди не фиксирована.LinkedBlockingDeque
Двунаправленная
BlockingQueue, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.LinkedBlockingQueue
Однонаправленная
BlockingQueue, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.LinkedTransferQueue
Однонаправленная `BlockingQueue`, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована. Данная очередь позволяет ожидать когда элемент «заберет» обработчик.
PriorityBlockingQueue
Однонаправленная `BlockingQueue`, разрешающая приоритизировать сообщения (через сравнение элементов). Запрещает null значения.
SynchronousQueue
Однонаправленная `BlockingQueue`, реализующая
transfer() логику для put() методов.Synchronization points
CountDownLatch
Барьер (
await()), ожидающий конкретного (или больше) кол-ва вызовов countDown(). Состояние барьера не может быть сброшено.CyclicBarrier
Барьер (
await()), ожидающий конкретного кол-ва вызовов await() другими потоками. Когда кол-во потоков достигнет указанного будет вызван опциональный callback и блокировка снимется. Барьер сбрасывает свое состояние в начальное при освобождении ожидающих потоков и может быть использован повторно.Exchanger
Барьер (`exchange()`) для синхронизации двух потоков. В момент синхронизации возможна volatile передача объектов между потоками.
Phaser
Расширение `CyclicBarrier`, позволяющая регистрировать и удалять участников на каждый цикл барьера.
Semaphore
Барьер, разрешающий только указанному кол-во потоков захватить монитор. По сути расширяет функционал `Lock` возможность находиться в блоке нескольким потокам.
Executors
ExecutorService пришел на замену new Thread(runnable) чтобы упростить работу с потоками. ExecutorService помогает повторно использовать освободившиеся потоки, организовывать очереди из задач для пула потоков, подписываться на результат выполнения задачи. Вместо интерфейса Runnable пул использует интерфейс Callable (умеет возвращать результат и кидать ошибки).ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
Object call() throws Exception {
println("In thread")
return "From thread"
}
})
println("From main")
println(future.get())
try {
pool.submit(new Callable() {
Object call() throws Exception {
throw new IllegalStateException()
}
}).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}
pool.shutdown()
Метод
invokeAll отдает управление вызвавшему потоку только по завершению всех задач. Метод invokeAny возвращает результат первой успешно выполненной задачи, отменяя все последующие.ThreadPoolExecutor
Пул потоков с возможностью указывать рабочее и максимальное кол-во потоков в пуле, очередь для задач.
ScheduledThreadPoolExecutor
Расширяет функционал
ThreadPoolExecutor возможностью выполнять задачи отложенно или регулярно.ThreadPoolExecutor
Более легкий пул потоков для «самовоспроизводящих» задач. Пул ожидает вызовов `fork()` и `join()` методов у дочерних задач в родительской.
class LNode {
List<LNode> childs = []
def object
}
class Finder extends RecursiveTask<LNode> {
LNode node
Object expect
protected LNode compute() {
if (node?.object?.equals(expect)) {
return node
}
node?.childs?.collect {
new Finder(node: it, expect: expect).fork()
}?.collect {
it.join()
}?.find {
it != null
}
}
}
ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
node: new LNode(
childs: [
new LNode(object: "ivalid"),
new LNode(
object: "ivalid",
childs: [new LNode(object: "test")]
)
]
),
expect: "test"
))
print("${invoke?.object}")
Accumulators _jdk 1.8_
Аккумуляторы позволяют выполнять примитивные операции (сумма/поиск максимального значения) над числовыми элементами в многопоточной среде без использования CAS.
