Введение в Акторы на основе Java/GPars, Часть I

  • Tutorial
Кратко рассматривается API библиотеки GPars и решение многопоточной задачи средней сложности, результаты которой могут быть полезны в «народном хозяйстве».

Данная статья написана в ходе исследования различных библиотек акторов, доступных Java-программисту, в процессе подготовки к чтению курса «Multicore programming in Java».

Также я веду курс «Scala for Java Developers» на платформе для онлайн-образования udemy.com (аналог Coursera/EdX).

Это первая статья из цикла статей цель которых сравнить API, быстродействие и реализацию акторов Akka с реализациями в других библиотеках на некоторой модельной задаче. Данная статья предлагает такую задачу и решение на GPars.

GPars — библиотека написанная для Clojure с широкой поддержкой различных подходов к параллельным вычислениям.
Плюсы GPars
  • Исходный код написан на Java (в отличии от Akka, написанной на Scala). Всегда интересно посмотреть «что под капотом» на «родном» языке программирования
  • GPars представляет собой целый «зоопарк» подходов (Actor, Agent, STM, CSP, Dataflow)
  • GPars использует классы из runtime-библиотеки Clojure, написанной на Java. Интересно покопаться


«Установка» GPars


Подключаете в Maven GPars и Groovy
<dependency>
    <groupId>org.codehaus.gpars</groupId>
    <artifactId>gpars</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.codehaus.groovy</groupId>
    <artifactId>groovy-all</artifactId>
    <version>2.2.2</version>
</dependency>


Без Maven просто качайте из репозитория GPars-1.1.0 (sources) и Groovy-2.2.2 (sources) и подключайте к проекту.

Stateless Actor


Начнем с простых примеров.
Посылаем сообщение актору.
import groovyx.gpars.actor.*;

public class Demo {
    public static void main(String[] args) throws Exception {
        Actor actor = new DynamicDispatchActor() {
            public void onMessage(String msg) {
                System.out.println("receive: " + msg);
            }
        }.start();
        actor.send("Hello!");

        System.in.read();
    }
}
>> receive: Hello!


Посылаем сообщение и ждем ответа
import groovyx.gpars.actor.*;

public class Demo {
    public static void main(String[] args) throws Exception {
        Actor actor = new DynamicDispatchActor() {
            public void onMessage(String msg) {
                System.out.println("ping: " + msg);
                getSender().send(msg.toUpperCase());
            }            
        }.start();
        System.out.println("pong: " + actor.sendAndWait("Hello!"));
    }
}
>> ping: Hello!
>> pong: HELLO!


Посылаем сообщение и вешаем на ответ асинхронный callback
import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

public class Demo {
    public static void main(String[] args) throws Exception {
        Actor actor = new DynamicDispatchActor() {
            public void onMessage(String msg) {
                System.out.println("ping: " + msg);
                getSender().send(msg.toUpperCase());
            }            
        }.start();
        actor.sendAndContinue("Hello!", new MessagingRunnable<String>() {
            protected void doRun(String msg) {
                System.out.println("pong: " + msg);
            }
        });
        
        System.in.read();
    }
}
>> ping: Hello!
>> pong: HELLO!


Делаем pattern matching по типам принятого сообщения
import groovyx.gpars.actor.*;

public class Demo {
    public static void main(String[] args) throws Exception {
        Actor actor = new DynamicDispatchActor() {
            public void onMessage(String arg) {
                getSender().send(arg.toUpperCase());
            }

            public void onMessage(Long arg) {
                getSender().send(1000 + arg);
            }
        }.start();

        System.out.println("42.0 -> " + actor.sendAndWait(42.0));
    }
}
>> Hello! -> HELLO!
>> 42 -> 1042


Pattern matching не нашел подходящего обработчика
import groovyx.gpars.actor.*;

