Как стать автором
Обновить
1098.52
OTUS
Цифровые навыки от ведущих экспертов

Разбор основных концепций параллелизма

Время на прочтение17 мин
Количество просмотров71K
Автор оригинала: Igor Sorokin, Alex Miller
Всем кофе!

Завтра у нас плавненько стартует практически юбилейный поток курс «Разработчик Java» — уже шестой по счёту начиная с апреля прошлого года. А это значит, что мы снова подобрали, перевели интереснейший материал, которым делимся с вами.

Поехали!

Эта памятка поможет Java-разработчикам, работающим с многопоточными программами, понять основные концепции параллелизма и способы их применения. Вы ознакомьтесь с ключевыми аспектами языка Java со ссылками на стандартную библиотеку.

РАЗДЕЛ 1

Вступление

С момента своего создания Java поддерживает ключевые концепции параллелизма, такие как потоки и блокировки. Эта памятка поможет Java-разработчикам, работающим с многопоточными программами, понять основные концепции параллелизма и способы их применения.

РАЗДЕЛ 2

Концепции

Концепция Описание
Atomicity (атомарность) Атомарная операция — это операция, которая выполняется полностью или не выполняется совсем, частичное выполнение невозможно.
Visibility (видимость) Условия, при которых один поток видит изменения, сделанные другим потоком

Таблица 1: Концепции параллелизма



Состояние гонки (Race condition)

Состояние гонки возникает, когда один и тот же ресурс используется несколькими потоками одновременно, и в зависимости от порядка действий каждого потока может быть несколько возможных результатов. Код, приведенный ниже, не является потокобезопасным, и переменная value может быть инициализирована больше, чем один раз, так как check-then-act (проверка на null, а затем инициализация), которая лениво инициализирует поле, не является атомарной:

class Lazy <T> {
 private volatile T value;
 T get() {
   if (value == null)
     value = initialize();
   return value;
 }
}

Гонка данных (Data race)

Гонка данных возникает, когда два или более потока пытаются получить доступ к одной и той же не финальной переменной без синхронизации. Отсутствие синхронизации может привести к внесению изменений, которые не будут видны другим потокам, из-за этого возможно чтение устаревших данных, что, в свою очередь, приводит к бесконечным циклам, поврежденным структурам данных или неточным вычислениям. Этот код может привести к бесконечному циклу, потому что считывающий поток может так и не заметить изменения, внесенные перезаписывающими потоками:

class Waiter implements Runnable {
 private boolean shouldFinish;
 void finish() { shouldFinish = true; }
 public void run() {
   long iteration = 0;
   while (!shouldFinish) {
     iteration++;
   }
   System.out.println("Finished after: " + iteration);
 }
}
class DataRace {
 public static void main(String[] args) throws InterruptedException {
   Waiter waiter = new Waiter();
   Thread waiterThread = new Thread(waiter);
   waiterThread.start();
   waiter.finish();
   waiterThread.join();
 }
}

РАЗДЕЛ 3

Модель памяти Java: отношение happens-before

Модель памяти Java определяется с точки зрения таких действий, как чтение/запись полей и синхронизация в мониторе. Действия упорядочены с помощью отношения happens-before (выполняется прежде), которое может быть использовано для объяснения того, когда поток видит результат действий другого потока, и что представляет собой правильно синхронизированная программа.

ОТНОШЕНИЯ HAPPENS-BEFORE ИМЕЮТ СЛЕДУЮЩИЕ СВОЙСТВА:

  • Вызов Thread#start происходит до любого действия в этом потоке.
  • Возврат монитора происходит до любого последующего захвата этого же монитора.
  • Запись в volatile-переменную происходит до любого последующего считывания volatile-переменной.
  • Запись в final-переменную происходит до публикации ссылки объекта.
  • Все действия в потоке выполняются до возвращения из Thread#join в этом потоке.

На Изображении 1 Action X происходит до Action Y, поэтому в Thread 2 все операции справа от Action Y будут видеть все операции слева от Action X в Thread 1.


Изображение 1: Пример happens-before


РАЗДЕЛ 4

Стандартные функции синхронизации

Ключевое слово synchronized

