Pull to refresh

Taskurotta или управление процессами в распределенной системе

Reading time12 min
Views6.4K
Добрый день, хабраюзер!

Есть у нас задача связывать различные сервисы и существующие системы в управляемые процессы. Скорость нужна не космическая (т.е. не по биржевым котировкам отклик создавать), но зато процессов много и компонент (систем) которые нужно использовать тоже порядочно вырисовывается. Не хочется делать p2p связывание. Хочется чего-то красивого и управляемого.

Просмотрев рынок, было принято решение сделать реплику по мотивам Amazon Simple Workflow, так как использовать его напрямую мы не можем. Свойства фреймворка которые нам подходят:

  • Низкий порог старта (хороший программист нынче дорог). Низкий порог тут больше в части начала программирования так как все делается на высоком уровне — почти на уровне взаимодействия с интерфейсами. Но чтобы достойно асинхронно управлять процессом надо конечно нарастить опыт
  • При сохранении параметров задач и результатов выполнения получаем анализ и базу для регрессионного тестирования исполнителей задач процесса
  • Концентрация логики управления процессом в определенных местах (в Координаторе). Это может на первый взгляд не очевидно, но это величайшее благо по сравнению с возможной альтернативой, когда каждый актер имеет свою логику — какие другие компоненты далее вызвать (передать управление). Часто приводит к усложнению системы и невозможности переиспользовать компоненты

Это минимум который хотелось бы, но как показывает практика, плюсов больше. Проект назвали Taskurotta в честь «Task» — задачи, и суслика по фински, которому все равно, которого не видно, но он есть. Открытый исходный код доступен на GitHub. Проект реализован с помощью Hazelcast для формирования общего пространства памяти и среды выполнения между серверами, Dropwizard для быстрой и удобной реализации REST сервисов и друзей из Amazon которые были первыми и сделали отличный продукт вдохновив нас на собственную разработку. С документацией пока тяжело, но скоро поправим.

Давайте перейдем от теории к тому, что сейчас есть на реальном примере.

Предположим что нам необходимо разработать приложение, отсылающее строковое сообщение пользователю. На вход мы получаем Id пользователя и набор символов. Из его профиля (по Id) достаем данные о предпочтении — получать сообщения по email или номеру телефона. Номер телефона и email также доступны в профиле. Далее отправляем сообщение нужным транспортом. В случае, если отправка не удалась (по причине не верного адреса или номера), необходимо отметить это в профиле для предотвращения повторных попыток в будущем.


Добавим еще к этому не функциональное требование, о том, что сервисы отправляющие сообщения уже существуют, находятся в других подсетях и их нужно переиспользовать.

PS: Весь исходный код описываемого примера доступен также на GitHub taskurotta\taskurotta-getstarted


Taskurotta позволяет реализовывать компоненты системы (Актеров), взаимодействующих между собой привычным для разработчика способом — путем вызовов методов друг друга, но в асинхронной манере. Актеры делятся на два вида — Исполнителей и Координаторов. Исполнители должны четко выполнять поставленные перед ними задачи. Они представляют собой максимально независимые модули, и соответственно — максимально переиспользуемые. Исполнители могут взаимодействовать с внешним миром (любые потоки ввода вывода) выполняя задачу таким образом и так долго, как этого требуется. С другой стороны, Координаторы не выполняют задач, связанных с внешним миром. Они должны отрабатывать как можно быстрее и не спотыкаться на прямом взаимодействии с БД, сетью и другими потенциально не стабильными компонентами. Их обязанность ставить задачи исполнителям, координировать их действия и тем самым обеспечить реализацию (описание) процесса. Координаторы могут ставить задачи другим координаторам, реализуя парадигму переиспользуемых подпроцессов.

Задача Координатора как можно быстрее раздать известные в данный момент задачи. Т.е. он не должен блокироваться на ожидании результата. Он должен построить зависимости между известными ему задачами и при необходимости сформировать асинхронные точки определения дальнейших действий.

Для нашего процесса Координатор должен сделать следующее:

  1. Запросить профиль пользователя
  2. Дождаться получения профиля
  3. Отправить сообщение пользователю
  4. Дождаться результата отправления


Кодировать данную последовательность действий с помощью автомата состояний, методами изменений одного сообщения несколькими исполнителями и другими привычными, но костлявыми способами мы не будем. Сделаем это просто и красиво с помощью сущности Promise и нашей системы, следящей за действиями Координатора.

Promise<Profile> profilePromise = userProfileService.get(userId);
Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message);


В примере видно, что в результате вызова сервисов мы получаем не реальный объект, а некий Promise — ссылку на результат выполнения задачи. Этот Promise мы можем передавать в качестве аргумента другим сервисам (т.е. задачам). Вызовы других сервисов будут перехвачены системой (т.е. реального синхронного вызова не произойдет) и выстроена зависимость между ними. Задачи не поступят на выполнение к сервисам до тех пор, пока все их аргументы типа Promise не будут готовы, т.е. пока не будут выполнены все необходимые предварительные задачи.

