Модель памяти, существующая на данный момент в Java, гарантирует ожидаемый порядок выполнения многопоточного кода, при отсутствии в этом коде гонок потоков. И для того, чтобы обезопасить ваш код от гонок, придуманы различные способы синхронизации и обмена данными между ними.
Пакет
В дочернем пакете
Для примера возьмем два массива
Результат выполнения будет ожидаемым:
Однако такой подход имеет существенные недостатки по производительности. В данном случае на бесполезную для нас работу, уходит больше ресурсов, чем на полезную:
Рассмотрим использование
Как видно из результатов «ошибочных» попыток было не так уж и много:
При решении использовать оптимистичную блокировку важно, чтобы действие с модифицируемой переменной не занимало много времени. Чем дольше это действие — тем чаще будут случаться ошибочные
На основе
В отличие от syncronized блокировок,
Несмотря на то, что
Не вдаваясь в подробности JMM: использовать
Дополняет свойства
Реализовывает оптимистичные и пессимистичные блокировки на чтение-запись с возможностью их дальнейшего увеличения или уменьшения. Оптимистичная блокировка реализуется через «штамп» лока (javadoc):
Честная очередь для передачи сообщения из одного потока в другой. Поддерживает блокирующие (
Ключ-значение структура, основанная на
Сбалансированная многопоточная ключ-значение структура (O(log n)). Поиск основан на списке с пропусками. Карта должна иметь возможность сравнивать ключи.
Блокирующий на запись, не блокирующий на чтение список. Любая модификация создает новый экземпляр массива в памяти.
Двунаправленная
Однонаправленная
Однонаправленная `BlockingQueue`, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована. Данная очередь позволяет ожидать когда элемент «заберет» обработчик.
Однонаправленная `BlockingQueue`, разрешающая приоритизировать сообщения (через сравнение элементов). Запрещает null значения.
Однонаправленная `BlockingQueue`, реализующая
Барьер (
Барьер (
Барьер (`exchange()`) для синхронизации двух потоков. В момент синхронизации возможна volatile передача объектов между потоками.
Расширение `CyclicBarrier`, позволяющая регистрировать и удалять участников на каждый цикл барьера.
Барьер, разрешающий только указанному кол-во потоков захватить монитор. По сути расширяет функционал `Lock` возможность находиться в блоке нескольким потокам.
Метод
Пул потоков с возможностью указывать рабочее и максимальное кол-во потоков в пуле, очередь для задач.
Расширяет функционал
Более легкий пул потоков для «самовоспроизводящих» задач. Пул ожидает вызовов `fork()` и `join()` методов у дочерних задач в родительской.
Аккумуляторы позволяют выполнять примитивные операции (сумма/поиск максимального значения) над числовыми элементами в многопоточной среде без использования CAS.
Пакет
java.util.concurrent, входящий в состав HotSpot JDK, предоставляет следующие инструменты для написания многопоточного кода:- Atomic
- Locks
- Collections
- Synchronization points
- Executors
- Accumulators _jdk 1.8_
Atomic
В дочернем пакете
java.util.concurrent.atomic находится набор классов для атомарной работы с примитивными типами. Контракт данных классов гарантирует выполнение операции compare-and-set за «1 единицу процессорного времени». При установке нового значения этой переменной вы также передаете ее старое значение (подход оптимистичной блокировки). Если с момента вызова метода значение переменной отличается от ожидаемого — результатом выполнения будет false.Для примера возьмем два массива
long переменных [1,2,3,4,5] и [-1,-2,-3,-4,-5]. Каждый из потоков будет последовательно итерироваться по массиву и суммировать элементы в единую переменную. Код (groovy) с пессимистичной блокировкой выглядит так:class Sum {
static monitor = new Object()
static volatile long sum = 0
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
synchronized (Sum.monitor) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
Sum.sum += it
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
Результат выполнения будет ожидаемым:
pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0
Однако такой подход имеет существенные недостатки по производительности. В данном случае на бесполезную для нас работу, уходит больше ресурсов, чем на полезную:
- попытка блокирования монитора
- блокировка потока
- разблокировка монитора
- разблокировка потока
Рассмотрим использование
AtomicLong для реализации оптимистичной блокировки при расчете этой же суммы:class Sum {
static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
while(true) {
long localSum = Sum.sum.get()
if (Sum.sum.compareAndSet(localSum, localSum + it)) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
break;
} else {
println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
}
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
Как видно из результатов «ошибочных» попыток было не так уж и много:
[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0
При решении использовать оптимистичную блокировку важно, чтобы действие с модифицируемой переменной не занимало много времени. Чем дольше это действие — тем чаще будут случаться ошибочные
compare-and-set, и тем чаще придется выполнять это действие повторно.На основе
compare-and-set может также реализовываться неблокирующая read блокировка. В данном случае в atomic переменной будет храниться версия обрабатываемого объекта. Получив значение версии до вычислений мы можем сверить ее после вычисления. Обычные read-write блокировки вступают в силу, только если проверка версии провалилась.class Transaction {
long debit
}
class Account {
AtomicLong version = new AtomicLong()
ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
List<Transaction> transactions = new ArrayList<Transaction>()
}
long balance(Account account) {
ReentrantReadWriteLock.ReadLock locked
while(true) {
long balance = 0
long version = account.version.get()
account.transactions.each {balance += it.debit}
//volatile write for JMM
if (account.version.compareAndSet(version, version)) {
if (locked) {locked.unlock()}
return balance
} else {
locked = account.readWriteLock.readLock()
}
}
}
void modifyTransaction(Account account, int position, long newDebit) {
def writeLock = account.readWriteLock.writeLock()
account.version.incrementAndGet()
account.transactions[position].debit = newDebit
writeLock.unlock()
}
Locks
ReentrantLock
В отличие от syncronized блокировок,
ReentrantLock позволяет более гибко выбирать моменты снятия и получения блокировки т.к. использует обычные Java вызовы. Также ReentrantLock позволяет получить информацию о текущем состоянии блокировки, разрешает «ожидать» блокировку в течение определенного времени. Поддерживает правильное рекурсивное получение и освобождение блокировки для одного потока. Если вам необходимы честные блокировки (соблюдающие очередность при захвате монитора) — ReentrantLock также снабжен этим механизмом.Несмотря на то, что
syncronized и ReentrantLock блокировки очень похожи — реализация на уровне JVM отличается довольно сильно. Не вдаваясь в подробности JMM: использовать
ReentrantLock вместо предоставляемой JVM syncronized блокировки стоит только в том случае, если у вас очень часто происходит битва потоков за монитор. В случае, когда в syncronized метод _обычно_ попадает лишь один поток — производительность ReentrantLock уступает механизму блокировок JVM.ReentrantReadWriteLock
Дополняет свойства
ReentrantLock возможностью захватывать множество блокировок на чтение и блокировку на запись. Блокировка на запись может быть «опущена» до блокировки на чтение, если это необходимо.StampedLock _jdk 1.8_
Реализовывает оптимистичные и пессимистичные блокировки на чтение-запись с возможностью их дальнейшего увеличения или уменьшения. Оптимистичная блокировка реализуется через «штамп» лока (javadoc):
double distanceFromOriginV1() { // A read-only method
long stamp;
if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
double currentX = x;
double currentY = y;
if (sl.validate(stamp))
return Math.sqrt(currentX * currentX + currentY * currentY);
}
stamp = sl.readLock(); // fall back to read lock
try {
double currentX = x;
double currentY = y;
return Math.sqrt(currentX * currentX + currentY * currentY);
} finally {
sl.unlockRead(stamp);
}
}
Collections
ArrayBlockingQueue
Честная очередь для передачи сообщения из одного потока в другой. Поддерживает блокирующие (
put() take()) и неблокирующие (offer() pool()) методы. Запрещает null значения. Емкость очереди должна быть указанна при создании.ConcurrentHashMap
Ключ-значение структура, основанная на
hash функции. Отсутствуют блокировки на чтение. При записи блокируется только часть карты (сегмент). Кол-во сегментов ограничено ближайшей к concurrencyLevel степени 2.ConcurrentSkipListMap
Сбалансированная многопоточная ключ-значение структура (O(log n)). Поиск основан на списке с пропусками. Карта должна иметь возможность сравнивать ключи.
ConcurrentSkipListSet
ConcurrentSkipListMap без значений.CopyOnWriteArrayList
Блокирующий на запись, не блокирующий на чтение список. Любая модификация создает новый экземпляр массива в памяти.
CopyOnWriteArraySet
CopyOnWriteArrayList без значений.DelayQueue
PriorityBlockingQueue разрешающая получить элемент только после определенной задержки (задержк�� объявляется через Delayed интерфейс объекта). DelayQueue может быть использована для реализации планировщика. Емкость очереди не фиксирована.LinkedBlockingDeque
Двунаправленная
BlockingQueue, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.LinkedBlockingQueue
Однонаправленная
BlockingQueue, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.LinkedTransferQueue
Однонаправленная `BlockingQueue`, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована. Данная очередь позволяет ожидать когда элемент «заберет» обработчик.
PriorityBlockingQueue
Однонаправленная `BlockingQueue`, разрешающая приоритизировать сообщения (через сравнение элементов). Запрещает null значения.
SynchronousQueue
Однонаправленная `BlockingQueue`, реализующая
transfer() логику для put() методов.Synchronization points
CountDownLatch
Барьер (
await()), ожидающий конкретного (или больше) кол-ва вызовов countDown(). Состояние барьера не может быть сброшено.CyclicBarrier
Барьер (
await()), ожидающий конкретного кол-ва вызовов await() другими потоками. Когда кол-во потоков достигнет указанного будет вызван опциональный callback и блокировка снимется. Барьер сбрасывает свое состояние в начальное при освобождении ожидающих потоков и может быть использован повторно.Exchanger
Барьер (`exchange()`) для синхронизации двух потоков. В момент синхронизации возможна volatile передача объектов между потоками.
Phaser
Расширение `CyclicBarrier`, позволяющая регистрировать и удалять участников на каждый цикл барьера.
Semaphore
Барьер, разрешающий только указанному кол-во потоков захватить монитор. По сути расширяет функционал `Lock` возможность находиться в блоке нескольким потокам.
Executors
ExecutorService пришел на замену new Thread(runnable) чтобы упростить работу с потоками. ExecutorService помогает повторно использовать освободившиеся потоки, организовывать очереди из задач для пула потоков, подписываться на результат выполнения задачи. Вместо интерфейса Runnable пул использует интерфейс Callable (умеет возвращать результат и кидать ошибки).ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
Object call() throws Exception {
println("In thread")
return "From thread"
}
})
println("From main")
println(future.get())
try {
pool.submit(new Callable() {
Object call() throws Exception {
throw new IllegalStateException()
}
}).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}
pool.shutdown()
Метод
invokeAll отдает управление вызвавшему потоку только по завершению всех задач. Метод invokeAny возвращает результат первой успешно выполненной задачи, отменяя все последующие.ThreadPoolExecutor
Пул потоков с возможностью указывать рабочее и максимальное кол-во потоков в пуле, очередь для задач.
ScheduledThreadPoolExecutor
Расширяет функционал
ThreadPoolExecutor возможностью выполнять задачи отложенно или регулярно.ThreadPoolExecutor
Более легкий пул потоков для «самовоспроизводящих» задач. Пул ожидает вызовов `fork()` и `join()` методов у дочерних задач в родительской.
class LNode {
List<LNode> childs = []
def object
}
class Finder extends RecursiveTask<LNode> {
LNode node
Object expect
protected LNode compute() {
if (node?.object?.equals(expect)) {
return node
}
node?.childs?.collect {
new Finder(node: it, expect: expect).fork()
}?.collect {
it.join()
}?.find {
it != null
}
}
}
ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
node: new LNode(
childs: [
new LNode(object: "ivalid"),
new LNode(
object: "ivalid",
childs: [new LNode(object: "test")]
)
]
),
expect: "test"
))
print("${invoke?.object}")
Accumulators _jdk 1.8_
Аккумуляторы позволяют выполнять примитивные операции (сумма/поиск максимального значения) над числовыми элементами в многопоточной среде без использования CAS.