public class Demo {
    public static void main(String[] args) throws Exception {
        Actor actor = new DynamicDispatchActor() {
            public void onMessage(String arg) {
                getSender().send(arg.toUpperCase());
            }

            public void onMessage(Long arg) {
                getSender().send(1000 + arg);
            }
        }.start();

        System.out.println("42.0 -> " + actor.sendAndWait(42.0));
    }
}
>> An exception occurred in the Actor thread Actor Thread 1
>> groovy.lang.MissingMethodException: No signature of method:
>> net.golovach.Demo_4$1.onMessage() is applicable for argument types: (java.lang.Double) values: [42.0]
>> Possible solutions: onMessage(java.lang.Long), onMessage(java.lang.String)
>> 	at org.codehaus.groovy.runtime.ScriptBytecodeAdapter ...
>>   ...


Что видно
— «pattern matching» делает подбором подходящего перегруженного (overloaded) варианта метода onMessage(<one-arg>), если такового нет, то «получаем» исключение
— акторы работают на основе пула потоков-«демонов», так что нам необходимо как-то подвесить работу метода main() (я использовал System.in.read()) с целью предотвратить преждевременное завершение работы JVM
— на примере метода reply() мы видим, что при наследовании от DynamicDispatchActor в «пространство имен» актора попадает множество методов (reply, replyIfExists, getSender, terminate, ...)

Хотя авторы GPars и называют наследников класса DynamicDispatchActor — акторами-без-состояния (stateless actor), это — обычные экземпляры java-классов, которые могут иметь мутирующие поля и хранить в них свое состояние. Продемонстрируем это
import groovyx.gpars.actor.*;

import java.util.ArrayList;
import java.util.List;

public class StatelessActorTest {
    public static void main(String[] args) throws InterruptedException {
        Actor actor = new DynamicDispatchActor() {
            private final List<Double> state = new ArrayList<>();
            public void onMessage(final Double msg) {
                state.add(msg);
                reply(state);
            }
        }.start();

        System.out.println("answer: " + actor.sendAndWait(1.0));
        System.out.println("answer: " + actor.sendAndWait(2.0));
        System.out.println("answer: " + actor.sendAndWait(3.0));
        System.out.println("answer: " + actor.sendAndWait(4.0));
        System.out.println("answer: " + actor.sendAndWait(5.0));
    }
}
>> answer: [1.0]
>> answer: [1.0, 2.0]
>> answer: [1.0, 2.0, 3.0]
>> answer: [1.0, 2.0, 3.0, 4.0]
>> answer: [1.0, 2.0, 3.0, 4.0, 5.0]


Statefull Actor


Вводя деление stateless/statefull, авторы имею в виду, что Statefull Actor позволяют органично создавать реализации шаблона State. Рассмотрим простой пример (наследники DefaultActor — Statefull Actor-ы)
import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

import static java.util.Arrays.asList;

public class StatefulActorTest {
    public static void main(String[] args) throws Exception {
        Actor actor = new MyStatefulActor().start();

        actor.send("A");
        actor.send(1.0);
        actor.send(Arrays.asList(1, 2, 3));

        actor.send("B");
        actor.send(2.0);
        actor.send(Arrays.asList(4, 5, 6));

        System.in.read();
    }

    private static class MyStatefulActor extends DefaultActor {
        protected void act() {
            loop(new Runnable() {
                public void run() {
                    react(new MessagingRunnable<Object>(this) {
                        protected void doRun(final Object msg) {
                            System.out.println("react: " + msg);
                        }
                    });
                }
            });
        }
    }
}
>> react: A
>> react: 1.0
>> react: [1, 2, 3]
>> react: B
>> react: 2.0
>> react: [4, 5, 6]


Однако, обещанной реализацией шаблона State совсем «не пахнет». Давайте зайдем с такой стороны (Java не лучший язык для таких трюков, на Clojure/Scala этот код выглядит намного компактнее)
import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

import java.util.List;
import static java.util.Arrays.asList;

