Pull to refresh

Создание компонента Apache Camel

Level of difficultyEasy
Reading time6 min
Views1.1K

Приветствую, сообщество!

Меня зовут Александр, я java разработчик в компании БФТ-Холдинг. Тружусь я на проекте СМЭВ-адаптера, где мы занимаемся транзитивной обработкой сообщений. В нашу зону ответственности входит несколько микросервисов, которые обрабатывают очень много сообщений, почти ничего не пишут в БД, но часто обращаются в сторонние прикладные сервисы.

Для отслеживания пути сообщения через наши микросервисы мы используем Zipkin. Помимо этого в проекте задействован Apache Camel, с помощью которого мы выстраиваем цепочку обработки сообщения в одном конкретном микросервисе. Стандартные средства для работы с Zipkin обычно позволяют легко добавить к трассе вход, выход в сервис и запись в БД, но,
т.к. к нас не совсем стандартное поведение у сервисов, нам хотелось выделять в Zipkin и обращение в сторонние сервисы.

Хотелось эту логику как-то элегантно встроить в роут Camel, но существующие средства такой возможности не предоставляли.

Было принято решение написать свой компонент для Apache Camel. Делали мы это впервые и, к сожалению, полноценного гайда в интернетах найти не удалось...

Встречайте! Гайд по написанию собственного Camel-компонента!

Основные составляющие camel-компонента:

* Component - отвечает за создание Endpointa и является входной точкой в ваш компонент.
* Endpoint - отвечает за создание Producer и Consumer. Также хранит в себе параметры из урла.
* Producer - принимает запросы к вашему компоненту (`to("ref")`).
* Consumer - отправляет сообщения для слушателей вашего компонента (`from("ref")`).

Теперь подробнее про каждого.

Component

Класс компонента необходимо аннотировать @Component и передать имя вашего camel-компонента. Также отнаследовать его от абстрактного класса DefaultComponent и переопределить метод createEndpoint. Как можно было догадаться этот метод отвечает за создание Endpoint и важно позаботиться о том, чтобы все необходимые зависимости попали в него, если вы не желаете их получать потом обходными путями. Кроме этого, в этом методе определяются параметры из урла.

@Component("zipkintrace")
public class ZipkinTraceComponent extends DefaultComponent {

    // Зависимости
    private final ZipkinTraceProperties zipkinTraceProperties;
    private final ZipkinTraceCache zipkinTraceCache;


    public ZipkinTraceComponent(
        CamelContext context, ZipkinService zipkinService, ZipkinTraceProperties zipkinTraceProperties,
        ZipkinTraceCache zipkinTraceCache
    ) {
        super(context);
        this.zipkinService = zipkinService;
        this.zipkinTraceProperties = zipkinTraceProperties;
        this.zipkinTraceCache = zipkinTraceCache;
    }

    @Override
    protected ZipkinTraceEndpoint createEndpoint(
        String uri, String remaining, Map<String, Object> parameters
    ) throws Exception {

        ZipkinTraceEndpoint endpoint = new ZipkinTraceEndpoint(
            uri, this, zipkinTraceProperties, zipkinTraceCache
        );

        // Сохранения параметров из урла
        setProperties(endpoint, parameters);
        endpoint.setAction(remaining);

        return endpoint;
    }
}

Endpoint

Этот класс поинтереснее. Тут описывается вся необходимая информация для Apache Camel.

Прежде всего аннотируем его @UriEndpoint. Аннотация принимает множество параметров, описание которых вы найдёте в javaDoc её файла.

Если вы не хотите полностью настраивать Endpoint для сamel, наследуемся от DefaultEndpoint и имплементируем AsyncEndpoint, чтобы дать понять фреймворку, что Endpoint поддерживает асинхронную обработку сообщений.

В полях класса определяем все возможные параметры, которые можно передать в урле и помечаем их соответствующими аннотациями.

Важно! У каждого такого поля должен быть геттер и сеттер с описанным JavaDoc для них. Иначе camel-компонент не собрать.

В этом же классе переопределяем методы создания Producer и Consumer

@UriEndpoint(
    firstVersion = "3.21.0",
    scheme = "zipkintrace",
    syntax = "zipkintrace:action",
    title = "zipkintrace",
    category = Category.LOG,
    producerOnly = true,
    headersClass = ZipkinTraceConstants.class
)
public class ZipkinTraceEndpoint extends DefaultEndpoint implements AsyncEndpoint {
    private final ZipkinTraceProperties zipkinTraceProperties;
    private final ZipkinTraceCache zipkinTraceCache;

    @UriPath
    @Metadata(required = true)
    private String action;
    @UriParam
    private String route;
    @UriParam
    private String processor;
    @UriParam
    private String messageId;
    @UriParam
    private String originalMessageId;
    @UriParam
    private String iisId;
    @UriParam
    private boolean buildTraceContext;
    @UriParam(description = "Трасса, которую необходимо продолжить")
    private String traceContext;

    public ZipkinTraceEndpoint(String endpointUri, Component component,
                               ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache) {
        super(endpointUri, component);
        this.zipkinTraceProperties = zipkinTraceProperties;
        this.zipkinTraceCache = zipkinTraceCache;
    }

    @Override
    public Producer createProducer() throws Exception {
        return new ZipkinTraceProduces(this, zipkinTraceProperties, zipkinTraceCache);
    }
    