Таким образом, управление процессом выполняется совместно координатором и нашей системой. Координатор выстраивает зависимости между задачами, а система берет на себя, кроме всего прочего, функцию ожидания выполнения предварительных задач и последующего запуска зависимых от них задач.

Давайте теперь раскроем, что такое асинхронные точки определения дальнейших действий.

Нам необходимо убедиться в том, что отправка уведомления была успешна и если нет то нужно заблокировать отправку уведомлений пользователю.


В данном случае после отправки уведомления и перед дальнейшими действиями необходимо проанализировать результат отправки. Т.е. дождаться выполнения задачи, произвести анализ и в зависимости от результата, блокировать или нет. Для решения такой проблемы у координатора есть возможность создать задачу на самого себя — т.е. точку определения дальнейших действий, в которую передать необходимые Promise. Ниже представлено как это выглядит.

    public void start(String userId, String message) {
           Promise<Profile> profilePromise = userProfileService.get(userId);
           Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message);
           decider.blockOnFail(userId, sendResultPromise);
       }

     @Asynchronous
     public void blockOnFail(String userId, Promise<Boolean> sendResultPromise) {
            logger.info(".blockOnFail(userId = [{}], sendResultPromise = [{}])", userId, sendResultPromise);
            if (!sendResultPromise.get()) {
                userProfileService.blockNotification(userId);
            }
     }


Метод start() — это старт процесса. Далее идет постановка трех задач. Первая на получение профиля, вторую и третью Координатор ставит сам себе для последующего анализа результата (вызов методов sendToTransport и blockOnFail). Таким образом Координатор как бы ждет решения первой задачи, но без блокировки. Как только задача решена, система Taskurotta вызывает метод координатора sendToTransport, передавая в него готовый Promise объект, из которого можно получить реальные данные методом get(). После выполнения задачи sendToTransport запускается задача blockOnFail где мы ставим задачу сервису userProfileService на блокировку сообщений для пользователя userId если произошла ошибка при отправке уведомления.

С помощью точек определения дальнейших действий можно реализовать различные поведения процесса:
  • Распараллеливание на различные ветки
  • Дальнейшее слияние независимых потоков процесса в одной точке с помощью проброса Promise и @NoWait аннотации
  • Асинхронную рекурсию
  • Распараллеливание выполнения однотипных задач, например проверки ЭЦП всех файлов и ожидания результатов выполнения в одной точке принятия решений
  • и т.д.

P.S.: Вызов задачи blockOnFail происходит через объект decider. Это искусственный объект, перехватывающий вызов, но реально не вызывающий метод blockOnFail. Нам нужно поставить задачу, а не вызвать ее синхронно.

Так как по сценарию у нас уже есть Исполнители для отправки email и sms, то нам остается только создать Исполнителя для работы с профилем. У данного Исполнителя две задачи:

  1. Вернуть профиль по идентификатору пользователя
  2. Сделать в профиле отметку о невозможности отправки сообщений для конкретного пользователя


Начинаем с объявления его интерфейса. С этим интерфейсом будет работать Координатор. Здесь и далее, для компактности опущены комментарии и другие не существенные части кода.

    @Worker
    public interface UserProfileService {

        public Profile get(String userId);

        public void blockNotification(String userId);

    }


Аннотация @Worker определяет этот интерфейс как Исполнителя. У аннотации есть необязательные атрибуты определяющие его имя и версию (контракта). По умолчанию, именем является полное имя интерфейса, а версия — «1.0». Исполнители различных версий могут одновременно работать для разных процессов без каких либо конфликтов.

Перейдем к реализации интерфейса.

    public class UserProfileServiceImpl implements UserProfileService {

        private static final Logger logger = LoggerFactory.getLogger(UserProfileServiceImpl.class);

        @Override
        public Profile get(String userId) {
            return ProfileUtil.createRandomProfile(userId);
        }

        @Override
        public void blockNotification(String userId) {
            logger.info(".blockNotification(userId = [{}]", userId);
        }
    }


Тут мы опустили инициализацию менеджера профилей (ProfileUtil). Он может работать с БД, LDAP или другим реестром. Данный пример нам показывает, что Исполнитель получает задачи (вызовы) и делегирует их реальному модулю.

На этом создание Исполнителя завершается.

Для решения поставленной перед нами задачи, Координатор должен передать ссылку на еще не полученный профиль пользователя (объект Promise) в точку определения дальнейших действий. Там он выберет транспорт или не будет ничего отсылать, если отправка сообщений для данного пользователя уже заблокирована.