public class StatefulActorTest {
    public static void main(String[] args) throws Exception {
        Actor actor = new MyStatefulActor().start();

        actor.send("A");
        actor.send(1.0);
        actor.send(asList(1, 2, 3));

        actor.send("B");
        actor.send(2.0);
        actor.send(asList(4, 5, 6));

        System.in.read();
    }

    private static class MyStatefulActor extends DefaultActor {
      protected void act() {
        loop(new Runnable() {
          public void run() {
            react(new MessagingRunnable<String>(this) {
              protected void doRun(final String msg) {
                System.out.println("Stage #0: " + msg);
                react(new MessagingRunnable<Double>() {
                  protected void doRun(final Double msg) {
                    System.out.println("  Stage #1: " + msg);
                    react(new MessagingRunnable<List<Integer>>() {
                      protected void doRun(final List<Integer> msg) {
                        System.out.println("    Stage #2: " + msg + "\n");
                      }
                  });
                }
              });
            }
          });
        }
      });
    }
  }
}
>> Stage #0: A
>>   Stage #1: 1.0
>>     Stage #2: [1, 2, 3]
>> 
>> Stage #0: B
>>   Stage #1: 2.0
>>     Stage #2: [4, 5, 6]


Ну или давайте избавимся от этой жуткой вложенности анонимных классов и «материализуем состояния»
import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

import java.util.List;
import static java.util.Arrays.asList;

public class StatefulActorTest {
    public static void main(String[] args) throws Exception {
        Actor actor = new MyStatefulActor().start();

        actor.send("A");
        actor.send(1.0);
        actor.send(asList(1, 2, 3));

        actor.send("B");
        actor.send(2.0);
        actor.send(asList(4, 5, 6));

        System.in.read();
    }

    private static class MyStatefulActor extends DefaultActor {
        protected void act() {
            loop(new Runnable() {
                public void run() {
                    react(new Stage0(MyStatefulActor.this));
                }
            });
        }
    }

    private static class Stage0 extends MessagingRunnable<String> {
        private final DefaultActor owner;
        private Stage0(DefaultActor owner) {this.owner = owner;}

        protected void doRun(final String msg) {
            System.out.println("Stage #0: " + msg);
            owner.react(new Stage1(owner));
        }
    }

    private static class Stage1 extends MessagingRunnable<Double> {
        private final DefaultActor owner;
        private Stage1(DefaultActor owner) {this.owner = owner;}

        protected void doRun(final Double msg) {
            System.out.println("  Stage #1: " + msg);
            owner.react(new Stage2());
        }
    }

    private static class Stage2 extends MessagingRunnable<List<Integer>> {
        protected void doRun(final List<Integer> msg) {
            System.out.println("    Stage #2: " + msg + "\n");
        }
    }
}

Да, да, я с Вами полностью согласен, Java — крайне многословный язык.

Вот как выглядит диаграмма переходов (развилок по аргументу мы не делали)
// START
// -----  
//   |
//   |
//   |
//   |  +--------+
//   +->| Stage0 | ---String----+
//      +--------+              |
//         ^                    v
//         |                +--------+
//         |                | Stage1 |
//   List<Integer>          +--------+
//         |                    |
//         |  +--------+      Double
//         +--| Stage2 |<-------+
//            +--------+


Таймер


Для решения моей задачи мне будет необходим таймер — нечто, что можно запрограммировать оповестить меня об окончании некоторого промежутка времени. В «обычной» Java мы используем java.util.concurrent.ScheduledThreadPoolExecutor или java.util.Timer на худой конец. Но мы же в мире акторов!
Это Statefull Actor, который висит в ожидании сообщения в методе react() с таймаутом. Если никакое сообщение не приходит в течении этого промежутка времени, то инфраструктура GPars присылает нам сообщение Actor.TIMEOUT (это просто строка «TIMEOUT») и мы «возвращаем» нашему создателю сообщение из конструктора timeoutMsg. Если же вы хотите «выключить» таймер — пришлите ему любое другое сообщение (я буду присылать ему строку «KILL»)
import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;
import groovyx.gpars.actor.impl.MessageStream;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class Timer<T> extends DefaultActor {
    private final long timeout;
    private final T timeoutMsg;
    private final MessageStream replyTo;

    public Timer(long timeout, T timeoutMsg, MessageStream replyTo) {
        this.timeout = timeout;
        this.timeoutMsg = timeoutMsg;
        this.replyTo = replyTo;
    }

    protected void act() {
        loop(new Runnable() {
            public void run() {
                react(timeout, MILLISECONDS, new MessagingRunnable() {
                    protected void doRun(Object argument) {
                        if (Actor.TIMEOUT.equals(argument)) {
                            replyTo.send(timeoutMsg);
                        }
                        terminate();
                    }
                });
            }
        });
    }
}


Пример использования таймера.
Я создаю два таймера timerX и timerY, которые с задержкой 1000мс вышлют мне сообщения «X» и «Y» соответственно. Но через 500мс я передумал и «прибил» timerX.
import groovyx.gpars.actor.Actor;
import groovyx.gpars.actor.impl.MessageStream;

public class TimerDemo {
    public static void main(String[] args) throws Exception {
        Actor timerX = new Timer<>(1000, "X", new MessageStream() {
            public MessageStream send(Object msg) {
                System.out.println("timerX send timeout message: '" + msg + "'");
                return this;
            }
        }).start();
        Actor timerY = new Timer<>(1000, "Y", new MessageStream() {
            public MessageStream send(Object msg) {
                System.out.println("timerY send timeout message: '" + msg + "'");
                return this;
            }
        }).start();
        
        Thread.sleep(500);
        timerX.send("KILL");
        
        System.in.read();
    }
}
>> timerY send timeout message: 'Y'


Постановка задачи и схема решения


Рассмотрим следующую весьма общую задачу.
1. У нас есть много потоков, которые достаточно часто вызывают некоторую функцию.
2. У этой функции есть два варианта: обработка одного аргумента и обработка списка аргументов.
3. Эта функция такова, что обработка списка аргументов потребляет меньше ресурсов системы, чем сумма обработок каждого в отдельности.
4. Задача состоит в том, что бы между потоками и функцией поместить некоторый Batcher, который собирает аргументы от потоков в «пачку», передает функции, она обрабатывает список, Batcher «раздает» результаты потокам отправителям.
5. Batcher передает список аргументов в двух случаях: собрали «пачку» достаточного размера или по истечению времени ожидания, в течении которого не удалось собрать полную «пачку», но потокам уже пора возвращать результаты.

Давайте рассмотрим схему решения.
Таймаут 100мс, максимальный размер «пачки» — 3 аргумента

В момент времени 0 поток T-0 посылает аргумент «A». Batcher находится в «чистом» состоянии, поколение 0
//time:0
//
//  T-0 --"A"----->     +-------+ generationId=0
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]


Спустя мгновение Batcher знает, что надо обсчитать «A» и вернуть потоку T-0. Заведен таймер для поколения 0
//                                    +-----+ timeoutMsg=0
//                                    |Timer| timeout=100
//time:0.001                          +-----+
//
//  T-0                 +-------+ generationId=0
//  T-1                 |Batcher| argList=["A"]
//  T-2                 +-------+ replyToList=[T-0]


В момент времени 25 миллисекунд поток T-1 посылает на обработку «B»
//                                    +-----+ timeoutMsg=0
//                                    |Timer| timeout=100
//time:25                             +-----+
//
//  T-0                 +-------+ generationId=0
//  T-1 ---"B"---->     |Batcher| argList=["A"]
//  T-2                 +-------+ replyToList=[T-0]


Спустя мгновение Batcher знает, что надо обсчитать «A» и «B» и вернуть потокам T-0 и T-1
//                                    +-----+ timeoutMsg=0
//                                    |Timer| timeout=100
//time:25.001                         +-----+
//
//  T-0                 +-------+ generationId=0
//  T-1                 |Batcher| argList=["A","B"]
//  T-2                 +-------+ replyToList=[T-0,T-1]


В момент времени 50 миллисекунд поток T-2 посылает на обработку «С»
//                                    +-----+ timeoutMsg=0
//                                    |Timer| timeout=100
//time:50                             +-----+
//
//  T-0                 +-------+ generationId=0
//  T-1                 |Batcher| argList=["A","B"]
//  T-2 ----"C"--->     +-------+ replyToList=[T-0,T-1]


Спустя мгновение Batcher знает, что надо обсчитать «A», «B» и «C» и вернуть потокам T-0, T-1 и T-2. Выясняет, что «пачка» наполнена и «убивает» таймер
//                                    +-----+ timeoutMsg=0
//                          +-"KILL"->|Timer| timeout=100
//time:50.001               |         +-----+
//                          |
//  T-0                 +-------+ generationId=0
//  T-1                 |Batcher| argList=["A","B","C"]
//  T-2                 +-------+ replyToList=[T-0,T-1,T-2]


Спустя мгновение Batcher отдает данные на обсчет в отдельному актору (anonimous), очищает состояние и меняет поколение с 0 на 1
//time:50.002
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]
//
//                                 +---------+ argList=["A","B","C"]
//                                 |anonymous| replyToList=[T-0,T-1,T-2]
//                                 +---------+


Спустя мгновение (для «раскадровки» буду считать, что вычисления мгновенны) анонимный актор выполняет действие над списком аргументов [«A»,«B»,«C»] -> [«res#A»,«res#B»,«res#C»]
//time:50.003
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]
//
//                                 +---------+ resultList=["res#A","res#B","res#B"]
//                                 |anonymous| replyToList=[T-0,T-1,T-2]
//                                 +---------+


Спустя мгновение анонимный актер раздает результаты вычисления потокам
//time:50.004
//
//  T-0 <-----------+   +-------+ generationId=1
//  T-1 <---------+ |   |Batcher| argList=[]
//  T-2 <-------+ | |   +-------+ replyToList=[]
//              | | |
//              | | +---"res#A"--- +---------+
//              | +---"res#B"----- |anonymous|
//              +--"res#C"-------- +---------+


Спустя мгновение система возвращает в исходное «чистое» состояние
//time:50.005
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]


Позже, в момент времени, 75 поток T-2 передает на вычисление «D»
//time:75
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2 ----"D"--->     +-------+ replyToList=[]


Спустя мгновение Batcher знает, что надо обсчитать «D» и вернуть потоку T-2, кроме того запущен таймер для поколения 1
//                                    +-----+ timeoutMsg=1
//                                    |Timer| timeout=100
//time:75.001                         +-----+
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=["D"]
//  T-2                 +-------+ replyToList=[T-2]


Спустя 100мс (в момент времени 175мс) инфраструктура GPars оповещает таймер о истечении периода ожидания
//                                        +--"TIMEOUT"--
//                                        |
//                                        v
//                                    +-----+ timeoutMsg=1
//                                    |Timer| timeout=100
//time:175                            +-----+
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=["D"]
//  T-2                 +-------+ replyToList=[T-2]


Спустя мгновение таймер оповещает Batcher о том, что время ожидания поколения 1 истекло и кончает жизнь самоубийством вызывая terminate()
//                                     +-----+ timeoutMsg=1
//                          +----1-----|Timer| timeout=100
//time:175.001              |          +-----+
//                          v
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=["D"]
//  T-2                 +-------+ replyToList=[T-2]