Ключевое слово synchronized используется для предотвращения одновременного выполнения разными потоками одного и того же блока кода. Оно гарантирует, что, если вы получили блокировку (войдя в синхронизированный блок), данные, на которые наложена эта блокировка, обрабатываются в эксклюзивном режиме, поэтому операция может считаться атомарной. Кроме того, оно гарантирует, что другие потоки увидят результат операции после того, как получат такую же блокировку.

class AtomicOperation {
 private int counter0;
 private int counter1;
 void increment() {
   synchronized (this) {
     counter0++;
     counter1++;
   }
 }
}

Ключевое слово synchronized можно также раскрыть на уровне методов.

ТИП МЕТОДА ССЫЛКА, ИСПОЛЬЗУЕМАЯ КАК МОНИТОР
static ссылка на объект Class<?>
non-static this-ссылка

Таблица 2: Мониторы, которые используются, когда весь метод синхронизирован

Блокировка реентерабельна (reentrant), поэтому, если поток уже содержит блокировку, он может успешно получить ее снова.

class Reentrantcy {
 synchronized void doAll() {
   doFirst();
   doSecond();
 }
 synchronized void doFirst() {
   System.out.println("First operation is successful.");
 }
 synchronized void doSecond() {
   System.out.println("Second operation is successful.");
 }
}

Уровень соперничества влияет на способ захвата монитора:

Состояние Описание
init Только что создан, пока никем не был захвачен.
biased Борьбы нет, и код, защищенный блокировкой, выполняется только одним потоком. Самый дешевый для захвата.
thin Монитор захватывается несколькими потоками без борьбы. Для блокировки используется сравнительно дешевый CAS.
fat Возникает борьба. JVM запрашивает мьютексы ОС и позволяет планировщику ОС обрабатывать парковки потоков и пробуждения.

Таблица 3: Состояния мониторов

wait/notify

Методы wait/notify/notifyAll объявляются в классе Object. wait используется, чтобы заставить поток перейти в состояние WAITING или TIMED_WAITING (если передано значение тайм-аута). Чтобы разбудить поток, можно сделать любое из этих действий:

  • Другой поток вызывает notify, который пробуждает произвольный поток, ожидающий на мониторе.
  • Другой поток вызывает notifyAll, который пробуждает все потоки, ожидающие на мониторе.
  • Вызывается Thread#interrupt. В этом случае бросается исключение InterruptedException.

Наиболее распространенным примером является условный цикл:

class ConditionLoop {
 private boolean condition;
 synchronized void waitForCondition() throws InterruptedException {
   while (!condition) {
     wait();
   }
 }
 synchronized void satisfyCondition() {
   condition = true;
   notifyAll();
 }
}

  • Имейте в виду, что для того, чтобы использовать wait/notify/notifyAll для объекта, вам необходимо сначала наложить блокировку на этот объект.
  • Всегда ждите внутри цикла, проверяющего условие, выполнение которого вы ожидаете. Это касается проблемы синхронизации, если другой поток удовлетворяет условию до начала ожидания. Кроме того, это защищает ваш код от побочных пробуждений, которые могут (и будут) происходить.
  • Всегда проверяйте, что вы удовлетворяете условию ожидания перед вызовом notify/notifyAll. Несоблюдение этого требования приведет к уведомлению, но поток не сможет избежать цикла ожидания.

Ключевое слово volatile

volatile решает проблему видимости и делает изменение значения атомарным, потому что здесь есть отношение happens-before: запись в volatile-переменную происходит до любого последующего считывания volatile-переменной. Таким образом, оно гарантирует, что при последующем считывании поля будет видно значение, которое было задано самой последней записью.

class VolatileFlag implements Runnable {
 private volatile boolean shouldStop;
 public void run() {
   while (!shouldStop) {
     //do smth
   }
   System.out.println("Stopped.");
 }
 void stop() {
   shouldStop = true;
 }
 public static void main(String[] args) throws InterruptedException {
   VolatileFlag flag = new VolatileFlag();
   Thread thread = new Thread(flag);
   thread.start();
   flag.stop();
   thread.join();
 }
}

Атомарность

Пакет java.util.concurrent.atomic содержит набор классов, которые поддерживают составные атомарные действия над одним значением без блокировок, подобно volatile.

Используя классы AtomicXXX, можно реализовать атомарную операцию check-then-act:

class CheckThenAct {
 private final AtomicReference<String> value = new AtomicReference<>();
 void initialize() {
   if (value.compareAndSet(null, "Initialized value")) {
     System.out.println("Initialized only once.");
   }
 }
}

И AtomicInteger, и AtomicLong имеют атомарную операцию инкремента/декремента:

class Increment {
 private final AtomicInteger state = new AtomicInteger();
 void advance() {
   int oldState = state.getAndIncrement();
   System.out.println("Advanced: '" + oldState + "' -> '" + (oldState + 1) + "'.");
 }
}

Если вам нужен счетчик и нет необходимости получать его значение атомарно, подумайте об использовании LongAdder вместо AtomicLong/AtomicInteger. LongAdder обрабатывает значение в нескольких ячейках и увеличивает их число, если нужно, и, следовательно, он работает лучше при высокой конкуренции.

ThreadLocal

Один из способов хранить данные в потоке и сделать блокировку необязательной — это использовать хранилище ThreadLocal. Концептуально ThreadLocal действует так, как будто в каждом потоке есть своя версия переменной. ThreadLocal обычно используется для фиксации значений каждого потока, таких как «текущая транзакция», или других ресурсов. Кроме того, они используются для содержания поточных счетчиков, статистики или генераторов идентификаторов.

class TransactionManager {
 private final ThreadLocal<Transaction> currentTransaction
     = ThreadLocal.withInitial(NullTransaction::new);
 Transaction currentTransaction() {
   Transaction current = currentTransaction.get();
   if (current.isNull()) {
     current = new TransactionImpl();
     currentTransaction.set(current);
   }
   return current;
 }
}

РАЗДЕЛ 5

Безопасная публикация

Публикация объекта делает его ссылку доступной за пределами текущей области (например, возврат ссылки из геттера). Обеспечение безопасной публикации объекта (только когда он полностью создан) может потребовать синхронизации. Безопасность публикации может быть достигнута с использованием:

  • Статических инициализаторов. Только один поток может инициализировать статические переменные, поскольку инициализация класса выполняется под исключительной блокировкой.

class StaticInitializer {
 // Публикация неизменяемого объекта без дополнительной инициализации

 public static final Year year = Year.of(2017);
 public static final Set<String> keywords;
 // Использование статического инициализатора для построения сложного объекта

 static {
   // Создание изменяемого множества

   Set<String> keywordsSet = new HashSet<>();
   // Состояние инициализации
   keywordsSet.add("java");
   keywordsSet.add("concurrency");
   // Делаем множество немодифицируемым

   keywords = Collections.unmodifiableSet(keywordsSet);
 }
}

  • Volatile-поля. Считывающий поток всегда будет считывать последнее значение, потому что запись в volatile-переменную происходит до (happens before) любого последующего чтения.

class Volatile {
 private volatile String state;
 void setState(String state) {
   this.state = state;
 }
 String getState() {
   return state;
 }
}

  • Атомарности. Например, AtomicInteger сохраняет значение в volatile-поле, поэтому правило для volatile-переменных здесь тоже применимо.

class Atomics {
 private final AtomicInteger state = new AtomicInteger();
 void initializeState(int state) {
   this.state.compareAndSet(0, state);
 }
 int getState() {
   return state.get();
 }
}

  • Final-полей.

class Final {
 private final String state;
 Final(String state) {
   this.state = state;
 }
 String getState() {
   return state;
 }
}

Убедитесь, что this-ссылка не испарилась во время создания.

class ThisEscapes {
private final String name;
ThisEscapes(String name) {
  Cache.putIntoCache(this);
  this.name = name;
}
String getName() { return name; }
}
class Cache {
private static final Map<String, ThisEscapes> CACHE = new ConcurrentHashMap<>();
static void putIntoCache(ThisEscapes thisEscapes) {
  // 'this' ссылка испарилась прежде, чем объект полностью сконструирован.
  CACHE.putIfAbsent(thisEscapes.getName(), thisEscapes);
}
}

  • Правильно синхронизированных полей.

class Synchronization {
 private String state;
 synchronized String getState() {
   if (state == null)
     state = "Initial";
   return state;
 }
}

