Новый синхронизатор Phaser

  • Tutorial
Phaser (Этапщик) — мощная и гибкая реализация паттерна синхронизации Барьер. Включен в JDK 7 в составе пакета java.util.concurrent.

Поскольку в документации, как говорится, без ста грамм не разберешься, опишу тут принцип действия, неочевидные моменты и приведу несколько примеров использования.



Phaser вполне естественно расширяет функциональность предшественника из JDK 1.5, CyclicBarrier (про него можно почитать тут):
  • Количество участников барьера может меняться.
  • Поток не обязан ожидать, пока все участники соберутся на барьере. Достаточно только сообщить о готовности своей работы.
  • Наоборот, потоку необязательно быть участником барьера, чтобы ожидать его преодоления.
Кстати, это расширение настолько общо, что согласуется и с контрактом третьего барьера из стандартной библиотеки, CountDownLatch.

 
Состояние этапщика включает
  1. номер этапа (фазы, цикла синхронизации) | int phase
  2. количество участников | int parties
  3. количество участников, которые заявили/не заявили о своей готовности | int arrived, unarrived
  4. состояние завершения | boolean terminated
Всегда верно:
terminated = false   →   phase = [реальный номер этапа, счет ведется с нуля] % (231 − 1)
parties  =  arrived + unarrived
0  ≤  unarrived, arrived  ≤  parties

Все элементы состояния снабжены геттерами.

 
0. Если этапщик находится в терминальном состоянии (terminated = true), он не изменяем; вызов любого управляющего метода возвращает немедленно. Phase имеет отрицательное значение, parties, arrived, unarrived — значение в момент завершения.

 
1. Основные управляющие методы:
register() зарегистрировать участника
arrive() сообщить этапщику о своей готовности, не ожидая открытия барьера
arriveAndAwaitAdvance() классическое прибытие на барьер. Точный аналог CyclicBarrier.await()
arriveAndDeregister() отменить свое участие
Ясное дело, как и в других синхронизаторах из JDK, вызывающий поток в управляющих методах не отслеживается, поэтому термины вроде «потока-участника» и «своей регистрации» условны. Т. е. заряженный пистолет уже у нас в руках, осталось направить его на ногу и спустить курок :-) Впрочем, несложно написать обертку, исправляющую эту опасную ситуацию.

 
2. Барьер открывается сразу после всякого уменьшения unarrived до нуля.1 То есть, в том числе, когда снимается последний участник, однако при создании «пустого» этапщика (new Phaser() или new Phaser(0)) «ворота закрыты».

Так или иначе, преодолеть барьер можно только с вызовом одного из методов, начинающихся на «arrive». В контексте потока, который это сделает, выполняется protected-метод onAdvance(phase, parties) — если он возвращает true, этапщик завершает свою работу (terminated ← true). Этот механизм позволяет управлять жизненным циклом изнутри класса. В дефолтной реализации phaser умирает как раз если с барьера ушли все участники (parties = 0).

Открытие барьера есть переход на новый этап: phase ← phase + 1.

Как это примерно работает:
final Phaser ph = new Phaser(4);
ph.arrive();
ph.register();
new Thread() {public void run(){ ph.arriveAndAwaitAdvance(); }}.start();
ph.arrive();
ph.arrive();
ph.arriveAndDeregister(); 
// phase number = 1 
// Thread-0 released

image
Над состоянием написан метод, который к нему привел

 
3. arriveAndDeregister — вредно думать, что поток сначала заявляет о готовности, и лишь потом снимается, о чем намекает название метода. Суть отражало бы «arriveToDeregister», «приехал, чтобы положить партбилет на стол» :-) Такой подход устраняет неоднозначность в понимании (см. иллюстрацию выше): перед переходом на новый этап вызван onAdvance() с аргументом parties = 4 или 5?

 
4. Пример изменения логики завершения в методе onAdvance. «Неубиваемый» этапщик конструируется так:
new Phaser() {
	protected boolean onAdvance(int phase, int parties) {
		return false;
	}
};

