Pull to refresh

Comments 39

Делать wait / notify на BlockingQueue — это оскорбление всего Java Concurrency.
BlockingQueue на то и blocking, что вызов take() будет ждать, пока в очередь не поступит элемент из другого потока.
Спасибо, ни в коем случае не хотел оскарблять никого, тем более Java Concurrency :)
Добавил новый метод. Блокировку на объекте очереди все же оставил, в качестве наглядного примера блокировки на объекте.
Убрал из первого метода BlockingQueue вообще.
Добавьте четвертый пример: изменить queue.poll() на queue.take(), не игнорировать InterruptedException и убрать всю прочую синхронизацию на барьерах, CDL и т.д.
Добавить Thread.currentThread().interrupt() в обработку InterruptedException? Просто у каждого же может быть своя логика обработки исключений. Остальное сделал, спасибо.
Да, у каждого потока может быть своя логика обработки InterruptedException, но т.к. вы используете стороннюю библиотеку, я думаю необходимо дать ей (библиотеке) решать что же в итоге делать и как на это событие реагировать т.к. у нее может быть как раз таки своя логика.
Согласен, добавил.
Вот кстати еще один вариант, который чуточку короче:

class A {
    public void asyncMethod(final Callback callback) {
        newFixedThreadPool(1).execute(new Runnable() {
            @Override
            public void run() {
                parkNanos(TimeUnit.SECONDS.toNanos(5));
                callback.onFinish("Finished!");
            }
        });
    }



    public Object syncMethod() {
        final Exchanger<Object> ex = new Exchanger<Object>();

        asyncMethod(new Callback() {
            @Override
            public void onFinish(Object o) {
                try {
                    ex.exchange(o);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });


        try {
            return ex.exchange(null);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }
}
Спасибо, добавил метод в статью.
Вот еще интересный короткий метод, хотя и не для всех случаев годится:

static Object syncMethod() {
    final Object[] result = new Object[1];
    final Thread caller = Thread.currentThread();

    A.asyncMethod(new Callback() {
        @Override
        public void onFinish(Object obj) {
            result[0] = obj;
            LockSupport.unpark(caller);
        }
    });

    LockSupport.park();
    return result[0];
}
Что-то мне этот метод не очень нравится. Судя по документации нить может быть продолжена после метода LockSupport.park() в любое время. («The park method may also return at any other time, for „no reason“»)
Тогда нужно делать что-то вроде:
while (result[0] == null){
	LockSupport.park();
}

А в таком виде может возникнуть дедлок. Тоесть скорее даже как-то так прийдется:
static Object syncMethod() {
	final Object[] result = new Object[1];
	final Thread caller = Thread.currentThread();

	A.asyncMethod(new Callback() {
		@Override
		public void onFinish(Object obj) {
			synchronized(result) {
				result[0] = obj;
				LockSupport.unpark(caller);
			}
		}
	});

	while (true){
		synchronized(result) {
			if (result[0] == null) LockSupport.park();
			else break;
		}
	}
	return result[0];
}
В этом случае можно проще через volatile и Object result вместо массива :) Но это будет работать только в том случае, если наш aSync никогда не должен вернуть null.
Кстати зачем вообще BlockingQueue там, где мы используем CDL или CB? Да и в первом примере зачем List, если нам по-сути нужен просто один объект?

И вообще, из всех предложенных вариантов, ИМХО, самый «элегантный» — с Exchanger.
А вообще решение этой задачи можно развивать и развивать, как говориться, сколько умов — столько и решений :)
Кстати зачем вообще BlockingQueue там, где мы используем CDL или CB?

Согласен, не за чем. Убрал.

Да и в первом примере зачем List, если нам по-сути нужен просто один объект?

Люблю списки, не люблю массивы. В даном случае не такой уж и большой оверхед будет.

И вообще, из всех предложенных вариантов, ИМХО, самый «элегантный» — с Exchanger.

Согласен :)
И вообще, из всех предложенных вариантов, ИМХО, самый «элегантный» — с Exchanger.
Согласен :)
А я не совсем :) Exchanger служит для двустороннего обмена, а в данном примере имеет место односторонний (producer-consumer). ИМХО лучше всего подходит SynchronousQueue. И по длине кода столько же: один exchange меняется на put, а второй — на take.
А если возвращаемый объект вдруг окежется null-ом?
Тут стоит отметить что в этом случае и метод 4 с BlockingQueue (например с реализацией SynchronousQueue) некорректно сработает.
The park method may also return at any other time, for «no reason»
На практике этого никогда не произойдет, т.к. HotSpot фильтрует «случайные» пробуждения.
Но synchronized в любом случае не нужен. В том-то и «фишка» LockSupport: park() можно вызывать даже после того, как случился unpark() — дедлока не будет.
Все синхронизационные примитивы java.util.concurrent (ReentrantLock, Semaphore, ArrayBlockingQueue, Exchanger и т.д.) реализованы на основе LockSupport.
На практике этого никогда не произойдет, т.к. HotSpot фильтрует «случайные» пробуждения.