Однако интерфейс исполнителя, как и сам исполнитель, получают и отдают результат синхронно, а потому не имеют в декларации результатов выполнения в виде объекта Promise, а возвращают чистый объект данных. Это и правильно. Исполнитель не должен знать как его используют. Например, наш Исполнитель по получению профиля можно использовать если уже известен идентификатор пользователя, или если он не известен и нужно передать ссылку на другую задачу, которая этот идентификатор откуда-то получит. Таким образом мы приходим к интерфейсу взаимодействия с Исполнителем. Этот интерфейс определяет сам Координатор для своих нужд. Т.е. он определяется в пакете (проекте) Координатора. Добавим интерфейс взаимодействия с Исполнителем для работы с профилем:

    @WorkerClient(worker = UserProfileService.class)
    public interface UserProfileServiceClient {

        public Promise<Profile> get(String userId);

        public void blockNotification(String userId);
    }


Мы видим интерфейс помеченный аннотацией @WorkerClient. Параметр аннотации ссылается на класс реального интерфейса Исполнителя. Таким образом устанавливается связь между существующим интерфейсом и необходимым интерфейсом для конкретного Координатора. Назовем этот интерфейс «клиентским интерфейсом Исполнителя». Этот клиентский интерфейс должен содержать все необходимые координатору методы (можно не объявлять не используемые) и с идентичной сигнатурой аргументов. Любой аргумент может быть типом Promise, если требуется передавать в качестве аргумента результат еще не завершенной задачи.

Теперь переходим к самому интересному — созданию координатора. Для начала ниже представлен интерфейс координатора, используя который клиенты Taskurotta будут запускать нужные им процессы.

    @Decider
    public interface NotificationDecider {

        @Execute
        public void start(String userId, String message);
    }


Этот интерфейс определен как @Decider — т.е. как Координатор. У этой аннотации есть те же свойства, что и у аннотации @Worker — имя и версия. По умолчанию за имя берется полное имя интерфейса, а за версию — «1.0».

Метод start помечен как @Execute. Это означает что через данный метод можно запускать процесс.

Теперь переходим к реализации координатора

    public class NotificationDeciderImpl implements NotificationDecider {

        private static final Logger logger = LoggerFactory.getLogger(NotificationDeciderImpl.class);

        private UserProfileServiceClient userProfileService;
        private MailServiceClient mailService;
        private SMSServiceClient smsService;
        private NotificationDeciderImpl decider;

        @Override
        public void start(String userId, String message) {
            logger.info(".start(userId = [{}], message = [{}])", userId, message);

            Promise<Profile> profilePromise = userProfileService.get(userId);
            Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message);
            decider.blockOnFail(userId, sendResultPromise);
        }

        @Asynchronous
        public Promise<Boolean> sendToTransport(Promise<Profile> profilePromise, String message) {
            logger.info(".sendToTransport(profilePromise = [{}], message = [{}])", profilePromise, message);

            Profile profile = profilePromise.get();

            switch (profile.getDeliveryType()) {
                case SMS: {
                    return smsService.send(profile.getPhone(), message);
                }
                case EMAIL: {
                    return mailService.send(profile.getEmail(), message);
                }

            }

            return Promise.asPromise(Boolean.TRUE);
        }


        @Asynchronous
        public void blockOnFail(String userId, Promise<Boolean> sendResultPromise) {
            logger.info(".blockOnFail(userId = [{}], sendResultPromise = [{}])", userId, sendResultPromise);

            if (!sendResultPromise.get()) {
                userProfileService.blockNotification(userId);
            }
        }
    }


В данном коде мы также опустили инициализацию приватных объектов. Полный и работающий код примера можно посмотреть в проекте taskurotta-getstarted. Тут только отметим, что значения приватных полей получаются через специальную фабрику прокси объектов для Координатора.

В примере реализации есть две точки ожидания результатов выполнения незавершенных задач Координатором. Это метод sendToTransport и blockOnFail. Данные методы будут вызваны только тогда, когда все их аргументы типа Promise будут готовы,
т.е. выполнены соответствующий задачи.

Объекты полей типа MailServiceClient и SMSServiceClient также являются клиентскими интерфейсами к соответствующим Исполнителям. Их инициализацию можно также посмотреть в проекте taskurotta-getstarted.

На данный момент у нас есть все реализованные Исполнители и Координатор. Перейдем непосредственно к запуску Актеров (т.е. Исполнителей и Координаторов).

Выполнение задач может происходить как внутри серверов приложений, так и в виде отдельного java приложения (данный пример использует вариант отдельного приложения из модуля taskurotta\bootstrap). Что делает отдельное приложение:

  • Регистрируется на сервере Taskurotta
  • Запускает пул из N потоков для выполнения задач
  • Получает задачи от серверов Taskurotta
  • Запускает их выполнение
  • Пересылает результат серверу Taskurotta