Еще в методе можно выполнить т. н. барьерное действие — композицию результатов работы участников и т. п.

Конкретнее о том, что происходит при переходе на следующий этап:
image

 

Использование


Чтобы получить из Phaser CyclicBarrier, достаточно ограничиться использованием метода arriveAndAwaitAdvance().

Эмуляция CountDownLatch немногим сложнее:
// aka countDown()
phaser.arriveAndDeregister(); // просто arrive() для многоразовой защелки

// aka await()
if (!phaser.isTerminated()) // необязательно
	phaser.awaitAdvance( phaser.getPhase() );

Конструкция awaitAdvance(getPhase()) безопасна (хоть и не атомарна), потому что метод awaitAdvance(int phaseNumber) возвращает сразу, если указанный этап уже пройден.

 
Вообще, наличие счетчика этапа значительно упрощает жизнь и увеличивает количество применений класса.

Распараллеливание N одинаковых действий подряд:
import java.util.concurrent.Phaser;

public class FBIEasterEgg {
	
	static int lines = 10;
	static String alphabet = "abcdefghijklmnopqrstuvwxyz";
	static StringBuffer randomAlphabet = new StringBuffer();

	public static void main(String[] args) {
		
		final Phaser phaser = new Phaser() { 
			protected boolean onAdvance(int phase, int parties) {
				// John Nash mode
				/* print "deviations"
				for (int i = 0; i < alphabet.length(); i++) {
					System.out.printf("%d ", randomAlphabet.indexOf(
							  alphabet.charAt(i) + "") - i);
				}
				System.out.println();
				*/
				System.out.println(randomAlphabet);
				//
				randomAlphabet = new StringBuffer();
				
				return phase >= lines; //loop count managing here
			}
		};
		
		// everyone have to wait for the main thread
		phaser.register(); 
		
		for (int i = 0; i < alphabet.length(); i++) {
			
			final char next = alphabet.charAt(i);
			phaser.register(); // ticket for the following thread
			
			new Thread() {
				public void run() {
					do {
						randomAlphabet.append(next);
						phaser.arriveAndAwaitAdvance(); // checkout
					} while ( !phaser.isTerminated() );
				}
			}.start();
		}
		
		System.out.println("Write this by hand and carry in the pocket:");
		// some additional preparations may be done here
		// release
		phaser.arriveAndDeregister(); 
	}
}


Ожидать, пока случится заданное число каких-то событий:
class EventCounter {
	private Phaser count = new Phaser(1);
	
	public void eventOccured() {
		count.arrive();
	}
	
	public void waitFor(int events) {
		count.register();
		
		for (int i = 0; i < events; i++) {
			count.arriveAndAwaitAdvance();
		}
		
		count.arriveAndDeregister();
	}
}


 

Реализация


Даг Ли использовал остроумную идею: хранить все состояние этапщика в одном лонге, и изменять через атомарные compare-and-set операции.
Phaser from JDK 7
Благодаря этому в реализации почти не используются обычные блокировки.

 

Ссылки


Phaser javadoc
JDK 7 download page
Concurrency JSR-166 Interest Site — там есть ссылка на скачивание нового java.util.concurrent для JDK 6.

 
П. С. Название «Этапщик», конечно, корявое, если кто-то знает/придумает лучше — не таите.
1 — при условии, что данный Phaser является корнем иерархии. В этом обзоре механизм масштабирования не рассматривается (все равно бы наврал), см. javadoc.
  • +36
  • 35.4k
  • 4
Share post

Similar posts

Comments 4

    +1
    Делал в сильно упрощённом виде для С++ github.com/HounD/tsw
      –2
      расширение настолько общо? Общее м.б?
        0
        А вообще спасибо за обзор, полезно
          0
          Спасибо. Жаль что в проектах, где я учавствую вся многопоточность разруливает блоками синхронизации. То есть даже java.util.concurrent.* из 1.5 не используется толком.

          Но будет больше стимула написать «проект выходного дня» и показать разницу в производительности.

          Only users with full accounts can post comments. Log in, please.