РАЗДЕЛ 6

Неизменяемые объекты

Одним из самых замечательных свойств неизменяемых объектов является потокобезопасность, поэтому синхронизация для них не нужна. Требования к неизменному объекту:

  • Все поля являются final-полями.
  • Все поля должны быть либо изменчивыми, либо неизменяемыми объектами, но не выходить за пределы объекта, поэтому состояние объекта не может быть изменено после создания.
  • Ссылка this не исчезает во время создания.
  • Класс является final-классом, поэтому переопределение его поведения в подклассах невозможно.

Пример неизменяемого объекта:

// Помечается как final - подклассы запрещены
public final class Artist {
 // Неизменяемый объект, поле final
 private final String name;
 // Коллекция неизменяемых объектов, final поле
 private final List<Track> tracks;
 public Artist(String name, List<Track> tracks) {
   this.name = name;
   // Защитная копия
   List<Track> copy = new ArrayList<>(tracks);
   // Превращение изменяемой коллекции в неизменяемую
   this.tracks = Collections.unmodifiableList(copy);
   // 'this' никуда не передается во время создания
 }
 // Getters, equals, hashCode, toString
}
//Помечается как final - запрещается наследование
public final class Track {
 //Неизменяемый объект, поле final
 private final String title;
 public Track(String title) {
   this.title = title;
 }
 // Getters, equals, hashCode, toString
}

РАЗДЕЛ 7

Потоки