Для запуска отдельного java приложения используется пакет bootstrap, а конкретнее — класс ru.taskurotta.bootstrap.Main. Ему в качестве аргумента нужно передать местонахождение файла конфигурации в формате YAML.

Как это попробовать запустить? Очень просто. Ниже пошаговая сборка сервера, актеров и их запуск из исходного кода. будьте внимательны, необходимы незначительные изменения если у вас не linux.

Допустим у вас уже есть:
  • jdk 1.7
  • maven 3
  • git


соберем сервер Taskurotta

git clone https://github.com/taskurotta/taskurotta.git
cd taskurotta/


Запустим сборку. Для ускорения отключим тесты.

mvn clean install -DskipTests


Теперь запустим кластер из двух узлов (мы используем одну машину для демонстрационных целей и поэтому различные порты в параметрах запуска). В реальной среде можно запустить необходимое количество машин с одинаковой конфигурацией.

Запускаем первый узел кластера:

java -Xmx64m -Ddw.http.port=8081 -Ddw.http.adminPort=9081 -Ddw.logging.file.currentLogFilename="assemble/target/server1.log" -jar assemble/target/assemble-0.4.0-SNAPSHOT.jar server assemble/src/main/resources/hz.yml


Запускаем второй узел (Мы намеренно ограничиваем память чтобы выявить ее возможные утечки на ранней стадии. В конфигурации данного примера не используется никакая база данных, поэтому на больших объемах нужно давать больше памяти).

java -Xmx64m -Ddw.http.port=8082 -Ddw.http.adminPort=9082 -Ddw.logging.file.currentLogFilename="assemble/target/server2.log" -jar assemble/target/assemble-0.4.0-SNAPSHOT.jar server assemble/src/main/resources/hz.yml


Когда оба сервера соединятся друг с другом, в логе будет похожее на такое сообщение:

    Members [2] {
    	Member [192.168.1.2]:7777
    	Member [192.168.1.2]:7778 this
    }


Откройте консоль в браузере. http://localhost:8081/index.html — первый узел или http://localhost:8082/index.html — второй узел.

Вы можете использовать любой узел для работы с консолью. Они отображают преимущественно одну и туже информацию. В настоящий момент пока не все функции консоли функционируют в данной конфигурации (без базы данных). Все работает в конфигурации с БД oracle и mongodb. См. варианты развертывания в документации.

Теперь давайте запустим наш процесс. Клонируем для этого репозиторий taskurotta-getstarted

git clone https://github.com/taskurotta/taskurotta-getstarted.git
cd taskurotta-getstarted/
mvn clean install


Для того чтобы начали работать актеры, надо стартовать процессы. Запустим их например 91 штук.

java -cp target/getstarted-process-1.0-SNAPSHOT.jar ru.taskurotta.example.starter.NotificationModule http://localhost:8081 91


Проверим в консоли http://localhost:8081/index.html. Выберем вкладку «Queues». Увидим что у координатора 91на задача, что соответствует 91ому запущенному процессу.

image

Теперь запустим координатора. В YAML файле конфигурации определен только он — без исполнителей. Поэтому после запуска не отработают все задачи процесса и мы увидим стоящие в очереди задачи исполнителей.

java -Xmx64m -jar target/getstarted-process-1.0-SNAPSHOT.jar -f src/main/resources/config-decider.yml


В конфигурационном файле определен первый узел нашего кластера как сервер Taskurotta для координатора

     spreader:
          - Spreader:
              class: ru.taskurotta.example.bootstrap.SimpleSpreaderConfig
              instance:
                endpoint: "http://localhost:8081"
                threadPoolSize: 10
                readTimeout: 0
                connectTimeout: 3000


Можно обновить список очередей в консоле и увидеть что есть задачи ожидающие исполнителей.

Теперь давайте запустим исполнителей (оставьте координатора работать) и для демонстрации их направим на второй узел кластера. Так как узлы кластера образуют общую память и среду выполнения внутренних задач, то не имеет значения, на какой из серверов придет запрос от исполнителя.

java -Xmx64m -jar target/getstarted-process-1.0-SNAPSHOT.jar -f src/main/resources/config-workers.yml


У исполнителей установлен второй узел кластера для взаимодействия:

    spreader:
      - Spreader:
          class: ru.taskurotta.example.bootstrap.SimpleSpreaderConfig
          instance:
            endpoint: "http://localhost:8082"
            threadPoolSize: 10
            readTimeout: 0
            connectTimeout: 3000


В итоге все процессы должны полностью отработать и это видно по очередям в консоли управления.

image

Вот пока и все чем хотелось бы поделиться на данный момент. Будем рады предложениям и конструктивной критике.
Tags:
Hubs:
Total votes 11: ↑11 and ↓0+11
Comments8

Articles