Как стать автором
Обновить

Свойство типа Controller Service в кастомном процессоре NiFi

Уровень сложностиСредний
Время на прочтение6 мин
Количество просмотров1.7K

Преамбула

В одном из самодельных процессоров Apache NiFi у меня возникла необходимость работать с файлами сертификатов. Так как в проекте уже были настроенные службы сертификатов (типа StandardRestrictedSSLContextService), я решил сделать у процессора свойство с типом SSLContextService, чтобы можно было подставлять уже настроенные в NiFi службы контроллера и брать данные сертификатов оттуда. Изучил матчасть, ничего сложного. Добавил свойство в процессор, закинул его на flow. Но... процессор не видит мои StandardRestrictedSSLContextService. Пересмотрел и перечитал множество статей, в том числе от уважаемого Pierre Villard, но никак. Пока не наткнулся на реализацию похожего кейса на Github.

Возможно, это банальная проблема, но у меня она вызвала приличные затруднения, поэтому я решил об этом написать. Ниже приведу примеры кода, а так же покажу, как подсунуть свою заглушку в интеграционный тест процессора (что тоже оказалось не совсем очевидной задачей).

Добавление свойства в процессор

Добавляем в процессор свойство, через которое можно будет указывать стандартные службы, обеспечивающие доступ к сертификатам.

public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    .name("SSL Context Service")
    .description("SSL Context Service provides trusted certificates and client certificates for TLS communication.")
    .required(false)
    .identifiesControllerService(SSLContextService.class)
    .build();

В коде метода @OnTrigger процессора доступ к значению свойства будем получать вот так:

SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);

Теперь наверно самый важный участок этой маленькой статьи, ради которого все и затевалось. Правильное добавление зависимостей!

1) В pom.xml основного модуля процессора добавим две зависимости:

<dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-ssl-context-service-api</artifactId>
    <version>${nifi.version}</version>
</dependency>
 
<dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-standard-services-api-nar</artifactId>
    <version>${nifi.version}</version>
    <type>nar</type>
</dependency>

2) В pom.xml nar-модуля добавим одну зависимость:

<dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-standard-services-api-nar</artifactId>
    <version>${nifi.version}</version>
    <type>nar</type>
</dependency>

nifi-ssl-context-service-api - с помощью этой зависимости в модуле процессора у нас появляется возможность использовать интерфейс SSLContextService.class в коде основного модуля процессора

nifi-standard-services-api-nar - эта зависимость в обоих модулях позволит процессору использовать стандартные службы NiFi

Собственно, вот и весь фокус.

Если надо понять, какую зависимость придется использовать, когда нужно добавить в процессор свойство, отличное по типу от SSLContextService, то помочь в этом может анализ исходников NiFi.

Тестирование процессора, в котором используется служба в качестве одного из свойств

Небольшая проблема заключается в том, что для тестирования процессора, у которого одно из свойств это Controller Service, нам надо обязательно создать некую заглушку этого сервиса. А потом и указать ее в качестве одного из свойств.

Подготовим вспомогательные тестовые данные:

DummySSLContextService.class - заглушка для службы
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
import lombok.Builder;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;

@Builder
public class DummySSLContextService extends AbstractControllerService implements SSLContextService {
    private final String keyStoreFile;
    private final String keyStorePassword;
    private final String keyStoreType;
    private final String trustStoreFile;
    private final String trustStorePassword;
    private final String trustStoreType;