Создается анонимный актор, который выполняет вычисления над списком аргументов (в котором всего 1 аргумент). Поколение с 1 меняется на 2
//time:175.002
//
//  T-0                 +-------+ generationId=2
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]
//
//                                 +---------+ argList=["D"]
//                                 |anonymous| replyToList=[T-2]
//                                 +---------+


Актор выполнил работу
//time:175.003
//
//  T-0                 +-------+ generationId=2
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]
//
//                                 +---------+ resultList=["res#D"]
//                                 |anonymous| replyToList=[T-2]
//                                 +---------+


Актор отдает результат
//time:175.004
//
//  T-0                 +-------+generationId=2
//  T-1                 |Batcher|argList=[]
//  T-2 <-------+       +-------+replyToList=[]
//              |
//              |               +---------+
//              +--"res#C"----- |anonymous|
//                              +---------+


Система в исходном «чистом» состоянии
//time:175.005
//
//  T-0                 +-------+ generationId=2
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]


Решение задачи



BatchProcessor — интерфейс «функции». допускающей «пакетный режим» обработки
import java.util.List;

public interface BatchProcessor<ARG, RES> {
    List<RES> onBatch(List<ARG> argList) throws Exception;
}


Batcher — класс, «пакующий» аргументы. Ядро решения
import groovyx.gpars.actor.*;
import groovyx.gpars.actor.impl.MessageStream;

import java.util.*;

public class Batcher<ARG, RES> extends DynamicDispatchActor {
    // fixed parameters
    private final BatchProcessor<ARG, RES> processor;
    private final int maxBatchSize;
    private final long batchWaitTimeout;
    // current state
    private final List<ARG> argList = new ArrayList<>();
    private final List<MessageStream> replyToList = new ArrayList<>();
    private long generationId = 0;
    private Actor lastTimer;

    public Batcher(BatchProcessor<ARG, RES> processor, int maxBatchSize, long batchWaitTimeout) {
        this.processor = processor;
        this.maxBatchSize = maxBatchSize;
        this.batchWaitTimeout = batchWaitTimeout;
    }

    public void onMessage(final ARG elem) {
        argList.add(elem);
        replyToList.add(getSender());
        if (argList.size() == 1) {
            lastTimer = new Timer<>(batchWaitTimeout, ++generationId, this).start();
        } else if (argList.size() == maxBatchSize) {
            lastTimer.send("KILL");
            lastTimer = null;
            nextGeneration();
        }
    }

    public void onMessage(final long timeOutId) {
        if (generationId == timeOutId) {nextGeneration();}
    }

    private void nextGeneration() {
        new DynamicDispatchActor() {
            public void onMessage(final Work<ARG, RES> work) throws Exception {
                List<RES> resultList = work.batcher.onBatch(work.argList);
                for (int k = 0; k < resultList.size(); k++) {
                    work.replyToList.get(k).send(resultList.get(k));
                }
                terminate();
            }
        }.start().send(new Work<>(processor, new ArrayList<>(argList), new ArrayList<>(replyToList)));
        argList.clear();
        replyToList.clear();
        generationId = generationId + 1;
    }

    private static class Work<ARG, RES> {
        public final BatchProcessor<ARG, RES> batcher;
        public final List<ARG> argList;
        public final List<MessageStream> replyToList;

        public Work(BatchProcessor<ARG, RES> batcher, List<ARG> argList, List<MessageStream> replyToList) {
            this.batcher = batcher;
            this.argList = argList;
            this.replyToList = replyToList;
        }
    }
}


BatcherDemo — демонстрация работы класса Batcher. Совпадает со схематичным планом
import groovyx.gpars.actor.Actor;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import static java.util.concurrent.Executors.newCachedThreadPool;

public class BatcherDemo {
    public static final int BATCH_SIZE = 3;
    public static final long BATCH_TIMEOUT = 100;
    
    public static void main(String[] args) throws InterruptedException, IOException {
        final Actor actor = new Batcher<>(new BatchProcessor<String, String>() {
            public List<String> onBatch(List<String> argList) {
                System.out.println("onBatch(" + argList + ")");
                ArrayList<String> result = new ArrayList<>(argList.size());
                for (String arg : argList) {
                    result.add("res#" + arg);
                }
                return result;
            }
        }, BATCH_SIZE, BATCH_TIMEOUT).start();

        ExecutorService exec = newCachedThreadPool();
        exec.submit(new Callable<Void>() { // T-0
            public Void call() throws Exception {
                System.out.println(actor.sendAndWait(("A")));
                return null;
            }
        });
        exec.submit(new Callable<Void>() { // T-1
            public Void call() throws Exception {
                Thread.sleep(25);
                System.out.println(actor.sendAndWait(("B")));
                return null;
            }
        });
        exec.submit(new Callable<Void>() { // T-2
            public Void call() throws Exception {
                Thread.sleep(50);
                System.out.println(actor.sendAndWait(("C")));
                Thread.sleep(25);
                System.out.println(actor.sendAndWait(("D")));
                return null;
            }
        });        

        exec.shutdown();
    }
}
>> onBatch([A, B, C])
>> res#A
>> res#B
>> res#C
>> onBatch([D])
>> res#D


Заключение


В моем представлении, акторы хороши для программирования многопоточных примитивов, представляющих собой конечные автоматы со сложной диаграммой переходов, которая кроме всего прочего может зависеть от поступающих аргументов.

Некоторые примеры этой статьи являются вариациями кода найденного в сети в различных места, включая gpars.org/guide.

Во второй части мы
  • Измерим скорость работы предложенного решения
  • Ускорим работу с JDBC объединяя запросы различных потоков из отдельных транзакций в одну большую транзакцию RDBMS. То есть сделаем batch не в рамках одного Connection, а между различными Connection-ами.


UPD
Спасибо за замечание Fury:
GPars написана на смеси Java+Groovy.
В исходном коде видно что на Groovy написаны пакеты
— groovyx.gpars.csp.*
— groovyx.gpars.pa.*
— groovyx.gpars.* (частично)

Контакты



Я занимаюсь онлайн обучением Java (вот курсы программирования) и публикую часть учебных материалов в рамках переработки курса Java Core. Видеозаписи лекций в аудитории Вы можете увидеть на youtube-канале, возможно, видео канала лучше систематизировано в этой статье.

skype: GolovachCourses
email: GolovachCourses@gmail.com
GolovachCourses
Company
Ads
AdBlock has stolen the banner, but banners are not teeth — they will be back

More

Comments 9

    +3
    акторы хороши для программирования многопоточных примитивов
    В приведенном примере Batcher я не вижу никаких синхронизаций, а BatcherDemo, кроме того, что однопоточный, еще и Thread.sleep постоянно вызывает. Что будет, если передача запросов Batcher-у будет многопоточной и высококонкурентной? Могут ли сразу 10 потоков войти в Batcher.onMessage и одновременно пройти проверку argList.size() == 1, что приведет к тому, что после того, как все 10 запросов будут обработаны, lastTimer.send(«KILL») так и не будет вызван? Если GPars где-то внутри не синхронизирует потоки, то многопточность в приведенном примере, видимо, не работает? Или я неправ?
      +3
      Вы не пробовали адаптировать примеры под Java 8? Код должен получиться чище, потому что не будет большинства анонимных классов.
        +3
        Callback hell теперь и в Java!
          0
          Вы, наверное, не писали на Swing?
            0
            Разумеется, писал. И с содроганием вспоминаю об этом опыте.
          +2
          а оно что, не умеет в Ask паттерн и Future (или промисы на худой конец)?
            0
            Нейронную сеть не пробовали сделать на этой библиотечке?
              +1
              Я в них «не верю»:)
              0
              Также я веду курс «Scala for Java Developers» на платформе для онлайн-образования udemy.com (аналог Coursera/EdX).

              Only users with full accounts can post comments. Log in, please.