Класс java.lang.Thread используется для представления приложения или потока JVM. Код всегда выполняется в контексте некоторого класса Thread (чтобы получить текущий поток вы можете использовать Thread#currentThread()).

Состояние Описание
NEW Не запускался.
RUNNABLE Запущен и работает.
BLOCKED Ожидание на мониторе — он пытается получить блокировку и войти в критическую секцию.
WAITING Ожидание выполнения определенного действия другим потоком (notify/notifyAll, LockSupport#unpark).
TIMED_WAITING То же, что и WAITING, но с таймаутом.
TERMINATED Остановлен

Таблица 4: Состояния потоков

Потоковый метод Описание
start Запускает экземпляр класса Thread и выполняет метод run().
join Блокирует до окончания потока.
interrupt Прерывает поток. Если поток заблокирован в методе, который отвечает на прерывания, в другом потоке будет брошен InterruptedException, в противном случае будет установлен статус прерывания.
stop, suspend, resume, destroy Все эти методы устарели. Они выполняют опасные операции в зависимости от состояния рассматриваемого потока. Вместо них используйте Thread#interrupt() или флаг volatile, чтобы указать потоку, что он должен делать

Таблица 5: Thread coordination methods Методы координации потоков

Как обрабатывать InterruptedException?

  • Очистите все ресурсы и завершите выполнение потока, если это возможно на текущем уровне.
  • Объявите, что текущий метод бросает InterruptedException.
  • Если метод не порождает исключение InterruptedException, прерванный флаг должен быть восстановлен в true, вызывая Thread.currentThread().interrupt() и должно быть порождено исключение, которое является более подходящим на этом уровне. Очень важно вернуть флаг true, чтобы дать возможность обрабатывать прерывания на более высоком уровне.

Обработка неожиданных исключений

В потоках может указываться UncaughtExceptionHandler, который получит уведомление о любом неперехваченном исключении, из-за которого поток прерывается.

Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler((failedThread, exception) -> {
 logger.error("Caught unexpected exception in thread '{}'.",
     failedThread.getName(), exception);
});
thread.start();

РАЗДЕЛ 8

Жизнеспособность (Liveness)

Deadlock

Deadlock, или взаимная блокировка, возникает, когда есть несколько потоков и каждый ожидает ресурс, принадлежащий другому потоку, так что формируется цикл из ресурсов и ожидающих их потоков. Наиболее очевидным видом ресурса является монитор объекта, но любой ресурс, который вызывает блокировку (например, wait/notify), также подходит.

Пример потенциального дэдлока:

class Account {
 private long amount;
 void plus(long amount) { this.amount += amount; }
 void minus(long amount) {
   if (this.amount < amount)
     throw new IllegalArgumentException();
   else
     this.amount -= amount;
 }
 static void transferWithDeadlock(long amount, Account first, Account second){
   synchronized (first) {
     synchronized (second) {
       first.minus(amount);
       second.plus(amount);
     }
   }
 }
}

Взаимная блокировка происходит, если в одно и то же время:

  • Один поток пытается перенести данные с одного аккаунта на другой и уже наложил блокировку на первый аккаунт.
  • Другой поток пытается перенести данные со второго аккаунта на первый, и уже наложил блокировку на второй аккаунт.

Способы предотвращения дэдлока:

  • Порядок блокировок — всегда накладывайте блокировки в одном и том же порядке.

class Account {
 private long id;
 private long amount;
 // Некоторые методы опущены
 static void transferWithLockOrdering(long amount, Account first, Account second){
   boolean lockOnFirstAccountFirst = first.id < second.id;
   Account firstLock = lockOnFirstAccountFirst  ? first  : second;
   Account secondLock = lockOnFirstAccountFirst ? second : first;
   synchronized (firstLock) {
     synchronized (secondLock) {
       first.minus(amount);
       second.plus(amount);
     }
   }
 }
}

  • Блокировка с тайм-аутом — не блокируйте бессрочно при наложении блокировки, лучше как можно быстрее снимите все блокировки и попробуйте снова.

class Account {
 private long amount;
// Некоторые методы опущены
 static void transferWithTimeout(
     long amount, Account first, Account second, int retries, long timeoutMillis
 ) throws InterruptedException {
   for (int attempt = 0; attempt < retries; attempt++) {
     if (first.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
     {
       try {
         if (second.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
         {
           try {
             first.minus(amount);
             second.plus(amount);
           }
           finally {
             second.lock.unlock();
           }
         }
       }
       finally {
         first.lock.unlock();
       }
     }
   }
 }
}

JVM способен обнаруживать взаимные блокировки мониторов и выводить информацию о них в дампах потоков.

Livelock и потоковое голодание

Livelock возникает, когда потоки тратят все свое время на переговоры о доступе к ресурсу или обнаруживают и избегают тупиковой ситуации так, что поток фактически не продвигается вперед. Голодание возникает, когда потоки сохраняют блокировку в течение длительных периодов, так что некоторые потоки «голодают» без прогресса.

РАЗДЕЛ 9

java.util.concurrent

Пулы потоков

Основным интерфейсом для пулов потоков является ExecutorService.java.util.concurrent также предоставляет статическую фабрику Executors, которая содержит фабричные методы для создания пула потоков с наиболее распространенными конфигурациями.

Метод Описание
newSingleThreadExecutor Возвращает ExecutorService только с одним потоком.
newFixedThreadPool Возвращает ExecutorService с фиксированным количеством потоков.
newCachedThreadPool Возвращает ExecutorService с пулом потоков различного размера.
newSingleThreadScheduledExecutor Возвращает ScheduledExecutorService с одним потоком.
newScheduledThreadPool Возвращает ScheduledExecutorService с основным набором потоков.
newWorkStealingPool Возвращает крадущий задачи ExecutorService.

Таблица 6: Методы статической фабрики

При определении размера пулов потока часто бывает полезно определить размер числа логических ядер в машине, на которой запущено приложение. Получить это значение в Java можно вызвав Runtime.getRuntime().AvailableProcessors().

Реализация Описание
ThreadPoolExecutor Реализация по умолчанию с изменяющим размер пулом потока, одной рабочей очереди и настраиваемой политикой для отклоненных задач (через RejectedExecutionHandler) и создания потоков (через ThreadFactory).
ScheduledThreadPoolExecutor Расширение ThreadPoolExecutor, которое обеспечивает возможность планирования периодических задач.
ForkJoinPool Крадущий задачи пул: все потоки в пуле пытаются найти и запустить либо поставленные задачи, либо задачи, созданные другими активными задачами.

Таблица 7: Реализации пула потоков

Задачи отправляются с помощью ExecutorService#submit, ExecutorService#invokeAll или ExecutorService#invokeAny, которые имеют несколько перегрузок для разных типов задач.

Интерфейс Описание
Runnable Представляет задачу без возвращаемого значения.
Callable Представляет вычисление с возвращаемым значением. Он также выбрасывает исходный Exeption, поэтому не требуется обертка для проверенного исключения.

Таблица 8: Функциональные интерфейсы задач

Future

Future — это абстракция для асинхронного вычисления. Она представляет результат вычисления, который может быть доступен в какой-либо момент: либо вычисленное значение, либо исключение. Большинство методов ExecutorService используют Future как возвращаемый тип. Он предоставляет методы для изучения текущего состояния future или блокирует до тех пор, пока не будет доступен результат.

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "result");
try {
 String result = future.get(1L, TimeUnit.SECONDS);
 System.out.println("Result is '" + result + "'.");
}
catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new RuntimeException(e);
}
catch (ExecutionException e) {
 throw new RuntimeException(e.getCause());
}
catch (TimeoutException e) {
 throw new RuntimeException(e);
}
assert future.isDone();

Блокировки

Lock

Пакет java.util.concurrent.locks имеет стандартный интерфейс Lock. Реализация ReentrantLock дублирует функциональность ключевого слова synchronized, но также предоставляет дополнительные функции, такие как получение информации о состоянии блокировки, неблокирующий tryLock() и прерываемая блокировке. Пример использования явного экземпляра ReentrantLock:

class Counter {
 private final Lock lock = new ReentrantLock();
 private int value;
 int increment() {
   lock.lock();
   try {
     return ++value;
   } finally {
     lock.unlock();
   }
 }
}

ReadWriteLock

Пакет java.util.concurrent.locks также содержит интерфейс ReadWriteLock (и реализацию ReentrantReadWriteLock), который определяется парой блокировок для чтения и записи, обычно позволяя считывать одновременно нескольким читателям, но допуская только одного писателя.

class Statistic {
 private final ReadWriteLock lock = new ReentrantReadWriteLock();
 private int value;
 void increment() {
   lock.writeLock().lock();
   try {
     value++;
   } finally {
     lock.writeLock().unlock();
   }
 }
 int current() {
   lock.readLock().lock();
   try {
     return value;
   } finally {
     lock.readLock().unlock();
   }
 }
}

CountDownLatch

CountDownLatch инициализируется счетчиком. Потоки могут вызывать await(), чтобы ждать, пока счетчик не достигнет 0. Другие потоки (или тот же поток) могут вызвать countDown(), чтобы уменьшить счетчик. Нельзя использовать повторно, как только счетчик достигнет 0. Используется для запуска неизвестного набора потоков, как только произошло некоторое количество действий.

CompletableFuture

CompletableFuture является абстракцией для произведения асинхронных вычислений. В отличие от простого Future, где единственная возможность получить результат — блокировать, рекомендуется регистрировать обратные вызовы для создания конвейера задач, которые должны выполняться, когда доступен результат или исключение. Либо во время создания (через CompletableFuture#supplyAsync/runAsync), либо во время добавления обратных вызовов (методы семейства *async) может быть указан исполнитель, где должно выполняться вычисление (если он не указан стандартным глобальным ForkJoinPool#commonPool).

Учтите, что если CompletableFuture уже завершен, обратные вызовы, зарегистрированные с помощью не *async методов, будут выполняться в вызывающем потоке.

Если есть несколько future, вы можете использовать CompletableFuture#allOf, чтобы получить future, который будет завершен, когда все future будут завершены, или CompletableFuture#anyOf, который будет завершен, как только будет завершен какой-либо future.

ExecutorService executor0 = Executors.newWorkStealingPool();
ExecutorService executor1 = Executors.newWorkStealingPool();
//Завершено, когда оба future завершены
CompletableFuture<String> waitingForAll = CompletableFuture
   .allOf(
       CompletableFuture.supplyAsync(() -> "first"),
       CompletableFuture.supplyAsync(() -> "second", executor1)
   )
   .thenApply(ignored -> " is completed.");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Concurrency Refcard", executor0)
   //Использование того же исполнителя
   .thenApply(result -> "Java " + result)
   //Использование другого исполнителя
   .thenApplyAsync(result -> "Dzone " + result, executor1)
   //Завершено, когда это и другое future завершено
   .thenCombine(waitingForAll, (first, second) -> first + second)
  //Неявно использование ForkJoinPool#commonPool как исполнителя
   .thenAcceptAsync(result -> {
     System.out.println("Result is '" + result + "'.");
   })
  //Общий обработчик
   .whenComplete((ignored, exception) -> {
     if (exception != null)
       exception.printStackTrace();
   });
//Первый блокирующий вызов - блокирует, пока он не будет завершен.
future.join();
future
  //Выполняется в текущем потоке (который является основным).
   .thenRun(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))
  //Неявное использование ForkJoinPool#commonPool как исполнителя
   .thenRunAsync(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."));

Параллельные коллекции

Самый простой способ сделать коллекцию потокобезопасной — использование родственных методов Collections#synchronized*. Поскольку это решение работает плохо при высокой конкуренции, java.util.concurrent предоставляет множество структур данных, которые оптимизированы для параллельного использования.

Списки

Реализация Описание
CopyOnWriteArrayList Предоставляет семантику копирования при записи, где каждая модификация структуры данных приводит к новой внутренней копии данных (поэтому запись очень дорогая, тогда как чтение дешевое). Итераторы в структуре данных всегда видят снепшот данных с момента создания итератора.

Таблица 9: Списки в java.util.concurrent

Реализация Описание
ConcurrentHashMap Обычно выступает в качестве сегментированной хэш-таблицы. Операции чтения, как правило, не блокируют и отражают результаты последней завершенной записи. Запись первого узла в пустой ящик выполняется просто CAS-ом (сравнить и установить), тогда как другим операциям записи требуются блокировки (первый узел сегмента используется как блокировка).
ConcurrentSkipListMap Обеспечивает параллельный доступ наряду функциональностью сортированного Map, подобной TreeMap. Границы производительности такие же как у TreeMap, хотя несколько потоков обычно могут читать и записывать из ассоциативного массива без конфликтов, если они не изменяют одну и ту же часть отображения.

Таблица 10: Ассоциативные массивы в java.util.concurrent

Множества

Реализация Описание
CopyOnWriteArraySet Подобно CopyOnWriteArrayList, он использует семантику copy-on-write для реализации интерфейса Set.
ConcurrentSkipListSet Подобно ConcurrentSkipListMap, но реализует интерфейс Set.

Таблица 11: Множества в java.util.concurrent

Другим подходом к созданию параллельного множества является обертка параллельного Map:

Set<T> concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());

Очереди

Очереди выступают в качестве труб между «производителями» и «потребителями». Элементы помещаются в один конец трубы и выходят из другого конца трубы в том же порядке «первый зашел, первый вышел» (FIFO). Интерфейс BlockingQueue расширяет Queue, чтобы предоставить дополнительные варианты того, как обрабатывать сценарий, где очередь может быть заполнена (когда производитель добавляет элемент) или пустой (когда потребитель читает или удаляет элемент). В этих случаях BlockingQueue предоставляет методы, которые либо блокируют навсегда, либо блокируют в течение определенного периода времени, ожидая изменения условия из-за действий другого потока.

Реализация Описание
ConcurrentLinkedQueue Неограниченная неблокирующая очередь, поддерживаемая связанным списком.
LinkedBlockingQueue Опционально ограниченная блокирующая очередь, поддерживаемая связанным списком.
PriorityBlockingQueue Неограниченная блокирующая очередь, поддерживаемая минимальной кучей. Элементы удаляются из очереди в порядке, основанном на компараторе Comparator, связанном с очередью (вместо порядка FIFO).
DelayQueue Неограниченная блокирующая очередь элементов, каждый из которых имеет значение задержки. Элементы могут быть удалены только тогда, когда их задержка прошла и удаляются в порядке старейшего истекшего элемента.
SynchronousQueue Очередь о-длины, где производитель и потребитель блокируются до тех пор, пока не прибудет другой. Когда оба потока приходят, значение передается напрямую от производителя к потребителю. Полезно при передаче данных между потоками.

Таблица 12: Очереди в java.util.concurrent

THE END

Как всегда ждём пожелания и вопросы.

Спасибо.
Теги:
Хабы:
Всего голосов 17: ↑15 и ↓2+13
Комментарии12

Публикации

Информация

Сайт
otus.ru
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия
Представитель
OTUS