    @Override
    public TlsConfiguration createTlsConfiguration() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public SSLContext createContext() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public SSLContext createSSLContext(org.apache.nifi.security.util.ClientAuth clientAuth) throws ProcessException {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public SSLContext createSSLContext(ClientAuth clientAuth) throws ProcessException {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public X509TrustManager createTrustManager() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public String getTrustStoreFile() {
        return trustStoreFile;
    }

    @Override
    public String getTrustStoreType() {
        return trustStoreType;
    }

    @Override
    public String getTrustStorePassword() {
        return trustStorePassword;
    }

    @Override
    public boolean isTrustStoreConfigured() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public String getKeyStoreFile() {
        return keyStoreFile;
    }

    @Override
    public String getKeyStoreType() {
        return keyStoreType;
    }

    @Override
    public String getKeyStorePassword() {
        return keyStorePassword;
    }

    @Override
    public String getKeyPassword() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public boolean isKeyStoreConfigured() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public String getSslAlgorithm() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public void onPropertyModified(PropertyDescriptor propertyDescriptor, String s, String s1) {
        // ничего не делаем
    }
}

TestData.class - константы и экземпляр службы-заглушки для тестов
import java.nio.file.Path;

import org.apache.nifi.ssl.SSLContextService;

public class TestData {
    static final String CLIENT_CERT_FILE_NAME = Path.of("src/test/resources/client-identity.jks").toAbsolutePath().toString();
    static final String CLIENT_TRUST_FILE_NAME = Path.of("src/test/resources/truststore.jks").toAbsolutePath().toString();
    public static final String KEYSTORE_COMMON_PASSWORD = "changeit";
    public static final String KEYSTORE_COMMON_TYPE = "JKS";

    public static final SSLContextService TEST_SSL_CONTEXT_SERVICE = DummySSLContextService.builder()
            .keyStoreFile(CLIENT_CERT_FILE_NAME)
            .keyStorePassword(KEYSTORE_COMMON_PASSWORD)
            .keyStoreType(KEYSTORE_COMMON_TYPE)
            .trustStoreFile(CLIENT_TRUST_FILE_NAME)
            .trustStorePassword(KEYSTORE_COMMON_PASSWORD)
            .trustStoreType(KEYSTORE_COMMON_TYPE)
            .build();
}

И собственно тест (тут SSL_CONTEXT_SERVICE это дескриптор свойства процессора):

    @Test
    void should_run_with_service_property() throws InitializationException {
        final TestRunner testRunner = TestRunners.newTestRunner(MyProcessor.class);

        // устанавливаем значение для одного из свойств тестового процессора
        testRunner.setProperty(SOME_PROPERTY_NAME, SOME_PROPERTY_VALUE);
      
        // устанавливаем для тестового процессора свойство, в котором указывается Controller Service
        // это делается в три шага
        testRunner.addControllerService("TestSSLContextService", TEST_SSL_CONTEXT_SERVICE);
        testRunner.setProperty(SSL_CONTEXT_SERVICE, "TestSSLContextService");
        testRunner.enableControllerService(TEST_SSL_CONTEXT_SERVICE);

        // создаем набор аттрибутов для тестового flow-файла
        Map<String, String> attributes = new HashMap<>();
        attributes.put(SOME_ATTR_NAME, SOME_ATTR_VALUE);

        // добавляем контент и аттрибуты в тестовый flow-файл
        // вместо строки сюда может быть передан поток
        testRunner.enqueue("Flowfile content", attributes);

        // When
        testRunner.run();

        // Then
        List<MockFlowFile> originalFlowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
        List<MockFlowFile> failureFlowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);

        assertThat(originalFlowFiles.size()).isEqualTo(1);
        assertThat(failureFlowFiles.size()).isZero();

        Map<String, String> actualAttributes = originalFlowFiles.get(0).getAttributes();
        assertThat(actualAttributes)
                .isNotNull()
                .containsEntry(SOME_ATTR_NAME, SOME_ATTR_VALUE);
    }

Заключение

Вот и все. Надеюсь эта маленькая статья будет полезна. Замечания и указания на неточности в комментариях очень приветствуются))

Спасибо, что дочитали до конца.

Теги:
Хабы:
Всего голосов 5: ↑4 и ↓1+3
Комментарии0

Публикации

Истории

Работа

Data Scientist
78 вакансий
Java разработчик
347 вакансий

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань