Phaser (Этапщик) — мощная и гибкая реализация паттерна синхронизации Барьер. Включен в JDK 7 в составе пакета java.util.concurrent.
Поскольку в документации, как говорится, без ста грамм не разберешься, опишу тут принцип действия, неочевидные моменты и приведу несколько примеров использования.
Phaser вполне естественно расширяет функциональность предшественника из JDK 1.5, CyclicBarrier (про него можно почитать тут):
Состояние этапщика включает
terminated = false → phase = [реальный номер этапа, счет ведется с нуля] % (231 − 1)
parties = arrived + unarrived
0 ≤ unarrived, arrived ≤ parties
Все элементы состояния снабжены геттерами.
0. Если этапщик находится в терминальном состоянии (terminated = true), он не изменяем; вызов любого управляющего метода возвращает немедленно. Phase имеет отрицательное значение, parties, arrived, unarrived — значение в момент завершения.
1. Основные управляющие методы:
Ясное дело, как и в других синхронизаторах из JDK, вызывающий поток в управляющих методах не отслеживается, поэтому термины вроде «потока-участника» и «своей регистрации» условны. Т. е. заряженный пистолет уже у нас в руках, осталось направить его на ногу и спустить курок :-) Впрочем, несложно написать обертку, исправляющую эту опасную ситуацию.
2. Барьер открывается сразу после всякого уменьшения unarrived до нуля.1 То есть, в том числе, когда снимается последний участник, однако при создании «пустого» этапщика (new Phaser() или new Phaser(0)) «ворота закрыты».
Так или иначе, преодолеть барьер можно только с вызовом одного из методов, начинающихся на «arrive». В контексте потока, который это сделает, выполняется protected-метод onAdvance(phase, parties) — если он возвращает true, этапщик завершает свою работу (terminated ← true). Этот механизм позволяет управлять жизненным циклом изнутри класса. В дефолтной реализации phaser умирает как раз если с барьера ушли все участники (parties = 0).
Открытие барьера есть переход на новый этап: phase ← phase + 1.
Как это примерно работает:

Над состоянием написан метод, который к нему привел
3. arriveAndDeregister — вредно думать, что поток сначала заявляет о готовности, и лишь потом снимается, о чем намекает название метода. Суть отражало бы «arriveToDeregister», «приехал, чтобы положить партбилет на стол» :-) Такой подход устраняет неоднозначность в понимании (см. иллюстрацию выше): перед переходом на новый этап вызван onAdvance() с аргументом parties = 4 или 5?
4. Пример изменения логики завершения в методе onAdvance. «Неубиваемый» этапщик конструируется так:
Еще в методе можно выполнить т. н. барьерное действие — композицию результатов работы участников и т. п.
Конкретнее о том, что происходит при переходе на следующий этап:

Чтобы получить из Phaser CyclicBarrier, достаточно ограничиться использованием метода arriveAndAwaitAdvance().
Эмуляция CountDownLatch немногим сложнее:
Конструкция awaitAdvance(getPhase()) безопасна (хоть и не атомарна), потому что метод awaitAdvance(int phaseNumber) возвращает сразу, если указанный этап уже пройден.
Вообще, наличие счетчика этапа значительно упрощает жизнь и увеличивает количество применений класса.
Распараллеливание N одинаковых действий подряд:
Ожидать, пока случится заданное число каких-то событий:
Даг Ли использовал остроумную идею: хранить все состояние этапщика в одном лонге, и изменять через атомарные compare-and-set операции.

Благодаря этому в реализации почти не используются обычные блокировки.
Phaser javadoc
JDK 7 download page
Concurrency JSR-166 Interest Site — там есть ссылка на скачивание нового java.util.concurrent для JDK 6.
П. С. Название «Этапщик», конечно, корявое, если кто-то знает/придумает лучше — не таите.
1 — при условии, что данный Phaser является корнем иерархии. В этом обзоре механизм масштабирования не рассматривается (все равно бы наврал), см. javadoc.
Поскольку в документации, как говорится, без ста грамм не разберешься, опишу тут принцип действия, неочевидные моменты и приведу несколько примеров использования.
Phaser вполне естественно расширяет функциональность предшественника из JDK 1.5, CyclicBarrier (про него можно почитать тут):
- Количество участников барьера может меняться.
- Поток не обязан ожидать, пока все участники соберутся на барьере. Достаточно только сообщить о готовности своей работы.
- Наоборот, потоку необязательно быть участником барьера, чтобы ожидать его преодоления.
Состояние этапщика включает
- номер этапа (фазы, цикла синхронизации) | int phase
- количество участников | int parties
- количество участников, которые заявили/не заявили о своей готовности | int arrived, unarrived
- состояние завершения | boolean terminated
terminated = false → phase = [реальный номер этапа, счет ведется с нуля] % (231 − 1)
parties = arrived + unarrived
0 ≤ unarrived, arrived ≤ parties
Все элементы состояния снабжены геттерами.
0. Если этапщик находится в терминальном состоянии (terminated = true), он не изменяем; вызов любого управляющего метода возвращает немедленно. Phase имеет отрицательное значение, parties, arrived, unarrived — значение в момент завершения.
1. Основные управляющие методы:
register() | зарегистрировать участника |
arrive() | сообщить этапщику о своей готовности, не ожидая открытия барьера |
arriveAndAwaitAdvance() | классическое прибытие на барьер. Точный аналог CyclicBarrier.await() |
arriveAndDeregister() | отменить свое участие |
2. Барьер открывается сразу после всякого уменьшения unarrived до нуля.1 То есть, в том числе, когда снимается последний участник, однако при создании «пустого» этапщика (new Phaser() или new Phaser(0)) «ворота закрыты».
Так или иначе, преодолеть барьер можно только с вызовом одного из методов, начинающихся на «arrive». В контексте потока, который это сделает, выполняется protected-метод onAdvance(phase, parties) — если он возвращает true, этапщик завершает свою работу (terminated ← true). Этот механизм позволяет управлять жизненным циклом изнутри класса. В дефолтной реализации phaser умирает как раз если с барьера ушли все участники (parties = 0).
Открытие барьера есть переход на новый этап: phase ← phase + 1.
Как это примерно работает:
final Phaser ph = new Phaser(4);
ph.arrive();
ph.register();
new Thread() {public void run(){ ph.arriveAndAwaitAdvance(); }}.start();
ph.arrive();
ph.arrive();
ph.arriveAndDeregister();
// phase number = 1
// Thread-0 released

Над состоянием написан метод, который к нему привел
3. arriveAndDeregister — вредно думать, что поток сначала заявляет о готовности, и лишь потом снимается, о чем намекает название метода. Суть отражало бы «arriveToDeregister», «приехал, чтобы положить партбилет на стол» :-) Такой подход устраняет неоднозначность в понимании (см. иллюстрацию выше): перед переходом на новый этап вызван onAdvance() с аргументом parties = 4 или 5?
4. Пример изменения логики завершения в методе onAdvance. «Неубиваемый» этапщик конструируется так:
new Phaser() {
protected boolean onAdvance(int phase, int parties) {
return false;
}
};
Еще в методе можно выполнить т. н. барьерное действие — композицию результатов работы участников и т. п.
Конкретнее о том, что происходит при переходе на следующий этап:

Использование
Чтобы получить из Phaser CyclicBarrier, достаточно ограничиться использованием метода arriveAndAwaitAdvance().
Эмуляция CountDownLatch немногим сложнее:
// aka countDown()
phaser.arriveAndDeregister(); // просто arrive() для многоразовой защелки
// aka await()
if (!phaser.isTerminated()) // необязательно
phaser.awaitAdvance( phaser.getPhase() );
Конструкция awaitAdvance(getPhase()) безопасна (хоть и не атомарна), потому что метод awaitAdvance(int phaseNumber) возвращает сразу, если указанный этап уже пройден.
Вообще, наличие счетчика этапа значительно упрощает жизнь и увеличивает количество применений класса.
Распараллеливание N одинаковых действий подряд:
import java.util.concurrent.Phaser;
public class FBIEasterEgg {
static int lines = 10;
static String alphabet = "abcdefghijklmnopqrstuvwxyz";
static StringBuffer randomAlphabet = new StringBuffer();
public static void main(String[] args) {
final Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int parties) {
// John Nash mode
/* print "deviations"
for (int i = 0; i < alphabet.length(); i++) {
System.out.printf("%d ", randomAlphabet.indexOf(
alphabet.charAt(i) + "") - i);
}
System.out.println();
*/
System.out.println(randomAlphabet);
//
randomAlphabet = new StringBuffer();
return phase >= lines; //loop count managing here
}
};
// everyone have to wait for the main thread
phaser.register();
for (int i = 0; i < alphabet.length(); i++) {
final char next = alphabet.charAt(i);
phaser.register(); // ticket for the following thread
new Thread() {
public void run() {
do {
randomAlphabet.append(next);
phaser.arriveAndAwaitAdvance(); // checkout
} while ( !phaser.isTerminated() );
}
}.start();
}
System.out.println("Write this by hand and carry in the pocket:");
// some additional preparations may be done here
// release
phaser.arriveAndDeregister();
}
}
Ожидать, пока случится заданное число каких-то событий:
class EventCounter {
private Phaser count = new Phaser(1);
public void eventOccured() {
count.arrive();
}
public void waitFor(int events) {
count.register();
for (int i = 0; i < events; i++) {
count.arriveAndAwaitAdvance();
}
count.arriveAndDeregister();
}
}
Реализация
Даг Ли использовал остроумную идею: хранить все состояние этапщика в одном лонге, и изменять через атомарные compare-and-set операции.

Благодаря этому в реализации почти не используются обычные блокировки.
Ссылки
Phaser javadoc
JDK 7 download page
Concurrency JSR-166 Interest Site — там есть ссылка на скачивание нового java.util.concurrent для JDK 6.
П. С. Название «Этапщик», конечно, корявое, если кто-то знает/придумает лучше — не таите.
1 — при условии, что данный Phaser является корнем иерархии. В этом обзоре механизм масштабирования не рассматривается (все равно бы наврал), см. javadoc.