Целью данной публикации не является полный анализ синхронизаторов из пакета java.util.concurrent. Пишу её, прежде всего, как справочник, который облегчит вхождение в тему и покажет возможности практического применения классов для синхронизации потоков (далее поток = thread).
В java.util.concurrent много различных классов, которые по функционалу можно поделить на группы: Concurrent Collections, Executors, Atomics и т.д. Одной из этих групп будет Synchronizers (синхронизаторы).
Синхронизаторы – вспомогательные утилиты для синхронизации потоков, которые дают возможность разработчику регулировать и/или ограничивать работу потоков и предоставляют более высокий уровень абстракции, чем основные примитивы языка (мониторы).
Синхронизатор Semaphore реализует шаблон синхронизации Семафор. Чаще всего, семафоры необходимы, когда нужно ограничить доступ к некоторому общему ресурсу. В конструктор этого класса (
Доступ управляется с помощью счётчика: изначально значение счётчика равно
Официальная документация по Semaphore.
CountDownLatch (замок с обратным отсчетом) предоставляет возможность любому количеству потоков в блоке кода ожидать до тех пор, пока не завершится определенное количество операций, выполняющихся в других потоках, перед тем как они будут «отпущены», чтобы продолжить свою деятельность. В конструктор CountDownLatch (
Блокировка потоков снимается с помощью счётчика: любой действующий поток, при выполнении определенной операции уменьшает значение счётчика. Когда счётчик достигает 0, все ожидающие потоки разблокируются и продолжают выполняться (примером CountDownLatch из жизни может служить сбор экскурсионной группы: пока не наберется определенное количество человек, экскурсия не начнется).
Официальная документация по CountDownLatch.
CyclicBarrier реализует шаблон синхронизации Барьер. Циклический барьер является точкой синхронизации, в которой указанное количество параллельных потоков встречается и блокируется. Как только все потоки прибыли, выполняется опционное действие (или не выполняется, если барьер был инициализирован без него), и, после того, как оно выполнено, барьер ломается и ожидающие потоки «освобождаются». В конструктор барьера (
Барьер похож на CountDownLatch, но главное различие между ними в том, что вы не можете заново использовать «замок» после того, как его счётчик достигнет нуля, а барьер вы можете использовать снова, даже после того, как он сломается. CyclicBarrier является альтернативой метода
Официальная документация по CyclicBarrier.
Exchanger (обменник) может понадобиться, для того, чтобы обменяться данными между двумя потоками в определенной точки работы обоих потоков. Обменник — обобщенный класс, он параметризируется типом объекта для передачи.
Обменник является точкой синхронизации пары потоков: поток, вызывающий у обменника метод
Официальная документация по Exchanger.
Phaser (фазер), как и CyclicBarrier, является реализацией шаблона синхронизации Барьер, но, в отличии от CyclicBarrier, предоставляет больше гибкости. Этот класс позволяет синхронизировать потоки, представляющие отдельную фазу или стадию выполнения общего действия. Как и CyclicBarrier, Phaser является точкой синхронизации, в которой встречаются потоки-участники. Когда все стороны прибыли, Phaser переходит к следующей фазе и снова ожидает ее завершения.
Если сравнить Phaser и CyclicBarrier, то можно выделить следующие важные особенности Phaser:
Объект Phaser создается с помощью одного из конструкторов:
Параметр parties указывает на количество сторон-участников, которые будут выполнять фазы действия. Первый конструктор создает объект Phaser без каких-либо сторон, при этом барьер в этом случае тоже «закрыт». Второй конструктор регистрирует передаваемое в конструктор количество сторон. Барьер открывается когда все стороны прибыли, или, если снимается последний участник. (У класса Phaser еще есть конструкторы, в которые передается родительский объект Phaser, но мы их рассматривать не будем.)
Основные методы:
Официальная документация по Phaser.
Кстати, функционалом фазера можно воспроизвести работу CountDownLatch.
Если кому-нибудь пригодилось, то я очень рад=)
Более подробно о Phaser здесь.
Почитать ещё о синхронизаторах и посмотреть примеры можно здесь.
Отличный обзор java.util.concurrent смотрите здесь.
В java.util.concurrent много различных классов, которые по функционалу можно поделить на группы: Concurrent Collections, Executors, Atomics и т.д. Одной из этих групп будет Synchronizers (синхронизаторы).
Синхронизаторы – вспомогательные утилиты для синхронизации потоков, которые дают возможность разработчику регулировать и/или ограничивать работу потоков и предоставляют более высокий уровень абстракции, чем основные примитивы языка (мониторы).
Semaphore
Синхронизатор Semaphore реализует шаблон синхронизации Семафор. Чаще всего, семафоры необходимы, когда нужно ограничить доступ к некоторому общему ресурсу. В конструктор этого класса (
Semaphore(int permits)
или Semaphore(int permits, boolean fair)
) обязательно передается количество потоков, которому семафор будет разрешать одновременно использовать заданный ресурс.Доступ управляется с помощью счётчика: изначально значение счётчика равно
int permits
, когда поток заходит в заданный блок кода, то значение счётчика уменьшается на единицу, когда поток его покидает, то увеличивается. Если значение счётчика равно нулю, то текущий поток блокируется, пока кто-нибудь не выйдет из блока (в качестве примера из жизни с permits = 1
, можно привести очередь в кабинет в поликлинике: когда пациент покидает кабинет, мигает лампа, и заходит следующий пациент).Официальная документация по Semaphore.
Пример использования Semaphore
Рассмотрим следующий пример. Существует парковка, которая одновременно может вмещать не более 5 автомобилей. Если парковка заполнена полностью, то вновь прибывший автомобиль должен подождать пока не освободится хотя бы одно место. После этого он сможет припарковаться.
Семафор отлично подходит для решения такой задачи: он не дает автомобилю (потоку) припарковаться (зайти в заданный блок кода и воспользоваться общим ресурсом) если мест на парковке нет (счётчик равен 0) Стоит отметить, что класс Semaphore поддерживает захват и освобождение более чем одного разрешения за раз, но в данном задаче это не нужно.
import java.util.concurrent.Semaphore;
public class Parking {
//Парковочное место занято - true, свободно - false
private static final boolean[] PARKING_PLACES = new boolean[5];
//Устанавливаем флаг "справедливый", в таком случае метод
//aсquire() будет раздавать разрешения в порядке очереди
private static final Semaphore SEMAPHORE = new Semaphore(5, true);
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i <= 7; i++) {
new Thread(new Car(i)).start();
Thread.sleep(400);
}
}
public static class Car implements Runnable {
private int carNumber;
public Car(int carNumber) {
this.carNumber = carNumber;
}
@Override
public void run() {
System.out.printf("Автомобиль №%d подъехал к парковке.\n", carNumber);
try {
//acquire() запрашивает доступ к следующему за вызовом этого метода блоку кода,
//если доступ не разрешен, поток вызвавший этот метод блокируется до тех пор,
//пока семафор не разрешит доступ
SEMAPHORE.acquire();
int parkingNumber = -1;
//Ищем свободное место и паркуемся
synchronized (PARKING_PLACES){
for (int i = 0; i < 5; i++)
if (!PARKING_PLACES[i]) { //Если место свободно
PARKING_PLACES[i] = true; //занимаем его
parkingNumber = i; //Наличие свободного места, гарантирует семафор
System.out.printf("Автомобиль №%d припарковался на месте %d.\n", carNumber, i);
break;
}
}
Thread.sleep(5000); //Уходим за покупками, к примеру
synchronized (PARKING_PLACES) {
PARKING_PLACES[parkingNumber] = false;//Освобождаем место
}
//release(), напротив, освобождает ресурс
SEMAPHORE.release();
System.out.printf("Автомобиль №%d покинул парковку.\n", carNumber);
} catch (InterruptedException e) {
}
}
}
}
Результат работы программы
Автомобиль №1 подъехал к парковке.
Автомобиль №1 припарковался на месте 0.
Автомобиль №2 подъехал к парковке.
Автомобиль №2 припарковался на месте 1.
Автомобиль №3 подъехал к парковке.
Автомобиль №3 припарковался на месте 2.
Автомобиль №4 подъехал к парковке.
Автомобиль №4 припарковался на месте 3.
Автомобиль №5 подъехал к парковке.
Автомобиль №5 припарковался на месте 4.
Автомобиль №6 подъехал к парковке.
Автомобиль №7 подъехал к парковке.
Автомобиль №1 покинул парковку.
Автомобиль №6 припарковался на месте 0.
Автомобиль №2 покинул парковку.
Автомобиль №7 припарковался на месте 1.
Автомобиль №3 покинул парковку.
Автомобиль №4 покинул парковку.
Автомобиль №5 покинул парковку.
Автомобиль №6 покинул парковку.
Автомобиль №7 покинул парковку.
Автомобиль №1 припарковался на месте 0.
Автомобиль №2 подъехал к парковке.
Автомобиль №2 припарковался на месте 1.
Автомобиль №3 подъехал к парковке.
Автомобиль №3 припарковался на месте 2.
Автомобиль №4 подъехал к парковке.
Автомобиль №4 припарковался на месте 3.
Автомобиль №5 подъехал к парковке.
Автомобиль №5 припарковался на месте 4.
Автомобиль №6 подъехал к парковке.
Автомобиль №7 подъехал к парковке.
Автомобиль №1 покинул парковку.
Автомобиль №6 припарковался на месте 0.
Автомобиль №2 покинул парковку.
Автомобиль №7 припарковался на месте 1.
Автомобиль №3 покинул парковку.
Автомобиль №4 покинул парковку.
Автомобиль №5 покинул парковку.
Автомобиль №6 покинул парковку.
Автомобиль №7 покинул парковку.
Семафор отлично подходит для решения такой задачи: он не дает автомобилю (потоку) припарковаться (зайти в заданный блок кода и воспользоваться общим ресурсом) если мест на парковке нет (счётчик равен 0) Стоит отметить, что класс Semaphore поддерживает захват и освобождение более чем одного разрешения за раз, но в данном задаче это не нужно.
CountDownLatch
CountDownLatch (замок с обратным отсчетом) предоставляет возможность любому количеству потоков в блоке кода ожидать до тех пор, пока не завершится определенное количество операций, выполняющихся в других потоках, перед тем как они будут «отпущены», чтобы продолжить свою деятельность. В конструктор CountDownLatch (
CountDownLatch(int count)
) обязательно передается количество операций, которое должно быть выполнено, чтобы замок «отпустил» заблокированные потоки.Блокировка потоков снимается с помощью счётчика: любой действующий поток, при выполнении определенной операции уменьшает значение счётчика. Когда счётчик достигает 0, все ожидающие потоки разблокируются и продолжают выполняться (примером CountDownLatch из жизни может служить сбор экскурсионной группы: пока не наберется определенное количество человек, экскурсия не начнется).
Официальная документация по CountDownLatch.
Пример использования CountDownLatch
Рассмотрим следующий пример. Мы хотим провести автомобильную гонку. В гонке принимают участие пять автомобилей. Для начала гонки нужно, чтобы выполнились следующие условия:
CountDownLatch может быть использован в самых разных схемах синхронизации: к примеру, чтобы пока один поток выполняет работу, заставить другие потоки ждать или, наоборот, чтобы заставить поток ждать других, чтобы выполнить работу.
- Каждый из пяти автомобилей подъехал к стартовой прямой;
- Была дана команда «На старт!»;
- Была дана команда «Внимание!»;
- Была дана команда «Марш!».
import java.util.concurrent.CountDownLatch;
public class Race {
//Создаем CountDownLatch на 8 "условий"
private static final CountDownLatch START = new CountDownLatch(8);
//Условная длина гоночной трассы
private static final int trackLength = 500000;
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i <= 5; i++) {
new Thread(new Car(i, (int) (Math.random() * 100 + 50))).start();
Thread.sleep(1000);
}
while (START.getCount() > 3) //Проверяем, собрались ли все автомобили
Thread.sleep(100); //у стартовой прямой. Если нет, ждем 100ms
Thread.sleep(1000);
System.out.println("На старт!");
START.countDown();//Команда дана, уменьшаем счетчик на 1
Thread.sleep(1000);
System.out.println("Внимание!");
START.countDown();//Команда дана, уменьшаем счетчик на 1
Thread.sleep(1000);
System.out.println("Марш!");
START.countDown();//Команда дана, уменьшаем счетчик на 1
//счетчик становится равным нулю, и все ожидающие потоки
//одновременно разблокируются
}
public static class Car implements Runnable {
private int carNumber;
private int carSpeed;//считаем, что скорость автомобиля постоянная
public Car(int carNumber, int carSpeed) {
this.carNumber = carNumber;
this.carSpeed = carSpeed;
}
@Override
public void run() {
try {
System.out.printf("Автомобиль №%d подъехал к стартовой прямой.\n", carNumber);
//Автомобиль подъехал к стартовой прямой - условие выполнено
//уменьшаем счетчик на 1
START.countDown();
//метод await() блокирует поток, вызвавший его, до тех пор, пока
//счетчик CountDownLatch не станет равен 0
START.await();
Thread.sleep(trackLength / carSpeed);//ждем пока проедет трассу
System.out.printf("Автомобиль №%d финишировал!\n", carNumber);
} catch (InterruptedException e) {
}
}
}
}
Результат работы программы
Автомобиль №1 подъехал к стартовой прямой.
Автомобиль №2 подъехал к стартовой прямой.
Автомобиль №3 подъехал к стартовой прямой.
Автомобиль №4 подъехал к стартовой прямой.
Автомобиль №5 подъехал к стартовой прямой.
На старт!
Внимание!
Марш!
Автомобиль №4 финишировал!
Автомобиль №1 финишировал!
Автомобиль №3 финишировал!
Автомобиль №5 финишировал!
Автомобиль №2 финишировал!
Автомобиль №2 подъехал к стартовой прямой.
Автомобиль №3 подъехал к стартовой прямой.
Автомобиль №4 подъехал к стартовой прямой.
Автомобиль №5 подъехал к стартовой прямой.
На старт!
Внимание!
Марш!
Автомобиль №4 финишировал!
Автомобиль №1 финишировал!
Автомобиль №3 финишировал!
Автомобиль №5 финишировал!
Автомобиль №2 финишировал!
CountDownLatch может быть использован в самых разных схемах синхронизации: к примеру, чтобы пока один поток выполняет работу, заставить другие потоки ждать или, наоборот, чтобы заставить поток ждать других, чтобы выполнить работу.
CyclicBarrier
CyclicBarrier реализует шаблон синхронизации Барьер. Циклический барьер является точкой синхронизации, в которой указанное количество параллельных потоков встречается и блокируется. Как только все потоки прибыли, выполняется опционное действие (или не выполняется, если барьер был инициализирован без него), и, после того, как оно выполнено, барьер ломается и ожидающие потоки «освобождаются». В конструктор барьера (
CyclicBarrier(int parties)
и CyclicBarrier(int parties, Runnable barrierAction)
) обязательно передается количество сторон, которые должны «встретиться», и, опционально, действие, которое должно произойти, когда стороны встретились, но перед тем когда они будут «отпущены». Барьер похож на CountDownLatch, но главное различие между ними в том, что вы не можете заново использовать «замок» после того, как его счётчик достигнет нуля, а барьер вы можете использовать снова, даже после того, как он сломается. CyclicBarrier является альтернативой метода
join()
, который «собирает» потоки только после того, как они выполнились.Официальная документация по CyclicBarrier.
Пример использования CyclicBarrier
Рассмотрим следующий пример. Существует паромная переправа. Паром может переправлять одновременно по три автомобиля. Чтобы не гонять паром лишний раз, нужно отправлять его, когда у переправы соберется минимум три автомобиля.
Когда три потока достигают метода
import java.util.concurrent.CyclicBarrier;
public class Ferry {
private static final CyclicBarrier BARRIER = new CyclicBarrier(3, new FerryBoat());
//Инициализируем барьер на три потока и таском, который будет выполняться, когда
//у барьера соберется три потока. После этого, они будут освобождены.
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 9; i++) {
new Thread(new Car(i)).start();
Thread.sleep(400);
}
}
//Таск, который будет выполняться при достижении сторонами барьера
public static class FerryBoat implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println("Паром переправил автомобили!");
} catch (InterruptedException e) {
}
}
}
//Стороны, которые будут достигать барьера
public static class Car implements Runnable {
private int carNumber;
public Car(int carNumber) {
this.carNumber = carNumber;
}
@Override
public void run() {
try {
System.out.printf("Автомобиль №%d подъехал к паромной переправе.\n", carNumber);
//Для указания потоку о том что он достиг барьера, нужно вызвать метод await()
//После этого данный поток блокируется, и ждет пока остальные стороны достигнут барьера
BARRIER.await();
System.out.printf("Автомобиль №%d продолжил движение.\n", carNumber);
} catch (Exception e) {
}
}
}
}
Результат работы программы
Автомобиль №0 подъехал к паромной переправе.
Автомобиль №1 подъехал к паромной переправе.
Автомобиль №2 подъехал к паромной переправе.
Автомобиль №3 подъехал к паромной переправе.
Паром переправил автомобили!
Автомобиль №2 продолжил движение.
Автомобиль №1 продолжил движение.
Автомобиль №0 продолжил движение.
Автомобиль №4 подъехал к паромной переправе.
Автомобиль №5 подъехал к паромной переправе.
Автомобиль №6 подъехал к паромной переправе.
Паром переправил автомобили!
Автомобиль №5 продолжил движение.
Автомобиль №4 продолжил движение.
Автомобиль №3 продолжил движение.
Автомобиль №7 подъехал к паромной переправе.
Автомобиль №8 подъехал к паромной переправе.
Паром переправил автомобили!
Автомобиль №8 продолжил движение.
Автомобиль №6 продолжил движение.
Автомобиль №7 продолжил движение.
Автомобиль №1 подъехал к паромной переправе.
Автомобиль №2 подъехал к паромной переправе.
Автомобиль №3 подъехал к паромной переправе.
Паром переправил автомобили!
Автомобиль №2 продолжил движение.
Автомобиль №1 продолжил движение.
Автомобиль №0 продолжил движение.
Автомобиль №4 подъехал к паромной переправе.
Автомобиль №5 подъехал к паромной переправе.
Автомобиль №6 подъехал к паромной переправе.
Паром переправил автомобили!
Автомобиль №5 продолжил движение.
Автомобиль №4 продолжил движение.
Автомобиль №3 продолжил движение.
Автомобиль №7 подъехал к паромной переправе.
Автомобиль №8 подъехал к паромной переправе.
Паром переправил автомобили!
Автомобиль №8 продолжил движение.
Автомобиль №6 продолжил движение.
Автомобиль №7 продолжил движение.
Когда три потока достигают метода
await()
, барьерное действие запускается, и паром переправляет три автомобиля из скопившихся. После этого начинается новый цикл.Exchanger<V>
Exchanger (обменник) может понадобиться, для того, чтобы обменяться данными между двумя потоками в определенной точки работы обоих потоков. Обменник — обобщенный класс, он параметризируется типом объекта для передачи.
Обменник является точкой синхронизации пары потоков: поток, вызывающий у обменника метод
exchange()
блокируется и ждет другой поток. Когда другой поток вызовет тот же метод, произойдет обмен объектами: каждая из них получит аргумент другой в методе exchange()
. Стоит отметить, что обменник поддерживает передачу null
значения. Это дает возможность использовать его для передачи объекта в одну сторону, или, просто как точку синхронизации двух потоков.Официальная документация по Exchanger.
Пример использования Exchanger
Рассмотрим следующий пример. Есть два грузовика: один едет из пункта A в пункт D, другой из пункта B в пункт С. Дороги AD и BC пересекаются в пункте E. Из пунктов A и B нужно доставить посылки в пункты C и D. Для этого грузовики в пункте E должны встретиться и обменяться соответствующими посылками.
Как мы видим, когда один грузовик (один поток) приезжает в пункт Е (достигает точки синхронизации), он ждет пока другой грузовик (другой поток) приедет в пункт Е (достигнет точки синхронизации). После этого происходит обмен посылками (String) и оба грузовика (потока) продолжают свой путь (работу).
import java.util.concurrent.Exchanger;
public class Delivery {
//Создаем обменник, который будет обмениваться типом String
private static final Exchanger<String> EXCHANGER = new Exchanger<>();
public static void main(String[] args) throws InterruptedException {
String[] p1 = new String[]{"{посылка A->D}", "{посылка A->C}"};//Формируем груз для 1-го грузовика
String[] p2 = new String[]{"{посылка B->C}", "{посылка B->D}"};//Формируем груз для 2-го грузовика
new Thread(new Truck(1, "A", "D", p1)).start();//Отправляем 1-й грузовик из А в D
Thread.sleep(100);
new Thread(new Truck(2, "B", "C", p2)).start();//Отправляем 2-й грузовик из В в С
}
public static class Truck implements Runnable {
private int number;
private String dep;
private String dest;
private String[] parcels;
public Truck(int number, String departure, String destination, String[] parcels) {
this.number = number;
this.dep = departure;
this.dest = destination;
this.parcels = parcels;
}
@Override
public void run() {
try {
System.out.printf("В грузовик №%d погрузили: %s и %s.\n", number, parcels[0], parcels[1]);
System.out.printf("Грузовик №%d выехал из пункта %s в пункт %s.\n", number, dep, dest);
Thread.sleep(1000 + (long) Math.random() * 5000);
System.out.printf("Грузовик №%d приехал в пункт Е.\n", number);
parcels[1] = EXCHANGER.exchange(parcels[1]);//При вызове exchange() поток блокируется и ждет
//пока другой поток вызовет exchange(), после этого произойдет обмен посылками
System.out.printf("В грузовик №%d переместили посылку для пункта %s.\n", number, dest);
Thread.sleep(1000 + (long) Math.random() * 5000);
System.out.printf("Грузовик №%d приехал в %s и доставил: %s и %s.\n", number, dest, parcels[0], parcels[1]);
} catch (InterruptedException e) {
}
}
}
}
Результат работы программы
В грузовик №1 погрузили: {посылка A->D} и {посылка A->C}.
Грузовик №1 выехал из пункта A в пункт D.
В грузовик №2 погрузили: {посылка B->C} и {посылка B->D}.
Грузовик №2 выехал из пункта B в пункт C.
Грузовик №1 приехал в пункт Е.
Грузовик №2 приехал в пункт Е.
В грузовик №2 переместили посылку для пункта C.
В грузовик №1 переместили посылку для пункта D.
Грузовик №2 приехал в C и доставил: {посылка B->C} и {посылка A->C}.
Грузовик №1 приехал в D и доставил: {посылка A->D} и {посылка B->D}.
Грузовик №1 выехал из пункта A в пункт D.
В грузовик №2 погрузили: {посылка B->C} и {посылка B->D}.
Грузовик №2 выехал из пункта B в пункт C.
Грузовик №1 приехал в пункт Е.
Грузовик №2 приехал в пункт Е.
В грузовик №2 переместили посылку для пункта C.
В грузовик №1 переместили посылку для пункта D.
Грузовик №2 приехал в C и доставил: {посылка B->C} и {посылка A->C}.
Грузовик №1 приехал в D и доставил: {посылка A->D} и {посылка B->D}.
Как мы видим, когда один грузовик (один поток) приезжает в пункт Е (достигает точки синхронизации), он ждет пока другой грузовик (другой поток) приедет в пункт Е (достигнет точки синхронизации). После этого происходит обмен посылками (String) и оба грузовика (потока) продолжают свой путь (работу).
Phaser
Phaser (фазер), как и CyclicBarrier, является реализацией шаблона синхронизации Барьер, но, в отличии от CyclicBarrier, предоставляет больше гибкости. Этот класс позволяет синхронизировать потоки, представляющие отдельную фазу или стадию выполнения общего действия. Как и CyclicBarrier, Phaser является точкой синхронизации, в которой встречаются потоки-участники. Когда все стороны прибыли, Phaser переходит к следующей фазе и снова ожидает ее завершения.
Если сравнить Phaser и CyclicBarrier, то можно выделить следующие важные особенности Phaser:
- Каждая фаза (цикл синхронизации) имеет номер;
- Количество сторон-участников жестко не задано и может меняться: поток может регистрироваться в качестве участника и отменять свое участие;
- Участник не обязан ожидать, пока все остальные участники соберутся на барьере. Чтобы продолжить свою работу достаточно сообщить о своем прибытии;
- Случайные свидетели могут следить за активностью в барьере;
- Поток может и не быть стороной-участником барьера, чтобы ожидать его преодоления;
- У фазера нет опционального действия.
Объект Phaser создается с помощью одного из конструкторов:
Phaser()
Phaser(int parties)
Параметр parties указывает на количество сторон-участников, которые будут выполнять фазы действия. Первый конструктор создает объект Phaser без каких-либо сторон, при этом барьер в этом случае тоже «закрыт». Второй конструктор регистрирует передаваемое в конструктор количество сторон. Барьер открывается когда все стороны прибыли, или, если снимается последний участник. (У класса Phaser еще есть конструкторы, в которые передается родительский объект Phaser, но мы их рассматривать не будем.)
Основные методы:
- int register() — регистрирует нового участника, который выполняет фазы. Возвращает номер текущей фазы;
- int getPhase() — возвращает номер текущей фазы;
- int arriveAndAwaitAdvance() — указывает что поток завершил выполнение фазы. Поток приостанавливается до момента, пока все остальные стороны не закончат выполнять данную фазу. Точный аналог
CyclicBarrier.await()
. Возвращает номер текущей фазы; - int arrive() — сообщает, что сторона завершила фазу, и возвращает номер фазы. При вызове данного метода поток не приостанавливается, а продолжает выполнятся;
- int arriveAndDeregister() — сообщает о завершении всех фаз стороной и снимает ее с регистрации. Возвращает номер текущей фазы;
- int awaitAdvance(int phase) — если phase равно номеру текущей фазы, приостанавливает вызвавший его поток до её окончания. В противном случае сразу возвращает аргумент.
Официальная документация по Phaser.
Пример использования Phaser
Рассмотрим следующий пример. Есть пять остановок. На первых четырех из них могут стоять пассажиры и ждать автобуса. Автобус выезжает из парка и останавливается на каждой остановке на некоторое время. После конечной остановки автобус едет в парк. Нам нужно забрать пассажиров и высадить их на нужных остановках.
import java.util.ArrayList;
import java.util.concurrent.Phaser;
public class Bus {
private static final Phaser PHASER = new Phaser(1);//Сразу регистрируем главный поток
//Фазы 0 и 6 - это автобусный парк, 1 - 5 остановки
public static void main(String[] args) throws InterruptedException {
ArrayList<Passenger> passengers = new ArrayList<>();
for (int i = 1; i < 5; i++) { //Сгенерируем пассажиров на остановках
if ((int) (Math.random() * 2) > 0)
passengers.add(new Passenger(i, i + 1));//Этот пассажир выходит на следующей
if ((int) (Math.random() * 2) > 0)
passengers.add(new Passenger(i, 5)); //Этот пассажир выходит на конечной
}
for (int i = 0; i < 7; i++) {
switch (i) {
case 0:
System.out.println("Автобус выехал из парка.");
PHASER.arrive();//В фазе 0 всего 1 участник - автобус
break;
case 6:
System.out.println("Автобус уехал в парк.");
PHASER.arriveAndDeregister();//Снимаем главный поток, ломаем барьер
break;
default:
int currentBusStop = PHASER.getPhase();
System.out.println("Остановка № " + currentBusStop);
for (Passenger p : passengers) //Проверяем, есть ли пассажиры на остановке
if (p.departure == currentBusStop) {
PHASER.register();//Регистрируем поток, который будет участвовать в фазах
p.start(); // и запускаем
}
PHASER.arriveAndAwaitAdvance();//Сообщаем о своей готовности
}
}
}
public static class Passenger extends Thread {
private int departure;
private int destination;
public Passenger(int departure, int destination) {
this.departure = departure;
this.destination = destination;
System.out.println(this + " ждёт на остановке № " + this.departure);
}
@Override
public void run() {
try {
System.out.println(this + " сел в автобус.");
while (PHASER.getPhase() < destination) //Пока автобус не приедет на нужную остановку(фазу)
PHASER.arriveAndAwaitAdvance(); //заявляем в каждой фазе о готовности и ждем
Thread.sleep(1);
System.out.println(this + " покинул автобус.");
PHASER.arriveAndDeregister(); //Отменяем регистрацию на нужной фазе
} catch (InterruptedException e) {
}
}
@Override
public String toString() {
return "Пассажир{" + departure + " -> " + destination + '}';
}
}
}
Результат работы программы
Пассажир{1 -> 2} ждёт на остановке № 1
Пассажир{1 -> 5} ждёт на остановке № 1
Пассажир{2 -> 3} ждёт на остановке № 2
Пассажир{2 -> 5} ждёт на остановке № 2
Пассажир{3 -> 4} ждёт на остановке № 3
Пассажир{4 -> 5} ждёт на остановке № 4
Пассажир{4 -> 5} ждёт на остановке № 4
Автобус выехал из парка.
Остановка № 1
Пассажир{1 -> 5} сел в автобус.
Пассажир{1 -> 2} сел в автобус.
Остановка № 2
Пассажир{2 -> 3} сел в автобус.
Пассажир{1 -> 2} покинул автобус.
Пассажир{2 -> 5} сел в автобус.
Остановка № 3
Пассажир{2 -> 3} покинул автобус.
Пассажир{3 -> 4} сел в автобус.
Остановка № 4
Пассажир{4 -> 5} сел в автобус.
Пассажир{3 -> 4} покинул автобус.
Пассажир{4 -> 5} сел в автобус.
Остановка № 5
Пассажир{1 -> 5} покинул автобус.
Пассажир{2 -> 5} покинул автобус.
Пассажир{4 -> 5} покинул автобус.
Пассажир{4 -> 5} покинул автобус.
Автобус уехал в парк.
Пассажир{1 -> 5} ждёт на остановке № 1
Пассажир{2 -> 3} ждёт на остановке № 2
Пассажир{2 -> 5} ждёт на остановке № 2
Пассажир{3 -> 4} ждёт на остановке № 3
Пассажир{4 -> 5} ждёт на остановке № 4
Пассажир{4 -> 5} ждёт на остановке № 4
Автобус выехал из парка.
Остановка № 1
Пассажир{1 -> 5} сел в автобус.
Пассажир{1 -> 2} сел в автобус.
Остановка № 2
Пассажир{2 -> 3} сел в автобус.
Пассажир{1 -> 2} покинул автобус.
Пассажир{2 -> 5} сел в автобус.
Остановка № 3
Пассажир{2 -> 3} покинул автобус.
Пассажир{3 -> 4} сел в автобус.
Остановка № 4
Пассажир{4 -> 5} сел в автобус.
Пассажир{3 -> 4} покинул автобус.
Пассажир{4 -> 5} сел в автобус.
Остановка № 5
Пассажир{1 -> 5} покинул автобус.
Пассажир{2 -> 5} покинул автобус.
Пассажир{4 -> 5} покинул автобус.
Пассажир{4 -> 5} покинул автобус.
Автобус уехал в парк.
Кстати, функционалом фазера можно воспроизвести работу CountDownLatch.
Пример из CountDownLatch с использованием Phaser
import java.util.concurrent.Phaser;
public class NewRace {
private static final Phaser START = new Phaser(8);
private static final int trackLength = 500000;
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i <= 5; i++) {
new Thread(new Car(i, (int) (Math.random() * 100 + 50))).start();
Thread.sleep(100);
}
while (START.getRegisteredParties() > 3) //Проверяем, собрались ли все автомобили
Thread.sleep(100); //у стартовой прямой. Если нет, ждем 100ms
Thread.sleep(100);
System.out.println("На старт!");
START.arriveAndDeregister();
Thread.sleep(100);
System.out.println("Внимание!");
START.arriveAndDeregister();
Thread.sleep(100);
System.out.println("Марш!");
START.arriveAndDeregister();
}
public static class Car implements Runnable {
private int carNumber;
private int carSpeed;
public Car(int carNumber, int carSpeed) {
this.carNumber = carNumber;
this.carSpeed = carSpeed;
}
@Override
public void run() {
try {
System.out.printf("Автомобиль №%d подъехал к стартовой прямой.\n", carNumber);
START.arriveAndDeregister();
START.awaitAdvance(0);
Thread.sleep(trackLength / carSpeed);
System.out.printf("Автомобиль №%d финишировал!\n", carNumber);
} catch (InterruptedException e) {
}
}
}
}
Если кому-нибудь пригодилось, то я очень рад=)
Более подробно о Phaser здесь.
Почитать ещё о синхронизаторах и посмотреть примеры можно здесь.
Отличный обзор java.util.concurrent смотрите здесь.