Облачно-ориентированный обмен сообщениями на платформе Red Hat OpenShift с использованием Quarkus и AMQ Online

    Всем привет! Вот и он – наш заключительный пост из серии про Quarkus! (Кстати, смотрите наш вебинар «Это Quarkus – Kubernetes native Java фреймворк». Покажем, как начать «с нуля» или перенести готовые решения)



    В предыдущем посте мы рассмотрели соответствующие инструменты, с помощью которых можно количественно оценить улучшения, полученные в результате модернизации Java-приложений.

    Начиная с версии 0.17.0, Quarkus поддерживает использование Advanced Message Queuing Protocol (AMQP), который является открытым стандартом передачи бизнес-сообщений между приложениями или организациями.

    Red Hat AMQ Online – это сервис, построенный на основе открытого проекта EnMasse и реализующий механизм обмена сообщений на базе платформы Red Hat OpenShift. Подробнее о том, как он устроен, см. здесь (EN). Сегодня мы покажем, как объединить AMQ Online и Quarkus, чтобы построить современную систему обмена сообщениями на базе OpenShift с использованием двух новых технологий, связанных с обработкой сообщений.

    Предполагается, что вы уже развернули AMQ Online на платформе OpenShift (если нет, то см. руководство по установке).

    Для начала мы создадим приложения Quarkus, которое будет представлять собой простую систему обработки заказов с использованием реактивного обмена сообщениями. Это приложение будет включать в себя генератор заказов, отправляющий заказы в очередь сообщений с фиксированным интервалом, а также обработчик заказов, который будет обрабатывать сообщения из очереди и формировать подтверждения, доступные для просмотра в браузере.

    После создания приложения мы покажем, как внедрять в него конфигурацию системы обмена сообщениями и задействуем AMQ Online, чтобы инициализировать на этой системе нужные нам ресурсы.

    Приложение Quarkus


    Наше Quarkus-приложение выполняется на OpenShift и представляет собой модифицированную версию программы amqp-quickstart. Полный пример клиентской части можно найти здесь.

    Генератор заказов


    Генератор каждые 5 секунд просто монотонно отправляет растущие идентификаторы заказов на адрес «orders».

    @ApplicationScoped
    public class OrderGenerator {
     
        private int orderId = 1;
     
        @Outgoing("orders")
        public Flowable<Integer> generate() {
            return Flowable.interval(5, TimeUnit.SECONDS)
            .map(tick -> orderId++);
        }
    }
    

    Обработчик заказов


    Обработчик заказов еще проще, он всего лишь возвращает идентификатор подтверждения на адрес «confirmations».

    @ApplicationScoped
    public class OrderProcessor {
        @Incoming("orders")
        @Outgoing("confirmations")
        public Integer process(Integer order) {
            // Идентификатор подтверждения равен удвоенному идентификатору заказа <img draggable="false" class="emoji" alt=":-)" src="https://s.w.org/images/core/emoji/11.2.0/svg/1f642.svg">
            return order * 2;
        }
    }
    

    Ресурсы подтверждения


    Ресурс подтверждения – это конечная точка HTTP для листинга сформированных в результате работы нашего приложения подтверждений.

    @Path("/confirmations")
    public class ConfirmationResource {
     
        @Inject
        @Stream("confirmations") Publisher<Integer> orders;
     
        @GET
        @Produces(MediaType.TEXT_PLAIN)
        public String hello() {
            return "hello";
        }
     
     
        @GET
        @Path("/stream")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        public Publisher<Integer> stream() {
            return orders;
        }
    }
    

    Настройка


    Для подключения к AMQ Online нашему приложению понадобится некоторые конфигурационные данные, а именно: конфигурация Quarkus-коннектора, сведения о конечной точке AMQP и клиентские учетные данные. Лучше, конечно, держать все конфигурационные данные в одном месте, но мы специально разделим их, чтобы показать возможные варианты настройки приложения Quarkus.

    Коннекторы


    Конфигурацию коннектора можно предоставить на этапе компиляции с помощью файла свойств приложения:

    mp.messaging.outgoing.orders.connector=smallrye-amqp
    mp.messaging.incoming.orders.connector=smallrye-amqp
    

    Чтобы не усложнять, мы будем использовать очередь сообщений только для адреса «orders». А адрес «confirmations» в нашем приложении будет использовать очередь в памяти.

    Конечная точка AMQP


    На этапе компиляции имя хоста и номер порта для конечной точки AMQP неизвестны, поэтому их нужно внедрить. Конечную точку можно задать в configmap, который создается AMQ Online, поэтому мы определим их через переменные среды в манифесте приложения:

    spec:
      template:
        spec:
          containers:
          - env:
            - name: AMQP_HOST
              valueFrom:
                configMapKeyRef:
                  name: quarkus-config
                  key: service.host
            - name: AMQP_PORT
              valueFrom:
                configMapKeyRef:
                  name: quarkus-config
                  key: service.port.amqp
    

    Учетные данные


    Маркер учетной записи сервиса можно использовать для аутентификации нашего приложения в OpenShift. Для этого надо сначала создать пользовательский ConfigSource, который будет читать маркер аутентификации из файловой системы pod’а:

    public class MessagingCredentialsConfigSource implements ConfigSource {
        private static final Set<String> propertyNames;
     
        static {
            propertyNames = new HashSet<>();
            propertyNames.add("amqp-username");
            propertyNames.add("amqp-password");
        }
     
        @Override
        public Set<String> getPropertyNames() {
            return propertyNames;
        }
     
        @Override
        public Map<String, String> getProperties() {
            try {
                Map<String, String> properties = new HashMap<>();
                properties.put("amqp-username", "@@serviceaccount@@");
                properties.put("amqp-password", readTokenFromFile());
                return properties;
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
     
        @Override
        public String getValue(String key) {
            if ("amqp-username".equals(key)) {
                return "@@serviceaccount@@";
            }
            if ("amqp-password".equals(key)) {
                try {
                    return readTokenFromFile();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
            return null;
        }
     
        @Override
        public String getName() {
            return "messaging-credentials-config";
        }
     
        private static String readTokenFromFile() throws IOException {
            return new String(Files.readAllBytes(Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token")), StandardCharsets.UTF_8);
        }
    }
    

    Сборка и развертывание приложения


    Поскольку приложение надо скомпилировать в исполняемый файл, потребуется виртуальная машина GraalVM. Подробнее о том, как настроить для этого окружение, см. соответствующие инструкции в Quarkus Guide.

    Затем, следуя приведенным там инструкциям, надо скачать исходник, провести сборку и выполнить развертывание нашего приложения:

    git clone https://github.com/EnMasseProject/enmasse-example-clients
    cd enmasse-example-clients/quarkus-example-client
    oc new-project myapp
    mvn -Pnative -Dfabric8.mode=openshift -Dfabric8.build.strategy=docker package fabric8:build fabric8:resource fabric8:apply
    

    После этих команд приложение будет развернуто, но не запустится до тех пор, пока мы не настроим в AMQ Online необходимые нам ресурсы обмена сообщениями.

    Настройка системы обмена сообщениями


    Теперь осталось задать в системе обмена сообщениями ресурсы, которые нужны нашему приложению. Для этого надо создать: 1) адресное пространство, чтобы инициализировать конечную точку системы обмена сообщениями; 2) адрес, чтобы настроить адреса, которые мы используем в приложении; 3) пользователя системы обмена сообщениями, чтобы задать учетные данные клиента.

    Пространство адресов


    Объект AddressSpace в AMQ Online – это группа адресов, которые совместно используют конечные точки подключения, а также политики аутентификации и авторизации. При создании пространства адресов можно задать, как именно будут предоставляться конечные точки системы обмена сообщениями:

    apiVersion: enmasse.io/v1beta1
    kind: AddressSpace
    metadata:
      name: quarkus-example
    spec:
      type: brokered
      plan: brokered-single-broker
      endpoints:
      - name: messaging
        service: messaging
        exports:
        - name: quarkus-config
          kind: configmap
    

    Адреса


    Адреса используются для отправки и приема сообщений. У каждого адреса есть тип, который определяет его семантику, а также план, который задает количество резервируемых ресурсов. Адрес можно определить, например, вот так:

    apiVersion: enmasse.io/v1beta1
    kind: Address
    metadata:
      name: quarkus-example.orders
    spec:
      address: orders
      type: queue
      plan: brokered-queue
    

    Пользователь системы обмена сообщениями


    Чтобы отправлять и получать сообщения на ваши адреса могли только доверенные приложения, в системе обмена сообщениями необходимо создать пользователя. Для приложений, работающих на кластере, клиентов можно аутентифицировать с помощью учетной записи сервиса OpenShift. Пользователя «serviceaccount» можно определить, например, так:

    apiVersion: user.enmasse.io/v1beta1
    kind: MessagingUser
    metadata:
      name: quarkus-example.app
    spec:
      username: system:serviceaccount:myapp:default
      authentication:
        type: serviceaccount
      authorization:
      - operations: ["send", "recv"]
        addresses: ["orders"]
    

    Разрешения для настройки приложения


    Чтобы AMQ Online мог создать configmap, который мы использовали для внедрения сведений о конечной точке AMQP, необходимо задать роль и привязку роли (Role и RoleBinding):

    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: quarkus-config
    spec:
      rules:
      - apiGroups: [ "" ]
        resources: [ "configmaps" ]
        verbs: [ "create" ]
      - apiGroups: [ "" ]
        resources: [ "configmaps" ]
        resourceNames: [ "quarkus-config" ]
        verbs: [ "get", "update", "patch" ]
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: quarkus-config
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: quarkus-config
    subjects:
    - kind: ServiceAccount
      name: address-space-controller
      namespace: amq-online-infra
    

    Как применить конфигурации


    Применить конфигурацию системы обмена сообщениями можно вот так:

    cd enmasse-example-clients/quarkus-example-client
    oc project myapp
    oc apply -f src/main/resources/k8s/addressspace
    oc apply -f src/main/resources/k8s/address
    

    Верификация приложения


    Чтобы убедиться, что приложение запустилось, первым делом проверим, создались ли и активны ли соответствующие адреса:

    until [[ `oc get address quarkus-example.prices -o jsonpath='{.status.phase}'` == "Active" ]]; do echo "Not yet ready"; sleep 5; done
    

    Затем проверим URL маршрута приложения (просто откроем это адрес в браузере):

    echo "http://$(oc get route quarkus-example-client -o jsonpath='{.spec.host}')/prices.html"
    

    В браузере должно быть видно, что тикеты периодически обновляются по мере того, как сообщения отправляются и принимаются AMQ Online.

    Подводим итоги


    Итак, мы написали приложение Quarkus, использующее AMQP для обмена сообщениями, настроили это приложение для работы на платформе Red Hat OpenShift, а также внедрили его конфигурацию на основе конфигурации AMQ Online. Затем мы создали манифесты, необходимые для инициализации системы обмена сообщениями под наше приложение.

    На этом мы завершаем серию про Quarkus, но впереди много нового и интересного, оставайтесь с нами!
    Red Hat
    Программные решения с открытым исходным кодом

    Комментарии 0

    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

    Самое читаемое