Привет, Хабр! Сегодня я расскажу о своем опыте создания сервиса для отправки сообщений в Apache Kafka с использованием Spring Boot, аутентификацией SASL/Kerberos и применением Avro Schema Registry для продюсера. В процессе разработки я столкнулся с рядом проблем, решение которых потребовало усилий и времени. Надеюсь, мой опыт будет полезен разработчикам, которым предстоит решать подобные задачи.
Ведение
По техзаданию необходимо было создать сервис, который в зависимости от топика отправлял бы сообщения или на один инстанс Kafka (строку, с простой авторизацией с помощью SSL), или на другой, но уже с сериализацией и аутентификацией через Kerberos.
Обмен данными с обоими серверами Kafka использует шифрование с помощью сертификатов, второй использует дополнительную аутентификацию клиента с помощью Kerberos.
Приложение деплоится на Томкат.
Немного теории
Apache Kafka — популярная платформа потоковой передачи данных, часто используемая для создания высоконагруженных распределенных систем. Для обеспечения безопасности могут применятся механизмы аутентификации SASL/Kerberos. Avro Schema Registry используется для управления схемами данных и обеспечивает совместимость версий сообщений и валидации отправляемых\принимаемых данных.

Kerberos — это протокол аутентификации, разработанный для обеспечения безопасного доступа к сети. Он работает на основе системы тикетов, предоставляемых центральным сервером аутентификации (KDC — Key Distribution Center).