    @Override
    public Consumer createConsumer(Processor processor) throws Exception {
        throw new IllegalArgumentException("zipkintraser has no consumer, so you cannot use get any data from him");
    }
    
    /**
     * Действие относительно трассы zipkin.
     * Перечень в ZipkinTraceAction
     */
    public String getAction() {
        return action;
    }
    
    /**
     * Действие относительно трассы zipkin.
     * Перечень в ZipkinTraceAction
     */
    public void setAction(String action) {
        this.action = action;
    }
    
    // остальные геттеры и сеттеры
}

Producer

Тут всё проще. Как и раньше, если не хотим полностью настраивать Producer, используем стандартный абстрактный класс - DefaultProducer или DefaultAsyncProducer. Переопределяем getEndpoint, чтобы не получать стандартный интерфейс, и метод полезный работы process. В асинхронном варианте последний метод будет иметь в параметрах callback для завершения потока.

public class ZipkinTraceProduces extends DefaultAsyncProducer {
  
    private final ZipkinTraceProperties zipkinTraceProperties;
    private final ZipkinTraceCache zipkinTraceCache;
    
    public ZipkinTraceProduces(ZipkinTraceEndpoint endpoint,
                               ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache
    ) {
        super(endpoint);
        this.zipkinTraceProperties = zipkinTraceProperties;
        this.zipkinTraceCache = zipkinTraceCache;
    }
    
    @Override
    public ZipkinTraceEndpoint getEndpoint() {
        return (ZipkinTraceEndpoint) super.getEndpoint();
    }
    
    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
    
        if (!isRunAllowed()) {
            return shutDownWithException(exchange, callback);
        }
    
        try {
    
            // полезная работа
            
            callback.done(true);
            return true;
        } catch (Throwable e) {
            exchange.setException(e);
            callback.done(true);
    
            return true;
        }
    }
    
    private boolean shutDownWithException(Exchange exchange, AsyncCallback callback) {
    
        if (isNull(exchange.getException())) {
            exchange.setException(new RejectedExecutionException());
        }
        callback.done(true);
        return true;
    }

}

Consumer

В этом классе определяется логика, которая будет отправлять сообщения слушателям. Например, может запускаться слушатель очереди или какая-то крон-задача.

Для этого используем класс DefaultConsumer в качестве родительского и переопределяем методы doStartdoStop.

Если поток сообщений может быть приостановлен (не полное отключение), нужно пометить класс интерфейсом Suspendable. Методы для обработки этого поведения doSuspend и doResume

public class ZipkinTraceConsumer extends DefaultConsumer {
  
    public ZipkinTraceConsumer(Endpoint endpoint, Processor processor) {
        super(endpoint, processor);
    }
    
    @Override
    protected void doStart() throws Exception {
        super.doStart();
    }
    
    @Override
    protected void doStop() throws Exception {
        super.doStop();
    }
    
    @Override
    protected void doSuspend() {
    }
    
    @Override
    protected void doResume() throws Exception {
    }

}

Теперь неочевидное

К сожалению, чтобы фреймворк заметил ваш компонент и позволил его использовать, действий, описанных выше, недостаточно. Помимо всего этого нужно добавить плагин, который сгенерирует метаинформацию по вашему компоненту во время компиляции кода.

<plugins>
    <plugin>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-component-maven-plugin</artifactId>
        <version>${camel-version}</version>
        <executions>
            <execution>
                <id>generate</id>
                <goals>
                    <goal>generate</goal>
                </goals>
                <phase>process-classes</phase>
            </execution>
        </executions>
    </plugin>
</plugins>

Также создать сам файл мета информации в пакете

resources/META-INF/services/org/apache/camel/component/

Файл назвать по имени компонента.

Содержимое файла

class=ru.gov.pfr.ecp.iis.smev.adapter.zipkin.camel.component.ZipkinTraceComponent

И вот теперь camel признает все ваши труды.

Что получилось у нас

 from("direct:" + FSSP_REPORT_ARREST_PROCESSING_ROUTE)
    .routeId(FSSP_REPORT_ARREST_PROCESSING_ROUTE)
    .log(LoggingLevel.INFO, FSSP_REPORT_ARREST_PROCESSING_ROUTE + ".start")
    .to("zipkintrace:scoped?processor=ReportArrestXmlEACreateProcessor") <-- Обращение в условный S3
    .process(reportArrestXmlEACreateProcessor)
    .to("zipkintrace:scoped?processor=ReportArrestXmlSignProcessor") <-- Обращение в прикладной сервис
    .process(reportArrestXmlSignProcessor)
    .to("zipkintrace:scoped?processor=ReportArrestArchiveCreateProcessor") <-- Сохранение результата в S3
    .process(reportArrestArchiveCreateProcessor)
    .to("zipkintrace:end")
    .process(convertProcessor);

Надеюсь, эта статья будет полезна и убережёт вас от подводных каменей Apache Camel.

Полезные материалы

  1. Код примера

  2. Документация фреймворка

  3. Другие компоненты в открытом доступе

  4. Stackoverflow

Tags:
Hubs:
Total votes 7: ↑6 and ↓1+9
Comments1

Articles

Information

Website
bftcom.com
Registered
Founded
Employees
1,001–5,000 employees
Location
Россия