Не то чтобы я вам не доверяю, просто предпочитаю делать так как написано в документации. :)
Теоретически реализация HotSpot-а может поменяться, а Java API это договор, который я и разработчики Java договорились соблюдать.
В любом случае я не могу другим такое советовать.

Насчет LockSupport понял, спасибо.
Это да. Экзотический вариант. Я поэтому и сделал оговорку, что не везде подойдет.
А сам бы я реализовал, как уже сказал, на SynchronousQueue.
Разве park()/unpark() имеют какую-то memory visibility semantics? В javadoc я такого не вижу. Понятно, что в реализации скорее всего такая семантика есть неявно, но раз она в спецификации не прописана, то этот код не корректен для написания кем угодно, кроме Дага Ли.
OK, согласен: Object[] лучше заменить на AtomicReference.

А насчет «код некорректен для написания кем угодно» у меня есть свое мнение: код имеет право на существование до тех пор, пока он решает конкретную задачу в конкретном случае. Мартин Томпсон, которого ты часто цитируешь, тому в пример. Скажем, в многопоточной программе, написанной согласно Single Writer Principle, можно добиться огромного прироста производительности, заменив volatile store на обычный store. Программа будет абсолютно правильно работать на x86 архитектуре, хотя, согласно JMM, будет некорректной.
Мне кажется, это все же разные вещи. Мартин, конечно, тот еще волюнтарист и безбожник, но .lazySet() — это не store, это store:memory_order_release, и замена memory_order_seq_con на release в случае single writer вполне легальна. Да, строгая семантика .lazySet не прописана в JMM, но она такая потому что именно для этого lazySet и вводился. А что JMM уже который год обновить политической воли нет — так это грустно, конечно, но что ж теперь, сидеть и куковать? То есть lazySet — это инструмент, предназначенный для того, для чего его использует Мартин, но формально недоспецифицированный.

В случае с park/unpark же ситуация иная — их назначение это взаимодействие с системным планировщиком задач. То, что у них есть еще какая-то memory visibility семантика — это деталь реализации, к их назначению никакого отношения не имеющая, в общем-то — случайность. Закладываться на нее может только тот, кто имеет возможность держать руку на пульсе деталей реализации classlib на разных платформах, и вмешиваться в принимаемые по ним решения. То есть Даг Ли :)

Когда Даг пишет SequenceLock на неспецифицированных особенностях Unsafe — я апплодирую, потому что для меня появляется отличный примитив синхронизации, и это ответственность Дага сменить его реализацию, если у нужных методов Unsafe сменится семантика. Когда я делаю то же самое — это моя ответственность. Вот только моим мнением при этих изменениях вряд ли кто поинтересуется, и вряд ли меня предупредят о них — в отличие от Дага.
заменив volatile store на обычный store. Программа будет абсолютно правильно работать на x86 архитектуре, хотя, согласно JMM, будет некорректной.

Программа будет корректно работать в том смысле, что x86 HMM ничего гадостного не сделает. Но тут прибегает HotSpot и опаньки. Уж ты то должен это знать.
Я думаю, он имел в виду не прямо ref=, а ref.lazySet(), который на x86 транслируется в store + compilerBarrier
UFO just landed and posted this here
Возможно и катит, напишите свою реализацию. Только учитывайте что Callback не является реализацией интерфейсов ни Callable ни Runnable. И доступа к исходному коду библиотеки и темболее A.asyncMethod() нету.
UFO just landed and posted this here
Звучит запутанно. А какие преимущества мы получим, используя данный подход? Потому что я уже вижу пару недостатков.
Преимущества в том, что Future «рекомендованный» способ работы с результатами асинхронных операций в яве (хотя лично я от него не сильно в восторге, но рекомендованный — значит рекомендованный, стандарты все же много значат).

То есть вам, по-сути, предлагают двухэтапный способ решения вашей задачи: на первом этапе вы оборачиваете Callback-based интерфейс библиотеки в нечто, предоставляющее взамен Future-based интерфейс. А Future уже можно использовать и для синхронного вычисления — просто вызывать .get(). Но можно и для асинхронного — есть методы типа .get(timeout), и прочее.

Но вы правы в том, что реализовывать обертку Callback -> Future действительно не фунт изюма. В качестве упражнения на способность делать непростые но корректные и эффективные реализации многопоточных примитивов — подойдет. Для однократного решения вашей задачи — я бы не стал возиться.
A.asyncMethod(new Callback() {
            @Override
            public void onFinish(Object o) {
                synchronized (queue) {
                    queue.add(o);
                    queue.notify();
                }
            }
        });
        if (queue.size() == 0) {
            synchronized (queue) {
                if (queue.size() == 0) queue.wait();
            }
        }
        return queue.poll();