SSL (Secure Sockets Layer) используется для шифрования данных и аутентификации сервера и/или клиента. Чтобы настроить SSL для соединения с Kafka с помощью JKS (Java KeyStore), необходимо создать и настроить truststore и keystore файлы. Truststore содержит сертификаты доверенных удостоверяющих центров (CA), а keystore содержит закрытые ключи и сертификаты для идентификации сервера или клиента. В Kafka клиентской конфигурации необходимо указать местоположения этих файлов и их пароли.
Настройка Spring Boot и зависимостей
Первым шагом добавляем необходимые зависимости в pom.xml:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>6.2.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> </dependencies>
Настройка SSL
В конфигурации соединения необходимо указать путь к jks файлу и пароля к нему. В примере ниже это producerRetailFactory.
Настройка Kerberos
Настройка Kerberos требует наличия конфигурационного файла krb5.conf и указания пути к keytab. Файл krb5.conf должен содержать информацию о KDC (Key Distribution Center). В примере ниже это producerMainFactory.
[libdefaults] default_realm = TEST_DOMAIN.TRUS.TCOMPANY [realms] TEST_DOMAIN.TRUS.TCOMPANY = { kdc = DC00.TEST_DOMAIN.trus.tCOMPANY admin_server = S000.TEST_DOMAIN.trus.tCOMPANY kdc = DC01.TEST_DOMAIN.TRUS.TCOMPANY } [domain_realm] .TEST_DOMAIN.trus.tCOMPANY = TEST_DOMAIN.TRUS.TCOMPANY TEST_DOMAIN.trus.tCOMPANY = TEST_DOMAIN.TRUS.TCOMPANY
keytab это двоичный файл содержащий пары Kerberos принципалов и их ключей (полученных с использованием Kerberos пароля). Эти файлы используются для аутентификации, без ввода пароля.
Обычно эти данные предоставляются администраторами сервисов.
Конфигурация для двух серверов Kafka
Так как у нас два разных брокера Kafka, требующих разные типы аутентификации, мы создадим специальный класс для конфигурирования. Один будет использовать SSL для аутентификации, другой — SSL, SASL/Kerberos.
Важно, в классе конфигурации, путь к
krb5.confдолжен быть отдельно указан (иначе будет использован системный) и указывать его нужно не через props (как остальные параметры), а через установку переменной окружения.
System.setProperty("java.security.krb5.conf", krb5ConfFile);
@Configuration @EnableKafka @Slf4j public class KafkaConfig { private final Environment environment; private final JMServiceDao jmServiceDao; public KafkaConfig(Environment environment, JMServiceDao jmServiceDao) { this.environment = environment; this.jmServiceDao = jmServiceDao; } @Bean public ProducerFactory<String, Object> producerRetailFactory() { Map<String, Object> props = new HashMap<>(); String hostname = Optional.ofNullable(System.getenv("HOSTNAME")).filter(Predicate.not(String::isBlank)).orElse(environment.getProperty("COMPUTERNAME", "Localhost")); props.put(CLIENT_ID_CONFIG, hostname + "_" + environment.getProperty("spring.application.name")); props.put(BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("KafkaRetailBrokerHost", jmServiceDao.getKafkaRetailBrokerHost())); props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put("security.protocol", "SSL"); props.put("ssl.enabled.protocols", "TLSv1.2"); props.put("ssl.truststore.location", environment.getProperty("KafkaRetailSSLKeyStoreLocation" + "kafka.retail.truststore.jks", environment.getProperty("catalina.base") + jmServiceDao.getKafkaRetailSSLKeyStoreLocation())); props.put("ssl.truststore.password", environment.getProperty("KafkaRetailSSLKeyStorePwd", jmServiceDao.getKafkaRetailSSLKeyStorePassword())); return new DefaultKafkaProducerFactory<>(props); } @Bean public ProducerFactory<String, Object> producerMainFactory() { // важно! путь к keyTab должен быть экраннированным для windows path String keyTabFile = Paths.get(environment.getProperty("catalina.base") + jmServiceDao.getKafkaMainKeytabLocation()).toAbsolutePath().normalize().toString().replace("\\", "/"); String krb5ConfFile = Paths.get(environment.getProperty("catalina.base") + jmServiceDao.getKafkaMainKrb5ConfLocation()).toAbsolutePath().normalize().toString().replace("\\", "/"); String jksFile = Paths.get(environment.getProperty("KafkaMainSSLKeyStoreLocation" + "kafka.retail.truststore.jks", environment.getProperty("catalina.base") + jmServiceDao.getKafkaMainSSLKeyStoreLocation())).toAbsolutePath().normalize().toString().replace("\\", "/"); Map<String, Object> props = new HashMap<>(); String hostname = Optional.ofNullable(environment.getProperty("HOSTNAME")).filter(Predicate.not(String::isBlank)).orElse(environment.getProperty("COMPUTERNAME", "Localhost")); props.put(CLIENT_ID_CONFIG, hostname + "_" + environment.getProperty("spring.application.name")); props.put(BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("KafkaMainBrokerHost", jmServiceDao.getKafkaMainBrokerHost())); props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("specific.avro.reader", "true"); props.put("sasl.mechanism", "GSSAPI"); props.put("sasl.jaas.config", """ com.sun.security.auth.module.Krb5LoginModule required serviceName="kafka" useKeyTab=true storeKey=true keyTab="%s" principal="%s"; """.formatted(keyTabFile, jmServiceDao.getKafkaMainPrincipal())); props.put("session.timeout.ms", "45000"); props.put("schema.registry.url", jmServiceDao.getKafkaMainSchemaRegistryUrl()); // параметр устанавливается не через props System.setProperty("java.security.krb5.conf", krb5ConfFile); props.put("security.protocol", "SASL_SSL"); props.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1,TLSv1"); props.put("ssl.truststore.location", environment.getProperty("KafkaMainSSLKeyStoreLocation" + "kafka.main.truststore.jks", jksFile)); props.put("ssl.truststore.password", environment.getProperty("KafkaMainSSLKeyStorePwd", jmServiceDao.getKafkaMainSSLKeyStorePassword())); //log.debug("kafka props {}", props); //log.debug("system env {}", System.getProperties()); return new DefaultKafkaProducerFactory<>(props); } @Bean public KafkaTemplate<String, Object> kafkaTemplateForRetail() { return new KafkaTemplate<>(producerRetailFactory()); } @Bean public KafkaTemplate<String, Object> kafkaTemplateForMain() { return new KafkaTemplate<>(producerMainFactory()); } }
В этом классе необходимо обратить внимание (кроме обязательных параметров), на параметр sasl.jaas.config и здесь я столкнулся с первой проблемой, оказалось, что нигде нет проверки на правильность пути к keyTab файлу. Разработка велась в ОС Windows, и windows путь, например C:\ra_projects\Java\apache-tomcat-10.1.10\conf\keystore\tkafka.keytab из-за отсутствия экранирования слешей (путь хранится в БД), преобразуется во внутренностях класса связанного с авторизацией, в C:a_projectsJava�pache-tomcat-10.1.10confkeystore kafka.keytab, в Exception я получал только, что авторизация не удалась:
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
И только в режиме Debug я увидел кривой путь к файлу
keyTab. Добавил экранирование. Так же завел issue.
Вторая проблема, я рандомно получал ошибку:
javax.security.sasl.SaslException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will retry.
Оказалось, что мне предоставили неправильный файл
krb5.conf, содержащий неправильные пути к KDC доменам. А в последствии, что распарсенный файлkrb5.confхранится кэшированным, и даже если его содержимое поменять, он не будет перечитан до перезапуска Tomcat сервера, на который деплоилось мое приложение.
Сериализация данных
В конфигурационном классе KafkaConfig необходимо указать домен к хранилище схем:
props.put("schema.registry.url", jmServiceDao.getKafkaMainSchemaRegistryUrl());
Для отправки данных в Kafka был написан класс, который использует две конфигурации в зависимости от данных, которые были получены для отправки:
@Service @Slf4j public class JMServiceImpl implements JMService { private enum KafkaType { KAFKA_RETAIL, KAFKA_MAIN } private final JMServiceDao jmServiceDao; // авторизация через SSL, тут продюсер для отправки "сырых" данных private final KafkaTemplate<String, Object> kafkaTemplateForRetail; // авторизация через SSL,Kerberos, тут продюсер для отправки отправки сериализованных данных private final KafkaTemplate<String, Object> kafkaTemplateForMain; @Autowired public JMServiceImpl(JMServiceDao plcPackageDao, @Qualifier("kafkaTemplateForRetail") KafkaTemplate<String, Object> kafkaTemplateForRetail, @Qualifier("kafkaTemplateForMain") KafkaTemplate<String, Object> kafkaTemplateForMain) { this.jmServiceDao = plcPackageDao; this.kafkaTemplateForRetail = kafkaTemplateForRetail; this.kafkaTemplateForMain = kafkaTemplateForMain; } @Override public Mono<KafkaSendOKResponse> sendMessage(QueueMessageId queueMessageId) throws IOException { log.info("Попытка отправить сообщение queueMessageId = {}", queueMessageId.getQueueMessageId()); QueueMessage message = jmServiceDao.getQueueMessage(queueMessageId.getQueueMessageId()); if (message == null) { throw new ResponseStatusException(NOT_FOUND, "В теле запроса указан несуществующий идентификатор сообщения в очереди"); } else { KafkaType messagingSystem = KafkaType.valueOf(message.getMessagingSystemCode()); Object messageForSend; KafkaTemplate<String, Object> kafkaTemplate = switch (messagingSystem) { case KAFKA_MAIN -> { ObjectMapper objectMapper = new ObjectMapper(); //строка с данными в JSON JsonNode rootNode = objectMapper.readTree(message.getMessageData()); //нужна сериализация SendPTI ptiMessage = new SendPTI(); ptiMessage.setSiebelId(rootNode.get("siebel_id").asText()); ptiMessage.setPti(ByteBuffer.wrap(rootNode.get("pti").asText().getBytes(StandardCharsets.US_ASCII))); ptiMessage.setPtiDate(Instant.parse(rootNode.get("pti_date").asText())); messageForSend = ptiMessage; yield kafkaTemplateForMain; } case KAFKA_RETAIL -> { //отправляем просто строку messageForSend = message.getMessageData(); yield kafkaTemplateForRetail; } default -> throw new IllegalArgumentException("Неподдерживаемая система: " + messagingSystem); }; log.debug("Отправляем в топик {} данные {}", message.getTopic(), messageForSend); return Mono.fromFuture(kafkaTemplate.send(message.getTopic(), messageForSend)) .map(result -> { log.debug("Результат {} отправки сообщения {}", result, queueMessageId.getQueueMessageId()); LocalDateTime localDateTime = result.getRecordMetadata().hasTimestamp() ? LocalDateTime.ofInstant(Instant.ofEpochMilli(result.getRecordMetadata().timestamp()), ZoneId.systemDefault()) : null; return new KafkaSendOKResponse() .setQueueMessageId(queueMessageId.getQueueMessageId()) .setSendStatus(1) .setRecordMetadata(result.getRecordMetadata().toString()) .setSentDate(localDateTime); }) .doOnError(ex -> { log.error("Произошла ошибка {} при отправке сообщения {}", ex, queueMessageId.getQueueMessageId()); }); } } }
Выборка нужной конфигурации (бина) осуществляется с помощью аннотации @Qualifier. Для KAFKA_MAIN используется Avro сериализатор, для KAFKA_RETAIL отправляется строка.
Для генерации класса SendPTI для сериализации отправляемых\принимаемых данных используется Maven плагин:
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.3</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin>
В нем нужно указать путь к *.avcs файлу, содержащему исходную схему, мне был предоставлен такой:
{ "type": "record", "name": "SendPTI", "namespace": "com.technology.kafkaproducer.jmessageservice.SendPTI", "fields": [ { "name": "siebel_id", "type": "string" }, { "name": "pti", "type": { "type": "bytes", "scale": 5, "precision": 7, "connect.version": 1, "connect.parameters": { "scale": "5", "connect.decimal.precision": "7" }, "connect.name": "org.apache.kafka.connect.data.Decimal", "logicalType": "decimal" } }, { "name": "pti_date", "type": { "type": "long", "connect.version": 1, "connect.name": "org.apache.kafka.connect.data.Timestamp", "logicalType": "timestamp-millis" } } ], "connect.name": "ru.COMPANY.schema.SCORF.SendPTI" }
При сборке проекта будет запускаться плагин и будет сгенери��ован POJO класс, в моем примере выше это класс SendPTI.
Когда все уже было готово, я столкнулся с другой проблемой; доступ в сеть у меня осуществляется через рабочий прокси, но при попытке отправить сообщение в Kafka, происходила безуспешная попытка подключения к Schema Registry и приложение падало с ошибкой:
2024-05-29 11:09:15.754 [http-nio-8081-exec-2] ERROR i.c.k.s.client.rest.RestService - Failed to send HTTP request to endpoint: https://schema-kafka-test/subjects/SCORF.SendPTI-value/versions/latest/ java.io.IOException: Unable to tunnel through proxy. Proxy returns "HTTP/1.1 407 Proxy Authentication Required"
Решением стало добавление дополнительных параметром при запуске моего локального (разработческого) сервера Томкат:
-Dhttps.proxyHost=proxy.host -Dhttps.proxyPort=8080 -Dhttps.proxyUser=user -Dhttps.proxyPassword=superSecretPassword
После этого я смог успешно отправлять данные из одного сервиса на два разных сервера Kafka с разными типами авторизации и с сериализацией и без.
Мира!
