Всем кофе!
Завтра у нас плавненько стартует практически юбилейный поток курс «Разработчик Java» — уже шестой по счёту начиная с апреля прошлого года. А это значит, что мы снова подобрали, перевели интереснейший материал, которым делимся с вами.
Поехали!
Эта памятка поможет Java-разработчикам, работающим с многопоточными программами, понять основные концепции параллелизма и способы их применения. Вы ознакомьтесь с ключевыми аспектами языка Java со ссылками на стандартную библиотеку.
РАЗДЕЛ 1
Вступление
С момента своего создания Java поддерживает ключевые концепции параллелизма, такие как потоки и блокировки. Эта памятка поможет Java-разработчикам, работающим с многопоточными программами, понять основные концепции параллелизма и способы их применения.
РАЗДЕЛ 2
Концепции
Таблица 1: Концепции параллелизма
Состояние гонки (Race condition)
Состояние гонки возникает, когда один и тот же ресурс используется несколькими потоками одновременно, и в зависимости от порядка действий каждого потока может быть несколько возможных результатов. Код, приведенный ниже, не является потокобезопасным, и переменная
Гонка данных (Data race)
Гонка данных возникает, когда два или более потока пытаются получить доступ к одной и той же не финальной переменной без синхронизации. Отсутствие синхронизации может привести к внесению изменений, которые не будут видны другим потокам, из-за этого возможно чтение устаревших данных, что, в свою очередь, приводит к бесконечным циклам, поврежденным структурам данных или неточным вычислениям. Этот код может привести к бесконечному циклу, потому что считывающий поток может так и не заметить изменения, внесенные перезаписывающими потоками:
РАЗДЕЛ 3
Модель памяти Java: отношение happens-before
Модель памяти Java определяется с точки зрения таких действий, как чтение/запись полей и синхронизация в мониторе. Действия упорядочены с помощью отношения happens-before (выполняется прежде), которое может быть использовано для объяснения того, когда поток видит результат действий другого потока, и что представляет собой правильно синхронизированная программа.
ОТНОШЕНИЯ HAPPENS-BEFORE ИМЕЮТ СЛЕДУЮЩИЕ СВОЙСТВА:
На Изображении 1
Изображение 1: Пример happens-before
РАЗДЕЛ 4
Стандартные функции синхронизации
Ключевое слово
Ключевое слово
Ключевое слово synchronized можно также раскрыть на уровне методов.
Таблица 2: Мониторы, которые используются, когда весь метод синхронизирован
Блокировка реентерабельна (reentrant), поэтому, если поток уже содержит блокировку, он может успешно получить ее снова.
Уровень соперничества влияет на способ захвата монитора:
Таблица 3: Состояния мониторов
Методы
Наиболее распространенным примером является условный цикл:
Ключевое слово
Атомарность
Пакет
Используя классы AtomicXXX, можно реализовать атомарную операцию
И
Если вам нужен счетчик и нет необходимости получать его значение атомарно, подумайте об использовании
Один из способов хранить данные в потоке и сделать блокировку необязательной — это использовать хранилище
РАЗДЕЛ 5
Безопасная публикация
Публикация объекта делает его ссылку доступной за пределами текущей области (например, возврат ссылки из геттера). Обеспечение безопасной публикации объекта (только когда он полностью создан) может потребовать синхронизации. Безопасность публикации может быть достигнута с использованием:
Убедитесь, что this-ссылка не испарилась во время создания.
РАЗДЕЛ 6
Неизменяемые объекты
Одним из самых замечательных свойств неизменяемых объектов является потокобезопасность, поэтому синхронизация для них не нужна. Требования к неизменному объекту:
Пример неизменяемого объекта:
РАЗДЕЛ 7
Потоки
Класс
Таблица 4: Состояния потоков
Таблица 5: Thread coordination methods Методы координации потоков
Как обрабатывать InterruptedException?
Обработка неожиданных исключений
В потоках может указываться
РАЗДЕЛ 8
Жизнеспособность (Liveness)
Пример потенциального дэдлока:
Взаимная блокировка происходит, если в одно и то же время:
Способы предотвращения дэдлока:
JVM способен обнаруживать взаимные блокировки мониторов и выводить информацию о них в дампах потоков.
Livelock и потоковое голодание
Livelock возникает, когда потоки тратят все свое время на переговоры о доступе к ресурсу или обнаруживают и избегают тупиковой ситуации так, что поток фактически не продвигается вперед. Голодание возникает, когда потоки сохраняют блокировку в течение длительных периодов, так что некоторые потоки «голодают» без прогресса.
РАЗДЕЛ 9
Пулы потоков
Основным интерфейсом для пулов потоков является
Таблица 6: Методы статической фабрики
При определении размера пулов потока часто бывает полезно определить размер числа логических ядер в машине, на которой запущено приложение. Получить это значение в Java можно вызвав
Таблица 7: Реализации пула потоков
Задачи отправляются с помощью
Таблица 8: Функциональные интерфейсы задач
Блокировки
Пакет
ReadWriteLock
Пакет
Учтите, что если
Если есть несколько
Параллельные коллекции
Самый простой способ сделать коллекцию потокобезопасной — использование родственных методов
Списки
Таблица 9: Списки в
Таблица 10: Ассоциативные массивы в
Множества
Таблица 11: Множества в
Другим подходом к созданию параллельного множества является обертка параллельного Map:
Очереди
Очереди выступают в качестве труб между «производителями» и «потребителями». Элементы помещаются в один конец трубы и выходят из другого конца трубы в том же порядке «первый зашел, первый вышел» (FIFO). Интерфейс
Таблица 12: Очереди в
THE END
Как всегда ждём пожелания и вопросы.
Спасибо.
Завтра у нас плавненько стартует практически юбилейный поток курс «Разработчик Java» — уже шестой по счёту начиная с апреля прошлого года. А это значит, что мы снова подобрали, перевели интереснейший материал, которым делимся с вами.
Поехали!
Эта памятка поможет Java-разработчикам, работающим с многопоточными программами, понять основные концепции параллелизма и способы их применения. Вы ознакомьтесь с ключевыми аспектами языка Java со ссылками на стандартную библиотеку.
РАЗДЕЛ 1
Вступление
С момента своего создания Java поддерживает ключевые концепции параллелизма, такие как потоки и блокировки. Эта памятка поможет Java-разработчикам, работающим с многопоточными программами, понять основные концепции параллелизма и способы их применения.
РАЗДЕЛ 2
Концепции
Концепция | Описание |
---|---|
Атомарная операция — это операция, которая выполняется полностью или не выполняется совсем, частичное выполнение невозможно. | |
Visibility (видимость) | Условия, при которых один поток видит изменения, сделанные другим потоком |
Таблица 1: Концепции параллелизма
Состояние гонки (Race condition)
Состояние гонки возникает, когда один и тот же ресурс используется несколькими потоками одновременно, и в зависимости от порядка действий каждого потока может быть несколько возможных результатов. Код, приведенный ниже, не является потокобезопасным, и переменная
value
может быть инициализирована больше, чем один раз, так как check-then-act
(проверка на null
, а затем инициализация), которая лениво инициализирует поле, не является атомарной:class Lazy <T> {
private volatile T value;
T get() {
if (value == null)
value = initialize();
return value;
}
}
Гонка данных (Data race)
Гонка данных возникает, когда два или более потока пытаются получить доступ к одной и той же не финальной переменной без синхронизации. Отсутствие синхронизации может привести к внесению изменений, которые не будут видны другим потокам, из-за этого возможно чтение устаревших данных, что, в свою очередь, приводит к бесконечным циклам, поврежденным структурам данных или неточным вычислениям. Этот код может привести к бесконечному циклу, потому что считывающий поток может так и не заметить изменения, внесенные перезаписывающими потоками:
class Waiter implements Runnable {
private boolean shouldFinish;
void finish() { shouldFinish = true; }
public void run() {
long iteration = 0;
while (!shouldFinish) {
iteration++;
}
System.out.println("Finished after: " + iteration);
}
}
class DataRace {
public static void main(String[] args) throws InterruptedException {
Waiter waiter = new Waiter();
Thread waiterThread = new Thread(waiter);
waiterThread.start();
waiter.finish();
waiterThread.join();
}
}
РАЗДЕЛ 3
Модель памяти Java: отношение happens-before
Модель памяти Java определяется с точки зрения таких действий, как чтение/запись полей и синхронизация в мониторе. Действия упорядочены с помощью отношения happens-before (выполняется прежде), которое может быть использовано для объяснения того, когда поток видит результат действий другого потока, и что представляет собой правильно синхронизированная программа.
ОТНОШЕНИЯ HAPPENS-BEFORE ИМЕЮТ СЛЕДУЮЩИЕ СВОЙСТВА:
- Вызов Thread#start происходит до любого действия в этом потоке.
- Возврат монитора происходит до любого последующего захвата этого же монитора.
- Запись в volatile-переменную происходит до любого последующего считывания volatile-переменной.
- Запись в final-переменную происходит до публикации ссылки объекта.
- Все действия в потоке выполняются до возвращения из Thread#join в этом потоке.
На Изображении 1
Action X
происходит до Action Y
, поэтому в Thread 2
все операции справа от Action Y
будут видеть все операции слева от Action X
в Thread 1
.Изображение 1: Пример happens-before
РАЗДЕЛ 4
Стандартные функции синхронизации
Ключевое слово
synchronized
Ключевое слово
synchronized
используется для предотвращения одновременного выполнения разными потоками одного и того же блока кода. Оно гарантирует, что, если вы получили блокировку (войдя в синхронизированный блок), данные, на которые наложена эта блокировка, обрабатываются в эксклюзивном режиме, поэтому операция может считаться атомарной. Кроме того, оно гарантирует, что другие потоки увидят результат операции после того, как получат такую же блокировку.class AtomicOperation {
private int counter0;
private int counter1;
void increment() {
synchronized (this) {
counter0++;
counter1++;
}
}
}
Ключевое слово synchronized можно также раскрыть на уровне методов.
ССЫЛКА, ИСПОЛЬЗУЕМАЯ КАК МОНИТОР | |
---|---|
static | ссылка на объект Class<?> |
non-static | this-ссылка |
Таблица 2: Мониторы, которые используются, когда весь метод синхронизирован
Блокировка реентерабельна (reentrant), поэтому, если поток уже содержит блокировку, он может успешно получить ее снова.
class Reentrantcy {
synchronized void doAll() {
doFirst();
doSecond();
}
synchronized void doFirst() {
System.out.println("First operation is successful.");
}
synchronized void doSecond() {
System.out.println("Second operation is successful.");
}
}
Уровень соперничества влияет на способ захвата монитора:
Описание | |
---|---|
init | Только что создан, пока никем не был захвачен. |
biased | Борьбы нет, и код, защищенный блокировкой, выполняется только одним потоком. Самый дешевый для захвата. |
thin | Монитор захватывается несколькими потоками без борьбы. Для блокировки используется сравнительно дешевый CAS. |
fat | Возникает борьба. JVM запрашивает мьютексы ОС и позволяет планировщику ОС обрабатывать парковки потоков и пробуждения. |
Таблица 3: Состояния мониторов
wait/notify
Методы
wait/notify/notifyAll
объявляются в классе Object
. wait
используется, чтобы заставить поток перейти в состояние WAITING
или TIMED_WAITING
(если передано значение тайм-аута). Чтобы разбудить поток, можно сделать любое из этих действий:- Другой поток вызывает notify, который пробуждает произвольный поток, ожидающий на мониторе.
- Другой поток вызывает notifyAll, который пробуждает все потоки, ожидающие на мониторе.
- Вызывается Thread#interrupt. В этом случае бросается исключение InterruptedException.
Наиболее распространенным примером является условный цикл:
class ConditionLoop {
private boolean condition;
synchronized void waitForCondition() throws InterruptedException {
while (!condition) {
wait();
}
}
synchronized void satisfyCondition() {
condition = true;
notifyAll();
}
}
- Имейте в виду, что для того, чтобы использовать
wait/notify/notifyAll
для объекта, вам необходимо сначала наложить блокировку на этот объект. - Всегда ждите внутри цикла, проверяющего условие, выполнение которого вы ожидаете. Это касается проблемы синхронизации, если другой поток удовлетворяет условию до начала ожидания. Кроме того, это защищает ваш код от побочных пробуждений, которые могут (и будут) происходить.
- Всегда проверяйте, что вы удовлетворяете условию ожидания перед вызовом notify/notifyAll. Несоблюдение этого требования приведет к уведомлению, но поток не сможет избежать цикла ожидания.
Ключевое слово
volatile
volatile
решает проблему видимости и делает изменение значения атомарным, потому что здесь есть отношение happens-before: запись в volatile-переменную происходит до любого последующего считывания volatile-переменной. Таким образом, оно гарантирует, что при последующем считывании поля будет видно значение, которое было задано самой последней записью.class VolatileFlag implements Runnable {
private volatile boolean shouldStop;
public void run() {
while (!shouldStop) {
//do smth
}
System.out.println("Stopped.");
}
void stop() {
shouldStop = true;
}
public static void main(String[] args) throws InterruptedException {
VolatileFlag flag = new VolatileFlag();
Thread thread = new Thread(flag);
thread.start();
flag.stop();
thread.join();
}
}
Атомарность
Пакет
java.util.concurrent.atomic
содержит набор классов, которые поддерживают составные атомарные действия над одним значением без блокировок, подобно volatile
.Используя классы AtomicXXX, можно реализовать атомарную операцию
check-then-act
:class CheckThenAct {
private final AtomicReference<String> value = new AtomicReference<>();
void initialize() {
if (value.compareAndSet(null, "Initialized value")) {
System.out.println("Initialized only once.");
}
}
}
И
AtomicInteger
, и AtomicLong
имеют атомарную операцию инкремента/декремента:class Increment {
private final AtomicInteger state = new AtomicInteger();
void advance() {
int oldState = state.getAndIncrement();
System.out.println("Advanced: '" + oldState + "' -> '" + (oldState + 1) + "'.");
}
}
Если вам нужен счетчик и нет необходимости получать его значение атомарно, подумайте об использовании
LongAdder
вместо AtomicLong/AtomicInteger
. LongAdder
обрабатывает значение в нескольких ячейках и увеличивает их число, если нужно, и, следовательно, он работает лучше при высокой конкуренции.ThreadLocal
Один из способов хранить данные в потоке и сделать блокировку необязательной — это использовать хранилище
ThreadLocal
. Концептуально ThreadLocal
действует так, как будто в каждом потоке есть своя версия переменной. ThreadLocal
обычно используется для фиксации значений каждого потока, таких как «текущая транзакция», или других ресурсов. Кроме того, они используются для содержания поточных счетчиков, статистики или генераторов идентификаторов.class TransactionManager {
private final ThreadLocal<Transaction> currentTransaction
= ThreadLocal.withInitial(NullTransaction::new);
Transaction currentTransaction() {
Transaction current = currentTransaction.get();
if (current.isNull()) {
current = new TransactionImpl();
currentTransaction.set(current);
}
return current;
}
}
РАЗДЕЛ 5
Безопасная публикация
Публикация объекта делает его ссылку доступной за пределами текущей области (например, возврат ссылки из геттера). Обеспечение безопасной публикации объекта (только когда он полностью создан) может потребовать синхронизации. Безопасность публикации может быть достигнута с использованием:
- Статических инициализаторов. Только один поток может инициализировать статические переменные, поскольку инициализация класса выполняется под исключительной блокировкой.
class StaticInitializer {
// Публикация неизменяемого объекта без дополнительной инициализации
public static final Year year = Year.of(2017);
public static final Set<String> keywords;
// Использование статического инициализатора для построения сложного объекта
static {
// Создание изменяемого множества
Set<String> keywordsSet = new HashSet<>();
// Состояние инициализации
keywordsSet.add("java");
keywordsSet.add("concurrency");
// Делаем множество немодифицируемым
keywords = Collections.unmodifiableSet(keywordsSet);
}
}
- Volatile-поля. Считывающий поток всегда будет считывать последнее значение, потому что запись в volatile-переменную происходит до (happens before) любого последующего чтения.
class Volatile {
private volatile String state;
void setState(String state) {
this.state = state;
}
String getState() {
return state;
}
}
- Атомарности. Например,
AtomicInteger
сохраняет значение в volatile-поле, поэтому правило для volatile-переменных здесь тоже применимо.
class Atomics {
private final AtomicInteger state = new AtomicInteger();
void initializeState(int state) {
this.state.compareAndSet(0, state);
}
int getState() {
return state.get();
}
}
- Final-полей.
class Final {
private final String state;
Final(String state) {
this.state = state;
}
String getState() {
return state;
}
}
Убедитесь, что this-ссылка не испарилась во время создания.
class ThisEscapes {
private final String name;
ThisEscapes(String name) {
Cache.putIntoCache(this);
this.name = name;
}
String getName() { return name; }
}
class Cache {
private static final Map<String, ThisEscapes> CACHE = new ConcurrentHashMap<>();
static void putIntoCache(ThisEscapes thisEscapes) {
// 'this' ссылка испарилась прежде, чем объект полностью сконструирован.
CACHE.putIfAbsent(thisEscapes.getName(), thisEscapes);
}
}
- Правильно синхронизированных полей.
class Synchronization {
private String state;
synchronized String getState() {
if (state == null)
state = "Initial";
return state;
}
}
РАЗДЕЛ 6
Неизменяемые объекты
Одним из самых замечательных свойств неизменяемых объектов является потокобезопасность, поэтому синхронизация для них не нужна. Требования к неизменному объекту:
- Все поля являются final-полями.
- Все поля должны быть либо изменчивыми, либо неизменяемыми объектами, но не выходить за пределы объекта, поэтому состояние объекта не может быть изменено после создания.
- Ссылка this не исчезает во время создания.
- Класс является final-классом, поэтому переопределение его поведения в подклассах невозможно.
Пример неизменяемого объекта:
// Помечается как final - подклассы запрещены
public final class Artist {
// Неизменяемый объект, поле final
private final String name;
// Коллекция неизменяемых объектов, final поле
private final List<Track> tracks;
public Artist(String name, List<Track> tracks) {
this.name = name;
// Защитная копия
List<Track> copy = new ArrayList<>(tracks);
// Превращение изменяемой коллекции в неизменяемую
this.tracks = Collections.unmodifiableList(copy);
// 'this' никуда не передается во время создания
}
// Getters, equals, hashCode, toString
}
//Помечается как final - запрещается наследование
public final class Track {
//Неизменяемый объект, поле final
private final String title;
public Track(String title) {
this.title = title;
}
// Getters, equals, hashCode, toString
}
РАЗДЕЛ 7
Потоки
Класс
java.lang.Thread
используется для представления приложения или потока JVM. Код всегда выполняется в контексте некоторого класса Thread (чтобы получить текущий поток вы можете использовать Thread#currentThread()).
Описание | |
---|---|
NEW | Не запускался. |
Запущен и работает. | |
BLOCKED | Ожидание на мониторе — он пытается получить блокировку и войти в критическую секцию. |
WAITING | Ожидание выполнения определенного действия другим потоком (notify/notifyAll, LockSupport#unpark). |
То же, что и WAITING, но с таймаутом. | |
TERMINATED | Остановлен |
Таблица 4: Состояния потоков
Описание | |
---|---|
start | Запускает экземпляр класса Thread и выполняет метод run(). |
join | Блокирует до окончания потока. |
interrupt | Прерывает поток. Если поток заблокирован в методе, который отвечает на прерывания, в другом потоке будет брошен InterruptedException, в противном случае будет установлен статус прерывания. |
stop, suspend, resume, destroy | Все эти методы устарели. Они выполняют опасные операции в зависимости от состояния рассматриваемого потока. Вместо них используйте Thread#interrupt() или флаг volatile, чтобы указать потоку, что он должен делать |
Таблица 5: Thread coordination methods Методы координации потоков
Как обрабатывать InterruptedException?
- Очистите все ресурсы и завершите выполнение потока, если это возможно на текущем уровне.
- Объявите, что текущий метод бросает InterruptedException.
- Если метод не порождает исключение InterruptedException, прерванный флаг должен быть восстановлен в true, вызывая Thread.currentThread().interrupt() и должно быть порождено исключение, которое является более подходящим на этом уровне. Очень важно вернуть флаг true, чтобы дать возможность обрабатывать прерывания на более высоком уровне.
Обработка неожиданных исключений
В потоках может указываться
UncaughtExceptionHandler
, который получит уведомление о любом неперехваченном исключении, из-за которого поток прерывается.Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler((failedThread, exception) -> {
logger.error("Caught unexpected exception in thread '{}'.",
failedThread.getName(), exception);
});
thread.start();
РАЗДЕЛ 8
Жизнеспособность (Liveness)
Deadlock
Deadlock
, или взаимная блокировка, возникает, когда есть несколько потоков и каждый ожидает ресурс, принадлежащий другому потоку, так что формируется цикл из ресурсов и ожидающих их потоков. Наиболее очевидным видом ресурса является монитор объекта, но любой ресурс, который вызывает блокировку (например, wait/notify
), также подходит.Пример потенциального дэдлока:
class Account {
private long amount;
void plus(long amount) { this.amount += amount; }
void minus(long amount) {
if (this.amount < amount)
throw new IllegalArgumentException();
else
this.amount -= amount;
}
static void transferWithDeadlock(long amount, Account first, Account second){
synchronized (first) {
synchronized (second) {
first.minus(amount);
second.plus(amount);
}
}
}
}
Взаимная блокировка происходит, если в одно и то же время:
- Один поток пытается перенести данные с одного аккаунта на другой и уже наложил блокировку на первый аккаунт.
- Другой поток пытается перенести данные со второго аккаунта на первый, и уже наложил блокировку на второй аккаунт.
Способы предотвращения дэдлока:
- Порядок блокировок — всегда накладывайте блокировки в одном и том же порядке.
class Account {
private long id;
private long amount;
// Некоторые методы опущены
static void transferWithLockOrdering(long amount, Account first, Account second){
boolean lockOnFirstAccountFirst = first.id < second.id;
Account firstLock = lockOnFirstAccountFirst ? first : second;
Account secondLock = lockOnFirstAccountFirst ? second : first;
synchronized (firstLock) {
synchronized (secondLock) {
first.minus(amount);
second.plus(amount);
}
}
}
}
- Блокировка с тайм-аутом — не блокируйте бессрочно при наложении блокировки, лучше как можно быстрее снимите все блокировки и попробуйте снова.
class Account {
private long amount;
// Некоторые методы опущены
static void transferWithTimeout(
long amount, Account first, Account second, int retries, long timeoutMillis
) throws InterruptedException {
for (int attempt = 0; attempt < retries; attempt++) {
if (first.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
{
try {
if (second.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
{
try {
first.minus(amount);
second.plus(amount);
}
finally {
second.lock.unlock();
}
}
}
finally {
first.lock.unlock();
}
}
}
}
}
JVM способен обнаруживать взаимные блокировки мониторов и выводить информацию о них в дампах потоков.
Livelock и потоковое голодание
Livelock возникает, когда потоки тратят все свое время на переговоры о доступе к ресурсу или обнаруживают и избегают тупиковой ситуации так, что поток фактически не продвигается вперед. Голодание возникает, когда потоки сохраняют блокировку в течение длительных периодов, так что некоторые потоки «голодают» без прогресса.
РАЗДЕЛ 9
java.util.concurrent
Пулы потоков
Основным интерфейсом для пулов потоков является
ExecutorService.java.util.concurrent
также предоставляет статическую фабрику Executors, которая содержит фабричные методы для создания пула потоков с наиболее распространенными конфигурациями.Метод | Описание |
---|---|
newSingleThreadExecutor | Возвращает ExecutorService только с одним потоком. |
newFixedThreadPool | Возвращает ExecutorService с фиксированным количеством потоков. |
newCachedThreadPool | Возвращает ExecutorService с пулом потоков различного размера. |
Возвращает ScheduledExecutorService с одним потоком. | |
newScheduledThreadPool | Возвращает ScheduledExecutorService с основным набором потоков. |
newWorkStealingPool | Возвращает крадущий задачи ExecutorService. |
Таблица 6: Методы статической фабрики
При определении размера пулов потока часто бывает полезно определить размер числа логических ядер в машине, на которой запущено приложение. Получить это значение в Java можно вызвав
Runtime.getRuntime().AvailableProcessors()
.Реализация | Описание |
---|---|
ThreadPoolExecutor | Реализация по умолчанию с изменяющим размер пулом потока, одной рабочей очереди и настраиваемой политикой для отклоненных задач (через RejectedExecutionHandler) и создания потоков (через ThreadFactory). |
Расширение ThreadPoolExecutor, которое обеспечивает возможность планирования периодических задач. | |
ForkJoinPool | Крадущий задачи пул: все потоки в пуле пытаются найти и запустить либо поставленные задачи, либо задачи, созданные другими активными задачами. |
Таблица 7: Реализации пула потоков
Задачи отправляются с помощью
ExecutorService#submit
, ExecutorService#invokeAll
или ExecutorService#invokeAny
, которые имеют несколько перегрузок для разных типов задач.Описание | |
---|---|
Runnable | Представляет задачу без возвращаемого значения. |
Callable | Представляет вычисление с возвращаемым значением. Он также выбрасывает исходный Exeption, поэтому не требуется обертка для проверенного исключения. |
Таблица 8: Функциональные интерфейсы задач
Future
Future
— это абстракция для асинхронного вычисления. Она представляет результат вычисления, который может быть доступен в какой-либо момент: либо вычисленное значение, либо исключение. Большинство методов ExecutorService
используют Future
как возвращаемый тип. Он предоставляет методы для изучения текущего состояния future или блокирует до тех пор, пока не будет доступен результат.ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "result");
try {
String result = future.get(1L, TimeUnit.SECONDS);
System.out.println("Result is '" + result + "'.");
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
catch (TimeoutException e) {
throw new RuntimeException(e);
}
assert future.isDone();
Блокировки
Lock
Пакет
java.util.concurrent.locks
имеет стандартный интерфейс Lock
. Реализация ReentrantLock
дублирует функциональность ключевого слова synchronized, но также предоставляет дополнительные функции, такие как получение информации о состоянии блокировки, неблокирующий tryLock()
и прерываемая блокировке. Пример использования явного экземпляра ReentrantLock:class Counter {
private final Lock lock = new ReentrantLock();
private int value;
int increment() {
lock.lock();
try {
return ++value;
} finally {
lock.unlock();
}
}
}
ReadWriteLock
Пакет
java.util.concurrent.locks
также содержит интерфейс ReadWriteLock (и реализацию ReentrantReadWriteLock), который определяется парой блокировок для чтения и записи, обычно позволяя считывать одновременно нескольким читателям, но допуская только одного писателя.class Statistic {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private int value;
void increment() {
lock.writeLock().lock();
try {
value++;
} finally {
lock.writeLock().unlock();
}
}
int current() {
lock.readLock().lock();
try {
return value;
} finally {
lock.readLock().unlock();
}
}
}
CountDownLatch
CountDownLatch
инициализируется счетчиком. Потоки могут вызывать await()
, чтобы ждать, пока счетчик не достигнет 0. Другие потоки (или тот же поток) могут вызвать countDown()
, чтобы уменьшить счетчик. Нельзя использовать повторно, как только счетчик достигнет 0. Используется для запуска неизвестного набора потоков, как только произошло некоторое количество действий.CompletableFuture
CompletableFuture
является абстракцией для произведения асинхронных вычислений. В отличие от простого Future, где единственная возможность получить результат — блокировать, рекомендуется регистрировать обратные вызовы для создания конвейера задач, которые должны выполняться, когда доступен результат или исключение. Либо во время создания (через CompletableFuture#supplyAsync/runAsync
), либо во время добавления обратных вызовов (методы семейства *async
) может быть указан исполнитель, где должно выполняться вычисление (если он не указан стандартным глобальным ForkJoinPool#commonPool
).Учтите, что если
CompletableFuture
уже завершен, обратные вызовы, зарегистрированные с помощью не *async
методов, будут выполняться в вызывающем потоке.Если есть несколько
future
, вы можете использовать CompletableFuture#allOf
, чтобы получить future
, который будет завершен, когда все future
будут завершены, или CompletableFuture#anyOf
, который будет завершен, как только будет завершен какой-либо future
.ExecutorService executor0 = Executors.newWorkStealingPool();
ExecutorService executor1 = Executors.newWorkStealingPool();
//Завершено, когда оба future завершены
CompletableFuture<String> waitingForAll = CompletableFuture
.allOf(
CompletableFuture.supplyAsync(() -> "first"),
CompletableFuture.supplyAsync(() -> "second", executor1)
)
.thenApply(ignored -> " is completed.");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Concurrency Refcard", executor0)
//Использование того же исполнителя
.thenApply(result -> "Java " + result)
//Использование другого исполнителя
.thenApplyAsync(result -> "Dzone " + result, executor1)
//Завершено, когда это и другое future завершено
.thenCombine(waitingForAll, (first, second) -> first + second)
//Неявно использование ForkJoinPool#commonPool как исполнителя
.thenAcceptAsync(result -> {
System.out.println("Result is '" + result + "'.");
})
//Общий обработчик
.whenComplete((ignored, exception) -> {
if (exception != null)
exception.printStackTrace();
});
//Первый блокирующий вызов - блокирует, пока он не будет завершен.
future.join();
future
//Выполняется в текущем потоке (который является основным).
.thenRun(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))
//Неявное использование ForkJoinPool#commonPool как исполнителя
.thenRunAsync(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."));
Параллельные коллекции
Самый простой способ сделать коллекцию потокобезопасной — использование родственных методов
Collections#synchronized*
. Поскольку это решение работает плохо при высокой конкуренции, java.util.concurrent
предоставляет множество структур данных, которые оптимизированы для параллельного использования.Списки
Реализация | Описание |
---|---|
Предоставляет семантику копирования при записи, где каждая модификация структуры данных приводит к новой внутренней копии данных (поэтому запись очень дорогая, тогда как чтение дешевое). Итераторы в структуре данных всегда видят снепшот данных с момента создания итератора. |
Таблица 9: Списки в
java.util.concurrent
Описание | |
---|---|
Обычно выступает в качестве сегментированной хэш-таблицы. Операции чтения, как правило, не блокируют и отражают результаты последней завершенной записи. Запись первого узла в пустой ящик выполняется просто CAS-ом (сравнить и установить), тогда как другим операциям записи требуются блокировки (первый узел сегмента используется как блокировка). | |
Обеспечивает параллельный доступ наряду функциональностью сортированного Map, подобной TreeMap. Границы производительности такие же как у TreeMap, хотя несколько потоков обычно могут читать и записывать из ассоциативного массива без конфликтов, если они не изменяют одну и ту же часть отображения. |
Таблица 10: Ассоциативные массивы в
java.util.concurrent
Множества
Описание | |
---|---|
Подобно CopyOnWriteArrayList, он использует семантику copy-on-write для реализации интерфейса Set. | |
Подобно ConcurrentSkipListMap, но реализует интерфейс Set. |
Таблица 11: Множества в
java.util.concurrent
Другим подходом к созданию параллельного множества является обертка параллельного Map:
Set<T> concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());
Очереди
Очереди выступают в качестве труб между «производителями» и «потребителями». Элементы помещаются в один конец трубы и выходят из другого конца трубы в том же порядке «первый зашел, первый вышел» (FIFO). Интерфейс
BlockingQueue
расширяет Queue
, чтобы предоставить дополнительные варианты того, как обрабатывать сценарий, где очередь может быть заполнена (когда производитель добавляет элемент) или пустой (когда потребитель читает или удаляет элемент). В этих случаях BlockingQueue
предоставляет методы, которые либо блокируют навсегда, либо блокируют в течение определенного периода времени, ожидая изменения условия из-за действий другого потока.Реализация | Описание |
---|---|
Неограниченная неблокирующая очередь, поддерживаемая связанным списком. | |
LinkedBlockingQueue | Опционально ограниченная блокирующая очередь, поддерживаемая связанным списком. |
Неограниченная блокирующая очередь, поддерживаемая минимальной кучей. Элементы удаляются из очереди в порядке, основанном на компараторе Comparator, связанном с очередью (вместо порядка FIFO). | |
DelayQueue | Неограниченная блокирующая очередь элементов, каждый из которых имеет значение задержки. Элементы могут быть удалены только тогда, когда их задержка прошла и удаляются в порядке старейшего истекшего элемента. |
SynchronousQueue | Очередь о-длины, где производитель и потребитель блокируются до тех пор, пока не прибудет другой. Когда оба потока приходят, значение передается напрямую от производителя к потребителю. Полезно при передаче данных между потоками. |
Таблица 12: Очереди в
java.util.concurrent
THE END
Как всегда ждём пожелания и вопросы.
Спасибо.