— этот код некорректен. Пройдя по ветке queue.size != 0 вы вернете из списка объект, который может быть не до конца инициализирован. Ведь между сохранением объекта в список в другом потоке, и возвратом из списка в этом потоке у вас нет никаких ребер hb. Это типичный DCL
Что-то я не пойму, какой объект может быть не до конца инциализирован? Который Object o? Он инициализируется еще до вызова метода onFinish.
Предлагаете как-то вот так сделать?
synchronized (queue) { return queue.poll(); }
>Он инициализируется еще до вызова метода onFinish.

В каком смысле вы здесь пишете «до»? Это он в потоке 2 инициализируется «до» — в program order того потока. Первому потоку на этот program order плевать с высокой колокольни. С точки зрения первого потока вы можете увидеть полу-инициализированный объект, или увидеть queue в несогласованном состоянии.
synchronized (queue) {
                while (queue.size() == 0) {
                   queue.wait();
                }
                return queue.poll();
}

Вот этот код корректен. Я очень рекомендую вам разобраться, в чем разница, и почему это важно — до того, как начнете выкладывать свой многопоточный код в продакшн.
В прямом смысле. Инициализация объекта «о» и вызов метода onFinish происходит в одном потоке (потоке выполнения асинхронного метода), поэтому совершенно корректно говорить «до».
Давайте посмотрим — вы говорите что в случае когда queue.size() != 0 могут возникнуть проблемы. Это значение не равно нулю только тогда, кода в очереди уже что-то есть. Что-то (объект «о») запихивается в очередь только внутри метода onFinish, который вызывается в потоке, в котором создается и инициализируется объект «о», только по завершению этой инициализации. Из этого следует, что queue.size() != 0 не может быть до инициализации объекта «о».
Или вы имеете ввиду что операция queue.add(o) не атомарная и может случиться ситуация когда размер очереди уже увеличен, а переменная еще не добавлена в очередь? В этом случае, соглашусь насчет возможности увидеть queue в несогласованом состояни. (ох не зря я там вначале BlockingQueue ставил :) )
С вашим решением соглашусь, хотя и не понимаю зачем там while, когда хватает if-а. Главный поток продолжится только после queue.notify() и тогда уж точно queue.size() != 0.
>Из этого следует, что queue.size() != 0 не может быть до инициализации объекта «о».

Если вы правда так считаете, то вы совершенно не понимаете модель исполнения многопоточного кода. Очень рекомендую почитать литературу на тему JMM, чтобы примерно быть в курсе того, как один поток может выполнять действия в одном порядке, а другой можеть видеть соответствующие изменения в памяти совсем в другом порядке. И каким образом расстановка действий синхронизации позволяет договариваться о глобально видимом порядке исполнения. Я уверен, что пока человек этого не понимает — его к написанию многопоточного кода на пушечный выстрел нельзя подпускать.

Чтобы понимать зачем while в идиомах ожидания состояния — нужно понимать откуда они выросли, и какой смысл у каждой операции в этом цикле. Но поскольку это довольно сложно для новичков, им обычно предлагают версию о spurious wakeups. Я же предлагаю просто подумать, сколько времени вы будете искать причину бага, если когда-нибудь .wait() проснется не по той причине, что вы задумали.
один поток может выполнять действия в одном порядке, а другой можеть видеть соответствующие изменения в памяти совсем в другом порядке.

Теперь понял о чем вы.
Я уверен, что пока человек этого не понимает — его к написанию многопоточного кода на пушечный выстрел нельзя подпускать.

Как понять если не писать код? Вот такой вот мир не совершенный.

Изменил код в статье в певом методе. Спасибо.

Впрочем, конечной целью было выработать надежное решение задачи, к чему, я надеюсь, мы и приближаемся :) Глядишь кому-то и будет полезно.

А Вас я призываю написать статью по многопоточности в Java. Я бы с удовольствием почитал.
Может покажусь банальным, но уже все давным давно написано: «Java Concurrency in Practice».
Про многопоточность в яве у меня целый блог, который я начал вести задолго до того, как мне дали сюда инвайт. Написать отдельный цикл статей на хабр по модели памяти хочу давно, но написать в личный блог небольшую статью по той теме, что меня прямо сейчас интересует гораздо естественнее, чем поднимать большой пласт информации, и пытаться его как-то понятно организовать. Руки пока не доходят.

Кроме того, как правильно заметил предыдущий оратор, на эту тему и без меня много всего написано.
Нашел ваш блог. Довольно интересный, и, имхо, вот именно такого рода статей, более углубленных, как раз не хватает на хабре.
Так что, был бы рад почитать :)
Буквально сегодня ночью пришлось решать такую задачу. Почитал шпаргалку и реализовал через SynchronousQueue (null значения были не нужны, да и параллельное выполнение нескольких асинхронных задач не требовалось)
Sign up to leave a comment.

Articles